#!/usr/bin/env python PROGRAM = 'startdifx' VERSION = '0.4' VERDATE = '20100516' AUTHOR = 'Walter Brisken' defaultgroup = "224.2.2.1" defaultport = 50200 defaultttl = 3 genmachines = 'genmachines -v' calcif = 'calcif2' difx2fits = 'difx2fits -v' #mpiOptions = '--mca btl ^udapl,openib --mca mpi_yield_when_idle 1 --mca rmaps seq' #mpiOptions = '--mca btl_tcp_if_exclude eth0 --mca mpi_yield_when_idle 1 --mca rmaps seq' mpiOptions = '--mca mpi_yield_when_idle 1 --mca rmaps seq' from sys import argv, exit from os import popen, getcwd, system, getenv, getpid from os.path import isfile, isdir from string import split, strip, rfind from time import time from glob import glob import socket import signal import sys def usage(): print '\n%s ver. %s %s %s\n' % (PROGRAM, VERSION, VERDATE, AUTHOR) print 'A program to simplify the launching of mpifxcorr, specialy tuned' print 'for NRAO-DiFX usage. It can also cause model and FITS to be made.\n' print 'Usage: startdifx [options] [ [ ... ] ]\n' print 'options can include:\n' print ' -h or --help' print ' print this usage info and exit\n' print ' -f or --force' print ' force running even if output file exists\n' print ' -a or --automachines' print ' will run genmachines if needed\n' print ' -g or --genmachines' print ' will run genmachines even if not needed [default]\n' print ' -n or --nomachines' print ' will not run genmachines, even if needed\n' print ' -d or --dont-calc' print ' will not run calcif, even if needed\n' print ' -F or --fits' print ' generate 1 fits file per job at end of each\n' print ' --override-version' print ' ignore difx version differences\n' print ' is the file prefix for a DiFX input file (possibly including .input)\n' print 'Environment variables DIFX_MESSAGE_GROUP and DIFX_MESSAGE_PORT' print 'can be used to override the default group/port of %s/%d\n' % \ (defaultgroup, defaultport) exit(0) def signal_handler(signal, frame): print 'You pressed Ctrl+C!' sys.exit(0) def getjobdifxversion(filebase): files = glob(filebase+'.calc') if len(files) < 1: return None d = open(files[0]).readlines() for l in d: if l[:12] == 'DIFX VERSION': return strip(l[20:]) def testdifxversion(filebase, override): jobver = getjobdifxversion(filebase) ver = getenv('DIFX_VERSION') if ver == None: print 'Warning: env. var. DIFX_VERSION not defined!' if jobver == None: print 'Warning: %s.calc does not contain version info' %filebase if ver != None and jobver != None and ver != jobver: print 'Warning: $DIFX_VERSION and job version mismatch' print ' <%s> != <%s>' % (ver, jobver) if override: print 'Continuing anyway due to --override-version' else: print 'Quitting since --override-version not specified' exit(0) return ver def sendmessage(filebase, state, statusmessage): group = getenv('DIFX_MESSAGE_GROUP') if group == None: group = defaultgroup port = getenv('DIFX_MESSAGE_PORT') if port == None: port = defaultport else: port = int(port) identifier = split(filebase, '/')[-1] sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, defaultttl) message = \ "\n" \ "" \ "
" \ "%s" \ "-1" \ "%s" \ "DifxStatusMessage" \ "
" \ "" \ "-1" \ "" \ "%s" \ "%s" \ "0" \ "" \ "" \ "
\n" % \ (socket.gethostname(), identifier, state, statusmessage) sock.sendto(message, (group, port) ) # verify presense of .machines file def domachines(filebase, machinespolicy): fn = filebase + '.machines' if machinespolicy == 2: system('rm -f %s' % fn) if not isfile(fn): if machinespolicy > 0: errcount = 0 while errcount < 2: v = system('%s %s.input' % (genmachines, filebase)) if v == 0: break else: errcount += 1 if not isfile(fn): return 0 else: return 0 return len(open(fn).readlines()) def run(filebase, machinespolicy, deletepolicy, makeModel, override): difxVersion = testdifxversion(filebase, override) identifier = split(filebase, '/')[-1] out = popen('which mpifxcorr').readlines() if len(out) != 1: return 'Error: mpifxcorr not found' pgm = strip(out[0]) if not isfile(filebase+'.input'): return 'Error: input file %s.input not found' % filebase if not isfile(filebase+'.im'): if makeModel and isfile(filebase+'.calc'): system('%s %s.calc' % (calcif, filebase)) else: return 'Error: model not available for %s' % filebase if isdir(filebase+'.difx') or isfile(filebase+'.difx'): if deletepolicy == 1: print 'Removing %s.difx' % filebase sendmessage(filebase, 'Info', 'Deleting %s.difx' % filebase) system('rm -rf %s.difx' % filebase) else: return 'Error: output file %s.difx exists' % filebase np = domachines(filebase, machinespolicy) if np <= 0: return 'Error: %s.machines not found' % filebase # spawn a logger process that will quit once this script ends # this will quietly fail on systems without difxlog installed cmd = 'difxlog %s %s.difxlog 4 %d &' % (identifier, filebase, getpid()) system(cmd); if difxVersion == None: difxProgram = 'mpifxcorr' else: difxProgram = 'runmpifxcorr.' + difxVersion cmd = 'mpirun -np %d --bynode --hostfile %s.machines %s %s %s.input' % (np, filebase, mpiOptions, difxProgram, filebase) sendmessage(filebase, 'Spawning', 'Spawning %d processes' % np) print 'Executing: ', cmd t0 = time() # system(cmd) t1 = time() print 'Elapsed time (s) =', t1-t0 groupId = getenv('DIFX_GROUP_ID') if groupId != None: cmd = 'chown :%s %s.difx/*' % (groupId, filebase) system(cmd) cmd = 'chmod g+w %s.difx/*' % filebase system(cmd) sendmessage(filebase, 'MpiDone', '') return None if len(argv) < 2: usage() signal.signal(signal.SIGINT, signal_handler) machinespolicy = 2 deletepolicy = 0 makeModel = True makefits = False filebaselist = [] cwd = getcwd() override = False for a in argv[1:]: fb = None if a[0] == '-': if a == '-h' or a == '--help': usage() elif a == '-g' or a == '--genmachines': machinespolicy = 2 elif a == '-a' or a == '--automachines': machinespolicy = 1 elif a == '-n' or a == '--nomachines': machinespolicy = 0 elif a == '-f' or a == '--force': deletepolicy = 1 elif a == '-d' or a == '--dont-calc': makeModel = False elif a == '-F' or a == '--fits': makefits = True elif a == '--override-version': override = True else: if a[-6:] == '.input': fb = a[:-6] elif a[-8:] == '.joblist': d = open(a).readlines() for l in d: parts = l.split(" ") job = strip(parts[0]) if job.startswith("#"): continue if parts[0] == '/': filebaselist.append(parts[0]) else: filebaselist.append(cwd + '/' + parts[0]) else: fb = a if fb != None: if fb[0] == '/': filebaselist.append(a) else: filebaselist.append(cwd + '/' + fb) if len(filebaselist) < 1: usage() for filebase in filebaselist: a = rfind(filebase, "/") jobname = filebase[a+1:] if len(jobname) > 31: sendmessage(filebase, 'ABORTED', "Did not start job because jobname is too long!") print "Job name %s is too long and will be skipped!" % filebase exit(1) v = run(filebase, machinespolicy, deletepolicy, makeModel, override) if v != None: sendmessage(filebase, 'ABORTED', v) print v continue #exit(1) elif makefits == True: system('%s %s %s.FITS' % (difx2fits, filebase, filebase))