#!/usr/bin/env python ################################################################################ #\defgroup difxbusy DiFXBusy # #\brief Monitors messages that help indicate how "busy" a correlator is. # # Usage: DiFXBusy [options] # # DiFXBusy can be used to monitor jobs that a DiFX correlator is # working on. It provides a continuously-updating output showing which # jobs are running, how far they have to go, and what processors and/or # Mark5 units are being utilized. When jobs complete they "linger" in # the display for a period, but are then removed to avoid clutter. # #
![DiFXBusy (text-based) showing two running jobs \"try1_01\" and \"try1_02\".](DiFXBusy.png)
# # Three # different aspects of correlator operations can be monitored (by default they # all are, but you can shut things off): # #
JOBS
# # List the job(s) the correlator is working on currently, along with their # completion percentage and current "state". Shortly after a job is "complete" it is removed unless # the user requests a "linger" option. Jobs that are doing nothing for a period # of time are also removed. # #
PROCESSORS and MARK5s
# # Processors and Mark5s are listed as long as they are working on a job # currently, along with their state, busy percentage, and any indication we # have of what they are up to (usually listing the job they are working on). # #

GUI Option

# # DiFXBusy has two display options - a text-based display and a limited GUI based # on the pyFltk package. The GUI version is a little more experimental and is # currently limited to monitoring the progress of running jobs. The text version # has more information, arguably too much. # #

Command Line Arguments

# # DiFXBusy has a number of command line arguments for tailoring its # behavior and output. The "-h" argument can be used to get an on-line list. # # #
-h, --help
Print help information and quit. #
-H, --hostname NAME
Use NAME as the host of the difxServer program. # Default is to use the DIFX_CONTROL_HOST environment variable. #
-i, --identifier NAME
Parse only messages with the named identifier. This is (often) # the DiFX job name, so this option can be used to monitor a # job or jobs exclusively. The named identifier is a regular # expression, so multiple jobs can be matched. #
-j, --jobs
Display jobs - by turning this on other items (processors # and Mark5's) will be turned off unless requested # specifically - by default everything is displayed). #
-k, --keep
Keep jobs on the list that are incomplete, even if they have # been idled or they exceed the linger limit. This can be used # to see what happened when jobs fail or stall, but can clutter # the display. By default we don't do this. #
-l, --linger [NUM]
Permanently display jobs - by default they are removed # from the display shortly after they finish or if they don't # do anything for a while. When included, a maximum of NUM # jobs will be displayed at any one time - oldest jobs are # deleted. In GUI mode a NUM of 20 is the default. #
-m, --mark5s
Display Mark5s - by turning this on other items (jobs # and processors) will be turned off unless requested # specifically - by default everything is displayed). #
-p, --processors
Display processors - by turning this on other items (jobs # and Mark5's) will be turned off unless requested # specifically - by default everything is displayed). #
-p, --port PORT
Use PORT as the TCP port to communicated with the difxServer. # Default is to use DIFX_CONTROL_PORT environment variable. #
-t, --text
Produce only text-based output. The default behavior is to # try to launch a small GUI (text will be used on failure). # ################################################################################ program = 'DiFXBusy' version = '0.1' author = 'John Spitzak' verdate = '20150602' import sys import os import time import DiFXControl try: import fltk import FleWidgets useGUI = True except: useGUI = False import threading import xml.parsers.expat import re window = None titleList = [] stateList = [] progressList = [] difx = None box = None lock = None showJobs = True showProcessors = True showMark5s = True userShowJobs = False userShowProcessors = False userShowMark5s = False identifierRE = None # These are the number of seconds jobs will "linger" on the display after they # finish or fail to do anything for a while. doneLinger = 20 idleLinger = 100 # Set this to true and no jobs will ever be removed from the display. linger = False # Well, okay, if this value is set only this many jobs will be displayed in # "linger" mode. lingerLimit = None if useGUI: lingerLimit = 20 absoluteLimit = 100 keepFailures = False #=============================================================================== # Class to monitor callbacks from the Client class and act on them. #=============================================================================== class Responder: def __init__( self ): self.keepGoing = True self.jobs = {} self.jobsNameList = [] self.processors = {} self.mark5s = {} self.maxProcCPU = 0.0 self.maxMarkCPU = 0.0 self.maxProcTX = 0.0 self.maxMarkTX = 0.0 self.maxProcRX = 0.0 self.maxMarkRX = 0.0 self.oldNJobs = 0 self.displayChange = True if not linger or lingerLimit != None: self.lingerMonitor = self.LingerMonitor( self ) self.lingerMonitor.start() #--------------------------------------------------------------------------- # Callback triggered when the monitoring client receives a new DiFX message. #--------------------------------------------------------------------------- def messageCallback( self, data ): # Parse the message. try: xmlDat = DiFXControl.parseXML( data ) # See if this is a message type we are interested in. if xmlDat.typeStr == "DifxStatusMessage": # Make sure the identifier (job name) is one we are interested in. if identifierRE == None or identifierRE.match( xmlDat.identifier ) != None: # Compute a progress value. The data for this might not be there, so we # have some default situations. lock.acquire() try: newProgress = 100.0 * ( float( xmlDat.visibilityMJD ) - float( xmlDat.jobstartMJD ) ) / ( float( xmlDat.jobstopMJD ) - float( xmlDat.jobstartMJD ) ) except: if xmlDat.state == "Done" or xmlDat.state == "MpiDone": newProgress = 100 elif xmlDat.state == "Ending": # Ending state is annoying - try to use the job's known # progress (if it is known!) try: job = self.jobs[ xmlDat.identifier ] newProgress = job[2] except: newProgress = 0 else: newProgress = 0 # Try to locate this job amongst the "active" jobs list. try: job = self.jobs[ xmlDat.identifier ] title = xmlDat.identifier state = xmlDat.state progress = int( newProgress ) except: # If its not there, create a new job. title = xmlDat.identifier state = xmlDat.state progress = int( newProgress ) self.jobsNameList.append( xmlDat.identifier ) self.jobs[ xmlDat.identifier ] = ( title, state, progress, 0 ) lock.release() self.updateDisplay() elif xmlDat.typeStr == "DifxAlertMessage" or xmlDat.typeStr == "DifxDiagnosticMessage": # Alerts and diagnostics have very little information about a job, however # they DO indicate that a job exists, so create a new listing for the job if we # don't have it. thisJob = None doUpdate = False # Eliminate "difxlog" as it keeps popping up. if xmlDat.identifier.strip() != "difxlog" and ( identifierRE == None or identifierRE.match( xmlDat.identifier ) != None ): lock.acquire() try: thisJob = self.jobs[ xmlDat.identifier ] except: title = xmlDat.identifier state = "Initializing" progress = 0 self.jobs[ xmlDat.identifier ] = ( title, state, progress, 0 ) self.jobsNameList.append( xmlDat.identifier ) doUpdate = True # The message also has a "source" which we should (try to) locate in the list of # mark5s and processors. Mark5s first... try: mark5 = self.mark5s[xmlDat.fromNode] jobsList = mark5[5] if jobsList.count( xmlDat.identifier ) == 0: jobsList.append( xmlDat.identifier ) self.mark5s[xmlDat.fromNode] = ( mark5[0], mark5[1], mark5[2], mark5[3], mark5[4], jobsList, mark5[6] ) doUpdate = True except: # Not a mark5, try locating it in the processors. try: proc = self.processors[xmlDat.fromNode] jobsList = proc[3] if jobsList.count( xmlDat.identifier ) == 0: jobsList.append( xmlDat.identifier ) self.processors[xmlDat.fromNode] = ( proc[0], proc[1], proc[2], jobsList ) doUpdate = True except: # Not found in either - guess that it is a processor. It will be moved # later if not. jobsList = [] jobsList.append( xmlDat.identifier ) self.processors[xmlDat.fromNode] = ( None, None, None, jobsList ) doUpdate = True lock.release() if doUpdate: self.updateDisplay() elif xmlDat.typeStr == "DifxLoadMessage": # Load messages can come from processors or mark5s, so we don't know # what this is exactly. See if we've already identified it as a mark5. lock.acquire() try: proc = self.mark5s[xmlDat.fromNode] # Okay, it is a mark5. Just update the data associated with it. # Data saved include cpu, rx, tx, state, active bank, and list of jobs. if float( xmlDat.cpuLoad ) > self.maxMarkCPU: self.maxMarkCPU = float( xmlDat.cpuLoad ) if float( xmlDat.netRXRate ) > self.maxMarkRX: self.maxMarkRX = float( xmlDat.netRXRate ) if float( xmlDat.netTXRate ) > self.maxMarkTX: self.maxMarkTX = float( xmlDat.netTXRate ) self.mark5s[xmlDat.fromNode] = ( float( xmlDat.cpuLoad ), float( xmlDat.netRXRate ), float( xmlDat.netTXRate ), proc[3], proc[4], proc[5], proc[6] ) except: # A regular processor. See if we have it listed. try: proc = self.processors[xmlDat.fromNode] # Replace the new data. Data for processors include the cpu, # tx, rx, and list of jobs. if float( xmlDat.cpuLoad ) > self.maxProcCPU: self.maxProcCPU = float( xmlDat.cpuLoad ) if float( xmlDat.netRXRate ) > self.maxProcRX: self.maxProcRX = float( xmlDat.netRXRate ) if float( xmlDat.netTXRate ) > self.maxProcTX: self.maxProcTX = float( xmlDat.netTXRate ) self.processors[xmlDat.fromNode] = ( float( xmlDat.cpuLoad ), float( xmlDat.netRXRate ), float( xmlDat.netTXRate ), proc[3] ) except: # Not found, create a new entry. Empty list of jobs. if float( xmlDat.cpuLoad ) > self.maxProcCPU: self.maxProcCPU = float( xmlDat.cpuLoad ) if float( xmlDat.netRXRate ) > self.maxProcRX: self.maxProcRX = float( xmlDat.netRXRate ) if float( xmlDat.netTXRate ) > self.maxProcTX: self.maxProcTX = float( xmlDat.netTXRate ) self.processors[xmlDat.fromNode] = ( float( xmlDat.cpuLoad ), float( xmlDat.netRXRate ), float( xmlDat.netTXRate ), [] ) self.displayChange = True lock.release() elif xmlDat.typeStr == "Mark5StatusMessage": # Mark5 status messages are only interesting if they contain information # about a module - i.e. one or the other of the bays has a module # in it. if xmlDat.bankAVSN.strip() != "none" or xmlDat.bankBVSN.strip() != "none": lock.acquire() activeBank = "unknown" if xmlDat.activeBank == "A": activeBank = xmlDat.bankAVSN elif xmlDat.activeBank == "B": activeBank = xmlDat.bankBVSN # Save some additional data for mk5 operations. mk5Data = ( xmlDat.state, xmlDat.scanNumber, xmlDat.scanName ) # Locate this node in the mark5 list, if it is there. try: mark5 = self.mark5s[xmlDat.fromNode] # Replace the data for the mark5 if found. self.mark5s[xmlDat.fromNode] = ( mark5[0], mark5[1], mark5[2], xmlDat.state, activeBank, mark5[5], mk5Data ) except: # If it is not there, see if we think it is a processor. try: proc = self.processors[xmlDat.fromNode] # If a processor, we need to add it to the mark5 list and remove it from the # processor list. self.mark5s[xmlDat.fromNode] = ( proc[0], proc[1], proc[2], xmlDat.state, activeBank, proc[3], mk5Data ) del self.processors[xmlDat.fromNode] except: # New - create a new entry in our list. self.mark5s[xmlDat.fromNode] = ( None, None, None, xmlDat.state, activeBank, [], mk5Data ) self.displayChange = True lock.release() except xml.parsers.expat.ExpatError as e: # Expat errors are caused by mangled XML, which probably is the result # of a broken connection. #print "ExpatError line", e.lineno, "column", e.offset #print ">>>>>>", data, "<<<<<<<" self.keepGoing = False #--------------------------------------------------------------------------- # Update the existing display (with new data presumably). #--------------------------------------------------------------------------- def updateDisplay( self ): lock.acquire() if useGUI: if showJobs: nJobs = 0 for job in sorted( self.jobs.keys() ): newStr = str( job ) if newStr != "difxlog" and newStr != "mk5cp" and newStr != "mk5daemon" and newStr != "mk5dir": nJobs += 1 # If the number of jobs has changed, resize the window to display only # active jobs...the remainder are hidden outside the window. if nJobs != self.oldNJobs: window.resize( window.x(), window.y(), window.w(), 25 * nJobs ) i = 0 for job in sorted( self.jobs.keys() ): newStr = str( job ) if newStr != "difxlog" and newStr != "mk5cp" and newStr != "mk5daemon" and newStr != "mk5dir": titleList[i].value( job.encode() ) stateList[i].value( self.jobs[job][1].encode() ) progressList[i].value( float( self.jobs[job][2] ) ) progressList[i].redraw() i = i + 1 self.oldNJobs = nJobs #print "redraw", nJobs window.redraw() else: # This little mess clears the screen (cross-platform I think) and starts # subsequent printing at the top. os.system( 'cls' if os.name == 'nt' else 'clear' ) if showJobs: if len( self.jobs.keys() ) > 0: maxCol1 = 12 maxCol2 = len( "Initializing" ) # Find some ideal column sizes for job in self.jobs.keys(): if len( job ) > maxCol1: maxCol1 = len( job ) if len( self.jobs[job][1] ) > maxCol2: maxCol2 = len( self.jobs[job][1] ) for job in sorted( self.jobs.keys() ): newStr = str( job ) # Done show "difxlog", "mk5cp", or "mk5daemon". if newStr != "difxlog" and newStr != "mk5cp" and newStr != "mk5daemon" and newStr != "mk5dir": while len( newStr ) < maxCol1 + 3: newStr += " " shortStr = self.jobs[job][1] while len( shortStr ) < maxCol2 + 3: shortStr += " " newStr += shortStr prog = self.jobs[job][2] if prog < 100: newStr += " " if prog < 10: newStr += " " newStr += str( prog ) + "% [" shortStr = "" if prog > 100: prog = 100 if prog < 0: prog = 0 while prog > 0: shortStr += "X" prog -= 2 while len( shortStr ) < 50: shortStr += " " newStr += shortStr + "]" print newStr # Find out column widths for processors and mark5s, even if we aren't showing # both. if showProcessors or showMark5s: maxCol1 = 12 maxJob = 10 if len( self.processors.keys() ) > 0: for key in self.processors.keys(): node = self.processors[key] if len( key ) > maxCol1: maxCol1 = len( key ) if node[3] != None: for job in node[3]: if len( job ) > maxJob: maxJob = len( job ) if len( self.mark5s.keys() ) > 0: for key in self.mark5s.keys(): node = self.mark5s[key] if len( key + "()" + node[4] ) > maxCol1: maxCol1 = len( key + "()" + node[4] ) if node[5] != None: for job in node[5]: if len( job ) > maxJob: maxJob = len( job ) printHeader = True if showProcessors: if len( self.processors.keys() ) > 0: printExtra = True for key in sorted( self.processors.keys() ): node = self.processors[key] if node[3] != None and len( node[3] ) > 0: if printExtra: print "" printExtra = False if printHeader: print " " * ( maxCol1 + 3 ) + "CPU RX TX" printHeader = False newStr = key while len( newStr ) < maxCol1 + 3: newStr += " " # CPU load... shortStr = "" if node[0] != None and self.maxProcCPU > 0.0: num = int( 10.0 * node[0] / self.maxProcCPU + .5 ) if num > 10: num = 10 shortStr = num * "X" while len( shortStr ) < 11: shortStr += " " newStr += shortStr # RX rate shortStr = "" if node[0] != None and self.maxProcRX > 0.0: num = int( 10.0 * node[1] / self.maxProcRX + .5 ) if num > 10: num = 10 shortStr = num * "X" while len( shortStr ) < 11: shortStr += " " newStr += shortStr # TX rate shortStr = "" if node[0] != None and self.maxProcTX > 0.0: num = int( 10.0 * node[2] / self.maxProcTX + .5 ) shortStr = num * "X" while len( shortStr ) < 11: shortStr += " " newStr += shortStr # List of jobs for job in sorted( node[3] ): shortStr = job while len( shortStr ) < maxJob + 2: shortStr += " " newStr += shortStr print newStr if showMark5s: if len( self.mark5s.keys() ) > 0: printExtra = True for key in sorted( self.mark5s.keys() ): node = self.mark5s[key] if node[5] != None and len( node[5] ) > 0: if printExtra: print "" printExtra = False if printHeader: print " " * ( maxCol1 + 3 ) + "CPU RX TX" printHeader = False newStr = ( key + "(" + node[4] + ")" ) while len( newStr ) < maxCol1 + 3: newStr += " " # CPU load... shortStr = "" if node[0] != None and self.maxProcCPU > 0.0: num = int( 10.0 * node[0] / self.maxProcCPU + .5 ) shortStr = num * "X" while len( shortStr ) < 11: shortStr += " " newStr += shortStr # RX rate shortStr = "" if node[0] != None and self.maxProcRX > 0.0: num = int( 10.0 * node[1] / self.maxProcRX + .5 ) shortStr = num * "X" while len( shortStr ) < 11: shortStr += " " newStr += shortStr # TX rate shortStr = "" if node[0] != None and self.maxProcTX > 0.0: num = int( 10.0 * node[2] / self.maxProcTX + .5 ) shortStr = num * "X" while len( shortStr ) < 11: shortStr += " " newStr += shortStr # Show the state and data appropriate to it. if node[6] != None: # State if node[6][0] != None: shortStr = node[6][0] while len( shortStr ) < 12: shortStr += " " if node[6][0] == "Copy" and node[6][2] != None: shortStr += node[6][2] newStr += shortStr print newStr lock.release() #--------------------------------------------------------------------------- # Callback triggered when the monitoring client fails (it returns an # identifier so we know what failed). #--------------------------------------------------------------------------- def failCallback( self, failureID ): if self.keepGoing: if failureID == DiFXControl.Client.BROKEN_SOCKET: print "broken socket" elif failureID == DiFXControl.Client.FAILED_CONNECTION: print "connection failed" self.keepGoing = False if window != None: fltk.Fl.remove_fd( difx.sock.fileno() ) window.hide() #--------------------------------------------------------------------------- # Threaded class used to monitor the list of jobs, removing those that are # idle and/or complete after a period of time. #--------------------------------------------------------------------------- class LingerMonitor( threading.Thread ): def __init__( self, responder ): threading.Thread.__init__( self ) self.responder = responder def run( self ): while self.responder.keepGoing: changed = False time.sleep( 1 ) lock.acquire() # This stuff is done if we are in "linger" mode - it limits the number # of jobs that stay. if linger: if lingerLimit != None: noneLeft = False while len( self.responder.jobsNameList ) > lingerLimit and not noneLeft: job = None if keepFailures: # Plod through the list of jobs until we find the oldest one that is # 100% done. found = False i = 0 while not found and i < len( self.responder.jobsNameList ): if self.responder.jobs[self.responder.jobsNameList[i]][2] == 100: found = True else: i = i + 1 if found: job = self.responder.jobsNameList[i] else: job = self.responder.jobsNameList[0] if job == None: noneLeft = True else: del self.responder.jobs[job] self.responder.jobsNameList.remove( job ) for key in self.responder.processors.keys(): node = self.responder.processors[key] try: self.responder.processors[key] = ( node[0], node[1], node[2], node[3].remove( job ) ) except: pass for key in self.responder.mark5s.keys(): node = self.responder.mark5s[key] try: self.responder.marks[key] = ( node[0], node[1], node[2], node[3], node[4], node[5].remove( job ) ) except: pass changed = True # This stuff is done if we are not in "linger" mode - i.e we are deleting jobs after a delay. else: for job in self.responder.jobs.keys(): self.responder.jobs[job] = ( self.responder.jobs[job][0], self.responder.jobs[job][1], self.responder.jobs[job][2 ], self.responder.jobs[job][3] + 1 ) deleteJob = False if ( self.responder.jobs[job][1] == "Done" or self.responder.jobs[job][1] == "MpiDone" ) and self.responder.jobs[job][3] >= doneLinger: deleteJob = True changed = True elif self.responder.jobs[job][3] >= idleLinger and not keepFailures: deleteJob = True changed = True # If we've deleted this job, remove it from the node information as well. if deleteJob: del self.responder.jobs[job] self.responder.jobsNameList.remove( job ) for key in self.responder.processors.keys(): node = self.responder.processors[key] try: self.responder.processors[key] = ( node[0], node[1], node[2], node[3].remove( job ) ) except: pass for key in self.responder.mark5s.keys(): node = self.responder.mark5s[key] try: self.responder.marks[key] = ( node[0], node[1], node[2], node[3], node[4], node[5].remove( job ) ) except: pass # This is slightly more brutal, but necessary - if the number of jobs displayed # approaches the "absolute limit" we wipe out the oldest. This will keep the disply # from crashing. while len( self.responder.jobsNameList ) > absoluteLimit: job = self.responder.jobsNameList[0] del self.responder.jobs[job] self.responder.jobsNameList.remove( job ) for key in self.responder.processors.keys(): node = self.responder.processors[key] try: self.responder.processors[key] = ( node[0], node[1], node[2], node[3].remove( job ) ) except: pass for key in self.responder.mark5s.keys(): node = self.responder.mark5s[key] try: self.responder.marks[key] = ( node[0], node[1], node[2], node[3], node[4], node[5].remove( job ) ) except: pass changed = True lock.release() if changed or self.responder.displayChange: self.responder.updateDisplay() self.responder.displayChange = False #=============================================================================== # Callback function for a "select" hit on the socket connecting to difxServer # (only used with the pyFltk windowing system). All this does is call # "monitor" functions to read packet data - existing callbacks for those will # process the data (and handle errors in reading). #=============================================================================== def fdCallback( data ): if difx != None and difx.monitorThread != None and responder != None and responder.keepGoing: if difx.monitorThread.status == difx.monitorThread.ok: difx.monitorThread.newPacket() #=============================================================================== # MAIN #=============================================================================== host = None port = None try: i = 1 while i < len( sys.argv ): # Check against legal argument types. Anything we don't recognize is assumed # to be part of a list of message types. if sys.argv[i] in [ "-h", "--help" ]: print '\n%s ver %s %s %s' % (program, version, author, verdate) print "A program for monitoring active jobs on a DiFX correlator." print "Usage: %s [options]" % ( sys.argv[0] ) print "" print "Options can include:" print "" print " --help" print " -h Print this help information and quit." print "" print " --hostname NAME" print " -H NAME Use NAME as the host of the difxServer program." print " Default is to use DIFX_CONTROL_HOST environment variable." print "" print " --identifier NAME" print " -i Parse only messages with the named identifier. This is (often)" print " the DiFX job name, so this option can be used to monitor a" print " job or jobs exclusively. The named identifier is a regular" print " expression, so multiple jobs can be matched." print "" print " --jobs" print " -j Display jobs - by turning this on other items (processors" print " and Mark5's) will be turned off unless requested" print " specifically - by default everything is displayed)." print "" print " --keep" print " -k Keep jobs on the list that are incomplete, even if they have" print " been idled or they exceed the linger limit. This can be used" print " to see what happened when jobs fail or stall, but can clutter" print " the display. By default we don't do this." print "" print " --linger [NUM]" print " -l [NUM] Permanently display jobs - by default they are removed" print " from the display shortly after they finish or if they don't" print " do anything for a while. When included, a maximum of NUM" print " jobs will be displayed at any one time - oldest jobs are" print " deleted. In GUI mode a NUM of 20 is the default." print "" print " --mark5s" print " -m Display Mark5s - by turning this on other items (jobs" print " and processors) will be turned off unless requested" print " specifically - by default everything is displayed)." print "" print " --processors" print " -p Display processors - by turning this on other items (jobs" print " and Mark5's) will be turned off unless requested" print " specifically - by default everything is displayed)." print "" print " --port PORT" print " -P PORT Use PORT as the TCP port to communicated with the difxServer." print " Default is to use DIFX_CONTROL_PORT environment variable." print "" print " --text" print " -t Produce only text-based output. The default behavior is to" print " try to launch a small GUI (text will be used on failure)." print "" exit( 0 ) elif sys.argv[i] in [ "-H", "--hostname" ]: host = sys.argv[i+1] i = i + 1 elif sys.argv[i] in [ "-i", "--identifier" ]: identifierRE = re.compile( sys.argv[i+1] ) i = i + 1 elif sys.argv[i] in [ "-P", "--port" ]: port = int( sys.argv[i+1] ) i = i + 1 elif sys.argv[i] in [ "-t", "--text" ]: useGUI = False elif sys.argv[i] in [ "-k", "--keep" ]: keepFailures = True elif sys.argv[i] in [ "-l", "--linger" ]: linger = True # See if a limit on the number of jobs to be displayed was included. if i + 1 < len( sys.argv ): if sys.argv[i+1][0] != "-": lingerLimit = int( sys.argv[i+1] ) i = i + 1 elif sys.argv[i] in [ "-j", "--jobs" ]: showJobs = True showProcessors = False showMark5s = False userShowJobs = True elif sys.argv[i] in [ "-p", "--processors" ]: showJobs = False showProcessors = True showMark5s = False userShowProcessors = True elif sys.argv[i] in [ "-m", "--mark5s" ]: showJobs = False showProcessors = False showMark5s = True userShowMark5s = True else: raise RuntimeError i = i + 1 except RuntimeError: print "Usage: %s [options] []" % ( sys.argv[0] ) exit( 0 ) if userShowJobs: showJobs = True if userShowProcessors: showProcessors = True if userShowMark5s: showMark5s = True # Open a new connection to the difxServer... difx = DiFXControl.Client() difx.connect( host = host, port = port ) if difx.socketOK: lock = threading.Lock() responder = Responder() if useGUI: difx.passiveMonitor() else: difx.monitor() difx.relayPackets() # Collect different messages depending on what the user is interested in. if showProcessors or showMark5s: selectionList = ( "DifxStatusMessage", "DifxDiagnosticMessage", "DifxAlertMessage", "DifxLoadMessage", "Mark5StatusMessage" ) else: selectionList = ( "DifxStatusMessage", ) difx.messageSelection( selectionList ) difx.addRelayCallback( responder.messageCallback ) difx.addFailCallback( responder.failCallback ) # Run the GUI to display busy results, if requested. if useGUI: window = fltk.Fl_Window( 500, 2, "DiFX Busy Monitor" ) dummy = fltk.Fl_Output( 1000, 1000, 2, 2 ) # to swallow the "caret" that appears in first output box for i in range( absoluteLimit + 2 ): titleBox = fltk.Fl_Output( 10, i * 25 + 2, 100, 20 ) titleBox.box( fltk.FL_FLAT_BOX ) titleBox.color( fltk.FL_LIGHT2 ) titleList.append( titleBox ) stateBox = fltk.Fl_Output( 115, i * 25 + 2, 100, 20 ) stateBox.box( fltk.FL_FLAT_BOX ) stateBox.color( fltk.FL_LIGHT2 ) stateList.append( stateBox ) progressBox = FleWidgets.Progress( 220, i * 25 + 2, 270, 20 ) progressBox.box( fltk.FL_THIN_DOWN_FRAME ) progressBox.minimum( 0.0 ) progressBox.maximum( 100.0 ) progressBox.color( fltk.FL_LIGHT2 ) progressBox.selection_color( fltk.FL_GREEN ) progressList.append( progressBox ) window.end() window.show() fltk.Fl.add_fd( difx.sock.fileno(), fdCallback ) fltk.Fl.run() # Text-based display only - we need an endless loop. else: while responder.keepGoing: try: time.sleep( .1 ) except KeyboardInterrupt: responder.keepGoing = False responder.keepGoing = False time.sleep( .1 ) difx.close()