#!/usr/bin/env python
################################################################################
#\defgroup difxjobcontrol DiFXJobControl
#
#\brief Run a job on the DiFX server with limited controls.
#
# Usage: DiFXJobControl [options] [.input path]
#
# DiFXJobControl attempts to run a single job on the DiFX server. The job is
# identified by the complete path to its associated .input
file on
# the server - this is the only required argument.
#
# DiFXJobControl allows you to specify the head node, data source nodes,
# processors, and threads used to run the job (if you specify one, you must specify
# them all!). Left to its own devices it will attempt to use .machines
# and .threads
files, if they exist. No check will be made that the
# content of these files is valid.
#
# This program is meant to show how to use the DiFXJobControl.Client class to
# run a job. However it is by no means a complete test of that class or its
# capabilities.
#
# Command Line Arguments
#
#
# -c, --config_only | Run only the configuration test, not the actual job.
# |
-d, --datasource NODE | Add NODE to the list of data sources used to process the job.
# If data sources are included head node, processors and threads must
# be specified as well. This argument can be used multiple times.
# |
-D, --difx VERSION | Run using a specific DiFX version. If not specified
# the value of the DIFX_VERSION environment variable will
# be used. Failing that, "DIFX-DEVEL" will be used.
# |
-h, --help | Print help information and quit.
# |
-H, --hostname NAME | Use NAME as the host of the DiFX Server program.
# Default is to use DIFX_CONTROL_HOST environment variable.
# |
-m, --manager NODE | Make NODE the head node used for the job.
# If the head node is specified processors, data sources and threads must
# be specified as well.
# |
-p, --processor NODE | Add NODE to the list of processors used for the job.
# If processors are included head node, data sources and threads must
# be specified as well. The total number of thread specifications
# must match the number of processor specifications. This argument can be used multiple times.
# |
-P, --port PORT | Use PORT as the TCP port to communicated with the DiFX Server.
# Default is to use DIFX_CONTROL_PORT environment variable.
# |
-r, --realtime | Turn on the real time monitor - this will produce a list of real time
# "products" from which the user will be prompted to select one or more.
# |
-t, --threads NUM | Use NUM threads for the corresponding processor.
# If threads are included head node, data sources and processors must
# be specified as well. The total number of thread specifications
# must match the number of processor specifications. This argument can be used multiple times.
# |
-w, --waitTime NUM | Wait NUM seconds before timing out of an operation due to inactivity.
# This applies only to the start() method call.
# |
" % ( sys.argv[0] ))
print("")
print("Options can include:")
print("")
print(" --config_only")
print(" -c Run only the configuration test, not the job itself.")
print("")
print(" --datasource NODE")
print(" -d NODE Add a data source node to the list of such sources.")
print("")
print(" --difx VERSION")
print(" -D VERSION Run using a specific DiFX version. If not specified")
print(" the value of the DIFX_VERSION environment variable will")
print(" be used. Failing that, \"DIFX-DEVEL\" will be used.")
print("")
print(" --help")
print(" -h Print this help information and quit.")
print("")
print(" --hostname NAME")
print(" -H NAME Use NAME as the host of the difxServer program.")
print(" Default is to use DIFX_CONTROL_HOST environment variable.")
print("")
print(" --manager NODE")
print(" -m NODE Use NODE as the \"head node\" (or \"manager\" node)")
print("")
print(" --processor NODE")
print(" -p NODE Add a processor node to the list of processors. There")
print(" must be a corresponding threads entry.")
print("")
print(" --port PORT")
print(" -P PORT Use PORT as the TCP port to communicated with the difxServer.")
print(" Default is to use DIFX_CONTROL_PORT environment variable.")
print("")
print(" --realtime")
print(" -r Turn on the real time monitor - this will produce a list of")
print(" real time \"products\" from which the user will be prompted")
print(" to select one or more.")
print("")
print(" --threads NUM")
print(" -t NUM Assign NUM threads to the corresponding processor entry.")
print("")
print(" --waitTime NUM")
print(" -w NUM Wait NUM seconds before timing out of an operation due to inactivity.")
print(" This applies only to the \"start()\" method call.")
print("")
exit( 0 )
elif sys.argv[i] in [ "-c", "--config_only" ]:
configOnly = True
i = i + 1
elif sys.argv[i] in [ "-d", "--datasource" ]:
dataSources.append( sys.argv[i+1] )
i = i + 1
elif sys.argv[i] in [ "-p", "--processor" ]:
processors.append( sys.argv[i+1] )
i = i + 1
elif sys.argv[i] in [ "-t", "--threads" ]:
threads.append( int( sys.argv[i+1] ) )
i = i + 1
elif sys.argv[i] in [ "-H", "--hostname" ]:
host = sys.argv[i+1]
i = i + 1
elif sys.argv[i] in [ "-m", "--manager" ]:
headNode = sys.argv[i+1]
i = i + 1
elif sys.argv[i] in [ "-D", "--difx" ]:
DiFXVersion = sys.argv[i+1]
i = i + 1
elif sys.argv[i] in [ "-P", "--port" ]:
port = int( sys.argv[i+1] )
i = i + 1
elif sys.argv[i] in [ "-r", "--realtime" ]:
realtime = True
i = i + 1
elif sys.argv[i] in [ "-w", "--waitTime" ]:
waitTime = int( sys.argv[i+1] )
i = i + 1
else:
# The final argument is assumed to be the .input file name
inputFile = sys.argv[i]
i = i + 1
except RuntimeError:
print("Usage: %s [options] <.input path>" % ( sys.argv[0] ))
exit( 0 )
# Check a few things...
if inputFile == None:
print("ERROR: .input file required to identify the job")
bail = True
if len( dataSources ) > 0 or headNode != None or len( processors ) > 0:
problem = False
if len( processors ) == 0:
print("ERROR: No processors included!")
problem = True
if len( dataSources ) == 0:
print("ERROR: No data sources included!")
problem = True
if headNode == None:
print("ERROR: Head node not defined!")
problem = True
if problem:
print("ERROR: ALL datasources, processors and the head node must be specified if any of them are.")
bail = True
if len( processors ) != len( threads ):
print("ERROR: There must be one thread specification for each processor specification.")
bail = True
if bail:
print("Usage: %s [options] <.input path>" % ( sys.argv[0] ))
exit( 0 )
difx = DiFXJobControl.Client()
difx.connect()
difx.monitor()
# Check the existence of the .input file.
if difx.ls( inputFile ) == None:
print("ERROR: .input file not found")
difx.close()
exit( 0 )
difx.version( DiFXVersion )
difx.inputFile( inputFile )
# If machine specifications were included on the command line, use them.
if headNode != None:
difx.clearDataSources()
difx.clearProcessors()
difx.setHeadNode( headNode )
for node in dataSources:
difx.addDataSource( node )
i = 0;
for node in processors:
difx.addProcessor( node, threads[i] )
i += 1
# Not sure if we should always do this!
difx.testProcessors( True )
# But this is certainly necessary
difx.defineMachines()
elif configOnly:
pass
else:
# Make sure threads and machines files exist.
if difx.ls( inputFile.replace( ".input", ".machines" ) ) == None:
print("ERROR: .machines file \"" + inputFile.replace( ".input", ".machines" ) + "\" does not exist")
bail = True
if difx.ls( inputFile.replace( ".input", ".threads" ) ) == None:
print("ERROR: .threads file \"" + inputFile.replace( ".input", ".threads" ) + "\" does not exist")
bail = True
if bail:
difx.close()
exit( 0 )
# Get the existing machines specifications.
difx.getMachines()
difx.messageCallback( messageCallback )
difx.warningCallback( warningCallback )
difx.errorCallback( errorCallback )
difx.messageSelection( ( "DifxStatusMessage", ) )
responder = Responder()
difx.addRelayCallback( responder.difxRelayCallback )
difx.relayPackets()
# Find the job name
responder.jobName = inputFile[inputFile.rfind( "/" ) + 1:inputFile.rfind( "." )]
if realtime:
difx.startMonitor()
monScans = difx.getMonitorProducts()
if monScans != None:
print("The following products are available for real time monitoring:")
for scan in monScans:
print("", scan[0])
for baseline in scan[7]:
if len( baseline[3] ) > 0:
print("\t", baseline[0] + " -", baseline[1], "frequencies")
for frequency in baseline[3]:
print(frequency[0], "\t", frequency[1])
if len( baseline[4] ) > 0:
print("\tAutocorrelations (frequency, telescope)")
for ac in baseline[4]:
print(ac[0], "\t", ac[2], ac[1])
l = input( "select products: " )
# Assuming the user typed something...
if len( l.strip() ) > 0:
# split by commas
productList = []
for product in l.split( "," ):
productList.append( int( product.strip() ) )
if len( productList ) > 0:
difx.requestProducts( productList )
difx.runWithMonitor( True )
difx.monitorDataCallbacks( None, None, None, None, None, meanLagCB )
# Make the wait time huge to avoid a timeout while the job is starting
if waitTime != None:
difx.waitTime( waitTime )
else:
difx.waitTime( 300.0 )
newJob = difx.newJob()
if configOnly:
newJob.configOnly( True )
newJob.start()
if realtime:
difx.stopMonitor()
time.sleep( 4 )
difx.close()