#!/usr/bin/env python ################################################################################ #\defgroup difxjobcontrol DiFXJobControl # #\brief Run a job on the DiFX server with limited controls. # # Usage: DiFXJobControl [options] [.input path] # # DiFXJobControl attempts to run a single job on the DiFX server. The job is # identified by the complete path to its associated .input file on # the server - this is the only required argument. # # DiFXJobControl allows you to specify the head node, data source nodes, # processors, and threads used to run the job (if you specify one, you must specify # them all!). Left to its own devices it will attempt to use .machines # and .threads files, if they exist. No check will be made that the # content of these files is valid. # # This program is meant to show how to use the DiFXJobControl.Client class to # run a job. However it is by no means a complete test of that class or its # capabilities. # #

Command Line Arguments

# # #
-c, --config_only
Run only the configuration test, not the actual job. #
-d, --datasource NODE
Add NODE to the list of data sources used to process the job. # If data sources are included head node, processors and threads must # be specified as well. This argument can be used multiple times. #
-D, --difx VERSION
Run using a specific DiFX version. If not specified # the value of the DIFX_VERSION environment variable will # be used. Failing that, "DIFX-DEVEL" will be used. #
-h, --help
Print help information and quit. #
-H, --hostname NAME
Use NAME as the host of the DiFX Server program. # Default is to use DIFX_CONTROL_HOST environment variable. #
-m, --manager NODE
Make NODE the head node used for the job. # If the head node is specified processors, data sources and threads must # be specified as well. #
-p, --processor NODE
Add NODE to the list of processors used for the job. # If processors are included head node, data sources and threads must # be specified as well. The total number of thread specifications # must match the number of processor specifications. This argument can be used multiple times. #
-P, --port PORT
Use PORT as the TCP port to communicated with the DiFX Server. # Default is to use DIFX_CONTROL_PORT environment variable. #
-r, --realtime
Turn on the real time monitor - this will produce a list of real time # "products" from which the user will be prompted to select one or more. #
-t, --threads NUM
Use NUM threads for the corresponding processor. # If threads are included head node, data sources and processors must # be specified as well. The total number of thread specifications # must match the number of processor specifications. This argument can be used multiple times. #
-w, --waitTime NUM
Wait NUM seconds before timing out of an operation due to inactivity. # This applies only to the start() method call. #
" % ( sys.argv[0] ) print "" print "Options can include:" print "" print " --config_only" print " -c Run only the configuration test, not the job itself." print "" print " --datasource NODE" print " -d NODE Add a data source node to the list of such sources." print "" print " --difx VERSION" print " -D VERSION Run using a specific DiFX version. If not specified" print " the value of the DIFX_VERSION environment variable will" print " be used. Failing that, \"DIFX-DEVEL\" will be used." print "" print " --help" print " -h Print this help information and quit." print "" print " --hostname NAME" print " -H NAME Use NAME as the host of the difxServer program." print " Default is to use DIFX_CONTROL_HOST environment variable." print "" print " --manager NODE" print " -m NODE Use NODE as the \"head node\" (or \"manager\" node)" print "" print " --processor NODE" print " -p NODE Add a processor node to the list of processors. There" print " must be a corresponding threads entry." print "" print " --port PORT" print " -P PORT Use PORT as the TCP port to communicated with the difxServer." print " Default is to use DIFX_CONTROL_PORT environment variable." print "" print " --realtime" print " -r Turn on the real time monitor - this will produce a list of" print " real time \"products\" from which the user will be prompted" print " to select one or more." print "" print " --threads NUM" print " -t NUM Assign NUM threads to the corresponding processor entry." print "" print " --waitTime NUM" print " -w NUM Wait NUM seconds before timing out of an operation due to inactivity." print " This applies only to the \"start()\" method call." print "" exit( 0 ) elif sys.argv[i] in [ "-c", "--config_only" ]: configOnly = True i = i + 1 elif sys.argv[i] in [ "-d", "--datasource" ]: dataSources.append( sys.argv[i+1] ) i = i + 1 elif sys.argv[i] in [ "-p", "--processor" ]: processors.append( sys.argv[i+1] ) i = i + 1 elif sys.argv[i] in [ "-t", "--threads" ]: threads.append( int( sys.argv[i+1] ) ) i = i + 1 elif sys.argv[i] in [ "-H", "--hostname" ]: host = sys.argv[i+1] i = i + 1 elif sys.argv[i] in [ "-m", "--manager" ]: headNode = sys.argv[i+1] i = i + 1 elif sys.argv[i] in [ "-D", "--difx" ]: DiFXVersion = sys.argv[i+1] i = i + 1 elif sys.argv[i] in [ "-P", "--port" ]: port = int( sys.argv[i+1] ) i = i + 1 elif sys.argv[i] in [ "-r", "--realtime" ]: realtime = True i = i + 1 elif sys.argv[i] in [ "-w", "--waitTime" ]: waitTime = int( sys.argv[i+1] ) i = i + 1 else: # The final argument is assumed to be the .input file name inputFile = sys.argv[i] i = i + 1 except RuntimeError: print "Usage: %s [options] <.input path>" % ( sys.argv[0] ) exit( 0 ) # Check a few things... if inputFile == None: print "ERROR: .input file required to identify the job" bail = True if len( dataSources ) > 0 or headNode != None or len( processors ) > 0: problem = False if len( processors ) == 0: print "ERROR: No processors included!" problem = True if len( dataSources ) == 0: print "ERROR: No data sources included!" problem = True if headNode == None: print "ERROR: Head node not defined!" problem = True if problem: print "ERROR: ALL datasources, processors and the head node must be specified if any of them are." bail = True if len( processors ) != len( threads ): print "ERROR: There must be one thread specification for each processor specification." bail = True if bail: print "Usage: %s [options] <.input path>" % ( sys.argv[0] ) exit( 0 ) difx = DiFXJobControl.Client() difx.connect() difx.monitor() # Check the existence of the .input file. if difx.ls( inputFile ) == None: print "ERROR: .input file not found" difx.close() exit( 0 ) difx.version( DiFXVersion ) difx.inputFile( inputFile ) # If machine specifications were included on the command line, use them. if headNode != None: difx.clearDataSources() difx.clearProcessors() difx.setHeadNode( headNode ) for node in dataSources: difx.addDataSource( node ) i = 0; for node in processors: difx.addProcessor( node, threads[i] ) i += 1 # Not sure if we should always do this! difx.testProcessors( True ) # But this is certainly necessary difx.defineMachines() elif configOnly: pass else: # Make sure threads and machines files exist. if difx.ls( inputFile.replace( ".input", ".machines" ) ) == None: print "ERROR: .machines file \"" + inputFile.replace( ".input", ".machines" ) + "\" does not exist" bail = True if difx.ls( inputFile.replace( ".input", ".threads" ) ) == None: print "ERROR: .threads file \"" + inputFile.replace( ".input", ".threads" ) + "\" does not exist" bail = True if bail: difx.close() exit( 0 ) # Get the existing machines specifications. difx.getMachines() difx.messageCallback( messageCallback ) difx.warningCallback( warningCallback ) difx.errorCallback( errorCallback ) difx.messageSelection( ( "DifxStatusMessage", ) ) responder = Responder() difx.addRelayCallback( responder.difxRelayCallback ) difx.relayPackets() # Find the job name responder.jobName = inputFile[inputFile.rfind( "/" ) + 1:inputFile.rfind( "." )] if realtime: difx.startMonitor() monScans = difx.getMonitorProducts() if monScans != None: print "The following products are available for real time monitoring:" for scan in monScans: print "", scan[0] for baseline in scan[7]: if len( baseline[3] ) > 0: print "\t", baseline[0] + " -", baseline[1], "frequencies" for frequency in baseline[3]: print frequency[0], "\t", frequency[1] if len( baseline[4] ) > 0: print "\tAutocorrelations (frequency, telescope)" for ac in baseline[4]: print ac[0], "\t", ac[2], ac[1] l = raw_input( "select products: " ) # Assuming the user typed something... if len( l.strip() ) > 0: # split by commas productList = [] for product in l.split( "," ): productList.append( int( product.strip() ) ) if len( productList ) > 0: difx.requestProducts( productList ) difx.runWithMonitor( True ) difx.monitorDataCallbacks( None, None, None, None, None, meanLagCB ) # Make the wait time huge to avoid a timeout while the job is starting if waitTime != None: difx.waitTime( waitTime ) else: difx.waitTime( 300.0 ) newJob = difx.newJob() if configOnly: newJob.configOnly( True ) newJob.start() if realtime: difx.stopMonitor() time.sleep( 4 ) difx.close()