#!/usr/bin/env python ################################################################################ #\defgroup difxmessages DiFXMessages # #\brief Echo UDP message traffice from DiFX Processes # #Usage: DiFXMessages [options] [list of message types] # # DiFXMessages collects an echos UDP message traffic intercepted # on the DiFX server. Messages are printed in their raw XML form and/or # parsed into component content. Desired message traffic can be limited to a # message type (specified by name or a number), source hostname(s), or # message identifier (which is often a DiFX job name). By default ALL traffic is echoed. # #

Command Line Arguments

# # #
-h, --help
Print help information and quit. #
-H, --hostname NAME
Use NAME as the host of the DiFX server program. # Default is to use DIFX_CONTROL_HOST environment variable. #
-i, --identifier NAME
Echo only messages with the named identifier. This is (often) # the DiFX job name, so this option can be used to monitor a # job or jobs to some degree. The named identifier is a regular # expression, so multiple jobs can be matched. #
-p, --parse
Toggle whether to parse XML messages into a readable format. # Default is to do so. #
-P, --port PORT
Use PORT as the TCP port to communicated with the DiFX server. # Default is to use DIFX_CONTROL_PORT environment variable. #
-r, --raw
Toggle whether to echo \"raw\" XML message strings. Default is # to do so. #
-R, --restrict
Tell difxServer to restrict messages it sends to those types # Selected. This can cut down on socket traffic, however currently # not all message types are supported. As a consequence this # feature is off by default. #
-s, --source LIST
Echo only messages originating from one of the comma-separated LIST # of node names. Default is to echo all sources. #
-S, --stats
Toggle whether to produce a list of statistics (messages received, etc.) # when collection is terminated. Default is not to do so. #
-t, --type LIST
Indicate the following items are a list of message types you wish to # echo. This isn't an explicitly required argument, as the list can be # included as the final argument(s) - seem below. #
# #

list of message types

# # The "list of message types" that follows command line arguments is a comma-separated list of one or more message # types, identified either by name or number. Default behavior (when you list nothing) is to echo all messages. # Legal message numbers and types are: # # #
Type NumberType Name #
1 DifxLoadMessage #
2 DifxAlertMessage #
3 Mark5StatusMessage #
4 DifxStatusMessage #
5 DifxInfoMessage #
6 DifxDatastreamMessage #
7 DifxCommand #
8 DifxParameter #
9 DifxStart #
10 DifxStop #
11 Mark5VersionMessage #
12 Mark5ConditionMessage #
13 DifxTransientMessage #
14 DifxSmartMessage #
15 Mark5DriveStatsMessage #
16 DifxDiagnosticMessage #
17 DifxFileTransfer #
18 DifxFileOperation #
19 DifxVex2DifxRun #
20 DifxMachinesDefinition #
21 DifxGetDirectory #
22 DifxMk5Control #
23 DifxMark5Copy #
# ################################################################################ program = 'DiFXMessages' version = '0.1' author = 'John Spitzak' verdate = '20150602' import sys import time import DiFXControl import xml.parsers.expat import re #=============================================================================== # Class to monitor callbacks from the Client class and act on them. #=============================================================================== class Responder: def __init__( self ): self.keepGoing = True self.parseXML = True self.rawXML = True self.echoAllMessages = True self.echoAllSources = True self.identifierRE = None self.messageTypes = [] self.echoSources = [] self.receivedInfo = {} #--------------------------------------------------------------------------- # Callback triggered when the monitoring client receives a new DiFX message. #--------------------------------------------------------------------------- def messageCallback( self, data ): # Parse the message. try: xmlDat = DiFXControl.parseXML( data ) # Add the information about the received message types to the dictionary. # This is rather complex - the dictionary contains a message type as a key, # and then a list of other info, including the total number of messages, # the total number of bytes, then another dictionary keyed by sending # node. # See if this message type exists already in the dictionary. try: messList = self.receivedInfo[ xmlDat.typeStr ] except KeyError: # Create a new list entry messList = [] messList.append( 0 ) messList.append( 0 ) messList.append( {} ) self.receivedInfo[ xmlDat.typeStr ] = messList # Add the information from this type to the list messList[0] += 1 messList[1] += len( data ) # See if this is a message type we are interested in. if self.echoAllMessages or xmlDat.typeStr in self.messageTypes: # See if it is from a source we are interested in. if self.echoAllSources or xmlDat.fromNode in self.echoSources: if self.identifierRE == None or self.identifierRE.match( xmlDat.identifier ) != None: if self.rawXML: print data if self.parseXML: print "" print "Message Type: ", xmlDat.typeStr print "From: ", xmlDat.fromNode print "Identifier: ", xmlDat.identifier print "MPI Process ID: ", xmlDat.mpiProcessId print "Sequence Number: ", xmlDat.seqNumber if xmlDat.typeStr == "DifxLoadMessage": print "" print "CPU Load: ", xmlDat.cpuLoad print "Total Memory: ", xmlDat.totalMemory print "Used Memory: ", xmlDat.usedMemory print "Net Receive Rate: ", xmlDat.netRXRate print "Net Transmit Rate: ", xmlDat.netTXRate print "Number of Cores: ", xmlDat.nCore elif xmlDat.typeStr == "DifxAlertMessage": print "" print "Input File: ", xmlDat.inputFile print "Alert Message: ", xmlDat.alertMessage print "Severity: ", xmlDat.severity elif xmlDat.typeStr == "Mark5StatusMessage": print "" print "Input File: ", xmlDat.inputFile print "BankA VSN: ", xmlDat.bankAVSN print "BankB VSN: ", xmlDat.bankBVSN print "Status Word: ", xmlDat.statusWord print "Active Bank: ", xmlDat.activeBank print "State: ", xmlDat.state print "Scan Number: ", xmlDat.scanNumber print "Scan Name: ", xmlDat.scanName print "Position: ", xmlDat.position print "Play Rate: ", xmlDat.playRate elif xmlDat.typeStr == "DifxStatusMessage": print "" print "Input File: ", xmlDat.inputFile print "State: ", xmlDat.state print "Message: ", xmlDat.message print "Visibility MJD: ", xmlDat.visibilityMJD print "Job Start MJD: ", xmlDat.jobstartMJD print "Job Stop MJD: ", xmlDat.jobstopMJD if len( xmlDat.weights ) > 0: print "Weights: " for wt in xmlDat.weights: print " Antenna: ", wt[0] print " Weight: ", wt[1] elif xmlDat.typeStr == "DifxInfoMessage": print "" print "Message: ", xmlDat.message elif xmlDat.typeStr == "DifxCommandMessage": print "" print "Command: ", xmlDat.command elif xmlDat.typeStr == "DifxParameter": print "" print "Target MPI ID: ", xmlDat.targetMpiId print "Name: ", xmlDat.name print "Value: ", xmlDat.value for k in xmlDat.index.keys(): newStr = "Index" + xmlDat.index[k] + ":" while len( newStr ) < len( " " ): newStr += " " print newStr elif xmlDat.typeStr == "DifxStart": print "" print "input: ", xmlDat.inputFile print "Head Node: ", xmlDat.manager if len( xmlDat.datastream ) > 0: print "Data Streams" for node in xmlDat.datastream: print " " + node if len( xmlDat.process ) > 0: print "Processors Threads" for node in xmlDat.process: newStr = " " + node[0] while len( tmpStr ) < len( "Processors " ): newStr += " " newStr += str( node[1] ) print newStr print "Force: ", xmlDat.force print "DiFX Version: ", xmlDat.difxVersion print "Restart Seconds: ", xmlDat.restartSeconds print "Function: ", xmlDat.function print "Address: ", xmlDat.address print "Port: ", xmlDat.port elif xmlDat.typeStr == "DifxStop": print "Input File: ", xmlDat.inputFile print "MPI Wrapper: ", xmlDat.mpiWrapper print "DiFX Version: ", xmlDat.difxVersion print "DiFX Program: ", xmlDat.difxProgram elif xmlDat.typeStr == "DifxTransientMessage": print "" print "Job ID: ", xmlDat.jobId print "Start MJD: ", xmlDat.startMJD print "Stop MJD: ", xmlDat.stopMJD print "Priority: ", xmlDat.priority print "Destination Directory: ", xmlDat.destDir print "Comment: ", xmlDat.comment print "dm: ", xmlDat.dm elif xmlDat.typeStr == "DifxSmartMessage": print "" print "MJD: ", xmlDat.mjd print "VSN: ", xmlDat.vsn print "Slot: ", xmlDat.slot if len( xmlDat.smarts ) > 0: print "S.M.A.R.T Values:" for smart in xmlDat.smarts: print "S.M.A.R.T ID: ", smart[0] print "Value: ", smart[1] elif xmlDat.typeStr == "DifxDiagnosticMessage": print "" print "Diagnostic Type: ", xmlDat.diagnosticType if xmlDat.diagnosticType == "MemoryUsage": print "Bytes: ", xmlDat.bytes elif xmlDat.diagnosticType == "BufferStatus": print "Thread Id: ", xmlDat.threadId print "#Buffer Elements: ", xmlDat.numBufElements print "Start Buffer Element: ", xmlDat.startBufElement print "Active Buffer Elements:", xmlDat.activeBufElements elif xmlDat.diagnosticType == "ProcessingTime": print "Thread ID: ", xmlDat.threadId print "Microsec: ", xmlDat.microsec elif xmlDat.diagnosticType == "DataConsumed": print "Bytes: ", xmlDat.bytes elif xmlDat.diagnosticType == "InputDatarate": print "Bytes per Second: ", xmlDat.bytespersec elif xmlDat.diagnosticType == "NumSubintsLost": print "#Subints Lost: ", xmlDat.numSubintsLost elif xmlDat.typeStr == "Mark5VersionMessage": print "" print "Api Version: ", xmlDat.ApiVer print "Api Date: ", xmlDat.ApiDate print "Firmware Version: ", xmlDat.FirmVer print "Mon Version: ", xmlDat.MonVer print "Xbar Version: ", xmlDat.XbarVer print "Ata Version: ", xmlDat.AtaVer print "UAta Version: ", xmlDat.UAtaVer print "Driver Version: ", xmlDat.DriverVer print "Board Type: ", xmlDat.BoardType print "Serial Number: ", xmlDat.SerialNum if xmlDat.daughterBoards: print "Daughter Boards:" print " PCB Version: ", xmlDat.PCBVer print " PCB Type: ", xmlDat.PCBType print " PCB SubType: ", xmlDat.PCBSubType print " FPGA Config: ", xmlDat.FPGAConfig print " FPGA Conf Version: ", xmlDat.FPGAConfigVer print " Serial Number: ", xmlDat.dbSerialNum print " Num Channels: ", xmlDat.NumChannels elif xmlDat.typeStr == "Mark5DriveStatsMessage": print "" print "Serial Number: ", xmlDat.serialNumber print "Model Number: ", xmlDat.modelNumber print "Size: ", xmlDat.size print "Mmodule VSN: ", xmlDat.moduleVSN print "Module Slot: ", xmlDat.moduleSlot print "Start MJD: ", xmlDat.startMJD print "Stop MJD: ", xmlDat.stopMJD print "Type: ", xmlDat.driveType print "Start Byte: ", xmlDat.startByte for k in xmlDat.bins.keys(): newStr = "Bin" + xmlDat.bins[k] + ":" while len( newStr ) < len( " " ): newStr += " " print newStr elif xmlDat.typeStr == "DifxFileTransfer": print "" print "Origin: ", xmlDat.origin print "Destination: ", xmlDat.destination print "Data Node: ", xmlDat.dataNode print "Address: ", xmlDat.address print "Direction: ", xmlDat.direction print "Port: ", xmlDat.port elif xmlDat.typeStr == "DifxFileOperation": print "" print "Path: ", xmlDat.path print "Operation: ", xmlDat.operation print "Data Node: ", xmlDat.dataNode print "Arguments: ", xmlDat.arg print "Address: ", xmlDat.address print "Port: ", xmlDat.port elif xmlDat.typeStr == "DifxVex2difxRun": print "" print "User: ", xmlDat.user print "Head Node: ", xmlDat.headNode print "DiFX Version: ", xmlDat.difxVersion print "Pass Path: ", xmlDat.passPath print "v2d File: ", xmlDat.v2dFile print "Address: ", xmlDat.address print "Port: ", xmlDat.port print "Calcif Only: ", xmlDat.calcifOnly elif xmlDat.typeStr == "DifxMachinesDefinition": print "" print "input: ", xmlDat.inputFile print "Head Node: ", xmlDat.manager if len( xmlDat.datastream ) > 0: print "Data Streams" for node in xmlDat.datastream: print " " + node if len( xmlDat.process ) > 0: print "Processors Threads" for node in xmlDat.process: newStr = " " + node[0] while len( tmpStr ) < len( "Processors " ): newStr += " " newStr += str( node[1] ) print newStr print "DiFX Version: ", xmlDat.difxVersion print "Test Processors: ", xmlDat.testProcessors print "Machines File: ", xmlDat.machinesFile print "Threads File: ", xmlDat.threadsFile print "Address: ", xmlDat.address print "Port: ", xmlDat.port elif xmlDat.typeStr == "DifxGetDirectory": print "" print "DiFX Version: ", xmlDat.difxVersion print "Mark5: ", xmlDat.mark5 print "VSN: ", xmlDat.vsn print "Address: ", xmlDat.address print "Port: ", xmlDat.port print "Generate New: ", xmlDat.generateNew elif xmlDat.typeStr == "DifxMk5Control": print "" print "Command: ", xmlDat.command print "Target Node: ", xmlDat.targetNode print "Address: ", xmlDat.address print "Port: ", xmlDat.port elif xmlDat.typeStr == "DifxMark5Copy": print "" print "DiFX Version: ", xmlDat.difxVersion print "Mark5: ", xmlDat.mark5 print "VSN: ", xmlDat.vsn print "Scans: ", xmlDat.scans print "Destination: ", xmlDat.destination print "Address: ", xmlDat.address print "Port: ", xmlDat.port else: # Unknown message format pass if self.rawXML: print "" except xml.parsers.expat.ExpatError as e: self.keepGoing = False #--------------------------------------------------------------------------- # Callback triggered when the monitoring client fails (it returns an # identifier so we know what failed). #--------------------------------------------------------------------------- def failCallback( self, failureID ): if failureID == DiFXControl.Client.BROKEN_SOCKET: print "broken socket" elif failureID == DiFXControl.Client.FAILED_CONNECTION: print "connection failed" self.keepGoing = False #=============================================================================== # MAIN #=============================================================================== host = None port = None printStats = False restrictTypes = False # Create a new instance of the Responder class to receive callbacks from the # DiFX client. responder = Responder() # Command line arguments... try: i = 1 doingSources = False doingTypes = False while i < len( sys.argv ): # Any option should shut off these things... if sys.argv[i].strip()[0] == '-': doingSources = False doingTypes = False # Check against legal argument types. Anything we don't recognize is assumed # to be part of a list of message types. if sys.argv[i] in [ "-h", "--help" ]: print '\n%s ver %s %s %s' % (program, version, author, verdate) print "A program for monitoring DiFX UDP message traffic." print "Usage: %s [options] []" % ( sys.argv[0] ) print "" print "Options can include:" print "" print " --help" print " -h Print this help information and quit." print "" print " --hostname NAME" print " -H NAME Use NAME as the host of the difxServer program." print " Default is to use DIFX_CONTROL_HOST environment variable." print "" print " --identifier NAME" print " -i Echo only messages with the named identifier. This is (often)" print " the DiFX job name, so this option can be used to monitor a" print " job or jobs to some degree. The named identifier is a regular" print " expression, so multiple jobs can be matched." print "" print " --parse" print " -p Toggle whether to parse XML messages into a readable format." print " Default is to do so." print "" print " --port PORT" print " -P PORT Use PORT as the TCP port to communicated with the difxServer." print " Default is to use DIFX_CONTROL_PORT environment variable." print "" print " --raw" print " -r Toggle whether to echo \"raw\" XML message strings. Default is" print " to do so." print "" print " --restrict" print " -R Tell difxServer to restrict messages it sends to those types" print " Selected. This can cut down on socket traffic, however currently" print " not all message types are supported. As a consequence this" print " feature is off by default." print "" print " --source LIST" print " -s LIST Echo only messages originating from one of the comma-separated LIST" print " of node names. Default is to echo all sources." print "" print " --stats" print " -S Toggle whether to produce a list of statistics (messages received, etc.)" print " when collection is terminated. Default is not to do so." print "" print " --type LIST" print " -t LIST Indicate the following items are a list of message types you wish to" print " echo. This isn't an explicitly required argument, as the list can be" print " included as the final argument(s) - seem below." print "" print " is a comma-separated list of one or more message" print "types, identified either by name or number. Default behavior is to echo all messages." print "Legal message numbers and types are:" print "" print " 1 : DifxLoadMessage" print " 2 : DifxAlertMessage" print " 3 : Mark5StatusMessage" print " 4 : DifxStatusMessage" print " 5 : DifxInfoMessage" print " 6 : DifxDatastreamMessage" print " 7 : DifxCommand" print " 8 : DifxParameter" print " 9 : DifxStart" print " 10 : DifxStop" print " 11 : Mark5VersionMessage" print " 12 : Mark5ConditionMessage" print " 13 : DifxTransientMessage" print " 14 : DifxSmartMessage" print " 15 : Mark5DriveStatsMessage" print " 16 : DifxDiagnosticMessage" print " 17 : DifxFileTransfer" print " 18 : DifxFileOperation" print " 19 : DifxVex2DifxRun" print " 20 : DifxMachinesDefinition" print " 21 : DifxGetDirectory" print " 22 : DifxMk5Control" print " 23 : DifxMark5Copy" print "" exit( 0 ) elif sys.argv[i] in [ "-H", "--hostname" ]: host = sys.argv[i+1] i = i + 1 elif sys.argv[i] in [ "-i", "--identifier" ]: responder.identifierRE = re.compile( sys.argv[i+1] ) i = i + 1 elif sys.argv[i] in [ "-p", "--parse" ]: if responder.parseXML == True: responder.parseXML = False else: responder.parseXML = True elif sys.argv[i] in [ "-P", "--port" ]: port = int( sys.argv[i+1] ) i = i + 1 elif sys.argv[i] in [ "-r", "--raw" ]: if responder.rawXML: responder.rawXML = False else: responder.rawXML = True elif sys.argv[i] in [ "-R", "--restrict" ]: restrictTypes = True elif sys.argv[i] in [ "-s", "--source" ]: # Indicates a list of sources follows. doingSources = True responder.echoAllSources = False elif sys.argv[i] in [ "-S", "--stats" ]: if printStats: printStats = False else: printStats = True elif sys.argv[i] in [ "-t", "--type" ]: # Indicates a list of types follows. Redundant variable - not really used. doingTypes = True elif doingSources: # Should be a source node. for st in sys.argv[i].strip().split( "," ): responder.echoSources.append( st.strip() ) # If the last character is not a comma, assume the list of sources is # ending. if sys.argv[i].strip().endswith( "," ) != True: doingSources = False else: # Presumed to be a message type, or list of message types. No longer # doing the default (all)! if responder.echoAllMessages: responder.echoAllMessages = False # This single argument might be more than one item. Split it up. for st in sys.argv[i].strip().split( "," ): # Translate any (known) integer into a message string. Otherwise just # assume the string is a legal message type. The user won't be warned # of an unrecognized type, but we don't have to keep the list of types # up to date that way - any new message will be recognized immediately. if st.strip() == "1": responder.messageTypes.append( "DifxLoadMessage" ) elif st.strip() == "2": responder.messageTypes.append( "DifxAlertMessage" ) elif st.strip() == "3": responder.messageTypes.append( "Mark5StatusMessage" ) elif st.strip() == "4": responder.messageTypes.append( "DifxStatusMessage" ) elif st.strip() == "5": responder.messageTypes.append( "DifxInfoMessage" ) elif st.strip() == "6": responder.messageTypes.append( "DifxDatastreamMessage" ) elif st.strip() == "7": responder.messageTypes.append( "DifxCommand" ) elif st.strip() == "8": responder.messageTypes.append( "DifxParameter" ) elif st.strip() == "9": responder.messageTypes.append( "DifxStart" ) elif st.strip() == "10": responder.messageTypes.append( "DifxStop" ) elif st.strip() == "11": responder.messageTypes.append( "Mark5VersionMessage" ) elif st.strip() == "12": responder.messageTypes.append( "Mark5ConditionMessage" ) elif st.strip() == "13": responder.messageTypes.append( "DifxTransientMessage" ) elif st.strip() == "14": responder.messageTypes.append( "DifxSmartMessage" ) elif st.strip() == "15": responder.messageTypes.append( "Mark5DriveStatsMessage" ) elif st.strip() == "16": responder.messageTypes.append( "DifxDiagnosticMessage" ) elif st.strip() == "17": responder.messageTypes.append( "DifxFileTransfer" ) elif st.strip() == "18": responder.messageTypes.append( "DifxFileOperation" ) elif st.strip() == "19": responder.messageTypes.append( "DifxVex2DifxRun" ) elif st.strip() == "20": responder.messageTypes.append( "DifxMachinesDefinition" ) elif st.strip() == "21": responder.messageTypes.append( "DifxGetDirectory" ) elif st.strip() == "22": responder.messageTypes.append( "DifxMk5Control" ) elif st.strip() == "23": responder.messageTypes.append( "DifxMark5Copy" ) else: responder.messageTypes.append( st.strip() ) i = i + 1 except RuntimeError: print "Usage: %s [options] []" % ( sys.argv[0] ) exit( 0 ) # Open a new connection to the difxServer... difx = DiFXControl.Client() difx.connect( host = host, port = port ) if difx.socketOK: difx.monitor() difx.relayPackets() if restrictTypes: difx.messageSelection( responder.messageTypes ) difx.addRelayCallback( responder.messageCallback ) difx.addFailCallback( responder.failCallback ) while responder.keepGoing: try: time.sleep( .1 ) except KeyboardInterrupt: responder.keepGoing = False difx.close() if printStats and difx.bytesReceived > 0: print "" print "Total bytes received:", difx.bytesReceived if len( responder.receivedInfo.keys() ): print "" print "Message Type Num Recieved (dropped) Bytes Received" for mess in responder.receivedInfo.keys(): outStr = mess while len( outStr ) < len( "Message Type " ): outStr += " " if not responder.echoAllMessages and not mess in responder.messageTypes: outStr += "(" outStr += str( responder.receivedInfo[ mess ][0] ) if not responder.echoAllMessages and not mess in responder.messageTypes: outStr += ")" while len( outStr ) < len( "Message Type Num Recieved (dropped) " ): outStr += " " outStr += str( responder.receivedInfo[ mess ][1] ) print outStr