#!/usr/bin/env python ################################################################################ #\defgroup difxmessages DiFXMessages # #\brief Echo UDP message traffice from DiFX Processes # #Usage: DiFXMessages [options] [list of message types] # # DiFXMessages collects an echos UDP message traffic intercepted # on the DiFX server. Messages are printed in their raw XML form and/or # parsed into component content. Desired message traffic can be limited to a # message type (specified by name or a number), source hostname(s), or # message identifier (which is often a DiFX job name). By default ALL traffic is echoed. # #

Command Line Arguments

# # #
-h, --help
Print help information and quit. #
-H, --hostname NAME
Use NAME as the host of the DiFX server program. # Default is to use DIFX_CONTROL_HOST environment variable. #
-i, --identifier NAME
Echo only messages with the named identifier. This is (often) # the DiFX job name, so this option can be used to monitor a # job or jobs to some degree. The named identifier is a regular # expression, so multiple jobs can be matched. #
-p, --parse
Toggle whether to parse XML messages into a readable format. # Default is to do so. #
-P, --port PORT
Use PORT as the TCP port to communicated with the DiFX server. # Default is to use DIFX_CONTROL_PORT environment variable. #
-r, --raw
Toggle whether to echo \"raw\" XML message strings. Default is # to do so. #
-R, --restrict
Tell difxServer to restrict messages it sends to those types # Selected. This can cut down on socket traffic, however currently # not all message types are supported. As a consequence this # feature is off by default. #
-s, --source LIST
Echo only messages originating from one of the comma-separated LIST # of node names. Default is to echo all sources. #
-S, --stats
Toggle whether to produce a list of statistics (messages received, etc.) # when collection is terminated. Default is not to do so. #
-t, --type LIST
Indicate the following items are a list of message types you wish to # echo. This isn't an explicitly required argument, as the list can be # included as the final argument(s) - seem below. #
# #

list of message types

# # The "list of message types" that follows command line arguments is a comma-separated list of one or more message # types, identified either by name or number. Default behavior (when you list nothing) is to echo all messages. # Legal message numbers and types are: # # #
Type NumberType Name #
1 DifxLoadMessage #
2 DifxAlertMessage #
3 Mark5StatusMessage #
4 DifxStatusMessage #
5 DifxInfoMessage #
6 DifxDatastreamMessage #
7 DifxCommand #
8 DifxParameter #
9 DifxStart #
10 DifxStop #
11 Mark5VersionMessage #
12 Mark5ConditionMessage #
13 DifxTransientMessage #
14 DifxSmartMessage #
15 Mark5DriveStatsMessage #
16 DifxDiagnosticMessage #
17 DifxFileTransfer #
18 DifxFileOperation #
19 DifxVex2DifxRun #
20 DifxMachinesDefinition #
21 DifxGetDirectory #
22 DifxMk5Control #
23 DifxMark5Copy #
# ################################################################################ program = 'DiFXMessages' version = '0.1' author = 'John Spitzak' verdate = '20150602' import sys import time import DiFXControl import xml.parsers.expat import re #=============================================================================== # Class to monitor callbacks from the Client class and act on them. #=============================================================================== class Responder: def __init__( self ): self.keepGoing = True self.parseXML = True self.rawXML = True self.echoAllMessages = True self.echoAllSources = True self.identifierRE = None self.messageTypes = [] self.echoSources = [] self.receivedInfo = {} #--------------------------------------------------------------------------- # Callback triggered when the monitoring client receives a new DiFX message. #--------------------------------------------------------------------------- def messageCallback( self, data ): # Parse the message. try: xmlDat = DiFXControl.parseXML( data ) # Add the information about the received message types to the dictionary. # This is rather complex - the dictionary contains a message type as a key, # and then a list of other info, including the total number of messages, # the total number of bytes, then another dictionary keyed by sending # node. # See if this message type exists already in the dictionary. try: messList = self.receivedInfo[ xmlDat.typeStr ] except KeyError: # Create a new list entry messList = [] messList.append( 0 ) messList.append( 0 ) messList.append( {} ) self.receivedInfo[ xmlDat.typeStr ] = messList # Add the information from this type to the list messList[0] += 1 messList[1] += len( data ) # See if this is a message type we are interested in. if self.echoAllMessages or xmlDat.typeStr in self.messageTypes: # See if it is from a source we are interested in. if self.echoAllSources or xmlDat.fromNode in self.echoSources: if self.identifierRE == None or self.identifierRE.match( xmlDat.identifier ) != None: if self.rawXML: print(data) if self.parseXML: print("") print("Message Type: ", xmlDat.typeStr) print("From: ", xmlDat.fromNode) print("Identifier: ", xmlDat.identifier) print("MPI Process ID: ", xmlDat.mpiProcessId) print("Sequence Number: ", xmlDat.seqNumber) if xmlDat.typeStr == "DifxLoadMessage": print("") print("CPU Load: ", xmlDat.cpuLoad) print("Total Memory: ", xmlDat.totalMemory) print("Used Memory: ", xmlDat.usedMemory) print("Net Receive Rate: ", xmlDat.netRXRate) print("Net Transmit Rate: ", xmlDat.netTXRate) print("Number of Cores: ", xmlDat.nCore) elif xmlDat.typeStr == "DifxAlertMessage": print("") print("Input File: ", xmlDat.inputFile) print("Alert Message: ", xmlDat.alertMessage) print("Severity: ", xmlDat.severity) elif xmlDat.typeStr == "Mark5StatusMessage": print("") print("Input File: ", xmlDat.inputFile) print("BankA VSN: ", xmlDat.bankAVSN) print("BankB VSN: ", xmlDat.bankBVSN) print("Status Word: ", xmlDat.statusWord) print("Active Bank: ", xmlDat.activeBank) print("State: ", xmlDat.state) print("Scan Number: ", xmlDat.scanNumber) print("Scan Name: ", xmlDat.scanName) print("Position: ", xmlDat.position) print("Play Rate: ", xmlDat.playRate) elif xmlDat.typeStr == "DifxStatusMessage": print("") print("Input File: ", xmlDat.inputFile) print("State: ", xmlDat.state) print("Message: ", xmlDat.message) print("Visibility MJD: ", xmlDat.visibilityMJD) print("Job Start MJD: ", xmlDat.jobstartMJD) print("Job Stop MJD: ", xmlDat.jobstopMJD) if len( xmlDat.weights ) > 0: print("Weights: ") for wt in xmlDat.weights: print(" Antenna: ", wt[0]) print(" Weight: ", wt[1]) elif xmlDat.typeStr == "DifxInfoMessage": print("") print("Message: ", xmlDat.message) elif xmlDat.typeStr == "DifxCommandMessage": print("") print("Command: ", xmlDat.command) elif xmlDat.typeStr == "DifxParameter": print("") print("Target MPI ID: ", xmlDat.targetMpiId) print("Name: ", xmlDat.name) print("Value: ", xmlDat.value) for k in list(xmlDat.index.keys()): newStr = "Index" + xmlDat.index[k] + ":" while len( newStr ) < len( " " ): newStr += " " print(newStr) elif xmlDat.typeStr == "DifxStart": print("") print("input: ", xmlDat.inputFile) print("Head Node: ", xmlDat.manager) if len( xmlDat.datastream ) > 0: print("Data Streams") for node in xmlDat.datastream: print(" " + node) if len( xmlDat.process ) > 0: print("Processors Threads") for node in xmlDat.process: newStr = " " + node[0] while len( tmpStr ) < len( "Processors " ): newStr += " " newStr += str( node[1] ) print(newStr) print("Force: ", xmlDat.force) print("DiFX Version: ", xmlDat.difxVersion) print("Restart Seconds: ", xmlDat.restartSeconds) print("Function: ", xmlDat.function) print("Address: ", xmlDat.address) print("Port: ", xmlDat.port) elif xmlDat.typeStr == "DifxStop": print("Input File: ", xmlDat.inputFile) print("MPI Wrapper: ", xmlDat.mpiWrapper) print("DiFX Version: ", xmlDat.difxVersion) print("DiFX Program: ", xmlDat.difxProgram) elif xmlDat.typeStr == "DifxTransientMessage": print("") print("Job ID: ", xmlDat.jobId) print("Start MJD: ", xmlDat.startMJD) print("Stop MJD: ", xmlDat.stopMJD) print("Priority: ", xmlDat.priority) print("Destination Directory: ", xmlDat.destDir) print("Comment: ", xmlDat.comment) print("dm: ", xmlDat.dm) elif xmlDat.typeStr == "DifxSmartMessage": print("") print("MJD: ", xmlDat.mjd) print("VSN: ", xmlDat.vsn) print("Slot: ", xmlDat.slot) if len( xmlDat.smarts ) > 0: print("S.M.A.R.T Values:") for smart in xmlDat.smarts: print("S.M.A.R.T ID: ", smart[0]) print("Value: ", smart[1]) elif xmlDat.typeStr == "DifxDiagnosticMessage": print("") print("Diagnostic Type: ", xmlDat.diagnosticType) if xmlDat.diagnosticType == "MemoryUsage": print("Bytes: ", xmlDat.bytes) elif xmlDat.diagnosticType == "BufferStatus": print("Thread Id: ", xmlDat.threadId) print("#Buffer Elements: ", xmlDat.numBufElements) print("Start Buffer Element: ", xmlDat.startBufElement) print("Active Buffer Elements:", xmlDat.activeBufElements) elif xmlDat.diagnosticType == "ProcessingTime": print("Thread ID: ", xmlDat.threadId) print("Microsec: ", xmlDat.microsec) elif xmlDat.diagnosticType == "DataConsumed": print("Bytes: ", xmlDat.bytes) elif xmlDat.diagnosticType == "InputDatarate": print("Bytes per Second: ", xmlDat.bytespersec) elif xmlDat.diagnosticType == "NumSubintsLost": print("#Subints Lost: ", xmlDat.numSubintsLost) elif xmlDat.typeStr == "Mark5VersionMessage": print("") print("Api Version: ", xmlDat.ApiVer) print("Api Date: ", xmlDat.ApiDate) print("Firmware Version: ", xmlDat.FirmVer) print("Mon Version: ", xmlDat.MonVer) print("Xbar Version: ", xmlDat.XbarVer) print("Ata Version: ", xmlDat.AtaVer) print("UAta Version: ", xmlDat.UAtaVer) print("Driver Version: ", xmlDat.DriverVer) print("Board Type: ", xmlDat.BoardType) print("Serial Number: ", xmlDat.SerialNum) if xmlDat.daughterBoards: print("Daughter Boards:") print(" PCB Version: ", xmlDat.PCBVer) print(" PCB Type: ", xmlDat.PCBType) print(" PCB SubType: ", xmlDat.PCBSubType) print(" FPGA Config: ", xmlDat.FPGAConfig) print(" FPGA Conf Version: ", xmlDat.FPGAConfigVer) print(" Serial Number: ", xmlDat.dbSerialNum) print(" Num Channels: ", xmlDat.NumChannels) elif xmlDat.typeStr == "Mark5DriveStatsMessage": print("") print("Serial Number: ", xmlDat.serialNumber) print("Model Number: ", xmlDat.modelNumber) print("Size: ", xmlDat.size) print("Mmodule VSN: ", xmlDat.moduleVSN) print("Module Slot: ", xmlDat.moduleSlot) print("Start MJD: ", xmlDat.startMJD) print("Stop MJD: ", xmlDat.stopMJD) print("Type: ", xmlDat.driveType) print("Start Byte: ", xmlDat.startByte) for k in list(xmlDat.bins.keys()): newStr = "Bin" + xmlDat.bins[k] + ":" while len( newStr ) < len( " " ): newStr += " " print(newStr) elif xmlDat.typeStr == "DifxFileTransfer": print("") print("Origin: ", xmlDat.origin) print("Destination: ", xmlDat.destination) print("Data Node: ", xmlDat.dataNode) print("Address: ", xmlDat.address) print("Direction: ", xmlDat.direction) print("Port: ", xmlDat.port) elif xmlDat.typeStr == "DifxFileOperation": print("") print("Path: ", xmlDat.path) print("Operation: ", xmlDat.operation) print("Data Node: ", xmlDat.dataNode) print("Arguments: ", xmlDat.arg) print("Address: ", xmlDat.address) print("Port: ", xmlDat.port) elif xmlDat.typeStr == "DifxVex2difxRun": print("") print("User: ", xmlDat.user) print("Head Node: ", xmlDat.headNode) print("DiFX Version: ", xmlDat.difxVersion) print("Pass Path: ", xmlDat.passPath) print("v2d File: ", xmlDat.v2dFile) print("Address: ", xmlDat.address) print("Port: ", xmlDat.port) print("Calcif Only: ", xmlDat.calcifOnly) elif xmlDat.typeStr == "DifxMachinesDefinition": print("") print("input: ", xmlDat.inputFile) print("Head Node: ", xmlDat.manager) if len( xmlDat.datastream ) > 0: print("Data Streams") for node in xmlDat.datastream: print(" " + node) if len( xmlDat.process ) > 0: print("Processors Threads") for node in xmlDat.process: newStr = " " + node[0] while len( tmpStr ) < len( "Processors " ): newStr += " " newStr += str( node[1] ) print(newStr) print("DiFX Version: ", xmlDat.difxVersion) print("Test Processors: ", xmlDat.testProcessors) print("Machines File: ", xmlDat.machinesFile) print("Threads File: ", xmlDat.threadsFile) print("Address: ", xmlDat.address) print("Port: ", xmlDat.port) elif xmlDat.typeStr == "DifxGetDirectory": print("") print("DiFX Version: ", xmlDat.difxVersion) print("Mark5: ", xmlDat.mark5) print("VSN: ", xmlDat.vsn) print("Address: ", xmlDat.address) print("Port: ", xmlDat.port) print("Generate New: ", xmlDat.generateNew) elif xmlDat.typeStr == "DifxMk5Control": print("") print("Command: ", xmlDat.command) print("Target Node: ", xmlDat.targetNode) print("Address: ", xmlDat.address) print("Port: ", xmlDat.port) elif xmlDat.typeStr == "DifxMark5Copy": print("") print("DiFX Version: ", xmlDat.difxVersion) print("Mark5: ", xmlDat.mark5) print("VSN: ", xmlDat.vsn) print("Scans: ", xmlDat.scans) print("Destination: ", xmlDat.destination) print("Address: ", xmlDat.address) print("Port: ", xmlDat.port) else: # Unknown message format pass if self.rawXML: print("") except xml.parsers.expat.ExpatError as e: self.keepGoing = False #--------------------------------------------------------------------------- # Callback triggered when the monitoring client fails (it returns an # identifier so we know what failed). #--------------------------------------------------------------------------- def failCallback( self, failureID ): if failureID == DiFXControl.Client.BROKEN_SOCKET: print("broken socket") elif failureID == DiFXControl.Client.FAILED_CONNECTION: print("connection failed") self.keepGoing = False #=============================================================================== # MAIN #=============================================================================== host = None port = None printStats = False restrictTypes = False # Create a new instance of the Responder class to receive callbacks from the # DiFX client. responder = Responder() # Command line arguments... try: i = 1 doingSources = False doingTypes = False while i < len( sys.argv ): # Any option should shut off these things... if sys.argv[i].strip()[0] == '-': doingSources = False doingTypes = False # Check against legal argument types. Anything we don't recognize is assumed # to be part of a list of message types. if sys.argv[i] in [ "-h", "--help" ]: print('\n%s ver %s %s %s' % (program, version, author, verdate)) print("A program for monitoring DiFX UDP message traffic.") print("Usage: %s [options] []" % ( sys.argv[0] )) print("") print("Options can include:") print("") print(" --help") print(" -h Print this help information and quit.") print("") print(" --hostname NAME") print(" -H NAME Use NAME as the host of the difxServer program.") print(" Default is to use DIFX_CONTROL_HOST environment variable.") print("") print(" --identifier NAME") print(" -i Echo only messages with the named identifier. This is (often)") print(" the DiFX job name, so this option can be used to monitor a") print(" job or jobs to some degree. The named identifier is a regular") print(" expression, so multiple jobs can be matched.") print("") print(" --parse") print(" -p Toggle whether to parse XML messages into a readable format.") print(" Default is to do so.") print("") print(" --port PORT") print(" -P PORT Use PORT as the TCP port to communicated with the difxServer.") print(" Default is to use DIFX_CONTROL_PORT environment variable.") print("") print(" --raw") print(" -r Toggle whether to echo \"raw\" XML message strings. Default is") print(" to do so.") print("") print(" --restrict") print(" -R Tell difxServer to restrict messages it sends to those types") print(" Selected. This can cut down on socket traffic, however currently") print(" not all message types are supported. As a consequence this") print(" feature is off by default.") print("") print(" --source LIST") print(" -s LIST Echo only messages originating from one of the comma-separated LIST") print(" of node names. Default is to echo all sources.") print("") print(" --stats") print(" -S Toggle whether to produce a list of statistics (messages received, etc.)") print(" when collection is terminated. Default is not to do so.") print("") print(" --type LIST") print(" -t LIST Indicate the following items are a list of message types you wish to") print(" echo. This isn't an explicitly required argument, as the list can be") print(" included as the final argument(s) - seem below.") print("") print(" is a comma-separated list of one or more message") print("types, identified either by name or number. Default behavior is to echo all messages.") print("Legal message numbers and types are:") print("") print(" 1 : DifxLoadMessage") print(" 2 : DifxAlertMessage") print(" 3 : Mark5StatusMessage") print(" 4 : DifxStatusMessage") print(" 5 : DifxInfoMessage") print(" 6 : DifxDatastreamMessage") print(" 7 : DifxCommand") print(" 8 : DifxParameter") print(" 9 : DifxStart") print(" 10 : DifxStop") print(" 11 : Mark5VersionMessage") print(" 12 : Mark5ConditionMessage") print(" 13 : DifxTransientMessage") print(" 14 : DifxSmartMessage") print(" 15 : Mark5DriveStatsMessage") print(" 16 : DifxDiagnosticMessage") print(" 17 : DifxFileTransfer") print(" 18 : DifxFileOperation") print(" 19 : DifxVex2DifxRun") print(" 20 : DifxMachinesDefinition") print(" 21 : DifxGetDirectory") print(" 22 : DifxMk5Control") print(" 23 : DifxMark5Copy") print("") exit( 0 ) elif sys.argv[i] in [ "-H", "--hostname" ]: host = sys.argv[i+1] i = i + 1 elif sys.argv[i] in [ "-i", "--identifier" ]: responder.identifierRE = re.compile( sys.argv[i+1] ) i = i + 1 elif sys.argv[i] in [ "-p", "--parse" ]: if responder.parseXML == True: responder.parseXML = False else: responder.parseXML = True elif sys.argv[i] in [ "-P", "--port" ]: port = int( sys.argv[i+1] ) i = i + 1 elif sys.argv[i] in [ "-r", "--raw" ]: if responder.rawXML: responder.rawXML = False else: responder.rawXML = True elif sys.argv[i] in [ "-R", "--restrict" ]: restrictTypes = True elif sys.argv[i] in [ "-s", "--source" ]: # Indicates a list of sources follows. doingSources = True responder.echoAllSources = False elif sys.argv[i] in [ "-S", "--stats" ]: if printStats: printStats = False else: printStats = True elif sys.argv[i] in [ "-t", "--type" ]: # Indicates a list of types follows. Redundant variable - not really used. doingTypes = True elif doingSources: # Should be a source node. for st in sys.argv[i].strip().split( "," ): responder.echoSources.append( st.strip() ) # If the last character is not a comma, assume the list of sources is # ending. if sys.argv[i].strip().endswith( "," ) != True: doingSources = False else: # Presumed to be a message type, or list of message types. No longer # doing the default (all)! if responder.echoAllMessages: responder.echoAllMessages = False # This single argument might be more than one item. Split it up. for st in sys.argv[i].strip().split( "," ): # Translate any (known) integer into a message string. Otherwise just # assume the string is a legal message type. The user won't be warned # of an unrecognized type, but we don't have to keep the list of types # up to date that way - any new message will be recognized immediately. if st.strip() == "1": responder.messageTypes.append( "DifxLoadMessage" ) elif st.strip() == "2": responder.messageTypes.append( "DifxAlertMessage" ) elif st.strip() == "3": responder.messageTypes.append( "Mark5StatusMessage" ) elif st.strip() == "4": responder.messageTypes.append( "DifxStatusMessage" ) elif st.strip() == "5": responder.messageTypes.append( "DifxInfoMessage" ) elif st.strip() == "6": responder.messageTypes.append( "DifxDatastreamMessage" ) elif st.strip() == "7": responder.messageTypes.append( "DifxCommand" ) elif st.strip() == "8": responder.messageTypes.append( "DifxParameter" ) elif st.strip() == "9": responder.messageTypes.append( "DifxStart" ) elif st.strip() == "10": responder.messageTypes.append( "DifxStop" ) elif st.strip() == "11": responder.messageTypes.append( "Mark5VersionMessage" ) elif st.strip() == "12": responder.messageTypes.append( "Mark5ConditionMessage" ) elif st.strip() == "13": responder.messageTypes.append( "DifxTransientMessage" ) elif st.strip() == "14": responder.messageTypes.append( "DifxSmartMessage" ) elif st.strip() == "15": responder.messageTypes.append( "Mark5DriveStatsMessage" ) elif st.strip() == "16": responder.messageTypes.append( "DifxDiagnosticMessage" ) elif st.strip() == "17": responder.messageTypes.append( "DifxFileTransfer" ) elif st.strip() == "18": responder.messageTypes.append( "DifxFileOperation" ) elif st.strip() == "19": responder.messageTypes.append( "DifxVex2DifxRun" ) elif st.strip() == "20": responder.messageTypes.append( "DifxMachinesDefinition" ) elif st.strip() == "21": responder.messageTypes.append( "DifxGetDirectory" ) elif st.strip() == "22": responder.messageTypes.append( "DifxMk5Control" ) elif st.strip() == "23": responder.messageTypes.append( "DifxMark5Copy" ) else: responder.messageTypes.append( st.strip() ) i = i + 1 except RuntimeError: print("Usage: %s [options] []" % ( sys.argv[0] )) exit( 0 ) # Open a new connection to the difxServer... difx = DiFXControl.Client() difx.connect( host = host, port = port ) if difx.socketOK: difx.monitor() difx.relayPackets() if restrictTypes: difx.messageSelection( responder.messageTypes ) difx.addRelayCallback( responder.messageCallback ) difx.addFailCallback( responder.failCallback ) while responder.keepGoing: try: time.sleep( .1 ) except KeyboardInterrupt: responder.keepGoing = False difx.close() if printStats and difx.bytesReceived > 0: print("") print("Total bytes received:", difx.bytesReceived) if len( list(responder.receivedInfo.keys()) ): print("") print("Message Type Num Recieved (dropped) Bytes Received") for mess in list(responder.receivedInfo.keys()): outStr = mess while len( outStr ) < len( "Message Type " ): outStr += " " if not responder.echoAllMessages and not mess in responder.messageTypes: outStr += "(" outStr += str( responder.receivedInfo[ mess ][0] ) if not responder.echoAllMessages and not mess in responder.messageTypes: outStr += ")" while len( outStr ) < len( "Message Type Num Recieved (dropped) " ): outStr += " " outStr += str( responder.receivedInfo[ mess ][1] ) print(outStr)