#!/usr/bin/env python
#\if DOXYGEN_IGNORE ############################################################
# #
# Copyright (C) 2016 by John Spitzak #
# #
# This program is free software; you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation; either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program; if not, write to the #
# Free Software Foundation, Inc., #
# 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. #
# #
#\endif ########################################################################
################################################################################
#\defgroup difxrun DiFXrun
#
#\brief Run a list of jobs on the DiFX server with limited controls.
#
# Usage: DiFXrun [options] [.input path list]
#
# DiFXrun runs a list of DiFX jobs, specified by .input
file paths, on a DiFX cluster.
# By default it tries to distribute the jobs on the cluster in a reasonable way such that the cluster will not
# be overwhelmed, or it will do so based on
# user command-line settings (reasonable or otherwise). Jobs are distributed amongst
# existing DiFX cluster processors by actively altering the .machines and .threads files associated
# with them. There are three basic
# "modes" of operation:
#
#
# -a, --all | Run the entire list of .input files using all available
# processors, rather than trying to balance or distribute
# processing.
# |
-c, --config_only | Run the configuration test on each job ONLY. The job will
# not actually be run. The test results (pass/fail) will be
# displayed.
# |
-d, --datasource NODE | Use NODE as a data sources. This is needed if mark5 modules
# are employed or if only specific processor nodes have access to data
# files. This option must be used once for each data source. Data sources
# will be blindly used in the order they are specified.
# |
-D, --difx VERSION | Run using a specific DiFX version. If not specified
# the value of the DIFX_VERSION environment variable will
# be used. Failing that, "DIFX-DEVEL" will be used.
# |
-e, --eliminate NODE | Eliminate node names matching the regular expression NODE
# from the list of nodes that will be used for processing. This
# argument can be used multiple times to include multiple regular expressions.
# |
-g, --generate | Use whatever existing .machines and .threads files exist
# for each job. If no such files exist, new ones will be
# created (this option is actually "don't generate" but that was too clumsy).
# |
-h, --help | Print help information and quit.
# |
-H, --hostname NAME | Use NAME as the host of the DiFX Server program.
# Default is to use DIFX_CONTROL_HOST environment variable.
# |
-m, --manager NODE | Make NODE the head node used for processing. If this is
# not specified DiFXrun will use the node where guiServer
# is located.
# |
-p, --processors NUM | Use NUM processors for each job. If this option is not used
# DiFXrun will use two (an arbitrary number, admittedly).
# |
-P, --port PORT | Use PORT as the TCP port to communicated with the DiFX Server.
# Default is to use DIFX_CONTROL_PORT environment variable.
# |
-s, --sequential | Run jobs sequentially instead of simultaneously (which is the
# default). All processors will be employed unless -p specifies
# a number.
# |
-t, --timeout SEC | Use SEC seconds as the timeout value for each job. This is the
# amount of time DiFXrun will wait before it gives up on a
# "silent" (i.e. no messages received from) job and declares it
# non-responsive. Default value is 300.0.
# |
0:
# This little mess clears the screen (cross-platform I think) and starts
# subsequent printing at the top.
os.system( 'cls' if os.name == 'nt' else 'clear' )
# Find some ideal column sizes.
maxCol1 = 12
maxCol2 = len( "Initializing" )
# Find some ideal column sizes
for job in jobs.keys():
if len( job ) > maxCol1:
maxCol1 = len( job )
if len( jobs[job][0] ) > maxCol2:
maxCol2 = len( jobs[job][0] )
for job in sorted( jobs.keys() ):
newStr = str( job )
while len( newStr ) < maxCol1 + 3:
newStr += " "
shortStr = jobs[job][0]
while len( shortStr ) < maxCol2 + 3:
shortStr += " "
newStr += shortStr
prog = int( jobs[job][1] )
if prog < 100:
newStr += " "
if prog < 10:
newStr += " "
if jobs[job][0] != "not started" and not configOnly:
newStr += str( prog ) + "% ["
shortStr = ""
if prog > 100:
prog = 100
if prog < 0:
prog = 0
while prog > 0:
shortStr += "X"
prog -= 2
while len( shortStr ) < 50:
shortStr += " "
newStr += shortStr + "]"
if showProcessorUsage:
newStr += " ["
for proc in sorted( processorsList.keys() ):
try:
index = processorsList[proc][1].index( job )
newStr += "*"
except:
try:
index = processorsList[proc][2].index( job )
newStr += "O"
except:
newStr += "."
newStr += "]"
print newStr
#===============================================================================
# Get the final status of this job and cause the job display to reflect it.
#===============================================================================
def produceFinalStatus( thisJob, jobName, failure ):
if thisJob == None and jobName == None:
# A call with "None" occurs when there is a connection problem. Close
# all of the open jobs "as is" so the program will terminate.
for job in jobs.keys():
print str( jobs[job] )
if not jobs[job][2]:
jobs[job] = ( "Broken Socket", jobs[job][1], True )
return
elif failure:
jobs[jobName] = ( "Not Responding", jobs[jobName][1], True )
difx.stop( thisJob.inputFile )
elif jobs[jobName][2]:
pass
else:
if thisJob.finalMessage() == thisJob.RUN_DIFX_JOB_ENDED_GRACEFULLY:
jobs[jobName] = ( "Done", 100.0, True )
elif thisJob.finalMessage() == thisJob.RUN_DIFX_JOB_ENDED_WITH_ERRORS:
jobs[jobName] = ( "Ran w/Errors", 100.0, True )
elif thisJob.finalMessage() == thisJob.RUN_DIFX_JOB_FAILED:
jobs[jobName] = ( "Failed", jobs[jobName][1], True )
elif thisJob.finalMessage() == thisJob.RUN_DIFX_JOB_TERMINATED:
jobs[jobName] = ( "Terminated", jobs[jobName][1], True )
elif thisJob.finalMessage() == thisJob.RUN_DIFX_FAILURE_INPUTFILE_BAD_CONFIG:
jobs[jobName] = ( "Config Failed", jobs[jobName][1], True )
elif thisJob.finalMessage() == thisJob.RUN_DIFX_CONFIG_PASSED:
jobs[jobName] = ( "Config Passed", jobs[jobName][1], True )
elif thisJob.finalMessage() == thisJob.RUN_DIFX_NOT_RESPONDING:
jobs[jobName] = ( "Not Responding", jobs[jobName][1], True )
difx.stop( thisJob.inputFile )
if responder != None:
responder.updateDisplay()
#===============================================================================
# Thread to monitor a running job and figure out when it terminates.
#===============================================================================
class JobEndMonitor( threading.Thread ):
def __init__( self, thisJob, jobName ):
threading.Thread.__init__( self )
self.jobName = jobName
self.thisJob = thisJob
def run( self ):
self.quitNow = False
self.failure = False
while not self.thisJob.jobComplete and not self.quitNow:
try:
time.sleep( 0.1 )
except KeyboardInterrupt:
self.quitNow = True
self.thisJob.wait = self.thisJob.wait - 0.1
if self.thisJob.wait < 0.0:
self.thisJob.setFinalMessage( thisJob.RUN_DIFX_NOT_RESPONDING )
self.failure = True
self.quitNow = True
produceFinalStatus( self.thisJob, self.jobName, self.failure )
self.thisJob.closeChannel()
def messageCallback( argstr ):
print "MESSAGE:", argstr
def warningCallback( argstr ):
print "WARNING:", argstr
def errorCallback( argstr ):
print "ERROR:", argstr
def failureCallback( arg ):
connectionFailure = True
produceFinalStatus( None, None, False )
print "GET OUT OF HERE NOW!!!"
#===============================================================================
# MAIN
#===============================================================================
host = None
port = None
dataSources = []
processors = 2
maxProcessors = None
maxProcessorsAll = False
usedAsDataSources = 2
headNode = None
inputFiles = None
bail = False
generate = True
timeout = 300.0
# Locate a "default" DiFX Version from environment variables. User may change this
# with command line arguments.
try:
DiFXVersion = os.environ["DIFX_VERSION"]
except:
DiFXVersion = "DIFX-DEVEL"
try:
i = 1
otherArgs = []
argStr = None
pathStr = None
while i < len( sys.argv ):
# Check against legal argument types. Anything we don't recognize is assumed
# to be an argument or a path.
if sys.argv[i] in [ "-h", "--help" ]:
print '\n%s ver %s %s %s' % (program, version, author, verdate)
print "Run a job on the DiFX software correlator using its .input file path."
print "Usage: %s [options] <.input path>" % ( sys.argv[0] )
print ""
print "Options can include:"
print ""
print " --all"
print " -a Run the entire list of .input files using all available"
print " processors, rather than trying to balance or distribute"
print " processing."
print ""
print " --config_only"
print " -c Run the configuration test on each job ONLY. The job will"
print " not actually be run. The test results (pass/fail) will be"
print " displayed."
print ""
print " --datasource NODE"
print " -d NODE Add a data source node to the list of such sources."
print ""
print " --difx VERSION"
print " -D VERSION Run using a specific DiFX version. If not specified"
print " the value of the DIFX_VERSION environment variable will"
print " be used. Failing that, \"DIFX-DEVEL\" will be used."
print ""
print " --eliminate NODE"
print " -e NODE Eliminate node names matching the regular expression NODE"
print " from the list of nodes that will be used for processing."
print ""
print " --generate"
print " -g Use whatever existing .machines and .threads files exist"
print " for each job. If no such files exist, new ones will be"
print " created."
print ""
print " --help"
print " -h Print this help information and quit."
print ""
print " --hostname NAME"
print " -H NAME Use NAME as the host of the difxServer program."
print " Default is to use DIFX_CONTROL_HOST environment variable."
print ""
print " --manager NODE"
print " -m NODE Use NODE as the \"head node\" (or \"manager\" node)"
print ""
print " --processors NUM[,MAX]"
print " -p NUM Use NUM processors for working on each job. By default"
print " two will be used. If MAX is included, NUM will become"
print " the minimum number of processors to be used and MAX will"
print " become the maximum used (and the desired number to use,"
print " depending on availability). If MAX is the character \"#\""
print " then all available processors will be the maximum."
print ""
print " --port PORT"
print " -P PORT Use PORT as the TCP port to communicated with the difxServer."
print " Default is to use DIFX_CONTROL_PORT environment variable."
print ""
print " --sequential"
print " -s Process jobs sequentially instead of attempting to do them"
print " simultaneously (default is simultaneous)."
print ""
exit( 0 )
elif sys.argv[i] in [ "-a", "--all" ]:
runAll = True
#showProcessorUsage = False
i = i + 1
elif sys.argv[i] in [ "-c", "--config_only" ]:
configOnly = True
i = i + 1
elif sys.argv[i] in [ "-g", "--generate" ]:
generate = False
i = i + 1
elif sys.argv[i] in [ "-d", "--datasource" ]:
dataSources.append( sys.argv[i+1] )
i = i + 2
elif sys.argv[i] in [ "-e", "--eliminate" ]:
eliminateNodes.append( re.compile( sys.argv[i+1] ) )
i = i + 2
elif sys.argv[i] in [ "-p", "--processors" ]:
# See if the string argument contains a comma, indicating two numbers.
try:
cpt = sys.argv[i+1].find( "," )
processors = int( sys.argv[i+1][:cpt])
if sys.argv[i+1][cpt+1:] == "#":
maxProcessorsAll = True
else:
maxProcessors = int( sys.argv[i+1][cpt+1:])
except:
processors = int( sys.argv[i+1] )
i = i + 2
elif sys.argv[i] in [ "-H", "--hostname" ]:
host = sys.argv[i+1]
i = i + 2
elif sys.argv[i] in [ "-m", "--manager" ]:
headNode = sys.argv[i+1]
i = i + 2
elif sys.argv[i] in [ "-D", "--difx" ]:
DiFXVersion = sys.argv[i+1]
i = i + 2
elif sys.argv[i] in [ "-P", "--port" ]:
port = int( sys.argv[i+1] )
i = i + 2
elif sys.argv[i] in [ "-s", "--sequential" ]:
sequential = True
#showProcessorUsage = False
i = i + 1
elif sys.argv[i] in [ "-t", "--timeout" ]:
timeout = int( sys.argv[i+1] )
i = i + 2
else:
# The final argument is assumed to be the .input file list
inputFiles = sys.argv[i]
i = i + 1
except RuntimeError:
print "Usage: %s [options] <.input path>" % ( sys.argv[0] )
exit( 0 )
# Check a few things...
if inputFiles == None:
print "ERROR: An .input file list is required to identify the jobs"
bail = True
if bail:
print "Usage: %s [options] <.input path>" % ( sys.argv[0] )
exit( 0 )
# Start the JobControl class, set the version, etc.
print "Making client connection..."
difx = DiFXJobControl.Client()
difx.connect()
difx.monitor()
difx.version( DiFXVersion )
difx.addFailCallback( failureCallback )
if not difx.socketOK:
difx.close()
exit( 0 )
#difx.messageCallback( messageCallback )
#difx.warningCallback( warningCallback )
#difx.errorCallback( errorCallback )
# Identify the node running the server - this will become the head node unless
# the user has selected otherwise.
if len( difx.serverEnvironment ):
for key in difx.serverEnvironment.keys():
if key.upper() == "HOSTNAME":
# Put this machine in our list of processors with a guess as to the number
# of cores. This number should be fixed later with DifxLoadMessages.
if keepThisNode( difx.serverEnvironment[key] ):
processorsList[difx.serverEnvironment[key]] = ( 8, [], [] )
# Make it the head node unless the user has specified one.
if headNode == None:
headNode = difx.serverEnvironment[key]
if headNode == None:
print "cannot identify a head node - please use \"-m\" to specify one."
difx.close()
exit( 0 )
difx.messageSelection( ( "DifxStatusMessage", "Mark5StatusMessage", "DifxLoadMessage" ) )
responder = Responder()
difx.addRelayCallback( responder.difxRelayCallback )
difx.relayPackets()
difx.waitTime( timeout )
# Get a list of all .input files that match the user request.
print "Locating .input files..."
lsList = difx.ls( inputFiles )
# It is an error if no matches exist.
if lsList == None:
print "ERROR: None of the requested .input files were found on the server."
difx.close()
exit( 0 )
# Compile a dictionary of job names - these will be used to associate message traffic
# with each job.
for inputFile in lsList:
jobs[inputFile[inputFile.rfind( "/" ) + 1:inputFile.rfind( "." )]] = ( "not started", 0.0, False )
# Wait until sufficient processors are listed.
keepGoing = True
explained = False
strlength = 0
if not configOnly:
while keepGoing and len( processorsList.keys() ) < processors:
if not explained:
newstr = "Waiting for at least " + str( processors ) + " processors to be available.....currently "
strlength = len( newstr )
explained = True
sys.stdout.write( newstr + str( len( processorsList.keys() ) ) + "\r" )
try:
time.sleep( .1 )
except KeyboardInterrupt:
keepGoing = False
if explained:
if keepGoing:
print newstr + str( len( processorsList.keys() ) )
else:
print "\nKeyboard Interrupt!"
difx.close()
exit( 0 )
# This is where we actually run things.
for inputFile in lsList:
if not connectionFailure:
difx.inputFile( inputFile )
# Find the job name
jobName = inputFile[inputFile.rfind( "/" ) + 1:inputFile.rfind( "." )]
jobs[jobName] = ( "Initializing", 0.0, False )
# See if we need to generate .machines and .threads files because they are missing.
print "trying to locate machines files"
if not difx.getMachines():
generate = True
# Assign machines and threads based on our instructions.
if generate:
difx.setHeadNode( headNode )
difx.clearDataSources()
difx.clearProcessors()
if runAll:
# Make the first processors data sources unless we have only
# one...in which case use that.
count = 0
for proc in processorsList.keys():
if count < usedAsDataSources:
difx.addDataSource( proc )
count += 1
# Make every machine a processor.
for proc in processorsList.keys():
difx.addProcessor( proc, processorsList[proc][0] )
processorsList[proc][1].append( jobName )
processorsList[proc] = ( processorsList[proc][0], processorsList[proc][1], processorsList[proc][2] )
difx.defineMachines()
thisJob = difx.newJob()
if configOnly:
thisJob.configOnly( True )
mon = JobEndMonitor( thisJob, jobName )
mon.start()
thisJob.start( False )
elif sequential:
# Allocated the requested number of data sources.
count = 0
for proc in processorsList.keys():
if count < usedAsDataSources:
difx.addDataSource( proc )
count += 1
# Make every machine a processor until we reach the number specified by the user.
count = 0
if maxProcessors != None:
desiredProcessors = maxProcessors
elif maxProcessorsAll:
desiredProcessors = len( processorsList.keys() )
else:
desiredProcessors = processors
for proc in processorsList.keys():
if count < desiredProcessors:
difx.addProcessor( proc, processorsList[proc][0] )
processorsList[proc][1].append( jobName )
processorsList[proc] = ( processorsList[proc][0], processorsList[proc][1], processorsList[proc][2] )
count += 1
difx.defineMachines()
thisJob = difx.newJob()
if configOnly:
thisJob.configOnly( True )
thisJob.start( True )
# Wait a second to allow any job-related messages to come through.
time.sleep( 1 )
produceFinalStatus( thisJob, jobName, False )
else:
notStarted = True
while notStarted and not connectionFailure:
# Make sure sufficient processors are idle (not working on another job)
# before we schedule them here.
count = 0
goodList = []
for proc in processorsList.keys():
if len( processorsList[proc][1] ) == 0:
count += 1
goodList.append( proc )
# Only continue if we have sufficient free processors.
if count >= usedAsDataSources and count >= processors:
count = 0
for proc in goodList:
if count < usedAsDataSources:
difx.addDataSource( proc )
count += 1
count = 0
if maxProcessors != None:
desiredProcessors = maxProcessors
else:
desiredProcessors = processors
for proc in goodList:
if count < desiredProcessors:
difx.addProcessor( proc, processorsList[proc][0] )
processorsList[proc][1].append( jobName )
processorsList[proc] = ( processorsList[proc][0], processorsList[proc][1], processorsList[proc][2] )
count += 1
difx.defineMachines()
thisJob = difx.newJob()
if configOnly:
thisJob.configOnly( True )
mon = JobEndMonitor( thisJob, jobName )
mon.start()
thisJob.start( False )
notStarted = False
else:
try:
time.sleep( 1 )
except KeyboardInterrupt:
difx.close()
exit( 0 )
# Hang out until all jobs are complete.
keepGoing = True
while keepGoing:
try:
time.sleep( 1 )
somethingGoing = False
for job in jobs.keys():
if jobs[job][2] != True:
somethingGoing = True
if not somethingGoing:
keepGoing = False
except KeyboardInterrupt:
keepGoing = False
responder.updateDisplay()
difx.close()