#!/usr/bin/env python ################################################################################ #\defgroup difxbusy DiFXBusy # #\brief Monitors messages that help indicate how "busy" a correlator is. # # Usage: DiFXBusy [options] # # DiFXBusy can be used to monitor jobs that a DiFX correlator is # working on. It provides a continuously-updating output showing which # jobs are running, how far they have to go, and what processors and/or # Mark5 units are being utilized. When jobs complete they "linger" in # the display for a period, but are then removed to avoid clutter. # #
![DiFXBusy (text-based) showing two running jobs \"try1_01\" and \"try1_02\".](DiFXBusy.png)
# # Three # different aspects of correlator operations can be monitored (by default they # all are, but you can shut things off): # #
JOBS
# # List the job(s) the correlator is working on currently, along with their # completion percentage and current "state". Shortly after a job is "complete" it is removed unless # the user requests a "linger" option. Jobs that are doing nothing for a period # of time are also removed. # #
PROCESSORS and MARK5s
# # Processors and Mark5s are listed as long as they are working on a job # currently, along with their state, busy percentage, and any indication we # have of what they are up to (usually listing the job they are working on). # #

GUI Option

# # DiFXBusy has two display options - a text-based display and a limited GUI based # on the pyFltk package. The GUI version is a little more experimental and is # currently limited to monitoring the progress of running jobs. The text version # has more information, arguably too much. # #

Command Line Arguments

# # DiFXBusy has a number of command line arguments for tailoring its # behavior and output. The "-h" argument can be used to get an on-line list. # # #
-h, --help
Print help information and quit. #
-H, --hostname NAME
Use NAME as the host of the difxServer program. # Default is to use the DIFX_CONTROL_HOST environment variable. #
-i, --identifier NAME
Parse only messages with the named identifier. This is (often) # the DiFX job name, so this option can be used to monitor a # job or jobs exclusively. The named identifier is a regular # expression, so multiple jobs can be matched. #
-j, --jobs
Display jobs - by turning this on other items (processors # and Mark5's) will be turned off unless requested # specifically - by default everything is displayed). #
-k, --keep
Keep jobs on the list that are incomplete, even if they have # been idled or they exceed the linger limit. This can be used # to see what happened when jobs fail or stall, but can clutter # the display. By default we don't do this. #
-l, --linger [NUM]
Permanently display jobs - by default they are removed # from the display shortly after they finish or if they don't # do anything for a while. When included, a maximum of NUM # jobs will be displayed at any one time - oldest jobs are # deleted. In GUI mode a NUM of 20 is the default. #
-m, --mark5s
Display Mark5s - by turning this on other items (jobs # and processors) will be turned off unless requested # specifically - by default everything is displayed). #
-p, --processors
Display processors - by turning this on other items (jobs # and Mark5's) will be turned off unless requested # specifically - by default everything is displayed). #
-p, --port PORT
Use PORT as the TCP port to communicated with the difxServer. # Default is to use DIFX_CONTROL_PORT environment variable. #
-t, --text
Produce only text-based output. The default behavior is to # try to launch a small GUI (text will be used on failure). # ################################################################################ program = 'DiFXBusy' version = '0.1' author = 'John Spitzak' verdate = '20150602' import sys import os import time import DiFXControl try: import fltk import FleWidgets useGUI = True except: useGUI = False import threading import xml.parsers.expat import re window = None titleList = [] stateList = [] progressList = [] difx = None box = None lock = None showJobs = True showProcessors = True showMark5s = True userShowJobs = False userShowProcessors = False userShowMark5s = False identifierRE = None # These are the number of seconds jobs will "linger" on the display after they # finish or fail to do anything for a while. doneLinger = 20 idleLinger = 100 # Set this to true and no jobs will ever be removed from the display. linger = False # Well, okay, if this value is set only this many jobs will be displayed in # "linger" mode. lingerLimit = None if useGUI: lingerLimit = 20 absoluteLimit = 100 keepFailures = False #=============================================================================== # Class to monitor callbacks from the Client class and act on them. #=============================================================================== class Responder: def __init__( self ): self.keepGoing = True self.jobs = {} self.jobsNameList = [] self.processors = {} self.mark5s = {} self.maxProcCPU = 0.0 self.maxMarkCPU = 0.0 self.maxProcTX = 0.0 self.maxMarkTX = 0.0 self.maxProcRX = 0.0 self.maxMarkRX = 0.0 self.oldNJobs = 0 self.displayChange = True if not linger or lingerLimit != None: self.lingerMonitor = self.LingerMonitor( self ) self.lingerMonitor.start() #--------------------------------------------------------------------------- # Callback triggered when the monitoring client receives a new DiFX message. #--------------------------------------------------------------------------- def messageCallback( self, data ): # Parse the message. try: xmlDat = DiFXControl.parseXML( data ) # See if this is a message type we are interested in. if xmlDat.typeStr == "DifxStatusMessage": # Make sure the identifier (job name) is one we are interested in. if identifierRE == None or identifierRE.match( xmlDat.identifier ) != None: # Compute a progress value. The data for this might not be there, so we # have some default situations. lock.acquire() try: newProgress = 100.0 * ( float( xmlDat.visibilityMJD ) - float( xmlDat.jobstartMJD ) ) / ( float( xmlDat.jobstopMJD ) - float( xmlDat.jobstartMJD ) ) except: if xmlDat.state == "Done" or xmlDat.state == "MpiDone": newProgress = 100 elif xmlDat.state == "Ending": # Ending state is annoying - try to use the job's known # progress (if it is known!) try: job = self.jobs[ xmlDat.identifier ] newProgress = job[2] except: newProgress = 0 else: newProgress = 0 # Try to locate this job amongst the "active" jobs list. try: job = self.jobs[ xmlDat.identifier ] title = xmlDat.identifier state = xmlDat.state progress = int( newProgress ) except: # If its not there, create a new job. title = xmlDat.identifier state = xmlDat.state progress = int( newProgress ) self.jobsNameList.append( xmlDat.identifier ) self.jobs[ xmlDat.identifier ] = ( title, state, progress, 0 ) lock.release() self.updateDisplay() elif xmlDat.typeStr == "DifxAlertMessage" or xmlDat.typeStr == "DifxDiagnosticMessage": # Alerts and diagnostics have very little information about a job, however # they DO indicate that a job exists, so create a new listing for the job if we # don't have it. thisJob = None doUpdate = False # Eliminate "difxlog" as it keeps popping up. if xmlDat.identifier.strip() != "difxlog" and ( identifierRE == None or identifierRE.match( xmlDat.identifier ) != None ): lock.acquire() try: thisJob = self.jobs[ xmlDat.identifier ] except: title = xmlDat.identifier state = "Initializing" progress = 0 self.jobs[ xmlDat.identifier ] = ( title, state, progress, 0 ) self.jobsNameList.append( xmlDat.identifier ) doUpdate = True # The message also has a "source" which we should (try to) locate in the list of # mark5s and processors. Mark5s first... try: mark5 = self.mark5s[xmlDat.fromNode] jobsList = mark5[5] if jobsList.count( xmlDat.identifier ) == 0: jobsList.append( xmlDat.identifier ) self.mark5s[xmlDat.fromNode] = ( mark5[0], mark5[1], mark5[2], mark5[3], mark5[4], jobsList, mark5[6] ) doUpdate = True except: # Not a mark5, try locating it in the processors. try: proc = self.processors[xmlDat.fromNode] jobsList = proc[3] if jobsList.count( xmlDat.identifier ) == 0: jobsList.append( xmlDat.identifier ) self.processors[xmlDat.fromNode] = ( proc[0], proc[1], proc[2], jobsList ) doUpdate = True except: # Not found in either - guess that it is a processor. It will be moved # later if not. jobsList = [] jobsList.append( xmlDat.identifier ) self.processors[xmlDat.fromNode] = ( None, None, None, jobsList ) doUpdate = True lock.release() if doUpdate: self.updateDisplay() elif xmlDat.typeStr == "DifxLoadMessage": # Load messages can come from processors or mark5s, so we don't know # what this is exactly. See if we've already identified it as a mark5. lock.acquire() try: proc = self.mark5s[xmlDat.fromNode] # Okay, it is a mark5. Just update the data associated with it. # Data saved include cpu, rx, tx, state, active bank, and list of jobs. if float( xmlDat.cpuLoad ) > self.maxMarkCPU: self.maxMarkCPU = float( xmlDat.cpuLoad ) if float( xmlDat.netRXRate ) > self.maxMarkRX: self.maxMarkRX = float( xmlDat.netRXRate ) if float( xmlDat.netTXRate ) > self.maxMarkTX: self.maxMarkTX = float( xmlDat.netTXRate ) self.mark5s[xmlDat.fromNode] = ( float( xmlDat.cpuLoad ), float( xmlDat.netRXRate ), float( xmlDat.netTXRate ), proc[3], proc[4], proc[5], proc[6] ) except: # A regular processor. See if we have it listed. try: proc = self.processors[xmlDat.fromNode] # Replace the new data. Data for processors include the cpu, # tx, rx, and list of jobs. if float( xmlDat.cpuLoad ) > self.maxProcCPU: self.maxProcCPU = float( xmlDat.cpuLoad ) if float( xmlDat.netRXRate ) > self.maxProcRX: self.maxProcRX = float( xmlDat.netRXRate ) if float( xmlDat.netTXRate ) > self.maxProcTX: self.maxProcTX = float( xmlDat.netTXRate ) self.processors[xmlDat.fromNode] = ( float( xmlDat.cpuLoad ), float( xmlDat.netRXRate ), float( xmlDat.netTXRate ), proc[3] ) except: # Not found, create a new entry. Empty list of jobs. if float( xmlDat.cpuLoad ) > self.maxProcCPU: self.maxProcCPU = float( xmlDat.cpuLoad ) if float( xmlDat.netRXRate ) > self.maxProcRX: self.maxProcRX = float( xmlDat.netRXRate ) if float( xmlDat.netTXRate ) > self.maxProcTX: self.maxProcTX = float( xmlDat.netTXRate ) self.processors[xmlDat.fromNode] = ( float( xmlDat.cpuLoad ), float( xmlDat.netRXRate ), float( xmlDat.netTXRate ), [] ) self.displayChange = True lock.release() elif xmlDat.typeStr == "Mark5StatusMessage": # Mark5 status messages are only interesting if they contain information # about a module - i.e. one or the other of the bays has a module # in it. if xmlDat.bankAVSN.strip() != "none" or xmlDat.bankBVSN.strip() != "none": lock.acquire() activeBank = "unknown" if xmlDat.activeBank == "A": activeBank = xmlDat.bankAVSN elif xmlDat.activeBank == "B": activeBank = xmlDat.bankBVSN # Save some additional data for mk5 operations. mk5Data = ( xmlDat.state, xmlDat.scanNumber, xmlDat.scanName ) # Locate this node in the mark5 list, if it is there. try: mark5 = self.mark5s[xmlDat.fromNode] # Replace the data for the mark5 if found. self.mark5s[xmlDat.fromNode] = ( mark5[0], mark5[1], mark5[2], xmlDat.state, activeBank, mark5[5], mk5Data ) except: # If it is not there, see if we think it is a processor. try: proc = self.processors[xmlDat.fromNode] # If a processor, we need to add it to the mark5 list and remove it from the # processor list. self.mark5s[xmlDat.fromNode] = ( proc[0], proc[1], proc[2], xmlDat.state, activeBank, proc[3], mk5Data ) del self.processors[xmlDat.fromNode] except: # New - create a new entry in our list. self.mark5s[xmlDat.fromNode] = ( None, None, None, xmlDat.state, activeBank, [], mk5Data ) self.displayChange = True lock.release() except xml.parsers.expat.ExpatError as e: # Expat errors are caused by mangled XML, which probably is the result # of a broken connection. #print "ExpatError line", e.lineno, "column", e.offset #print ">>>>>>", data, "<<<<<<<" self.keepGoing = False #--------------------------------------------------------------------------- # Update the existing display (with new data presumably). #--------------------------------------------------------------------------- def updateDisplay( self ): lock.acquire() if useGUI: if showJobs: nJobs = 0 for job in sorted( self.jobs.keys() ): newStr = str( job ) if newStr != "difxlog" and newStr != "mk5cp" and newStr != "mk5daemon" and newStr != "mk5dir": nJobs += 1 # If the number of jobs has changed, resize the window to display only # active jobs...the remainder are hidden outside the window. if nJobs != self.oldNJobs: window.resize( window.x(), window.y(), window.w(), 25 * nJobs ) i = 0 for job in sorted( self.jobs.keys() ): newStr = str( job ) if newStr != "difxlog" and newStr != "mk5cp" and newStr != "mk5daemon" and newStr != "mk5dir": titleList[i].value( job.encode() ) stateList[i].value( self.jobs[job][1].encode() ) progressList[i].value( float( self.jobs[job][2] ) ) progressList[i].redraw() i = i + 1 self.oldNJobs = nJobs #print "redraw", nJobs window.redraw() else: # This little mess clears the screen (cross-platform I think) and starts # subsequent printing at the top. os.system( 'cls' if os.name == 'nt' else 'clear' ) if showJobs: if len( list(self.jobs.keys()) ) > 0: maxCol1 = 12 maxCol2 = len( "Initializing" ) # Find some ideal column sizes for job in list(self.jobs.keys()): if len( job ) > maxCol1: maxCol1 = len( job ) if len( self.jobs[job][1] ) > maxCol2: maxCol2 = len( self.jobs[job][1] ) for job in sorted( self.jobs.keys() ): newStr = str( job ) # Done show "difxlog", "mk5cp", or "mk5daemon". if newStr != "difxlog" and newStr != "mk5cp" and newStr != "mk5daemon" and newStr != "mk5dir": while len( newStr ) < maxCol1 + 3: newStr += " " shortStr = self.jobs[job][1] while len( shortStr ) < maxCol2 + 3: shortStr += " " newStr += shortStr prog = self.jobs[job][2] if prog < 100: newStr += " " if prog < 10: newStr += " " newStr += str( prog ) + "% [" shortStr = "" if prog > 100: prog = 100 if prog < 0: prog = 0 while prog > 0: shortStr += "X" prog -= 2 while len( shortStr ) < 50: shortStr += " " newStr += shortStr + "]" print(newStr) # Find out column widths for processors and mark5s, even if we aren't showing # both. if showProcessors or showMark5s: maxCol1 = 12 maxJob = 10 if len( list(self.processors.keys()) ) > 0: for key in list(self.processors.keys()): node = self.processors[key] if len( key ) > maxCol1: maxCol1 = len( key ) if node[3] != None: for job in node[3]: if len( job ) > maxJob: maxJob = len( job ) if len( list(self.mark5s.keys()) ) > 0: for key in list(self.mark5s.keys()): node = self.mark5s[key] if len( key + "()" + node[4] ) > maxCol1: maxCol1 = len( key + "()" + node[4] ) if node[5] != None: for job in node[5]: if len( job ) > maxJob: maxJob = len( job ) printHeader = True if showProcessors: if len( list(self.processors.keys()) ) > 0: printExtra = True for key in sorted( self.processors.keys() ): node = self.processors[key] if node[3] != None and len( node[3] ) > 0: if printExtra: print("") printExtra = False if printHeader: print(" " * ( maxCol1 + 3 ) + "CPU RX TX") printHeader = False newStr = key while len( newStr ) < maxCol1 + 3: newStr += " " # CPU load... shortStr = "" if node[0] != None and self.maxProcCPU > 0.0: num = int( 10.0 * node[0] / self.maxProcCPU + .5 ) if num > 10: num = 10 shortStr = num * "X" while len( shortStr ) < 11: shortStr += " " newStr += shortStr # RX rate shortStr = "" if node[0] != None and self.maxProcRX > 0.0: num = int( 10.0 * node[1] / self.maxProcRX + .5 ) if num > 10: num = 10 shortStr = num * "X" while len( shortStr ) < 11: shortStr += " " newStr += shortStr # TX rate shortStr = "" if node[0] != None and self.maxProcTX > 0.0: num = int( 10.0 * node[2] / self.maxProcTX + .5 ) shortStr = num * "X" while len( shortStr ) < 11: shortStr += " " newStr += shortStr # List of jobs for job in sorted( node[3] ): shortStr = job while len( shortStr ) < maxJob + 2: shortStr += " " newStr += shortStr print(newStr) if showMark5s: if len( list(self.mark5s.keys()) ) > 0: printExtra = True for key in sorted( self.mark5s.keys() ): node = self.mark5s[key] if node[5] != None and len( node[5] ) > 0: if printExtra: print("") printExtra = False if printHeader: print(" " * ( maxCol1 + 3 ) + "CPU RX TX") printHeader = False newStr = ( key + "(" + node[4] + ")" ) while len( newStr ) < maxCol1 + 3: newStr += " " # CPU load... shortStr = "" if node[0] != None and self.maxProcCPU > 0.0: num = int( 10.0 * node[0] / self.maxProcCPU + .5 ) shortStr = num * "X" while len( shortStr ) < 11: shortStr += " " newStr += shortStr # RX rate shortStr = "" if node[0] != None and self.maxProcRX > 0.0: num = int( 10.0 * node[1] / self.maxProcRX + .5 ) shortStr = num * "X" while len( shortStr ) < 11: shortStr += " " newStr += shortStr # TX rate shortStr = "" if node[0] != None and self.maxProcTX > 0.0: num = int( 10.0 * node[2] / self.maxProcTX + .5 ) shortStr = num * "X" while len( shortStr ) < 11: shortStr += " " newStr += shortStr # Show the state and data appropriate to it. if node[6] != None: # State if node[6][0] != None: shortStr = node[6][0] while len( shortStr ) < 12: shortStr += " " if node[6][0] == "Copy" and node[6][2] != None: shortStr += node[6][2] newStr += shortStr print(newStr) lock.release() #--------------------------------------------------------------------------- # Callback triggered when the monitoring client fails (it returns an # identifier so we know what failed). #--------------------------------------------------------------------------- def failCallback( self, failureID ): if self.keepGoing: if failureID == DiFXControl.Client.BROKEN_SOCKET: print("broken socket") elif failureID == DiFXControl.Client.FAILED_CONNECTION: print("connection failed") self.keepGoing = False if window != None: fltk.Fl.remove_fd( difx.sock.fileno() ) window.hide() #--------------------------------------------------------------------------- # Threaded class used to monitor the list of jobs, removing those that are # idle and/or complete after a period of time. #--------------------------------------------------------------------------- class LingerMonitor( threading.Thread ): def __init__( self, responder ): threading.Thread.__init__( self ) self.responder = responder def run( self ): while self.responder.keepGoing: changed = False time.sleep( 1 ) lock.acquire() # This stuff is done if we are in "linger" mode - it limits the number # of jobs that stay. if linger: if lingerLimit != None: noneLeft = False while len( self.responder.jobsNameList ) > lingerLimit and not noneLeft: job = None if keepFailures: # Plod through the list of jobs until we find the oldest one that is # 100% done. found = False i = 0 while not found and i < len( self.responder.jobsNameList ): if self.responder.jobs[self.responder.jobsNameList[i]][2] == 100: found = True else: i = i + 1 if found: job = self.responder.jobsNameList[i] else: job = self.responder.jobsNameList[0] if job == None: noneLeft = True else: del self.responder.jobs[job] self.responder.jobsNameList.remove( job ) for key in list(self.responder.processors.keys()): node = self.responder.processors[key] try: self.responder.processors[key] = ( node[0], node[1], node[2], node[3].remove( job ) ) except: pass for key in list(self.responder.mark5s.keys()): node = self.responder.mark5s[key] try: self.responder.marks[key] = ( node[0], node[1], node[2], node[3], node[4], node[5].remove( job ) ) except: pass changed = True # This stuff is done if we are not in "linger" mode - i.e we are deleting jobs after a delay. else: for job in list(self.responder.jobs.keys()): self.responder.jobs[job] = ( self.responder.jobs[job][0], self.responder.jobs[job][1], self.responder.jobs[job][2 ], self.responder.jobs[job][3] + 1 ) deleteJob = False if ( self.responder.jobs[job][1] == "Done" or self.responder.jobs[job][1] == "MpiDone" ) and self.responder.jobs[job][3] >= doneLinger: deleteJob = True changed = True elif self.responder.jobs[job][3] >= idleLinger and not keepFailures: deleteJob = True changed = True # If we've deleted this job, remove it from the node information as well. if deleteJob: del self.responder.jobs[job] self.responder.jobsNameList.remove( job ) for key in list(self.responder.processors.keys()): node = self.responder.processors[key] try: self.responder.processors[key] = ( node[0], node[1], node[2], node[3].remove( job ) ) except: pass for key in list(self.responder.mark5s.keys()): node = self.responder.mark5s[key] try: self.responder.marks[key] = ( node[0], node[1], node[2], node[3], node[4], node[5].remove( job ) ) except: pass # This is slightly more brutal, but necessary - if the number of jobs displayed # approaches the "absolute limit" we wipe out the oldest. This will keep the disply # from crashing. while len( self.responder.jobsNameList ) > absoluteLimit: job = self.responder.jobsNameList[0] del self.responder.jobs[job] self.responder.jobsNameList.remove( job ) for key in list(self.responder.processors.keys()): node = self.responder.processors[key] try: self.responder.processors[key] = ( node[0], node[1], node[2], node[3].remove( job ) ) except: pass for key in list(self.responder.mark5s.keys()): node = self.responder.mark5s[key] try: self.responder.marks[key] = ( node[0], node[1], node[2], node[3], node[4], node[5].remove( job ) ) except: pass changed = True lock.release() if changed or self.responder.displayChange: self.responder.updateDisplay() self.responder.displayChange = False #=============================================================================== # Callback function for a "select" hit on the socket connecting to difxServer # (only used with the pyFltk windowing system). All this does is call # "monitor" functions to read packet data - existing callbacks for those will # process the data (and handle errors in reading). #=============================================================================== def fdCallback( data ): if difx != None and difx.monitorThread != None and responder != None and responder.keepGoing: if difx.monitorThread.status == difx.monitorThread.ok: difx.monitorThread.newPacket() #=============================================================================== # MAIN #=============================================================================== host = None port = None try: i = 1 while i < len( sys.argv ): # Check against legal argument types. Anything we don't recognize is assumed # to be part of a list of message types. if sys.argv[i] in [ "-h", "--help" ]: print('\n%s ver %s %s %s' % (program, version, author, verdate)) print("A program for monitoring active jobs on a DiFX correlator.") print("Usage: %s [options]" % ( sys.argv[0] )) print("") print("Options can include:") print("") print(" --help") print(" -h Print this help information and quit.") print("") print(" --hostname NAME") print(" -H NAME Use NAME as the host of the difxServer program.") print(" Default is to use DIFX_CONTROL_HOST environment variable.") print("") print(" --identifier NAME") print(" -i Parse only messages with the named identifier. This is (often)") print(" the DiFX job name, so this option can be used to monitor a") print(" job or jobs exclusively. The named identifier is a regular") print(" expression, so multiple jobs can be matched.") print("") print(" --jobs") print(" -j Display jobs - by turning this on other items (processors") print(" and Mark5's) will be turned off unless requested") print(" specifically - by default everything is displayed).") print("") print(" --keep") print(" -k Keep jobs on the list that are incomplete, even if they have") print(" been idled or they exceed the linger limit. This can be used") print(" to see what happened when jobs fail or stall, but can clutter") print(" the display. By default we don't do this.") print("") print(" --linger [NUM]") print(" -l [NUM] Permanently display jobs - by default they are removed") print(" from the display shortly after they finish or if they don't") print(" do anything for a while. When included, a maximum of NUM") print(" jobs will be displayed at any one time - oldest jobs are") print(" deleted. In GUI mode a NUM of 20 is the default.") print("") print(" --mark5s") print(" -m Display Mark5s - by turning this on other items (jobs") print(" and processors) will be turned off unless requested") print(" specifically - by default everything is displayed).") print("") print(" --processors") print(" -p Display processors - by turning this on other items (jobs") print(" and Mark5's) will be turned off unless requested") print(" specifically - by default everything is displayed).") print("") print(" --port PORT") print(" -P PORT Use PORT as the TCP port to communicated with the difxServer.") print(" Default is to use DIFX_CONTROL_PORT environment variable.") print("") print(" --text") print(" -t Produce only text-based output. The default behavior is to") print(" try to launch a small GUI (text will be used on failure).") print("") exit( 0 ) elif sys.argv[i] in [ "-H", "--hostname" ]: host = sys.argv[i+1] i = i + 1 elif sys.argv[i] in [ "-i", "--identifier" ]: identifierRE = re.compile( sys.argv[i+1] ) i = i + 1 elif sys.argv[i] in [ "-P", "--port" ]: port = int( sys.argv[i+1] ) i = i + 1 elif sys.argv[i] in [ "-t", "--text" ]: useGUI = False elif sys.argv[i] in [ "-k", "--keep" ]: keepFailures = True elif sys.argv[i] in [ "-l", "--linger" ]: linger = True # See if a limit on the number of jobs to be displayed was included. if i + 1 < len( sys.argv ): if sys.argv[i+1][0] != "-": lingerLimit = int( sys.argv[i+1] ) i = i + 1 elif sys.argv[i] in [ "-j", "--jobs" ]: showJobs = True showProcessors = False showMark5s = False userShowJobs = True elif sys.argv[i] in [ "-p", "--processors" ]: showJobs = False showProcessors = True showMark5s = False userShowProcessors = True elif sys.argv[i] in [ "-m", "--mark5s" ]: showJobs = False showProcessors = False showMark5s = True userShowMark5s = True else: raise RuntimeError i = i + 1 except RuntimeError: print("Usage: %s [options] []" % ( sys.argv[0] )) exit( 0 ) if userShowJobs: showJobs = True if userShowProcessors: showProcessors = True if userShowMark5s: showMark5s = True # Open a new connection to the difxServer... difx = DiFXControl.Client() difx.connect( host = host, port = port ) if difx.socketOK: lock = threading.Lock() responder = Responder() if useGUI: difx.passiveMonitor() else: difx.monitor() difx.relayPackets() # Collect different messages depending on what the user is interested in. if showProcessors or showMark5s: selectionList = ( "DifxStatusMessage", "DifxDiagnosticMessage", "DifxAlertMessage", "DifxLoadMessage", "Mark5StatusMessage" ) else: selectionList = ( "DifxStatusMessage", ) difx.messageSelection( selectionList ) difx.addRelayCallback( responder.messageCallback ) difx.addFailCallback( responder.failCallback ) # Run the GUI to display busy results, if requested. if useGUI: window = fltk.Fl_Window( 500, 2, "DiFX Busy Monitor" ) dummy = fltk.Fl_Output( 1000, 1000, 2, 2 ) # to swallow the "caret" that appears in first output box for i in range( absoluteLimit + 2 ): titleBox = fltk.Fl_Output( 10, i * 25 + 2, 100, 20 ) titleBox.box( fltk.FL_FLAT_BOX ) titleBox.color( fltk.FL_LIGHT2 ) titleList.append( titleBox ) stateBox = fltk.Fl_Output( 115, i * 25 + 2, 100, 20 ) stateBox.box( fltk.FL_FLAT_BOX ) stateBox.color( fltk.FL_LIGHT2 ) stateList.append( stateBox ) progressBox = FleWidgets.Progress( 220, i * 25 + 2, 270, 20 ) progressBox.box( fltk.FL_THIN_DOWN_FRAME ) progressBox.minimum( 0.0 ) progressBox.maximum( 100.0 ) progressBox.color( fltk.FL_LIGHT2 ) progressBox.selection_color( fltk.FL_GREEN ) progressList.append( progressBox ) window.end() window.show() fltk.Fl.add_fd( difx.sock.fileno(), fdCallback ) fltk.Fl.run() # Text-based display only - we need an endless loop. else: while responder.keepGoing: try: time.sleep( .1 ) except KeyboardInterrupt: responder.keepGoing = False responder.keepGoing = False time.sleep( .1 ) difx.close()