#!/usr/bin/env python ################################################################################ #\defgroup difxjobcontrol DiFXJobControl # #\brief Run a job on the DiFX server with limited controls. # # Usage: DiFXJobControl [options] [.input path] # # DiFXJobControl attempts to run a single job on the DiFX server. The job is # identified by the complete path to its associated .input file on # the server - this is the only required argument. # # DiFXJobControl allows you to specify the head node, data source nodes, # processors, and threads used to run the job (if you specify one, you must specify # them all!). Left to its own devices it will attempt to use .machines # and .threads files, if they exist. No check will be made that the # content of these files is valid. # # This program is meant to show how to use the DiFXJobControl.Client class to # run a job. However it is by no means a complete test of that class or its # capabilities. # #

Command Line Arguments

# # #
-c, --config_only
Run only the configuration test, not the actual job. #
-d, --datasource NODE
Add NODE to the list of data sources used to process the job. # If data sources are included head node, processors and threads must # be specified as well. This argument can be used multiple times. #
-D, --difx VERSION
Run using a specific DiFX version. If not specified # the value of the DIFX_VERSION environment variable will # be used. Failing that, "DIFX-DEVEL" will be used. #
-h, --help
Print help information and quit. #
-H, --hostname NAME
Use NAME as the host of the DiFX Server program. # Default is to use DIFX_CONTROL_HOST environment variable. #
-m, --manager NODE
Make NODE the head node used for the job. # If the head node is specified processors, data sources and threads must # be specified as well. #
-p, --processor NODE
Add NODE to the list of processors used for the job. # If processors are included head node, data sources and threads must # be specified as well. The total number of thread specifications # must match the number of processor specifications. This argument can be used multiple times. #
-P, --port PORT
Use PORT as the TCP port to communicated with the DiFX Server. # Default is to use DIFX_CONTROL_PORT environment variable. #
-r, --realtime
Turn on the real time monitor - this will produce a list of real time # "products" from which the user will be prompted to select one or more. #
-t, --threads NUM
Use NUM threads for the corresponding processor. # If threads are included head node, data sources and processors must # be specified as well. The total number of thread specifications # must match the number of processor specifications. This argument can be used multiple times. #
-w, --waitTime NUM
Wait NUM seconds before timing out of an operation due to inactivity. # This applies only to the start() method call. #
" % ( sys.argv[0] )) print("") print("Options can include:") print("") print(" --config_only") print(" -c Run only the configuration test, not the job itself.") print("") print(" --datasource NODE") print(" -d NODE Add a data source node to the list of such sources.") print("") print(" --difx VERSION") print(" -D VERSION Run using a specific DiFX version. If not specified") print(" the value of the DIFX_VERSION environment variable will") print(" be used. Failing that, \"DIFX-DEVEL\" will be used.") print("") print(" --help") print(" -h Print this help information and quit.") print("") print(" --hostname NAME") print(" -H NAME Use NAME as the host of the difxServer program.") print(" Default is to use DIFX_CONTROL_HOST environment variable.") print("") print(" --manager NODE") print(" -m NODE Use NODE as the \"head node\" (or \"manager\" node)") print("") print(" --processor NODE") print(" -p NODE Add a processor node to the list of processors. There") print(" must be a corresponding threads entry.") print("") print(" --port PORT") print(" -P PORT Use PORT as the TCP port to communicated with the difxServer.") print(" Default is to use DIFX_CONTROL_PORT environment variable.") print("") print(" --realtime") print(" -r Turn on the real time monitor - this will produce a list of") print(" real time \"products\" from which the user will be prompted") print(" to select one or more.") print("") print(" --threads NUM") print(" -t NUM Assign NUM threads to the corresponding processor entry.") print("") print(" --waitTime NUM") print(" -w NUM Wait NUM seconds before timing out of an operation due to inactivity.") print(" This applies only to the \"start()\" method call.") print("") exit( 0 ) elif sys.argv[i] in [ "-c", "--config_only" ]: configOnly = True i = i + 1 elif sys.argv[i] in [ "-d", "--datasource" ]: dataSources.append( sys.argv[i+1] ) i = i + 1 elif sys.argv[i] in [ "-p", "--processor" ]: processors.append( sys.argv[i+1] ) i = i + 1 elif sys.argv[i] in [ "-t", "--threads" ]: threads.append( int( sys.argv[i+1] ) ) i = i + 1 elif sys.argv[i] in [ "-H", "--hostname" ]: host = sys.argv[i+1] i = i + 1 elif sys.argv[i] in [ "-m", "--manager" ]: headNode = sys.argv[i+1] i = i + 1 elif sys.argv[i] in [ "-D", "--difx" ]: DiFXVersion = sys.argv[i+1] i = i + 1 elif sys.argv[i] in [ "-P", "--port" ]: port = int( sys.argv[i+1] ) i = i + 1 elif sys.argv[i] in [ "-r", "--realtime" ]: realtime = True i = i + 1 elif sys.argv[i] in [ "-w", "--waitTime" ]: waitTime = int( sys.argv[i+1] ) i = i + 1 else: # The final argument is assumed to be the .input file name inputFile = sys.argv[i] i = i + 1 except RuntimeError: print("Usage: %s [options] <.input path>" % ( sys.argv[0] )) exit( 0 ) # Check a few things... if inputFile == None: print("ERROR: .input file required to identify the job") bail = True if len( dataSources ) > 0 or headNode != None or len( processors ) > 0: problem = False if len( processors ) == 0: print("ERROR: No processors included!") problem = True if len( dataSources ) == 0: print("ERROR: No data sources included!") problem = True if headNode == None: print("ERROR: Head node not defined!") problem = True if problem: print("ERROR: ALL datasources, processors and the head node must be specified if any of them are.") bail = True if len( processors ) != len( threads ): print("ERROR: There must be one thread specification for each processor specification.") bail = True if bail: print("Usage: %s [options] <.input path>" % ( sys.argv[0] )) exit( 0 ) difx = DiFXJobControl.Client() difx.connect() difx.monitor() # Check the existence of the .input file. if difx.ls( inputFile ) == None: print("ERROR: .input file not found") difx.close() exit( 0 ) difx.version( DiFXVersion ) difx.inputFile( inputFile ) # If machine specifications were included on the command line, use them. if headNode != None: difx.clearDataSources() difx.clearProcessors() difx.setHeadNode( headNode ) for node in dataSources: difx.addDataSource( node ) i = 0; for node in processors: difx.addProcessor( node, threads[i] ) i += 1 # Not sure if we should always do this! difx.testProcessors( True ) # But this is certainly necessary difx.defineMachines() elif configOnly: pass else: # Make sure threads and machines files exist. if difx.ls( inputFile.replace( ".input", ".machines" ) ) == None: print("ERROR: .machines file \"" + inputFile.replace( ".input", ".machines" ) + "\" does not exist") bail = True if difx.ls( inputFile.replace( ".input", ".threads" ) ) == None: print("ERROR: .threads file \"" + inputFile.replace( ".input", ".threads" ) + "\" does not exist") bail = True if bail: difx.close() exit( 0 ) # Get the existing machines specifications. difx.getMachines() difx.messageCallback( messageCallback ) difx.warningCallback( warningCallback ) difx.errorCallback( errorCallback ) difx.messageSelection( ( "DifxStatusMessage", ) ) responder = Responder() difx.addRelayCallback( responder.difxRelayCallback ) difx.relayPackets() # Find the job name responder.jobName = inputFile[inputFile.rfind( "/" ) + 1:inputFile.rfind( "." )] if realtime: difx.startMonitor() monScans = difx.getMonitorProducts() if monScans != None: print("The following products are available for real time monitoring:") for scan in monScans: print("", scan[0]) for baseline in scan[7]: if len( baseline[3] ) > 0: print("\t", baseline[0] + " -", baseline[1], "frequencies") for frequency in baseline[3]: print(frequency[0], "\t", frequency[1]) if len( baseline[4] ) > 0: print("\tAutocorrelations (frequency, telescope)") for ac in baseline[4]: print(ac[0], "\t", ac[2], ac[1]) l = input( "select products: " ) # Assuming the user typed something... if len( l.strip() ) > 0: # split by commas productList = [] for product in l.split( "," ): productList.append( int( product.strip() ) ) if len( productList ) > 0: difx.requestProducts( productList ) difx.runWithMonitor( True ) difx.monitorDataCallbacks( None, None, None, None, None, meanLagCB ) # Make the wait time huge to avoid a timeout while the job is starting if waitTime != None: difx.waitTime( waitTime ) else: difx.waitTime( 300.0 ) newJob = difx.newJob() if configOnly: newJob.configOnly( True ) newJob.start() if realtime: difx.stopMonitor() time.sleep( 4 ) difx.close()