#!/usr/bin/env python #=========================================================================== # SVN properties (DO NOT CHANGE) # # $Id: statemon 3797 2011-09-12 15:23:28Z WalterBrisken $ # $HeadURL: $ # $LastChangedRevision: 3797 $ # $Author: WalterBrisken $ # $LastChangedDate: 2011-09-12 17:23:28 +0200 (Mon, 12 Sep 2011) $ # #============================================================================ from string import split, strip, find, upper, lower from sys import argv, exit from os import popen, getenv from glob import glob import socket import struct from xml.parsers import expat from copy import deepcopy from time import asctime program = 'difxdiagnosticmon' version = '0.4' verdate = '20151122' def usage(pgm): print '\n%s ver %s %s\n' % (program, version, verdate) print 'Program to print multicast diagnostic data from mpifxcorr.\n' print 'Usage: %s [options]\n' % pgm print 'options can include:' print ' --help' print ' -h print help information and quit\n' class Parser: def __init__(self): self._parser = expat.ParserCreate() self._parser.StartElementHandler = self.start self._parser.EndElementHandler = self.end self._parser.CharacterDataHandler = self.data self.message = '' self.ok = False self.diagnosticType = 0 self.threadid = 0 self.bytes = 0 self.counter = 0 self.microsec = 0 self.rateMbps = 0 self.bufferstatus = [0,0,0] self.mpiid = -1 self.source = '' self.id = '' self.tag = '' def feed(self, data): self._parser.Parse(data, 0) def close(self): self._parser.Parse("", 1) # end of data del self._parser # get rid of circular references def start(self, tag, attrs): self.tag = tag self.tmp = '' if tag == 'diagnosticType': self.ok = True def end(self, tag): if tag == 'message' and self.ok: self.message = self.tmp elif tag == 'diagnosticType': self.diagnosticType = self.tmp elif tag == 'microsec': self.microsec = float(self.tmp) elif tag == 'bytes': self.bytes = long(self.tmp) elif tag == 'bytespersec': self.rateMbps = 8.0*float(self.tmp)/1.0e6 elif tag == "numSubintsLost": self.counter == long(self.tmp) elif tag == "activeBufElements": self.bufferstatus[2] = int(self.tmp) elif tag == "startBufElement": self.bufferstatus[1] = int(self.tmp) elif tag == "numBufElements": self.bufferstatus[0] = int(self.tmp) elif tag == "threadId": self.threadid = int(self.tmp) elif tag == 'from': self.source = lower(self.tmp) elif tag == 'identifier': self.id = self.tmp elif tag == 'mpiProcessId': self.mpiid = int(self.tmp) def data(self, data): if self.tag == 'message': self.tmp = self.tmp + data else: self.tmp = data def getinfo(self): if self.ok: if self.diagnosticType == 'BufferStatus': diagstr = '%d/%d buffer slots filled, at slot %d' % (self.bufferstatus[2], self.bufferstatus[0], self.bufferstatus[1]) elif self.diagnosticType == 'MemoryUsage': diagstr = 'Memory Usage %.1f MB' % (float(self.bytes)/1.0e6) elif self.diagnosticType == 'ProcessingTime': diagstr = '%.1f microsec spent processing last subint' % (self.microsec) elif self.diagnosticType == 'DataConsumed': diagstr = '%.2f MB of data consumedI' % (self.bytes/1e6) elif self.diagnosticType == 'InputDatarate': diagstr = 'Data rate in last second: %.2f Mbps' % (self.rateMbps) elif self.diagnosticType == 'NumSubintsLost': diagstr = 'Now %d subints lost in total' % (self.counter) else: diagstr = "Unknown diagnostic message of type %s received" % (self.diagnosticType) return 'MPI[%2d] %-9s %-12s %s' % (self.mpiid, self.source, self.id, diagstr) else: return '' def run(): port = getenv('DIFX_MESSAGE_PORT') if port == None: print 'DIFX_MESSAGE_PORT needs to be defined' exit(0) else: port = int(port) group = getenv('DIFX_MESSAGE_GROUP') if group == None: print 'DIFX_MESSAGE_GROUP needs to be defined' exit(0) s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind(('', port)) mreq = struct.pack("4sl", socket.inet_aton(group), socket.INADDR_ANY) s.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) #s.settimeout(dt) try: while 1: try: message = s.recv(1500) if len(message) > 0 and message[0] == '<': p = Parser() p.feed(message) info = p.getinfo() p.close() if p.ok: print asctime(), info p.ok = False except socket.timeout: pass except expat.ExpatError: print asctime(), '*** Unparsable message received ***' except KeyboardInterrupt: pass for a in argv[1:]: if a in ['-h', '--help']: usage(argv[0]) exit(0) else: print 'Unknown option: %s. Run with --help for help.' % a exit(0) run()