#!/usr/bin/env python #************************************************************************** # Copyright (C) 2008-2015 by Walter Brisken and Helge Rottmann * # * # This program is free software; you can redistribute it and/or modify * # it under the terms of the GNU General Public License as published by * # the Free Software Foundation; either version 3 of the License, or * # (at your option) any later version. * # * # This program is distributed in the hope that it will be useful, * # but WITHOUT ANY WARRANTY; without even the implied warranty of * # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * # GNU General Public License for more details. * # * # You should have received a copy of the GNU General Public License * # along with this program; if not, write to the * # Free Software Foundation, Inc., * # 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * #************************************************************************** #=========================================================================== # SVN properties (DO NOT CHANGE) # # $Id$ # $HeadURL: $ # $LastChangedRevision$ # $Author$ # $LastChangedDate$ # #============================================================================ PROGRAM = 'startdifx' VERSION = '2.3.1' VERDATE = '20150702' AUTHOR = 'Walter Brisken and Helge Rottmann' defaultgroup = "224.2.2.1" defaultport = 50200 defaultttl = 3 maxGenmachineFail = -1 genmachines = 'genmachines -v' calcif = 'calcif2' difx2fits = 'difx2fits -v' mpiOptions = '--mca mpi_yield_when_idle 1 --mca rmaps seq' from sys import argv, exit, stdout from os import popen, getcwd, system, getenv, getpid, environ from os.path import isfile, isdir, isfile from string import split, strip, find, rfind, upper, lower from time import time, asctime from glob import glob from copy import deepcopy from xml.parsers import expat from optparse import OptionParser import socket import struct import signal import subprocess verbose = 0 def getUsage(): usage = '\n%s ver. %s %s %s\n' % (PROGRAM, VERSION, VERDATE, AUTHOR) usage +='A program to simplify the launching of mpifxcorr' usage += 'It can also cause model and FITS to be made.\n\n' usage += 'Usage: startdifx [options] [] [ [ ... ] ]\n' usage += 'or: startdifx [options] [] \n\n' usage += ' is an optional delay (seconds) to add to the job start time\n' usage += ' is the file prefix for a DiFX input file (possibly including .input)\n' usage += ' is a joblist file as created by vex2difx (must have .joblist extension)\n' usage += 'Environment variables DIFX_MESSAGE_GROUP and DIFX_MESSAGE_PORT ' usage += 'can be used to override the default group/port of %s/%d\n' % (defaultgroup, defaultport) usage += 'If using the -m option the DIFX_HEAD_NODE environment must name the correlation head node.\n' return(usage) class Parser: def __init__(self): self._parser = expat.ParserCreate() self._parser.StartElementHandler = self.start self._parser.EndElementHandler = self.end self._parser.CharacterDataHandler = self.data self.message = '' self.mjd = 0.0 self.state = '' self.tmp = '' self.ok = False self.unit = '' self.mpiId = -1 self.id = '' self.tag = '' def feed(self, data): self._parser.Parse(data, 0) def close(self): self._parser.Parse("", 1) # end of data del self._parser # get rid of circular references def start(self, tag, attrs): self.tag = tag self.tmp = '' if tag == 'difxStatus': self.ok = True def end(self, tag): if tag == 'message' and self.ok: self.message = self.tmp elif tag == 'state': self.state = self.tmp elif tag == 'visibilityMJD': self.mjd = float(self.tmp) elif tag == 'from': self.unit = lower(self.tmp) elif tag == 'identifier': self.id = self.tmp elif tag == 'mpiProcessId': self.mpiId = int(self.tmp) def data(self, data): if self.tag == 'message': self.tmp = self.tmp + data else: self.tmp = data def getInfo(self): if self.ok: return 'MPI[%2d] %-9s %-12s %-7s %s' % (self.mpiId, self.unit, self.id, self.state, self.message) else: return '' def watchJob(jobId): if verbose > 1: print 'Watching %s:' % jobId port = getenv('DIFX_MESSAGE_PORT') if port == None: print 'DIFX_MESSAGE_PORT needs to be defined' exit(0) else: port = int(port) group = getenv('DIFX_MESSAGE_GROUP') if group == None: print 'DIFX_MESSAGE_GROUP needs to be defined' exit(0) # FIXME: eventually migrate to IPv6 compliant multicast receive s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind(('', port)) mreq = struct.pack("4sl", socket.inet_aton(group), socket.INADDR_ANY) s.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) try: while 1: try: message = s.recv(1500) if len(message) > 0 and message[0] == '<': p = Parser() p.feed(message) info = p.getInfo() p.close() now = time() # print now if p.ok and p.id == jobId: if p.state == 'Running': if verbose > 0: stdout.write('.') stdout.flush() else: if verbose > 0: stdout.write("<%s>" % p.state) stdout.flush() if p.state == 'MpiDone' or p.state == 'Crashed': return except socket.timeout: pass except KeyboardInterrupt: pass if verbose > 1: print 'Watching %s:' % jobId port = getenv('DIFX_MESSAGE_PORT') if port == None: print 'DIFX_MESSAGE_PORT needs to be defined' exit(0) else: port = int(port) group = getenv('DIFX_MESSAGE_GROUP') if group == None: print 'DIFX_MESSAGE_GROUP needs to be defined' exit(0) # FIXME: eventually migrate to IPv6 compliant multicast receive s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind(('', port)) mreq = struct.pack("4sl", socket.inet_aton(group), socket.INADDR_ANY) s.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) try: while 1: try: message = s.recv(1500) if len(message) > 0 and message[0] == '<': p = Parser() p.feed(message) info = p.getInfo() p.close() now = time() # print now if p.ok and p.id == jobId: if p.state == 'Running': if verbose > 0: stdout.write('.') stdout.flush() else: if verbose > 0: stdout.write("<%s>" % p.state) stdout.flush() if p.state == 'MpiDone' or p.state == 'Crashed': return except socket.timeout: pass except KeyboardInterrupt: pass def updateMpiOptions(): global mpiOptions opt = getenv('DIFX_MPIRUNOPTIONS') if opt is not None: mpiOptions = opt def updateEnvironment(filename): savedEnvironment = [] if isfile(filename): lines = open(filename).readlines() for l in lines: if l[0] == '#': continue e = split(strip(l)) if len(e) == 2: savedEnvironment.append([e[0], getenv(e[0])]) environ[e[0]] = e[1] return savedEnvironment def restoreEnvironment(savedEnvironment): for e in savedEnvironment: if e[1] == None: del environ[e[0]] else: environ[e[0]] = e[1] def getjobdifxversion(fileBase): version = '' label = '' files = glob(fileBase+'.calc') if len(files) < 1: print 'Can\'t find file %s.calc. Quitting.' % fileBase exit(0); d = open(files[0]).readlines() for l in d: if l[:12] == 'DIFX VERSION': version = strip(l[20:]) if l[:12] == 'DIFX LABEL': label = strip(l[20:]) return version, label def testDifxVersion(fileBase, override): jobversion, joblabel = getjobdifxversion(fileBase) version = getenv('DIFX_VERSION') label = getenv('DIFX_LABEL') if label == None: label = '' if version == None: print 'Warning: env. var. DIFX_VERSION not defined!' if jobversion == None: print 'Warning: %s.calc does not contain version info' %fileBase if version != None and jobversion != None and version != jobversion: print 'Warning: $DIFX_VERSION and job version (in .calc file) mismatch' print ' <%s> != <%s>' % (version, jobversion) if override: print 'Continuing anyway due to --override-version' else: print 'Quitting since --override-version not specified' exit(0) if label != '' and joblabel != '' and label != joblabel: print 'Warning: $DIFX_LABEL and job label (in .calc file) mismatch' print ' <%s> != <%s>' % (ver, jobver) if label == '': label = version return version, label def sendMessage(fileBase, state, statusmessage): group = getenv('DIFX_MESSAGE_GROUP') if group == None: group = defaultgroup port = getenv('DIFX_MESSAGE_PORT') if port == None: port = defaultport else: port = int(port) identifier = split(fileBase, '/')[-1] sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, defaultttl) message = \ "\n" \ "" \ "
" \ "%s" \ "-1" \ "%s" \ "DifxStatusMessage" \ "
" \ "" \ "-1" \ "" \ "%s" \ "%s" \ "0" \ "" \ "" \ "
\n" % \ (socket.gethostname(), identifier, state, statusmessage) sock.sendto(message, (group, port) ) # getNodes depends on the particular assumptions made by genmachines def getNodes(fileBase): threads = [] machines = [] datastreams = [] processors = {} threadData = open(fileBase + '.threads').readlines() for t in threadData[1:]: threads.append(int(t)) machineData = open(fileBase + '.machines').readlines() nDatastream = len(machineData) - len(threads) - 1 for m in range(1, nDatastream+1): datastreams.append(split(strip(machineData[m]))[0]) for m in range(1+nDatastream, len(machineData)): i = m - 1 - nDatastream t = threads[i] if processors.has_key(t): processors[t].append(split(strip(machineData[m]))[0]) else: processors[t] = [(split(strip(machineData[m]))[0])] return datastreams, processors def sendStartMessage(fileBase, difxVersion, localHeadNode, restartSeconds): inputFile = fileBase + '.input' group = getenv('DIFX_MESSAGE_GROUP') if group == None: group = defaultgroup port = getenv('DIFX_MESSAGE_PORT') if port == None: port = defaultport else: port = int(port) headNode = getenv('DIFX_HEAD_NODE') # -l option overrides DIFX_HEAD_NODE if localHeadNode: headNode = socket.gethostname() corrHeadNode = headNode identifier = split(fileBase, '/')[-1] sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, defaultttl) dsList, pList = getNodes(fileBase) system("rm -f %s.machines" % fileBase) system("rm -f %s.threads" % fileBase) dataStreams = "" for d in dsList: dataStreams = dataStreams + d + " " dataStreams = dataStreams.strip() processors = "" pKeys = pList.keys() for p in pKeys: processors = processors + "" if restartSeconds > 0: restartOption = '%f' % restartSeconds else: restartOption = '' message = \ "\n" \ "" \ "
" \ "startdifx" \ "%s" \ "-1" \ "%s" \ "DifxStart" \ "
" \ "" \ "" \ "%s" \ "" \ "" \ "%s" \ "%s" \ "%s" \ "" \ "" \ "
\n" % \ (headNode, identifier, inputFile, corrHeadNode, dataStreams, processors, difxVersion, restartOption) if verbose > 0: print "Sending: ", message sock.sendto(message, (group, port) ) watchJob(split(fileBase, '/')[-1]) print "\n" # verify presence of .machines file def doMachines(fileBase, machinesPolicy, cache): machinesFile = fileBase + '.machines' cmd = 'grep "FILE " %s.input' % fileBase files = popen(cmd, 'r').readlines() cmd = 'grep "CORE CONF FILENAME" %s.input' % fileBase f = popen(cmd, 'r').readlines() if len(f) == 1: threadsFile = strip(split(f[0], ':')[-1]) else: threadsFile = fileBase + '.threads' if cache[2] == files: if verbose > 0: print 'Using cached machines/threads info' cmd = 'cp %s %s' % (cache[0], machinesFile); if verbose > 1: print 'Executing: %s' % cmd system(cmd) cmd = 'cp %s %s' % (cache[1], threadsFile); if verbose > 1: print 'Executing: %s' % cmd system(cmd) else: if machinesPolicy == 2: system('rm -f %s' % machinesFile) nFail = 0 while not isfile(machinesFile): if machinesPolicy > 0: localHeadString = "" extraOpts = "" if options.difxdb: extraOpts += " -d " if options.machinesFile != "": extraOpts += " -m %s " % (options.machinesFile) cmd = '%s %s %s.input' % (genmachines, extraOpts, fileBase) if verbose > 1: print 'Executing: %s' % cmd v = -1 while v !=0 : v = subprocess.call(cmd, shell=True) # CTRL+C if v==8: exit(0) if not isfile(machinesFile): nFail += 1 if nFail > maxGenmachineFail and maxGenmachineFail > 0: return 0 else: return 0 cache[0] = machinesFile cache[1] = threadsFile cache[2] = files return len(open(machinesFile).readlines()) def handler(signum, frame): print 'Signal handler called with signal', signum exit(0) # Start difx with DifxStartMessage def runMessage(fileBase, machinesPolicy, deletePolicy, makeModel, override, useLocalHead, machinesCache, restartSeconds): difxVersion, difxLabel = testDifxVersion(fileBase, override) identifier = split(fileBase, '/')[-1] if not isfile(fileBase+'.input'): return 'Error: input file %s.input not found' % fileBase if not isfile(fileBase+'.im'): if makeModel and isfile(fileBase+'.calc'): calcOptions = "" if override: calcOptions += "--override-version " for i in range(verbose): calcOptions += "-v " system('%s %s %s.calc' % (calcif, calcOptions, fileBase)) else: return 'Error: model not available for %s' % fileBase if restartSeconds > 0: if not isdir(fileBase+'.difx'): print 'Warning: restartSeconds = %d and no existing output!' % restartSeconds elif isdir(fileBase+'.difx') or isfile(fileBase+'.difx'): if deletePolicy == 1: if verbose > 0: print 'Removing %s.difx' % fileBase sendMessage(fileBase, 'Info', 'Deleting %s.difx' % fileBase) system('rm -rf %s.difx' % fileBase) else: print 'Warning: output file %s.difx exists' % fileBase return np = doMachines(fileBase, machinesPolicy, machinesCache) if np <= 0: return 'Error: %s.machines not found' % fileBase sendStartMessage(fileBase, difxLabel, useLocalHead, restartSeconds) return None # Start difx directly with mpirun def runDirect(fileBase, machinesPolicy, deletePolicy, makeModel, override, useLocalHead, machinesCache, restartSeconds): difxVersion, difxLabel = testDifxVersion(fileBase, override) identifier = split(fileBase, '/')[-1] out = popen('which mpifxcorr').readlines() if len(out) != 1: return 'Error: mpifxcorr not found' pgm = strip(out[0]) if not isfile(fileBase+'.input'): return 'Error: input file %s.input not found' % fileBase if not isfile(fileBase+'.im'): if makeModel and isfile(fileBase+'.calc'): calcOptions = "" if override: calcOptions += "--override-version " for i in range(verbose): calcOptions += "-v " system('%s %s %s.calc' % (calcif, calcOptions, fileBase)) else: return 'Error: model not available for %s' % fileBase if restartSeconds > 0: if not isdir(fileBase+'.difx'): print 'Warning: restartSeconds = %d and no existing output!' % restartSeconds elif isdir(fileBase+'.difx') or isfile(fileBase+'.difx'): if deletePolicy == 1: if verbose > 0: print 'Removing %s.difx' % fileBase sendMessage(fileBase, 'Info', 'Deleting %s.difx' % fileBase) system('rm -rf %s.difx' % fileBase) else: print 'Warning: output file %s.difx exists' % fileBase return np = doMachines(fileBase, machinesPolicy, machinesCache) if np <= 0: return 'Error: %s.machines not found' % fileBase # spawn a logger process that will quit once this script ends # this will quietly fail on systems without difxlog installed cmd = 'difxlog %s %s.difxlog 4 %d &' % (identifier, fileBase, getpid()) if verbose > 1: print 'Executing: %s' % cmd system(cmd); if difxLabel == None or difxLabel == '': difxProgram = 'mpifxcorr' else: difxProgram = 'runmpifxcorr.' + difxLabel if restartSeconds > 0.0: restartOption = ' -r%f' % restartSeconds else: restartOption = '' cmd = 'mpirun -np %d --hostfile %s.machines %s %s %s.input%s' % (np, fileBase, mpiOptions, difxProgram, fileBase, restartOption) sendMessage(fileBase, 'Spawning', 'Spawning %d processes' % np) if verbose > 0: print 'Executing: ', cmd t0 = time() system(cmd) t1 = time() if verbose > 0: print 'Elapsed time (s) =', t1-t0 groupId = getenv('DIFX_GROUP_ID') if groupId != None: cmd = 'chown :%s %s.difx/*' % (groupId, fileBase) if verbose > 1: print 'Executing: %s' % cmd system(cmd) cmd = 'chmod g+w %s.difx/*' % fileBase if verbose > 1: print 'Executing: %s' % cmd system(cmd) sendMessage(fileBase, 'MpiDone', '') return None def run(fileBase, machinesPolicy, deletePolicy, makeModel, override, useStartMessage, useLocalHead, machinesCache, restartSeconds): savedEnvironment = updateEnvironment(fileBase+'.input.env') if useStartMessage: rv = runMessage(fileBase, machinesPolicy, deletePolicy, makeModel, override, useLocalHead, machinesCache, restartSeconds) else: rv = runDirect(fileBase, machinesPolicy, deletePolicy, makeModel, override, useLocalHead, machinesCache, restartSeconds) restoreEnvironment(savedEnvironment) return rv #------------------ # main starts here #------------------ restartSeconds = 0.0 fileBaseList = [] cwd = getcwd() machinesCache = ['','',['Nothing to see here']] updateMpiOptions() signal.signal(signal.SIGINT, handler) usage = getUsage() optParser = OptionParser(version="%prog " + VERSION , usage=usage) optParser.set_defaults(machinesPolicy=2, useStartMessage=False, deletePolicy=0, makeModel=True, difxdb=False, makeFits=False, verbose=0, quiet=0, useLocalHead=False, override=False, machinesFile="") optParser.add_option("-g", "--genmachines", dest="machinesPolicy", action="store_const", const=2, help="will run genmachines even if not needed [default]") optParser.add_option("-a", "--automachines", dest="machinesPolicy", action="store_const", const=1, help="will run genmachines if needed") optParser.add_option("-n", "--nomachines", dest="machinesPolicy", action="store_const", const=0, help="will not run genmachines, even if needed") optParser.add_option("-M", "--machines-file", dest="machinesFile", action="store", type="string", help="start difx via DifxStartMessage") optParser.add_option("-m", "--message", dest="useStartMessage", action="store_true", help="start difx via DifxStartMessage") optParser.add_option("-f", "--force", dest="deletePolicy", action="store_const", const=1, help="force running even if output file exists") optParser.add_option("-d", "--dont-calc", dest="makeModel", action="store_false", help="will not run calcif, even if needed") optParser.add_option("-D", "--difxdb", dest="difxdb", action="store_true", help="make use of difxdb to obtain module location") optParser.add_option("-F", "--fits", dest="makeFits", action="store_true", help="generate 1 fits file per job at end of each job") optParser.add_option("-v", "--verbose", dest="verbose", action="count", help="send more output to the screen (use -v -v for extra info)") optParser.add_option("-q", "--quiet", dest="quiet", action="count", help="be quieter") optParser.add_option("-l", "--localhead", dest="useLocalHead", action="store_true", help="use the current host as the head node. Overrides DIFX_HEAD_NODE.") optParser.add_option("--override-version", dest="override", action="store_true", help="ignore difx version differences") # parse the command line. Options will be stored in the options list. Leftover arguments will be stored in the args list (options, args) = optParser.parse_args() verbose = options.verbose - options.quiet if len(args) < 1: optParser.error("at least one .input or .joblist file must be given.") # parse arguments try: # check if start delay parameter was given restartSeconds = float(args[0]) start = 1 except: start = 0 # loop over all arguments fb = None for a in args[start:]: # check for extension if a[-6:] == '.input': fb = a[:-6] elif a[-8:] == '.joblist': d = open(a).readlines() for l in d: parts = l.split() if len(parts) == 0: job = strip(l) else: job = strip(parts[0]) if job.startswith("#") or job.startswith("exper"): continue if parts[0] == '/': fileBaseList.append(parts[0]) else: fileBaseList.append(cwd + '/' + parts[0]) else: fb = a if fb != None: if fb[0] == '/': fileBaseList.append(a) else: fileBaseList.append(cwd + '/' + fb) # verify that joblist is not empty if len(fileBaseList) < 1: optParser.error("ERROR: At least one .input or .joblist file must be given.") # verify that machinesfile exists if options.machinesFile != "" and isfile(options.machinesFile) == False: print "ERROR: The machines file (%s) does not exist" % (options.machinesFile) exit(1) headNode = getenv('DIFX_HEAD_NODE') if options.useStartMessage and options.useLocalHead == False and headNode == None: print "ERROR: Environment variable DIFX_HEAD_NODE must be defined." print "Alternatively you can use the -l option in case mk5daemon -H is running locally on this host." exit(1) nBad = 0 for fileBase in fileBaseList: if not isfile(fileBase + '.input'): print 'Error: %s.input not found (or not a regular file)' % fileBase nBad += 1 if nBad > 0: exit(1) for fileBase in fileBaseList: a = rfind(fileBase, "/") jobname = fileBase[a+1:] if len(jobname) > 31: sendMessage(fileBase, 'ABORTED', "Did not start job because jobname is too long!") print "Job name %s is too long and will be skipped!" % fileBase exit(1) v = run(fileBase, options.machinesPolicy, options.deletePolicy, options.makeModel, options.override, options.useStartMessage, options.useLocalHead, machinesCache, restartSeconds) if v != None: sendMessage(fileBase, 'ABORTED', v) print v exit(1) elif options.makeFits == True: system('%s %s %s.FITS' % (difx2fits, fileBase, fileBase))