#!/usr/bin/env python #=========================================================================== # SVN properties (DO NOT CHANGE) # # $Id$ # $HeadURL$ # $LastChangedRevision$ # $Author$ # $LastChangedDate$ # #============================================================================ from string import split, strip, find, upper, lower from sys import argv, exit, stdout from glob import glob from optparse import OptionParser from xml.parsers import expat from copy import deepcopy from time import asctime, time import commands import subprocess import logging import logging.handlers import socket import struct import os import signal __author__="Helge Rottmann " __prog__ = os.path.basename(__file__) __build__= "$Revision$" __date__ ="$Date$" __lastAuthor__="$Author$" ############################################################################## # Parameters ############################################################################## logpath = '/tmp' # logfile path logname = 'difxwatch.log' # name of logfile logcount = 100 # Number of log files to keep stdout_level = logging.INFO logfile_level = logging.DEBUG maxIdleSec = 300 # number of seconds a job is allowed to be idle before killing it checkInterval = 5 # number of seconds between checking for idle jobs runJobs = {} delJobs = [] pidfile = "" def getUsage(): usage = "" usage += "%s %s %s (last changes by %s) \n" % (__prog__, __build__, __author__, __lastAuthor__) usage += "A watchdog program to monitor progress of difx jobs and to automatically kill hanging jobs.\n" usage += "A job that has not made any progress for more than 300s (default can be overriden; see -i option below)\n" usage += "is assumed to be hanging and will be ended. In addition any associated difxlog\n" usage += "will also be stopped. Difxwatch parses multicast state messages to determine start\n" usage += "and progress of difx jobs.\n" usage += "Output is written to /tmp/difxwatch.log\n\n" usage += "Usage: %s [options] \n\n" % __prog__ usage += "NOTE: %s requires DIFXMESSAGE_GROUP and DIFXMESSAGE_PORT environment variables to be defined" % __prog__ return(usage) def termination_handler (signum,frame): print 'You have requested to terminate the application...' stdout.flush() os.unlink(pidfile) exit() class Parser: def __init__(self): self._parser = expat.ParserCreate() self._parser.StartElementHandler = self.start self._parser.EndElementHandler = self.end self._parser.CharacterDataHandler = self.data self.message = '' self.mjd = 0.0 self.mjdStart = 0.0 self.mjdStop = 0.0 self.state = '' self.tmp = '' self.weight = {} self.maxant = 0 self.ok = False self.unit = '' self.mpiid = -1 self.id = '' self.tag = '' def feed(self, data): self._parser.Parse(data, 0) def close(self): self._parser.Parse("", 1) # end of data del self._parser # get rid of circular references def start(self, tag, attrs): self.tag = tag self.tmp = '' if tag == 'difxStatus': self.ok = True self.weight = {} self.maxant = 0 self.mjdStart = 0.0 self.mjdStop = 0.0 elif tag == 'weight': ant = int(attrs['ant']) self.weight[ant] = float(attrs['wt']) if ant > self.maxant: self.maxant = ant def end(self, tag): if tag == 'message' and self.ok: self.message = self.tmp elif tag == 'state': self.state = self.tmp elif tag == 'visibilityMJD': self.mjd = float(self.tmp) elif tag == 'jobstartMJD': self.mjdStart = float(self.tmp) elif tag == 'jobstopMJD': self.mjdStop = float(self.tmp) elif tag == 'from': self.unit = lower(self.tmp) elif tag == 'identifier': self.id = self.tmp elif tag == 'mpiProcessId': self.mpiid = int(self.tmp) def data(self, data): if self.tag == 'message': self.tmp = self.tmp + data else: self.tmp = data def getinfo(self): if self.state == 'Starting': # add job to list of running jobs runJobs[self.id] = time() logger.info("Job start: %s" %(self.id)) elif self.state == "Running": # update the last activity time of the job if self.id in runJobs: #print '%s %5.2f%% done' % (self.id, 100.0*(self.mjd - self.mjdStart)/(self.mjdStop - self.mjdStart) ) runJobs[self.id] = time() def run(): lastCheckTime = 0; port = os.getenv('DIFX_MESSAGE_PORT') if port == None: print 'DIFX_MESSAGE_PORT needs to be defined' exit(0) else: port = int(port) group = os.getenv('DIFX_MESSAGE_GROUP') if group == None: print 'DIFX_MESSAGE_GROUP needs to be defined' exit(0) logger.debug("Listening to multicast group=%s port=%s" % (group, port)) s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 768000) s.bind(('', port)) mreq = struct.pack("4sl", socket.inet_aton(group), socket.INADDR_ANY) s.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) s.settimeout(checkInterval) try: while 1: try: message = s.recv(1500) if len(message) > 0 and message[0] == '<': p = Parser() p.feed(message) info = p.getinfo() p.close() #if p.ok: except socket.timeout: pass except expat.ExpatError: logger.error("Unparsable message received: %s" % (message)) exit(0) # check activity of the running jobs if time() - lastCheckTime > checkInterval: lastCheckTime = time() #logger.debug ("Running jobs: %s" % runJobs) delJobs = [] for jobId, lastActivity in runJobs.iteritems(): # print jobId, lastActivity, time() - lastActivity # check if job has made any progress within maxIdleSec if time() - lastActivity > maxIdleSec: #get pid of mpirun instance associated with the job out = commands.getoutput('ps -C mpirun -o pid,cmd ' ) # kill it for line in split(out, "\n"): if jobId in line: pid = split(line)[0] logger.warning("Ending job %s (pid=%s) because it was idle for more than %s seconds" % (jobId, pid, maxIdleSec)) subprocess.call(['kill', pid]) # get pid of difxlog instance associated with the job out = commands.getoutput('ps -C difxlog -o pid,cmd ' ) for line in split(out, "\n"): if jobId in line: pid = split(line)[0] logger.warning("Ending job %s (pid=%s) because it was idle for more than %s seconds" % (jobId, pid, maxIdleSec)) subprocess.call(['kill', pid]) # mark job for deletion from list of running jobs delJobs.append(jobId) for job in delJobs: logger.debug("Removing job %s from list of running jobs" % (job)) del runJobs[job] except KeyboardInterrupt: pass ############### # start of main ############### if __name__ == "__main__": signal.signal(signal.SIGINT,termination_handler) usage = getUsage() parser = OptionParser(version="%prog " + __build__, usage=usage) parser.add_option("-i", "--idle-time", dest="maxIdleSec", type="int" ,action="store", help="Maximum number of seconds a job is allowed to be idle before it will be killed.") # parse the command line. Options will be stored in the options list. Leftover arguments will be stored in the args list (options, args) = parser.parse_args() # check that difxwatch is not running yet pid = str(os.getpid()) pidfile = '/tmp/'+os.path.basename(__file__).split('.')[0]+'.pid' if os.path.isfile(pidfile): print "Error: difxwatch seems to be running already" print "If you are sure that it doesn't remove the lockfile: %s" % pidfile exit() else: file(pidfile, 'w').write(pid) if options.maxIdleSec != None: maxIdleSec = options.maxIdleSec # create logger logger = logging.getLogger('difxwatch') logger.setLevel(logging.DEBUG) # create console handler ch = logging.StreamHandler() ch.setLevel(stdout_level) #create rotating file handler fh = logging.handlers.RotatingFileHandler(os.path.join(logpath, logname), backupCount = logcount) fh.doRollover() fh.setLevel(logfile_level) # create formatter and add it to the handlers formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') ch.setFormatter(formatter) fh.setFormatter(formatter) # add the handlers to the logger logger.addHandler(ch) logger.addHandler(fh) logger.info("Starting difxwatch") logger.info("Starting logfile: %s" % os.path.join(logpath, logname)) logger.info("Using max. idle time of: %s seconds" % maxIdleSec) run()