#!/usr/bin/env python
#\if DOXYGEN_IGNORE ############################################################
# #
# Copyright (C) 2016 by John Spitzak #
# #
# This program is free software; you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation; either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program; if not, write to the #
# Free Software Foundation, Inc., #
# 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. #
# #
#\endif ########################################################################
################################################################################
#\defgroup difxrun DiFXrun
#
#\brief Run a list of jobs on the DiFX server with limited controls.
#
# Usage: DiFXrun [options] [.input path list]
#
# DiFXrun runs a list of DiFX jobs, specified by .input
file paths, on a DiFX cluster.
# By default it tries to distribute the jobs on the cluster in a reasonable way such that the cluster will not
# be overwhelmed, or it will do so based on
# user command-line settings (reasonable or otherwise). Jobs are distributed amongst
# existing DiFX cluster processors by actively altering the .machines and .threads files associated
# with them. There are three basic
# "modes" of operation:
#
#
# -a, --all | Run the entire list of .input files using all available
# processors, rather than trying to balance or distribute
# processing.
# |
-c, --config_only | Run the configuration test on each job ONLY. The job will
# not actually be run. The test results (pass/fail) will be
# displayed.
# |
-d, --datasource NODE | Use NODE as a data sources. This is needed if mark5 modules
# are employed or if only specific processor nodes have access to data
# files. This option must be used once for each data source. Data sources
# will be blindly used in the order they are specified.
# |
-D, --difx VERSION | Run using a specific DiFX version. If not specified
# the value of the DIFX_VERSION environment variable will
# be used. Failing that, "DIFX-DEVEL" will be used.
# |
-e, --eliminate NODE | Eliminate node names matching the regular expression NODE
# from the list of nodes that will be used for processing. This
# argument can be used multiple times to include multiple regular expressions.
# |
-g, --generate | Use whatever existing .machines and .threads files exist
# for each job. If no such files exist, new ones will be
# created (this option is actually "don't generate" but that was too clumsy).
# |
-h, --help | Print help information and quit.
# |
-H, --hostname NAME | Use NAME as the host of the DiFX Server program.
# Default is to use DIFX_CONTROL_HOST environment variable.
# |
-m, --manager NODE | Make NODE the head node used for processing. If this is
# not specified DiFXrun will use the node where guiServer
# is located.
# |
-p, --processors NUM | Use NUM processors for each job. If this option is not used
# DiFXrun will use two (an arbitrary number, admittedly).
# |
-P, --port PORT | Use PORT as the TCP port to communicated with the DiFX Server.
# Default is to use DIFX_CONTROL_PORT environment variable.
# |
-s, --sequential | Run jobs sequentially instead of simultaneously (which is the
# default). All processors will be employed unless -p specifies
# a number.
# |
-t, --timeout SEC | Use SEC seconds as the timeout value for each job. This is the
# amount of time DiFXrun will wait before it gives up on a
# "silent" (i.e. no messages received from) job and declares it
# non-responsive. Default value is 300.0.
# |
0:
# This little mess clears the screen (cross-platform I think) and starts
# subsequent printing at the top.
os.system( 'cls' if os.name == 'nt' else 'clear' )
# Find some ideal column sizes.
maxCol1 = 12
maxCol2 = len( "Initializing" )
# Find some ideal column sizes
for job in list(jobs.keys()):
if len( job ) > maxCol1:
maxCol1 = len( job )
if len( jobs[job][0] ) > maxCol2:
maxCol2 = len( jobs[job][0] )
for job in sorted( jobs.keys() ):
newStr = str( job )
while len( newStr ) < maxCol1 + 3:
newStr += " "
shortStr = jobs[job][0]
while len( shortStr ) < maxCol2 + 3:
shortStr += " "
newStr += shortStr
prog = int( jobs[job][1] )
if prog < 100:
newStr += " "
if prog < 10:
newStr += " "
if jobs[job][0] != "not started" and not configOnly:
newStr += str( prog ) + "% ["
shortStr = ""
if prog > 100:
prog = 100
if prog < 0:
prog = 0
while prog > 0:
shortStr += "X"
prog -= 2
while len( shortStr ) < 50:
shortStr += " "
newStr += shortStr + "]"
if showProcessorUsage:
newStr += " ["
for proc in sorted( processorsList.keys() ):
try:
index = processorsList[proc][1].index( job )
newStr += "*"
except:
try:
index = processorsList[proc][2].index( job )
newStr += "O"
except:
newStr += "."
newStr += "]"
print(newStr)
#===============================================================================
# Get the final status of this job and cause the job display to reflect it.
#===============================================================================
def produceFinalStatus( thisJob, jobName, failure ):
if thisJob == None and jobName == None:
# A call with "None" occurs when there is a connection problem. Close
# all of the open jobs "as is" so the program will terminate.
for job in list(jobs.keys()):
print(str( jobs[job] ))
if not jobs[job][2]:
jobs[job] = ( "Broken Socket", jobs[job][1], True )
return
elif failure:
jobs[jobName] = ( "Not Responding", jobs[jobName][1], True )
difx.stop( thisJob.inputFile )
elif jobs[jobName][2]:
pass
else:
if thisJob.finalMessage() == thisJob.RUN_DIFX_JOB_ENDED_GRACEFULLY:
jobs[jobName] = ( "Done", 100.0, True )
elif thisJob.finalMessage() == thisJob.RUN_DIFX_JOB_ENDED_WITH_ERRORS:
jobs[jobName] = ( "Ran w/Errors", 100.0, True )
elif thisJob.finalMessage() == thisJob.RUN_DIFX_JOB_FAILED:
jobs[jobName] = ( "Failed", jobs[jobName][1], True )
elif thisJob.finalMessage() == thisJob.RUN_DIFX_JOB_TERMINATED:
jobs[jobName] = ( "Terminated", jobs[jobName][1], True )
elif thisJob.finalMessage() == thisJob.RUN_DIFX_FAILURE_INPUTFILE_BAD_CONFIG:
jobs[jobName] = ( "Config Failed", jobs[jobName][1], True )
elif thisJob.finalMessage() == thisJob.RUN_DIFX_CONFIG_PASSED:
jobs[jobName] = ( "Config Passed", jobs[jobName][1], True )
elif thisJob.finalMessage() == thisJob.RUN_DIFX_NOT_RESPONDING:
jobs[jobName] = ( "Not Responding", jobs[jobName][1], True )
difx.stop( thisJob.inputFile )
if responder != None:
responder.updateDisplay()
#===============================================================================
# Thread to monitor a running job and figure out when it terminates.
#===============================================================================
class JobEndMonitor( threading.Thread ):
def __init__( self, thisJob, jobName ):
threading.Thread.__init__( self )
self.jobName = jobName
self.thisJob = thisJob
def run( self ):
self.quitNow = False
self.failure = False
while not self.thisJob.jobComplete and not self.quitNow:
try:
time.sleep( 0.1 )
except KeyboardInterrupt:
self.quitNow = True
self.thisJob.wait = self.thisJob.wait - 0.1
if self.thisJob.wait < 0.0:
self.thisJob.setFinalMessage( thisJob.RUN_DIFX_NOT_RESPONDING )
self.failure = True
self.quitNow = True
produceFinalStatus( self.thisJob, self.jobName, self.failure )
self.thisJob.closeChannel()
def messageCallback( argstr ):
print("MESSAGE:", argstr)
def warningCallback( argstr ):
print("WARNING:", argstr)
def errorCallback( argstr ):
print("ERROR:", argstr)
def failureCallback( arg ):
connectionFailure = True
produceFinalStatus( None, None, False )
print("GET OUT OF HERE NOW!!!")
#===============================================================================
# MAIN
#===============================================================================
host = None
port = None
dataSources = []
processors = 2
maxProcessors = None
maxProcessorsAll = False
usedAsDataSources = 2
headNode = None
inputFiles = None
bail = False
generate = True
timeout = 300.0
# Locate a "default" DiFX Version from environment variables. User may change this
# with command line arguments.
try:
DiFXVersion = os.environ["DIFX_VERSION"]
except:
DiFXVersion = "DIFX-DEVEL"
try:
i = 1
otherArgs = []
argStr = None
pathStr = None
while i < len( sys.argv ):
# Check against legal argument types. Anything we don't recognize is assumed
# to be an argument or a path.
if sys.argv[i] in [ "-h", "--help" ]:
print('\n%s ver %s %s %s' % (program, version, author, verdate))
print("Run a job on the DiFX software correlator using its .input file path.")
print("Usage: %s [options] <.input path>" % ( sys.argv[0] ))
print("")
print("Options can include:")
print("")
print(" --all")
print(" -a Run the entire list of .input files using all available")
print(" processors, rather than trying to balance or distribute")
print(" processing.")
print("")
print(" --config_only")
print(" -c Run the configuration test on each job ONLY. The job will")
print(" not actually be run. The test results (pass/fail) will be")
print(" displayed.")
print("")
print(" --datasource NODE")
print(" -d NODE Add a data source node to the list of such sources.")
print("")
print(" --difx VERSION")
print(" -D VERSION Run using a specific DiFX version. If not specified")
print(" the value of the DIFX_VERSION environment variable will")
print(" be used. Failing that, \"DIFX-DEVEL\" will be used.")
print("")
print(" --eliminate NODE")
print(" -e NODE Eliminate node names matching the regular expression NODE")
print(" from the list of nodes that will be used for processing.")
print("")
print(" --generate")
print(" -g Use whatever existing .machines and .threads files exist")
print(" for each job. If no such files exist, new ones will be")
print(" created.")
print("")
print(" --help")
print(" -h Print this help information and quit.")
print("")
print(" --hostname NAME")
print(" -H NAME Use NAME as the host of the difxServer program.")
print(" Default is to use DIFX_CONTROL_HOST environment variable.")
print("")
print(" --manager NODE")
print(" -m NODE Use NODE as the \"head node\" (or \"manager\" node)")
print("")
print(" --processors NUM[,MAX]")
print(" -p NUM Use NUM processors for working on each job. By default")
print(" two will be used. If MAX is included, NUM will become")
print(" the minimum number of processors to be used and MAX will")
print(" become the maximum used (and the desired number to use,")
print(" depending on availability). If MAX is the character \"#\"")
print(" then all available processors will be the maximum.")
print("")
print(" --port PORT")
print(" -P PORT Use PORT as the TCP port to communicated with the difxServer.")
print(" Default is to use DIFX_CONTROL_PORT environment variable.")
print("")
print(" --sequential")
print(" -s Process jobs sequentially instead of attempting to do them")
print(" simultaneously (default is simultaneous).")
print("")
exit( 0 )
elif sys.argv[i] in [ "-a", "--all" ]:
runAll = True
#showProcessorUsage = False
i = i + 1
elif sys.argv[i] in [ "-c", "--config_only" ]:
configOnly = True
i = i + 1
elif sys.argv[i] in [ "-g", "--generate" ]:
generate = False
i = i + 1
elif sys.argv[i] in [ "-d", "--datasource" ]:
dataSources.append( sys.argv[i+1] )
i = i + 2
elif sys.argv[i] in [ "-e", "--eliminate" ]:
eliminateNodes.append( re.compile( sys.argv[i+1] ) )
i = i + 2
elif sys.argv[i] in [ "-p", "--processors" ]:
# See if the string argument contains a comma, indicating two numbers.
try:
cpt = sys.argv[i+1].find( "," )
processors = int( sys.argv[i+1][:cpt])
if sys.argv[i+1][cpt+1:] == "#":
maxProcessorsAll = True
else:
maxProcessors = int( sys.argv[i+1][cpt+1:])
except:
processors = int( sys.argv[i+1] )
i = i + 2
elif sys.argv[i] in [ "-H", "--hostname" ]:
host = sys.argv[i+1]
i = i + 2
elif sys.argv[i] in [ "-m", "--manager" ]:
headNode = sys.argv[i+1]
i = i + 2
elif sys.argv[i] in [ "-D", "--difx" ]:
DiFXVersion = sys.argv[i+1]
i = i + 2
elif sys.argv[i] in [ "-P", "--port" ]:
port = int( sys.argv[i+1] )
i = i + 2
elif sys.argv[i] in [ "-s", "--sequential" ]:
sequential = True
#showProcessorUsage = False
i = i + 1
elif sys.argv[i] in [ "-t", "--timeout" ]:
timeout = int( sys.argv[i+1] )
i = i + 2
else:
# The final argument is assumed to be the .input file list
inputFiles = sys.argv[i]
i = i + 1
except RuntimeError:
print("Usage: %s [options] <.input path>" % ( sys.argv[0] ))
exit( 0 )
# Check a few things...
if inputFiles == None:
print("ERROR: An .input file list is required to identify the jobs")
bail = True
if bail:
print("Usage: %s [options] <.input path>" % ( sys.argv[0] ))
exit( 0 )
# Start the JobControl class, set the version, etc.
print("Making client connection...")
difx = DiFXJobControl.Client()
difx.connect()
difx.monitor()
difx.version( DiFXVersion )
difx.addFailCallback( failureCallback )
if not difx.socketOK:
difx.close()
exit( 0 )
#difx.messageCallback( messageCallback )
#difx.warningCallback( warningCallback )
#difx.errorCallback( errorCallback )
# Identify the node running the server - this will become the head node unless
# the user has selected otherwise.
if len( difx.serverEnvironment ):
for key in list(difx.serverEnvironment.keys()):
if key.upper() == "HOSTNAME":
# Put this machine in our list of processors with a guess as to the number
# of cores. This number should be fixed later with DifxLoadMessages.
if keepThisNode( difx.serverEnvironment[key] ):
processorsList[difx.serverEnvironment[key]] = ( 8, [], [] )
# Make it the head node unless the user has specified one.
if headNode == None:
headNode = difx.serverEnvironment[key]
if headNode == None:
print("cannot identify a head node - please use \"-m\" to specify one.")
difx.close()
exit( 0 )
difx.messageSelection( ( "DifxStatusMessage", "Mark5StatusMessage", "DifxLoadMessage" ) )
responder = Responder()
difx.addRelayCallback( responder.difxRelayCallback )
difx.relayPackets()
difx.waitTime( timeout )
# Get a list of all .input files that match the user request.
print("Locating .input files...")
lsList = difx.ls( inputFiles )
# It is an error if no matches exist.
if lsList == None:
print("ERROR: None of the requested .input files were found on the server.")
difx.close()
exit( 0 )
# Compile a dictionary of job names - these will be used to associate message traffic
# with each job.
for inputFile in lsList:
jobs[inputFile[inputFile.rfind( "/" ) + 1:inputFile.rfind( "." )]] = ( "not started", 0.0, False )
# Wait until sufficient processors are listed.
keepGoing = True
explained = False
strlength = 0
if not configOnly:
while keepGoing and len( list(processorsList.keys()) ) < processors:
if not explained:
newstr = "Waiting for at least " + str( processors ) + " processors to be available.....currently "
strlength = len( newstr )
explained = True
sys.stdout.write( newstr + str( len( list(processorsList.keys()) ) ) + "\r" )
try:
time.sleep( .1 )
except KeyboardInterrupt:
keepGoing = False
if explained:
if keepGoing:
print(newstr + str( len( list(processorsList.keys()) ) ))
else:
print("\nKeyboard Interrupt!")
difx.close()
exit( 0 )
# This is where we actually run things.
for inputFile in lsList:
if not connectionFailure:
difx.inputFile( inputFile )
# Find the job name
jobName = inputFile[inputFile.rfind( "/" ) + 1:inputFile.rfind( "." )]
jobs[jobName] = ( "Initializing", 0.0, False )
# See if we need to generate .machines and .threads files because they are missing.
print("trying to locate machines files")
if not difx.getMachines():
generate = True
# Assign machines and threads based on our instructions.
if generate:
difx.setHeadNode( headNode )
difx.clearDataSources()
difx.clearProcessors()
if runAll:
# Make the first processors data sources unless we have only
# one...in which case use that.
count = 0
for proc in list(processorsList.keys()):
if count < usedAsDataSources:
difx.addDataSource( proc )
count += 1
# Make every machine a processor.
for proc in list(processorsList.keys()):
difx.addProcessor( proc, processorsList[proc][0] )
processorsList[proc][1].append( jobName )
processorsList[proc] = ( processorsList[proc][0], processorsList[proc][1], processorsList[proc][2] )
difx.defineMachines()
thisJob = difx.newJob()
if configOnly:
thisJob.configOnly( True )
mon = JobEndMonitor( thisJob, jobName )
mon.start()
thisJob.start( False )
elif sequential:
# Allocated the requested number of data sources.
count = 0
for proc in list(processorsList.keys()):
if count < usedAsDataSources:
difx.addDataSource( proc )
count += 1
# Make every machine a processor until we reach the number specified by the user.
count = 0
if maxProcessors != None:
desiredProcessors = maxProcessors
elif maxProcessorsAll:
desiredProcessors = len( list(processorsList.keys()) )
else:
desiredProcessors = processors
for proc in list(processorsList.keys()):
if count < desiredProcessors:
difx.addProcessor( proc, processorsList[proc][0] )
processorsList[proc][1].append( jobName )
processorsList[proc] = ( processorsList[proc][0], processorsList[proc][1], processorsList[proc][2] )
count += 1
difx.defineMachines()
thisJob = difx.newJob()
if configOnly:
thisJob.configOnly( True )
thisJob.start( True )
# Wait a second to allow any job-related messages to come through.
time.sleep( 1 )
produceFinalStatus( thisJob, jobName, False )
else:
notStarted = True
while notStarted and not connectionFailure:
# Make sure sufficient processors are idle (not working on another job)
# before we schedule them here.
count = 0
goodList = []
for proc in list(processorsList.keys()):
if len( processorsList[proc][1] ) == 0:
count += 1
goodList.append( proc )
# Only continue if we have sufficient free processors.
if count >= usedAsDataSources and count >= processors:
count = 0
for proc in goodList:
if count < usedAsDataSources:
difx.addDataSource( proc )
count += 1
count = 0
if maxProcessors != None:
desiredProcessors = maxProcessors
else:
desiredProcessors = processors
for proc in goodList:
if count < desiredProcessors:
difx.addProcessor( proc, processorsList[proc][0] )
processorsList[proc][1].append( jobName )
processorsList[proc] = ( processorsList[proc][0], processorsList[proc][1], processorsList[proc][2] )
count += 1
difx.defineMachines()
thisJob = difx.newJob()
if configOnly:
thisJob.configOnly( True )
mon = JobEndMonitor( thisJob, jobName )
mon.start()
thisJob.start( False )
notStarted = False
else:
try:
time.sleep( 1 )
except KeyboardInterrupt:
difx.close()
exit( 0 )
# Hang out until all jobs are complete.
keepGoing = True
while keepGoing:
try:
time.sleep( 1 )
somethingGoing = False
for job in list(jobs.keys()):
if jobs[job][2] != True:
somethingGoing = True
if not somethingGoing:
keepGoing = False
except KeyboardInterrupt:
keepGoing = False
responder.updateDisplay()
difx.close()