#!/usr/bin/env python #************************************************************************** # Copyright (C) 2011-2013 by Walter Brisken * # * # This program is free software; you can redistribute it and/or modify * # it under the terms of the GNU General Public License as published by * # the Free Software Foundation; either version 3 of the License, or * # (at your option) any later version. * # * # This program is distributed in the hope that it will be useful, * # but WITHOUT ANY WARRANTY; without even the implied warranty of * # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * # GNU General Public License for more details. * # * # You should have received a copy of the GNU General Public License * # along with this program; if not, write to the * # Free Software Foundation, Inc., * # 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * #************************************************************************** #=========================================================================== # SVN properties (DO NOT CHANGE) # # $Id$ # $HeadURL: $ # $LastChangedRevision$ # $Author$ # $LastChangedDate$ # #============================================================================ from string import split, strip, find, upper, lower from sys import argv, exit, stdout from os import popen, getenv from glob import glob import socket import struct from xml.parsers import expat from copy import deepcopy from time import asctime program = 'statemon' author = 'Walter Brisken' version = '0.4' verdate = '20130508' def usage(prog): print '%s ver. %s %s %s\n' % (program, version, author, verdate) print 'Usage: %s [options]\n' % prog print 'options can include:' print ' --help' print ' -h print help information and quit\n' class Parser: def __init__(self): self._parser = expat.ParserCreate() self._parser.StartElementHandler = self.start self._parser.EndElementHandler = self.end self._parser.CharacterDataHandler = self.data self.message = '' self.mjd = 0.0 self.mjdStart = 0.0 self.mjdStop = 0.0 self.state = '' self.tmp = '' self.weight = {} self.maxant = 0 self.ok = False self.unit = '' self.mpiid = -1 self.id = '' self.tag = '' def feed(self, data): self._parser.Parse(data, 0) def close(self): self._parser.Parse("", 1) # end of data del self._parser # get rid of circular references def start(self, tag, attrs): self.tag = tag self.tmp = '' if tag == 'difxStatus': self.ok = True self.weight = {} self.maxant = 0 self.mjdStart = 0.0 self.mjdStop = 0.0 elif tag == 'weight': ant = int(attrs['ant']) self.weight[ant] = float(attrs['wt']) if ant > self.maxant: self.maxant = ant def end(self, tag): if tag == 'message' and self.ok: self.message = self.tmp elif tag == 'state': self.state = self.tmp elif tag == 'visibilityMJD': self.mjd = float(self.tmp) elif tag == 'jobstartMJD': self.mjdStart = float(self.tmp) elif tag == 'jobstopMJD': self.mjdStop = float(self.tmp) elif tag == 'from': self.unit = lower(self.tmp) elif tag == 'identifier': self.id = self.tmp elif tag == 'mpiProcessId': self.mpiid = int(self.tmp) def data(self, data): if self.tag == 'message': self.tmp = self.tmp + data else: self.tmp = data def getinfo(self): if self.ok: wtstr = '' if self.state == 'Running': for i in range(0, self.maxant+1): if not self.weight.has_key(i) or self.weight[i] <= 0.0001: wtstr = wtstr + " .000" else: wtstr = wtstr + " %5.3f" % (self.weight[i]) if self.state == 'Running' and self.mjdStart > 50000.0 and self.mjdStop > self.mjdStart: statestr = '%5.2f%%' % ( 100.0*(self.mjd - self.mjdStart)/(self.mjdStop - self.mjdStart) ) else: statestr = self.state return 'MPI[%2d] %-9s %-12s %-7s %s%s' % (self.mpiid, self.unit, self.id, statestr, self.message, wtstr) else: return '' def run(): port = getenv('DIFX_MESSAGE_PORT') if port == None: print 'DIFX_MESSAGE_PORT needs to be defined' exit(0) else: port = int(port) group = getenv('DIFX_MESSAGE_GROUP') if group == None: print 'DIFX_MESSAGE_GROUP needs to be defined' exit(0) s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 768000) s.bind(('', port)) mreq = struct.pack("4sl", socket.inet_aton(group), socket.INADDR_ANY) s.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) #s.settimeout(dt) try: while 1: try: message = s.recv(1500) if len(message) > 0 and message[0] == '<': p = Parser() p.feed(message) info = p.getinfo() p.close() if p.ok: print asctime(), info stdout.flush() except socket.timeout: pass except expat.ExpatError: print asctime(), '*** Unparsable message received ***' print message exit(0) except KeyboardInterrupt: pass if len(argv) > 1: if argv[1] in ['-h', '--help']: usage(argv[0]) exit(0) run()