#!/usr/bin/env python #\if DOXYGEN_IGNORE ############################################################ # # # Copyright (C) 2016 by John Spitzak # # # # This program is free software; you can redistribute it and/or modify # # it under the terms of the GNU General Public License as published by # # the Free Software Foundation; either version 3 of the License, or # # (at your option) any later version. # # # # This program is distributed in the hope that it will be useful, # # but WITHOUT ANY WARRANTY; without even the implied warranty of # # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # # GNU General Public License for more details. # # # # You should have received a copy of the GNU General Public License # # along with this program; if not, write to the # # Free Software Foundation, Inc., # # 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # # # #\endif ######################################################################## ################################################################################ #\defgroup difxrun DiFXrun # #\brief Run a list of jobs on the DiFX server with limited controls. # # Usage: DiFXrun [options] [.input path list] # # DiFXrun runs a list of DiFX jobs, specified by .input file paths, on a DiFX cluster. # By default it tries to distribute the jobs on the cluster in a reasonable way such that the cluster will not # be overwhelmed, or it will do so based on # user command-line settings (reasonable or otherwise). Jobs are distributed amongst # existing DiFX cluster processors by actively altering the .machines and .threads files associated # with them. There are three basic # "modes" of operation: # #

Scheduler Mode (Default)

# # By default DiFXrun runs a primitive scheduler. It attempts to assign # a number of processor nodes to each job (by default this number is 2 - it can # be set with the "-p" command-line option) before running them. It will continue to do this # until it runs out of jobs (in which case it is done), or until it runs out of processors. # When the latter happens DiFXrun will wait to start any remaining jobs until sufficient # allocated resources are freed as previous jobs complete. # #
![DiFXrun running a series of jobs in scheduler mode (6 processors used for each).](DiFXrun_default.png)
# #

Sequential Mode

# # In sequential mode (selected by the command-line option "-s"), DiFXrun will run only one job at a time # until all are done. Two processing nodes will be used on each job unless # otherwise requested using the "-p" option. # #
![DiFXrun running a series of jobs in sequential mode (6 processors used for each).](DiFXrun_sequential.png)
# #

"All" Mode

# # If the "-a" command line option is used (for "all" mode), DiFXrun will simultaneously run # all jobs listed using all available processors. This works okay and might be the fastest way to run a limited list # of jobs but breaks down rapidly as the list gets long. Remember, if you use this option you are starting # all listed jobs at the same time. # #
![DiFXrun running a series of jobs simultaneously in \"all\" mode (all processors employed).](DiFXrun_all.png)
# # In all cases DiFXrun applies all available processor threads to each job # being run on a processor (the number of threads comes from the number of "cores" of each # machine). Machines assigned as data sources will be treated as regular # processors - no accommodation is made for the load put on a machine by its being # a data source (experience has shown this to be an acceptable assumption). The # "head node" is also treated as a processor. # # The list of .input files that will be run is specified using a # regular expression, and needs to include the full path to the files. # Matching .input files actually located on the server will be run. # Because of the infinite possible matches that can occur with regular expressions # (depending on how you word them), # no warnings will be given about "missing" .input files (file names that # match the regular expression but do not exist). # # To detect processors, DiFXrun depends on mk5daemon being run on each. # If mk5daemon is missing on a processor, DiFXrun will simply never see # it, and will be unable to utilize it. # #

Feedback

# # For each job it runs, DiFXrun provides a text-based "progress bar", as well as a # status and completion percentage. In addition, a node-usage map shows which processors # a job is currently using (the "*" character), which processors it used (the "O" character), # and which processors it did not touch (the "." character). The nodes in the node-usage # map are sorted by name, but nodes are chosen for processing in the order messages are # received from them, so the nodes used for a job will often be scattered about the map. # # All of these displays are updated when new job-related information is received. # #

If A Job Stops Responding

# # There are myriad (and sometimes unknown) reasons why a DiFX job might stop or stop # sending feedback. To avoid having DiFXrun hang forever on a job that will # never report completion, failure, or whatever hung state it is in, a timeout scheme # is implemented. When a job has been silent for a timeout interval (you can set the # interval using the -t argument, by default it is 300 seconds), its state will be # reported as "Not Responding" and an effort will be made to shut the job, or whatever # is left of the job down on the server (using aggressive kill signals). # #

What If I Want To Share the Correlator?

# # DiFXrun assumes that it has the entire correlator to itself, so that any processor # it detects it will try to use for correlating jobs. You can, however, block processors # by name, and thus utilize only a portion of the correlator. To do this, use the "-e" # option, and list the node names you do not wish to use using a regular expression. If # two or more parties agree on named blocks of processors that are allocated to them # (and more importantly which ones are NOT allocated to them), they can use this option # to keep multiple sessions of DiFXrun from stepping # on one another. # #

Command Line Arguments

# # #
-a, --all
Run the entire list of .input files using all available # processors, rather than trying to balance or distribute # processing. #
-c, --config_only
Run the configuration test on each job ONLY. The job will # not actually be run. The test results (pass/fail) will be # displayed. #
-d, --datasource NODE
Use NODE as a data sources. This is needed if mark5 modules # are employed or if only specific processor nodes have access to data # files. This option must be used once for each data source. Data sources # will be blindly used in the order they are specified. #
-D, --difx VERSION
Run using a specific DiFX version. If not specified # the value of the DIFX_VERSION environment variable will # be used. Failing that, "DIFX-DEVEL" will be used. #
-e, --eliminate NODE
Eliminate node names matching the regular expression NODE # from the list of nodes that will be used for processing. This # argument can be used multiple times to include multiple regular expressions. #
-g, --generate
Use whatever existing .machines and .threads files exist # for each job. If no such files exist, new ones will be # created (this option is actually "don't generate" but that was too clumsy). #
-h, --help
Print help information and quit. #
-H, --hostname NAME
Use NAME as the host of the DiFX Server program. # Default is to use DIFX_CONTROL_HOST environment variable. #
-m, --manager NODE
Make NODE the head node used for processing. If this is # not specified DiFXrun will use the node where guiServer # is located. #
-p, --processors NUM
Use NUM processors for each job. If this option is not used # DiFXrun will use two (an arbitrary number, admittedly). #
-P, --port PORT
Use PORT as the TCP port to communicated with the DiFX Server. # Default is to use DIFX_CONTROL_PORT environment variable. #
-s, --sequential
Run jobs sequentially instead of simultaneously (which is the # default). All processors will be employed unless
-p
specifies # a number. #
-t, --timeout SEC
Use SEC seconds as the timeout value for each job. This is the # amount of time DiFXrun will wait before it gives up on a # "silent" (i.e. no messages received from) job and declares it # non-responsive. Default value is 300.0. #
0: # This little mess clears the screen (cross-platform I think) and starts # subsequent printing at the top. os.system( 'cls' if os.name == 'nt' else 'clear' ) # Find some ideal column sizes. maxCol1 = 12 maxCol2 = len( "Initializing" ) # Find some ideal column sizes for job in list(jobs.keys()): if len( job ) > maxCol1: maxCol1 = len( job ) if len( jobs[job][0] ) > maxCol2: maxCol2 = len( jobs[job][0] ) for job in sorted( jobs.keys() ): newStr = str( job ) while len( newStr ) < maxCol1 + 3: newStr += " " shortStr = jobs[job][0] while len( shortStr ) < maxCol2 + 3: shortStr += " " newStr += shortStr prog = int( jobs[job][1] ) if prog < 100: newStr += " " if prog < 10: newStr += " " if jobs[job][0] != "not started" and not configOnly: newStr += str( prog ) + "% [" shortStr = "" if prog > 100: prog = 100 if prog < 0: prog = 0 while prog > 0: shortStr += "X" prog -= 2 while len( shortStr ) < 50: shortStr += " " newStr += shortStr + "]" if showProcessorUsage: newStr += " [" for proc in sorted( processorsList.keys() ): try: index = processorsList[proc][1].index( job ) newStr += "*" except: try: index = processorsList[proc][2].index( job ) newStr += "O" except: newStr += "." newStr += "]" print(newStr) #=============================================================================== # Get the final status of this job and cause the job display to reflect it. #=============================================================================== def produceFinalStatus( thisJob, jobName, failure ): if thisJob == None and jobName == None: # A call with "None" occurs when there is a connection problem. Close # all of the open jobs "as is" so the program will terminate. for job in list(jobs.keys()): print(str( jobs[job] )) if not jobs[job][2]: jobs[job] = ( "Broken Socket", jobs[job][1], True ) return elif failure: jobs[jobName] = ( "Not Responding", jobs[jobName][1], True ) difx.stop( thisJob.inputFile ) elif jobs[jobName][2]: pass else: if thisJob.finalMessage() == thisJob.RUN_DIFX_JOB_ENDED_GRACEFULLY: jobs[jobName] = ( "Done", 100.0, True ) elif thisJob.finalMessage() == thisJob.RUN_DIFX_JOB_ENDED_WITH_ERRORS: jobs[jobName] = ( "Ran w/Errors", 100.0, True ) elif thisJob.finalMessage() == thisJob.RUN_DIFX_JOB_FAILED: jobs[jobName] = ( "Failed", jobs[jobName][1], True ) elif thisJob.finalMessage() == thisJob.RUN_DIFX_JOB_TERMINATED: jobs[jobName] = ( "Terminated", jobs[jobName][1], True ) elif thisJob.finalMessage() == thisJob.RUN_DIFX_FAILURE_INPUTFILE_BAD_CONFIG: jobs[jobName] = ( "Config Failed", jobs[jobName][1], True ) elif thisJob.finalMessage() == thisJob.RUN_DIFX_CONFIG_PASSED: jobs[jobName] = ( "Config Passed", jobs[jobName][1], True ) elif thisJob.finalMessage() == thisJob.RUN_DIFX_NOT_RESPONDING: jobs[jobName] = ( "Not Responding", jobs[jobName][1], True ) difx.stop( thisJob.inputFile ) if responder != None: responder.updateDisplay() #=============================================================================== # Thread to monitor a running job and figure out when it terminates. #=============================================================================== class JobEndMonitor( threading.Thread ): def __init__( self, thisJob, jobName ): threading.Thread.__init__( self ) self.jobName = jobName self.thisJob = thisJob def run( self ): self.quitNow = False self.failure = False while not self.thisJob.jobComplete and not self.quitNow: try: time.sleep( 0.1 ) except KeyboardInterrupt: self.quitNow = True self.thisJob.wait = self.thisJob.wait - 0.1 if self.thisJob.wait < 0.0: self.thisJob.setFinalMessage( thisJob.RUN_DIFX_NOT_RESPONDING ) self.failure = True self.quitNow = True produceFinalStatus( self.thisJob, self.jobName, self.failure ) self.thisJob.closeChannel() def messageCallback( argstr ): print("MESSAGE:", argstr) def warningCallback( argstr ): print("WARNING:", argstr) def errorCallback( argstr ): print("ERROR:", argstr) def failureCallback( arg ): connectionFailure = True produceFinalStatus( None, None, False ) print("GET OUT OF HERE NOW!!!") #=============================================================================== # MAIN #=============================================================================== host = None port = None dataSources = [] processors = 2 maxProcessors = None maxProcessorsAll = False usedAsDataSources = 2 headNode = None inputFiles = None bail = False generate = True timeout = 300.0 # Locate a "default" DiFX Version from environment variables. User may change this # with command line arguments. try: DiFXVersion = os.environ["DIFX_VERSION"] except: DiFXVersion = "DIFX-DEVEL" try: i = 1 otherArgs = [] argStr = None pathStr = None while i < len( sys.argv ): # Check against legal argument types. Anything we don't recognize is assumed # to be an argument or a path. if sys.argv[i] in [ "-h", "--help" ]: print('\n%s ver %s %s %s' % (program, version, author, verdate)) print("Run a job on the DiFX software correlator using its .input file path.") print("Usage: %s [options] <.input path>" % ( sys.argv[0] )) print("") print("Options can include:") print("") print(" --all") print(" -a Run the entire list of .input files using all available") print(" processors, rather than trying to balance or distribute") print(" processing.") print("") print(" --config_only") print(" -c Run the configuration test on each job ONLY. The job will") print(" not actually be run. The test results (pass/fail) will be") print(" displayed.") print("") print(" --datasource NODE") print(" -d NODE Add a data source node to the list of such sources.") print("") print(" --difx VERSION") print(" -D VERSION Run using a specific DiFX version. If not specified") print(" the value of the DIFX_VERSION environment variable will") print(" be used. Failing that, \"DIFX-DEVEL\" will be used.") print("") print(" --eliminate NODE") print(" -e NODE Eliminate node names matching the regular expression NODE") print(" from the list of nodes that will be used for processing.") print("") print(" --generate") print(" -g Use whatever existing .machines and .threads files exist") print(" for each job. If no such files exist, new ones will be") print(" created.") print("") print(" --help") print(" -h Print this help information and quit.") print("") print(" --hostname NAME") print(" -H NAME Use NAME as the host of the difxServer program.") print(" Default is to use DIFX_CONTROL_HOST environment variable.") print("") print(" --manager NODE") print(" -m NODE Use NODE as the \"head node\" (or \"manager\" node)") print("") print(" --processors NUM[,MAX]") print(" -p NUM Use NUM processors for working on each job. By default") print(" two will be used. If MAX is included, NUM will become") print(" the minimum number of processors to be used and MAX will") print(" become the maximum used (and the desired number to use,") print(" depending on availability). If MAX is the character \"#\"") print(" then all available processors will be the maximum.") print("") print(" --port PORT") print(" -P PORT Use PORT as the TCP port to communicated with the difxServer.") print(" Default is to use DIFX_CONTROL_PORT environment variable.") print("") print(" --sequential") print(" -s Process jobs sequentially instead of attempting to do them") print(" simultaneously (default is simultaneous).") print("") exit( 0 ) elif sys.argv[i] in [ "-a", "--all" ]: runAll = True #showProcessorUsage = False i = i + 1 elif sys.argv[i] in [ "-c", "--config_only" ]: configOnly = True i = i + 1 elif sys.argv[i] in [ "-g", "--generate" ]: generate = False i = i + 1 elif sys.argv[i] in [ "-d", "--datasource" ]: dataSources.append( sys.argv[i+1] ) i = i + 2 elif sys.argv[i] in [ "-e", "--eliminate" ]: eliminateNodes.append( re.compile( sys.argv[i+1] ) ) i = i + 2 elif sys.argv[i] in [ "-p", "--processors" ]: # See if the string argument contains a comma, indicating two numbers. try: cpt = sys.argv[i+1].find( "," ) processors = int( sys.argv[i+1][:cpt]) if sys.argv[i+1][cpt+1:] == "#": maxProcessorsAll = True else: maxProcessors = int( sys.argv[i+1][cpt+1:]) except: processors = int( sys.argv[i+1] ) i = i + 2 elif sys.argv[i] in [ "-H", "--hostname" ]: host = sys.argv[i+1] i = i + 2 elif sys.argv[i] in [ "-m", "--manager" ]: headNode = sys.argv[i+1] i = i + 2 elif sys.argv[i] in [ "-D", "--difx" ]: DiFXVersion = sys.argv[i+1] i = i + 2 elif sys.argv[i] in [ "-P", "--port" ]: port = int( sys.argv[i+1] ) i = i + 2 elif sys.argv[i] in [ "-s", "--sequential" ]: sequential = True #showProcessorUsage = False i = i + 1 elif sys.argv[i] in [ "-t", "--timeout" ]: timeout = int( sys.argv[i+1] ) i = i + 2 else: # The final argument is assumed to be the .input file list inputFiles = sys.argv[i] i = i + 1 except RuntimeError: print("Usage: %s [options] <.input path>" % ( sys.argv[0] )) exit( 0 ) # Check a few things... if inputFiles == None: print("ERROR: An .input file list is required to identify the jobs") bail = True if bail: print("Usage: %s [options] <.input path>" % ( sys.argv[0] )) exit( 0 ) # Start the JobControl class, set the version, etc. print("Making client connection...") difx = DiFXJobControl.Client() difx.connect() difx.monitor() difx.version( DiFXVersion ) difx.addFailCallback( failureCallback ) if not difx.socketOK: difx.close() exit( 0 ) #difx.messageCallback( messageCallback ) #difx.warningCallback( warningCallback ) #difx.errorCallback( errorCallback ) # Identify the node running the server - this will become the head node unless # the user has selected otherwise. if len( difx.serverEnvironment ): for key in list(difx.serverEnvironment.keys()): if key.upper() == "HOSTNAME": # Put this machine in our list of processors with a guess as to the number # of cores. This number should be fixed later with DifxLoadMessages. if keepThisNode( difx.serverEnvironment[key] ): processorsList[difx.serverEnvironment[key]] = ( 8, [], [] ) # Make it the head node unless the user has specified one. if headNode == None: headNode = difx.serverEnvironment[key] if headNode == None: print("cannot identify a head node - please use \"-m\" to specify one.") difx.close() exit( 0 ) difx.messageSelection( ( "DifxStatusMessage", "Mark5StatusMessage", "DifxLoadMessage" ) ) responder = Responder() difx.addRelayCallback( responder.difxRelayCallback ) difx.relayPackets() difx.waitTime( timeout ) # Get a list of all .input files that match the user request. print("Locating .input files...") lsList = difx.ls( inputFiles ) # It is an error if no matches exist. if lsList == None: print("ERROR: None of the requested .input files were found on the server.") difx.close() exit( 0 ) # Compile a dictionary of job names - these will be used to associate message traffic # with each job. for inputFile in lsList: jobs[inputFile[inputFile.rfind( "/" ) + 1:inputFile.rfind( "." )]] = ( "not started", 0.0, False ) # Wait until sufficient processors are listed. keepGoing = True explained = False strlength = 0 if not configOnly: while keepGoing and len( list(processorsList.keys()) ) < processors: if not explained: newstr = "Waiting for at least " + str( processors ) + " processors to be available.....currently " strlength = len( newstr ) explained = True sys.stdout.write( newstr + str( len( list(processorsList.keys()) ) ) + "\r" ) try: time.sleep( .1 ) except KeyboardInterrupt: keepGoing = False if explained: if keepGoing: print(newstr + str( len( list(processorsList.keys()) ) )) else: print("\nKeyboard Interrupt!") difx.close() exit( 0 ) # This is where we actually run things. for inputFile in lsList: if not connectionFailure: difx.inputFile( inputFile ) # Find the job name jobName = inputFile[inputFile.rfind( "/" ) + 1:inputFile.rfind( "." )] jobs[jobName] = ( "Initializing", 0.0, False ) # See if we need to generate .machines and .threads files because they are missing. print("trying to locate machines files") if not difx.getMachines(): generate = True # Assign machines and threads based on our instructions. if generate: difx.setHeadNode( headNode ) difx.clearDataSources() difx.clearProcessors() if runAll: # Make the first processors data sources unless we have only # one...in which case use that. count = 0 for proc in list(processorsList.keys()): if count < usedAsDataSources: difx.addDataSource( proc ) count += 1 # Make every machine a processor. for proc in list(processorsList.keys()): difx.addProcessor( proc, processorsList[proc][0] ) processorsList[proc][1].append( jobName ) processorsList[proc] = ( processorsList[proc][0], processorsList[proc][1], processorsList[proc][2] ) difx.defineMachines() thisJob = difx.newJob() if configOnly: thisJob.configOnly( True ) mon = JobEndMonitor( thisJob, jobName ) mon.start() thisJob.start( False ) elif sequential: # Allocated the requested number of data sources. count = 0 for proc in list(processorsList.keys()): if count < usedAsDataSources: difx.addDataSource( proc ) count += 1 # Make every machine a processor until we reach the number specified by the user. count = 0 if maxProcessors != None: desiredProcessors = maxProcessors elif maxProcessorsAll: desiredProcessors = len( list(processorsList.keys()) ) else: desiredProcessors = processors for proc in list(processorsList.keys()): if count < desiredProcessors: difx.addProcessor( proc, processorsList[proc][0] ) processorsList[proc][1].append( jobName ) processorsList[proc] = ( processorsList[proc][0], processorsList[proc][1], processorsList[proc][2] ) count += 1 difx.defineMachines() thisJob = difx.newJob() if configOnly: thisJob.configOnly( True ) thisJob.start( True ) # Wait a second to allow any job-related messages to come through. time.sleep( 1 ) produceFinalStatus( thisJob, jobName, False ) else: notStarted = True while notStarted and not connectionFailure: # Make sure sufficient processors are idle (not working on another job) # before we schedule them here. count = 0 goodList = [] for proc in list(processorsList.keys()): if len( processorsList[proc][1] ) == 0: count += 1 goodList.append( proc ) # Only continue if we have sufficient free processors. if count >= usedAsDataSources and count >= processors: count = 0 for proc in goodList: if count < usedAsDataSources: difx.addDataSource( proc ) count += 1 count = 0 if maxProcessors != None: desiredProcessors = maxProcessors else: desiredProcessors = processors for proc in goodList: if count < desiredProcessors: difx.addProcessor( proc, processorsList[proc][0] ) processorsList[proc][1].append( jobName ) processorsList[proc] = ( processorsList[proc][0], processorsList[proc][1], processorsList[proc][2] ) count += 1 difx.defineMachines() thisJob = difx.newJob() if configOnly: thisJob.configOnly( True ) mon = JobEndMonitor( thisJob, jobName ) mon.start() thisJob.start( False ) notStarted = False else: try: time.sleep( 1 ) except KeyboardInterrupt: difx.close() exit( 0 ) # Hang out until all jobs are complete. keepGoing = True while keepGoing: try: time.sleep( 1 ) somethingGoing = False for job in list(jobs.keys()): if jobs[job][2] != True: somethingGoing = True if not somethingGoing: keepGoing = False except KeyboardInterrupt: keepGoing = False responder.updateDisplay() difx.close()