#!/usr/bin/env python #\if DOXYGEN_IGNORE ############################################################ # # # Copyright (C) 2016 by John Spitzak # # # # This program is free software; you can redistribute it and/or modify # # it under the terms of the GNU General Public License as published by # # the Free Software Foundation; either version 3 of the License, or # # (at your option) any later version. # # # # This program is distributed in the hope that it will be useful, # # but WITHOUT ANY WARRANTY; without even the implied warranty of # # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # # GNU General Public License for more details. # # # # You should have received a copy of the GNU General Public License # # along with this program; if not, write to the # # Free Software Foundation, Inc., # # 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # # # #\endif ######################################################################## ################################################################################ #\defgroup difxrun DiFXrun # #\brief Run a list of jobs on the DiFX server with limited controls. # # Usage: DiFXrun [options] [.input path list] # # DiFXrun runs a list of DiFX jobs, specified by .input file paths, on a DiFX cluster. # By default it tries to distribute the jobs on the cluster in a reasonable way such that the cluster will not # be overwhelmed, or it will do so based on # user command-line settings (reasonable or otherwise). Jobs are distributed amongst # existing DiFX cluster processors by actively altering the .machines and .threads files associated # with them. There are three basic # "modes" of operation: # #

Scheduler Mode (Default)

# # By default DiFXrun runs a primitive scheduler. It attempts to assign # a number of processor nodes to each job (by default this number is 2 - it can # be set with the "-p" command-line option) before running them. It will continue to do this # until it runs out of jobs (in which case it is done), or until it runs out of processors. # When the latter happens DiFXrun will wait to start any remaining jobs until sufficient # allocated resources are freed as previous jobs complete. # #
![DiFXrun running a series of jobs in scheduler mode (6 processors used for each).](DiFXrun_default.png)
# #

Sequential Mode

# # In sequential mode (selected by the command-line option "-s"), DiFXrun will run only one job at a time # until all are done. Two processing nodes will be used on each job unless # otherwise requested using the "-p" option. # #
![DiFXrun running a series of jobs in sequential mode (6 processors used for each).](DiFXrun_sequential.png)
# #

"All" Mode

# # If the "-a" command line option is used (for "all" mode), DiFXrun will simultaneously run # all jobs listed using all available processors. This works okay and might be the fastest way to run a limited list # of jobs but breaks down rapidly as the list gets long. Remember, if you use this option you are starting # all listed jobs at the same time. # #
![DiFXrun running a series of jobs simultaneously in \"all\" mode (all processors employed).](DiFXrun_all.png)
# # In all cases DiFXrun applies all available processor threads to each job # being run on a processor (the number of threads comes from the number of "cores" of each # machine). Machines assigned as data sources will be treated as regular # processors - no accommodation is made for the load put on a machine by its being # a data source (experience has shown this to be an acceptable assumption). The # "head node" is also treated as a processor. # # The list of .input files that will be run is specified using a # regular expression, and needs to include the full path to the files. # Matching .input files actually located on the server will be run. # Because of the infinite possible matches that can occur with regular expressions # (depending on how you word them), # no warnings will be given about "missing" .input files (file names that # match the regular expression but do not exist). # # To detect processors, DiFXrun depends on mk5daemon being run on each. # If mk5daemon is missing on a processor, DiFXrun will simply never see # it, and will be unable to utilize it. # #

Feedback

# # For each job it runs, DiFXrun provides a text-based "progress bar", as well as a # status and completion percentage. In addition, a node-usage map shows which processors # a job is currently using (the "*" character), which processors it used (the "O" character), # and which processors it did not touch (the "." character). The nodes in the node-usage # map are sorted by name, but nodes are chosen for processing in the order messages are # received from them, so the nodes used for a job will often be scattered about the map. # # All of these displays are updated when new job-related information is received. # #

If A Job Stops Responding

# # There are myriad (and sometimes unknown) reasons why a DiFX job might stop or stop # sending feedback. To avoid having DiFXrun hang forever on a job that will # never report completion, failure, or whatever hung state it is in, a timeout scheme # is implemented. When a job has been silent for a timeout interval (you can set the # interval using the -t argument, by default it is 300 seconds), its state will be # reported as "Not Responding" and an effort will be made to shut the job, or whatever # is left of the job down on the server (using aggressive kill signals). # #

What If I Want To Share the Correlator?

# # DiFXrun assumes that it has the entire correlator to itself, so that any processor # it detects it will try to use for correlating jobs. You can, however, block processors # by name, and thus utilize only a portion of the correlator. To do this, use the "-e" # option, and list the node names you do not wish to use using a regular expression. If # two or more parties agree on named blocks of processors that are allocated to them # (and more importantly which ones are NOT allocated to them), they can use this option # to keep multiple sessions of DiFXrun from stepping # on one another. # #

Command Line Arguments

# # #
-a, --all
Run the entire list of .input files using all available # processors, rather than trying to balance or distribute # processing. #
-c, --config_only
Run the configuration test on each job ONLY. The job will # not actually be run. The test results (pass/fail) will be # displayed. #
-d, --datasource NODE
Use NODE as a data sources. This is needed if mark5 modules # are employed or if only specific processor nodes have access to data # files. This option must be used once for each data source. Data sources # will be blindly used in the order they are specified. #
-D, --difx VERSION
Run using a specific DiFX version. If not specified # the value of the DIFX_VERSION environment variable will # be used. Failing that, "DIFX-DEVEL" will be used. #
-e, --eliminate NODE
Eliminate node names matching the regular expression NODE # from the list of nodes that will be used for processing. This # argument can be used multiple times to include multiple regular expressions. #
-g, --generate
Use whatever existing .machines and .threads files exist # for each job. If no such files exist, new ones will be # created (this option is actually "don't generate" but that was too clumsy). #
-h, --help
Print help information and quit. #
-H, --hostname NAME
Use NAME as the host of the DiFX Server program. # Default is to use DIFX_CONTROL_HOST environment variable. #
-m, --manager NODE
Make NODE the head node used for processing. If this is # not specified DiFXrun will use the node where guiServer # is located. #
-p, --processors NUM
Use NUM processors for each job. If this option is not used # DiFXrun will use two (an arbitrary number, admittedly). #
-P, --port PORT
Use PORT as the TCP port to communicated with the DiFX Server. # Default is to use DIFX_CONTROL_PORT environment variable. #
-s, --sequential
Run jobs sequentially instead of simultaneously (which is the # default). All processors will be employed unless
-p
specifies # a number. #
-t, --timeout SEC
Use SEC seconds as the timeout value for each job. This is the # amount of time DiFXrun will wait before it gives up on a # "silent" (i.e. no messages received from) job and declares it # non-responsive. Default value is 300.0. #
0: # This little mess clears the screen (cross-platform I think) and starts # subsequent printing at the top. os.system( 'cls' if os.name == 'nt' else 'clear' ) # Find some ideal column sizes. maxCol1 = 12 maxCol2 = len( "Initializing" ) # Find some ideal column sizes for job in jobs.keys(): if len( job ) > maxCol1: maxCol1 = len( job ) if len( jobs[job][0] ) > maxCol2: maxCol2 = len( jobs[job][0] ) for job in sorted( jobs.keys() ): newStr = str( job ) while len( newStr ) < maxCol1 + 3: newStr += " " shortStr = jobs[job][0] while len( shortStr ) < maxCol2 + 3: shortStr += " " newStr += shortStr prog = int( jobs[job][1] ) if prog < 100: newStr += " " if prog < 10: newStr += " " if jobs[job][0] != "not started" and not configOnly: newStr += str( prog ) + "% [" shortStr = "" if prog > 100: prog = 100 if prog < 0: prog = 0 while prog > 0: shortStr += "X" prog -= 2 while len( shortStr ) < 50: shortStr += " " newStr += shortStr + "]" if showProcessorUsage: newStr += " [" for proc in sorted( processorsList.keys() ): try: index = processorsList[proc][1].index( job ) newStr += "*" except: try: index = processorsList[proc][2].index( job ) newStr += "O" except: newStr += "." newStr += "]" print newStr #=============================================================================== # Get the final status of this job and cause the job display to reflect it. #=============================================================================== def produceFinalStatus( thisJob, jobName, failure ): if thisJob == None and jobName == None: # A call with "None" occurs when there is a connection problem. Close # all of the open jobs "as is" so the program will terminate. for job in jobs.keys(): print str( jobs[job] ) if not jobs[job][2]: jobs[job] = ( "Broken Socket", jobs[job][1], True ) return elif failure: jobs[jobName] = ( "Not Responding", jobs[jobName][1], True ) difx.stop( thisJob.inputFile ) elif jobs[jobName][2]: pass else: if thisJob.finalMessage() == thisJob.RUN_DIFX_JOB_ENDED_GRACEFULLY: jobs[jobName] = ( "Done", 100.0, True ) elif thisJob.finalMessage() == thisJob.RUN_DIFX_JOB_ENDED_WITH_ERRORS: jobs[jobName] = ( "Ran w/Errors", 100.0, True ) elif thisJob.finalMessage() == thisJob.RUN_DIFX_JOB_FAILED: jobs[jobName] = ( "Failed", jobs[jobName][1], True ) elif thisJob.finalMessage() == thisJob.RUN_DIFX_JOB_TERMINATED: jobs[jobName] = ( "Terminated", jobs[jobName][1], True ) elif thisJob.finalMessage() == thisJob.RUN_DIFX_FAILURE_INPUTFILE_BAD_CONFIG: jobs[jobName] = ( "Config Failed", jobs[jobName][1], True ) elif thisJob.finalMessage() == thisJob.RUN_DIFX_CONFIG_PASSED: jobs[jobName] = ( "Config Passed", jobs[jobName][1], True ) elif thisJob.finalMessage() == thisJob.RUN_DIFX_NOT_RESPONDING: jobs[jobName] = ( "Not Responding", jobs[jobName][1], True ) difx.stop( thisJob.inputFile ) if responder != None: responder.updateDisplay() #=============================================================================== # Thread to monitor a running job and figure out when it terminates. #=============================================================================== class JobEndMonitor( threading.Thread ): def __init__( self, thisJob, jobName ): threading.Thread.__init__( self ) self.jobName = jobName self.thisJob = thisJob def run( self ): self.quitNow = False self.failure = False while not self.thisJob.jobComplete and not self.quitNow: try: time.sleep( 0.1 ) except KeyboardInterrupt: self.quitNow = True self.thisJob.wait = self.thisJob.wait - 0.1 if self.thisJob.wait < 0.0: self.thisJob.setFinalMessage( thisJob.RUN_DIFX_NOT_RESPONDING ) self.failure = True self.quitNow = True produceFinalStatus( self.thisJob, self.jobName, self.failure ) self.thisJob.closeChannel() def messageCallback( argstr ): print "MESSAGE:", argstr def warningCallback( argstr ): print "WARNING:", argstr def errorCallback( argstr ): print "ERROR:", argstr def failureCallback( arg ): connectionFailure = True produceFinalStatus( None, None, False ) print "GET OUT OF HERE NOW!!!" #=============================================================================== # MAIN #=============================================================================== host = None port = None dataSources = [] processors = 2 maxProcessors = None maxProcessorsAll = False usedAsDataSources = 2 headNode = None inputFiles = None bail = False generate = True timeout = 300.0 # Locate a "default" DiFX Version from environment variables. User may change this # with command line arguments. try: DiFXVersion = os.environ["DIFX_VERSION"] except: DiFXVersion = "DIFX-DEVEL" try: i = 1 otherArgs = [] argStr = None pathStr = None while i < len( sys.argv ): # Check against legal argument types. Anything we don't recognize is assumed # to be an argument or a path. if sys.argv[i] in [ "-h", "--help" ]: print '\n%s ver %s %s %s' % (program, version, author, verdate) print "Run a job on the DiFX software correlator using its .input file path." print "Usage: %s [options] <.input path>" % ( sys.argv[0] ) print "" print "Options can include:" print "" print " --all" print " -a Run the entire list of .input files using all available" print " processors, rather than trying to balance or distribute" print " processing." print "" print " --config_only" print " -c Run the configuration test on each job ONLY. The job will" print " not actually be run. The test results (pass/fail) will be" print " displayed." print "" print " --datasource NODE" print " -d NODE Add a data source node to the list of such sources." print "" print " --difx VERSION" print " -D VERSION Run using a specific DiFX version. If not specified" print " the value of the DIFX_VERSION environment variable will" print " be used. Failing that, \"DIFX-DEVEL\" will be used." print "" print " --eliminate NODE" print " -e NODE Eliminate node names matching the regular expression NODE" print " from the list of nodes that will be used for processing." print "" print " --generate" print " -g Use whatever existing .machines and .threads files exist" print " for each job. If no such files exist, new ones will be" print " created." print "" print " --help" print " -h Print this help information and quit." print "" print " --hostname NAME" print " -H NAME Use NAME as the host of the difxServer program." print " Default is to use DIFX_CONTROL_HOST environment variable." print "" print " --manager NODE" print " -m NODE Use NODE as the \"head node\" (or \"manager\" node)" print "" print " --processors NUM[,MAX]" print " -p NUM Use NUM processors for working on each job. By default" print " two will be used. If MAX is included, NUM will become" print " the minimum number of processors to be used and MAX will" print " become the maximum used (and the desired number to use," print " depending on availability). If MAX is the character \"#\"" print " then all available processors will be the maximum." print "" print " --port PORT" print " -P PORT Use PORT as the TCP port to communicated with the difxServer." print " Default is to use DIFX_CONTROL_PORT environment variable." print "" print " --sequential" print " -s Process jobs sequentially instead of attempting to do them" print " simultaneously (default is simultaneous)." print "" exit( 0 ) elif sys.argv[i] in [ "-a", "--all" ]: runAll = True #showProcessorUsage = False i = i + 1 elif sys.argv[i] in [ "-c", "--config_only" ]: configOnly = True i = i + 1 elif sys.argv[i] in [ "-g", "--generate" ]: generate = False i = i + 1 elif sys.argv[i] in [ "-d", "--datasource" ]: dataSources.append( sys.argv[i+1] ) i = i + 2 elif sys.argv[i] in [ "-e", "--eliminate" ]: eliminateNodes.append( re.compile( sys.argv[i+1] ) ) i = i + 2 elif sys.argv[i] in [ "-p", "--processors" ]: # See if the string argument contains a comma, indicating two numbers. try: cpt = sys.argv[i+1].find( "," ) processors = int( sys.argv[i+1][:cpt]) if sys.argv[i+1][cpt+1:] == "#": maxProcessorsAll = True else: maxProcessors = int( sys.argv[i+1][cpt+1:]) except: processors = int( sys.argv[i+1] ) i = i + 2 elif sys.argv[i] in [ "-H", "--hostname" ]: host = sys.argv[i+1] i = i + 2 elif sys.argv[i] in [ "-m", "--manager" ]: headNode = sys.argv[i+1] i = i + 2 elif sys.argv[i] in [ "-D", "--difx" ]: DiFXVersion = sys.argv[i+1] i = i + 2 elif sys.argv[i] in [ "-P", "--port" ]: port = int( sys.argv[i+1] ) i = i + 2 elif sys.argv[i] in [ "-s", "--sequential" ]: sequential = True #showProcessorUsage = False i = i + 1 elif sys.argv[i] in [ "-t", "--timeout" ]: timeout = int( sys.argv[i+1] ) i = i + 2 else: # The final argument is assumed to be the .input file list inputFiles = sys.argv[i] i = i + 1 except RuntimeError: print "Usage: %s [options] <.input path>" % ( sys.argv[0] ) exit( 0 ) # Check a few things... if inputFiles == None: print "ERROR: An .input file list is required to identify the jobs" bail = True if bail: print "Usage: %s [options] <.input path>" % ( sys.argv[0] ) exit( 0 ) # Start the JobControl class, set the version, etc. print "Making client connection..." difx = DiFXJobControl.Client() difx.connect() difx.monitor() difx.version( DiFXVersion ) difx.addFailCallback( failureCallback ) if not difx.socketOK: difx.close() exit( 0 ) #difx.messageCallback( messageCallback ) #difx.warningCallback( warningCallback ) #difx.errorCallback( errorCallback ) # Identify the node running the server - this will become the head node unless # the user has selected otherwise. if len( difx.serverEnvironment ): for key in difx.serverEnvironment.keys(): if key.upper() == "HOSTNAME": # Put this machine in our list of processors with a guess as to the number # of cores. This number should be fixed later with DifxLoadMessages. if keepThisNode( difx.serverEnvironment[key] ): processorsList[difx.serverEnvironment[key]] = ( 8, [], [] ) # Make it the head node unless the user has specified one. if headNode == None: headNode = difx.serverEnvironment[key] if headNode == None: print "cannot identify a head node - please use \"-m\" to specify one." difx.close() exit( 0 ) difx.messageSelection( ( "DifxStatusMessage", "Mark5StatusMessage", "DifxLoadMessage" ) ) responder = Responder() difx.addRelayCallback( responder.difxRelayCallback ) difx.relayPackets() difx.waitTime( timeout ) # Get a list of all .input files that match the user request. print "Locating .input files..." lsList = difx.ls( inputFiles ) # It is an error if no matches exist. if lsList == None: print "ERROR: None of the requested .input files were found on the server." difx.close() exit( 0 ) # Compile a dictionary of job names - these will be used to associate message traffic # with each job. for inputFile in lsList: jobs[inputFile[inputFile.rfind( "/" ) + 1:inputFile.rfind( "." )]] = ( "not started", 0.0, False ) # Wait until sufficient processors are listed. keepGoing = True explained = False strlength = 0 if not configOnly: while keepGoing and len( processorsList.keys() ) < processors: if not explained: newstr = "Waiting for at least " + str( processors ) + " processors to be available.....currently " strlength = len( newstr ) explained = True sys.stdout.write( newstr + str( len( processorsList.keys() ) ) + "\r" ) try: time.sleep( .1 ) except KeyboardInterrupt: keepGoing = False if explained: if keepGoing: print newstr + str( len( processorsList.keys() ) ) else: print "\nKeyboard Interrupt!" difx.close() exit( 0 ) # This is where we actually run things. for inputFile in lsList: if not connectionFailure: difx.inputFile( inputFile ) # Find the job name jobName = inputFile[inputFile.rfind( "/" ) + 1:inputFile.rfind( "." )] jobs[jobName] = ( "Initializing", 0.0, False ) # See if we need to generate .machines and .threads files because they are missing. print "trying to locate machines files" if not difx.getMachines(): generate = True # Assign machines and threads based on our instructions. if generate: difx.setHeadNode( headNode ) difx.clearDataSources() difx.clearProcessors() if runAll: # Make the first processors data sources unless we have only # one...in which case use that. count = 0 for proc in processorsList.keys(): if count < usedAsDataSources: difx.addDataSource( proc ) count += 1 # Make every machine a processor. for proc in processorsList.keys(): difx.addProcessor( proc, processorsList[proc][0] ) processorsList[proc][1].append( jobName ) processorsList[proc] = ( processorsList[proc][0], processorsList[proc][1], processorsList[proc][2] ) difx.defineMachines() thisJob = difx.newJob() if configOnly: thisJob.configOnly( True ) mon = JobEndMonitor( thisJob, jobName ) mon.start() thisJob.start( False ) elif sequential: # Allocated the requested number of data sources. count = 0 for proc in processorsList.keys(): if count < usedAsDataSources: difx.addDataSource( proc ) count += 1 # Make every machine a processor until we reach the number specified by the user. count = 0 if maxProcessors != None: desiredProcessors = maxProcessors elif maxProcessorsAll: desiredProcessors = len( processorsList.keys() ) else: desiredProcessors = processors for proc in processorsList.keys(): if count < desiredProcessors: difx.addProcessor( proc, processorsList[proc][0] ) processorsList[proc][1].append( jobName ) processorsList[proc] = ( processorsList[proc][0], processorsList[proc][1], processorsList[proc][2] ) count += 1 difx.defineMachines() thisJob = difx.newJob() if configOnly: thisJob.configOnly( True ) thisJob.start( True ) # Wait a second to allow any job-related messages to come through. time.sleep( 1 ) produceFinalStatus( thisJob, jobName, False ) else: notStarted = True while notStarted and not connectionFailure: # Make sure sufficient processors are idle (not working on another job) # before we schedule them here. count = 0 goodList = [] for proc in processorsList.keys(): if len( processorsList[proc][1] ) == 0: count += 1 goodList.append( proc ) # Only continue if we have sufficient free processors. if count >= usedAsDataSources and count >= processors: count = 0 for proc in goodList: if count < usedAsDataSources: difx.addDataSource( proc ) count += 1 count = 0 if maxProcessors != None: desiredProcessors = maxProcessors else: desiredProcessors = processors for proc in goodList: if count < desiredProcessors: difx.addProcessor( proc, processorsList[proc][0] ) processorsList[proc][1].append( jobName ) processorsList[proc] = ( processorsList[proc][0], processorsList[proc][1], processorsList[proc][2] ) count += 1 difx.defineMachines() thisJob = difx.newJob() if configOnly: thisJob.configOnly( True ) mon = JobEndMonitor( thisJob, jobName ) mon.start() thisJob.start( False ) notStarted = False else: try: time.sleep( 1 ) except KeyboardInterrupt: difx.close() exit( 0 ) # Hang out until all jobs are complete. keepGoing = True while keepGoing: try: time.sleep( 1 ) somethingGoing = False for job in jobs.keys(): if jobs[job][2] != True: somethingGoing = True if not somethingGoing: keepGoing = False except KeyboardInterrupt: keepGoing = False responder.updateDisplay() difx.close()