#!/usr/bin/env python from sys import exit, argv, stdout, stdin from os import getenv, mkdir, system, umask, getcwd from string import split, strip, rfind, upper from getopt import getopt from os.path import isfile, isdir from glob import glob import datetime import cx_Oracle import popen2 from time import localtime, asctime dbname = getenv("VLBA_DB") program = 'difxqueue' version = '0.8' author = 'Walter Brisken' verdate = '20110109' clusterHorsepower = 0.24 # Antennas considered not foreign vlbaAntennas = ['BR', 'FD', 'HN', 'KP', 'LA', 'MK', 'NL', 'OV', 'PT', 'SC'] def usage(prog): print '\n%s ver %s %s %s' % (program, version, author, verdate) print 'A program to send DiFX jobs to the correlator queue and update the database.' print 'Functionality to examine and modify the contents of the queue are also provided.\n' print '\nUsage: %s [options] []' % prog print """ Options can include: --verbose -v Send more output to the screen (use -v -v for extra info) --help -h Print this help information and quit --override-version Force operation with mixed DiFX versions --priority -p Set the priority of the job to when adding to the queue --db-only -d Operate only on database (some commands only) --queuedir -q Specify the directory to use as the correlation base Command must be one of the following. Each command takes its own set of arguments. See examples for use cases: add Add job(s) to the queue arguments = [list of job numbers (ranges OK)] example 1: difxqueue add clock example 2: difxqueue add geodesy 3 4 5 8-12 bump Increase the priority of job(s) arguments = [list of job numbers] example 1: difxqueue bump clock del Remove job(s) from the queue arguments = [list of job numbers (ranges OK)] example 1: difxqueue del clock example 2: difxqueue del geodesy 3 4 5 11-32 list List all jobs within a pass arguments = example 1: difxqueue list clock listall List all incomplete jobs in the queue. Note that if one or more projects is specified, all jobs, complete or not, for those projects will be listed. If no segment code is appended to a project name, then all matching proposal codes will be listed. arguments = [ ... ] example 1: difxqueue listall example 2: difxqueue listall BX123 BY321 example 3: difxqueue listall BR138A log Print correlation history arguments = example: difxqueue log BX123 prod Print production correlator queue argumements = [] If no arguments passed, will print to stdout example 1: difxqueue prod example 2: difxqueue prod prodList.txt set Set the state of a job arguments = [list of job numbers] should be one of: COMPLETE, QUEUED or FAILED, but can be anything. example 1: difxqueue set tc015d COMPLETE example 1: difxqueue set tc015d QUEUED 2 slide Decrease the priority of job(s) arguments = [list of job numbers] example 1: difxqueue slide geodesy 3 5 """ exit(0) def getnumbers(stringlist): numbers = [] for s in stringlist: sp = split(s, '-') if len(sp) == 1: a = b = int(sp[0]) if len(sp) == 2: a = int(sp[0]) b = int(sp[1]) if b < a: print 'Range specifier "%s" is not conformant' % s exit(0); for i in range(a, b+1): if not i in numbers: numbers.append(i) numbers.sort() return numbers def parsekv(str): kv = {} ss = split(str) for s in ss: p = split(s, '=') if len(p) != 2: print 'Error parsing key=value statement: %s', s return {} kv[p[0]] = p[1] return kv # Splits a combined obs code into its proposal and segment portions def splitobscode(exper): obsSeg = '' proposal = exper[:] if len(proposal) > 3: if proposal[0].isalpha() and proposal[1].isalpha() and proposal[2].isdigit(): for i in range(3, len(proposal)): if proposal[i].isalpha(): obsSeg = proposal[i:] proposal = proposal[0:i] break if proposal[0].isalpha() and proposal[1].isdigit(): for i in range(2, len(proposal)): if proposal[i].isalpha(): obsSeg = proposal[i:] proposal = proposal[0:i] break return proposal, obsSeg def makeobscode(proposal, segment): if segment == None: return proposal else: return proposal+segment # Time conversion functions mjd0 = datetime.datetime(1858, 11, 17, 0, 0) def calcMJD(t): dt = t - mjd0 return dt.days + dt.seconds/86400.0 def mjd2str(mjd): if mjd < 50001 or mjd > 99999: return '' d = int(mjd) s = int((mjd - d)*86400.0 + 0.5) dt = datetime.timedelta(d, s) t = mjd0 + dt yr = t.year%100 return '%02d_%02d_%02d %02d:%02d' % (yr, t.month, t.day, t.hour, t.minute) def mjd2vex(mjd, dateonly=False): if mjd < 50001 or mjd > 99999: return '' md = [0, 31, 59, 90, 120, 151, 181, 212, 243, 273, 304, 334] d = int(mjd) s = int((mjd - d)*86400.0 + 0.5) dt = datetime.timedelta(d, s) t = mjd0 + dt d = t.day + md[t.month-1] if t.year % 4 == 0 and t.month > 2: d += 1 if dateonly: return '%dy%03dd' % \ (t.year, d) else: return '%dy%03dd%02dh%02dm%02ds' % \ (t.year, d, t.hour, t.minute, t.second) def mjd2db(mjd): t = mjd0 + datetime.timedelta(mjd) datestr = '%04d/%02d/%02d' % (t.year, t.month, t.day) timestr = '%02d:%02d:%02d' % (t.hour, t.minute, t.second) return "to_date('%s:%s', 'yyyy/mm/dd:hh24:mi:ss')" % (datestr, timestr) class DatabaseJob: def __init__(self, list): self.proposal = list[0] self.segment = list[1] self.passName = list[2] self.number = int(list[3]) self.priority = int(list[4]) self.jobStart = calcMJD(list[5]) self.jobStop = calcMJD(list[6]) self.speedUp = float(list[7]) self.inputFile = list[8] self.status = list[9] self.nAntenna = int(list[10]) if len(list) > 11: self.corrType = list[11] self.corrVersion = list[12] try: self.nForeign = int(list[13]) except TypeError: self.nForeign = 0 try: self.outSize = int(float(list[14])) except TypeError: self.outSize = 0 else: self.corrType = '' self.corrVersion = '' self.nForeign = 0 self.outSize = 0 p = rfind(self.inputFile, '/') if p < 1 or self.inputFile[-6:] != '.input': print 'Malformed full input filename: %s' % self.inputFile exit(0) self.queueDir = self.inputFile[0:p] def duration(self): return int(86400.0*(self.jobStop-self.jobStart)) def runtime(self): return int(86400.0*(self.jobStop-self.jobStart)/self.speedUp) def printHeader(self, printTotal, verbose=0): # note: this is a static function if verbose > 0: extra = "Start Stop " else: extra = "Dur(sec) " header = '\nExper Pass Num Pri. %sSpeedUp Status nAntenna' % extra if printTotal: header += ' Total(hrs)' print header def printJob(self, total=0.0, verbose=0): if verbose > 0: extra = "%-21s%-21s" % \ (mjd2vex(self.jobStart), mjd2vex(self.jobStop)) else: extra = "%-9d" % self.duration() line = '%-10s%-10s%-5d%-5d%s%-8.1f%-11s%4d/%-4d' % \ (makeobscode(self.proposal, self.segment), self.passName, self.number, \ self.priority, extra, self.speedUp, self.status, self.nAntenna, self.nForeign); if total > 0.0: line += '%-10.2f' % total print line if verbose > 1: print ' %s' % self.inputFile def printHeader2(self, fp, first): if first: fp.write('\n') fp.write('VLBA-DiFX Correlator Production Queue [ Created %s ]\n\n' % asctime(localtime()) ) fp.write('Job Proj Pri ObsStart[UT] ObsL CorL DSMB Rate SU Version #S FS Hours\n') else: fp.write('\n') fp.write('========== ======= === ============== ==== ===== ==== ==== === ============ == == =====\n') def printTrailer2(self, fp): fp.write('\n') fp.write('------------------------------------- Column Description ------------------------------------\n') fp.write('Job : job name\n') fp.write('Proj : name of project (proposal/segment)\n') fp.write('ObsStart : start date/time of observation (UTC)\n') fp.write('ObsL : the observation length (in hours)\n') fp.write('CorL : the estimated correlation time (HH:mm)\n') fp.write('DSMB : estimated disk size (in Megabytes)\n') fp.write('Rate : estimated data rate (in Kilobytes/second)\n') fp.write('SU : estimated speed-up factor\n') fp.write('Version : version of DiFX to use\n') fp.write('#S : total number of stations in job\n') fp.write('FS : number of foreign stations\n') fp.write('Hours : running total of estiamted correlation time (in hours)\n') def printJob2(self, fp, total): jobName = '%s_%d' % (self.passName, self.number) obsStart = mjd2str(self.jobStart) obsLength = (self.jobStop-self.jobStart)*24.0 corLength = obsLength/self.speedUp hh = int(corLength) mm = int(60*(corLength-hh) + 0.5) cl = '%02d:%02d' % (hh, mm) if self.speedUp < 9.95: su = '%3.1f' % self.speedUp; else: su = '%3.0f' % self.speedUp; rate = int((self.outSize*1000.0)/(3600.0*corLength)) hours = total + corLength fp.write('%-12s [ ] %-7s %-3d %-14s %4.1f %5s %4d %4d %3s %-12s %2d %2d %5.1f\n' % ( \ jobName, \ makeobscode(self.proposal, self.segment), \ self.priority, \ obsStart, \ obsLength, \ cl, \ self.outSize, \ rate, \ su, \ self.corrVersion, \ self.nAntenna, \ self.nForeign, \ hours \ )) return hours class DatabaseLog: def __init__(self, list): self.proposal = list[0] self.segment = list[1] self.passName = list[2] self.number = int(list[3]) if list[4] != None: self.corrStart = calcMJD(list[4]) else: self.corrStart = 50000 if list[5] != None: self.corrStop = calcMJD(list[5]) else: self.corrStop = 100000 if list[6] != None: self.speedUp = list[6] else: self.speedUp = 0.0 self.inputFile = list[7] self.outputFile = list[8] if list[9] != None: self.outputSize = list[9] else: self.outputSize = 0.0 if list[10] != None: self.status = list[10] else: self.status = "Unknown" def printHeader(self, verbose=0): # note: this is a static function if verbose > 0: extra = "Start Stop " else: extra = "Dur(sec) " header = '\nExper Pass Num %sSpeedUp Status Size' % extra print header def runtime(self): rt = int(86400.0*(self.corrStop-self.corrStart)) if rt > 1000000: return 0 else: return rt def printLog(self, verbose=0): if verbose > 0: extra = "%-21s%-21s" % \ (mjd2vex(self.corrStart), mjd2vex(self.corrStop)) else: extra = "%-9d" % self.runtime() line = '%-10s%-10s%-5d%s%-8.1f%-11s%-6.2f' % \ (makeobscode(self.proposal, self.segment), self.passName, self.number, \ extra, self.speedUp, self.status, self.outputSize); print line # Database access functions def getPassJobsFromDatabase(database, proposal, segment, passName, verbose=0): if len(segment) > 0: segString = "and SEGMENT='%s' " % segment else: segString = "" query = "select * from DIFXQUEUE where PROPOSAL='%s' %sand JOB_PASS='%s' order by JOB_NUMBER" % (proposal, segString, passName) if verbose > 1: print 'Executing database query: %s' % query cursor = database.cursor() cursor.execute(query) D = [] for j in cursor.fetchall(): D.append(DatabaseJob(j)) return D def getActiveJobsFromDatabase(database, projects, verbose=0): D = [] cursor = database.cursor() if len(projects) == 0: query = "select * from DIFXQUEUE where STATUS!='COMPLETE' order by JOB_START" if verbose > 1: print 'Executing database query: %s' % query cursor.execute(query) for j in cursor.fetchall(): D.append(DatabaseJob(j)) else: for p in projects: prop, seg = splitobscode(p) if len(seg) > 0: condition = "where PROPOSAL='%s' and SEGMENT='%s'" % (prop, seg) else: condition = "where PROPOSAL='%s'" % prop query = "select * from DIFXQUEUE %s order by JOB_START" % condition if verbose > 1: print 'Executing database query: %s' % query cursor.execute(query) for j in cursor.fetchall(): D.append(DatabaseJob(j)) return D def getDifxLogsFromDatabase(database, project, verbose=0): D = [] cursor = database.cursor() prop, seg = splitobscode(project) if len(seg) > 0: condition = "where PROPOSAL='%s' and SEGMENT='%s'" % (prop, seg) else: condition = "where PROPOSAL='%s'" % prop query = "select * from DIFXLOG %s order by CORR_START" % condition if verbose > 1: print 'Executing database query: %s' % query cursor.execute(query) for j in cursor.fetchall(): D.append(DatabaseLog(j)) return D def changeDatabaseStatus(database, dbJob, column, newValue, verbose=0): if isinstance(newValue, int): update = "%d" % newValue elif isinstance(newValue, str): update = "'%s'" % newValue else: print "Error: changeDatabaseStatus: don't know type of:", newValue exit(0) cmd = "update DIFXQUEUE set %s=%s where INPUT_FILE='%s'" % \ (column, update, dbJob.inputFile) if verbose > 1: print 'Executing database update: %s' % cmd cursor = database.cursor() cursor.execute(cmd) database.commit() def deleteDatabaseEntry(database, dbJob, verbose=0): cmd = "delete from DIFXQUEUE where INPUT_FILE='%s'" % dbJob.inputFile if verbose > 1: print 'Executing database update: %s' % cmd cursor = database.cursor() cursor.execute(cmd) database.commit() def addDatabaseEntry(database, passData, job, dir, priority, corrType, corrVersion, verbose=0): prefix = job.name inputFile = '%s/%s.input' % (dir, prefix) # make sure inputFile isn't already queued for j in passData.dbJobs: if j.inputFile == inputFile: print 'Error: %s already in the queue' % inputFile return status = 'QUEUED' dur = (job.mjdStop-job.mjdStart)*86400.0 speedUp = dur/(job.computationalLoad/clusterHorsepower) cmd = "insert into DIFXQUEUE (PROPOSAL, SEGMENT, JOB_PASS, JOB_NUMBER, PRIORITY, JOB_START, JOB_STOP, SPEEDUP, INPUT_FILE, STATUS, NUM_ANT, CORR_TYPE, CORR_VERSION, NUM_FOREIGN, OUTPUT_SIZE_EST) values ('%s', '%s', '%s', %d, %d, %s, %s, %5f, '%s', '%s', %d, '%s', '%s', %d, %d)" % ( \ passData.proposal, passData.segment, passData.passName, job.number, priority, mjd2db(job.mjdStart), mjd2db(job.mjdStop), speedUp, inputFile, status, job.nAnt, corrType, corrVersion, job.nForeign, job.outSize) if verbose > 1: print 'Executing database insert: %s' % cmd cursor = database.cursor() cursor.execute(cmd) database.commit() def printDatabaseJobs(dbJobs, verbose=0): dbJobs[0].printHeader(True, verbose) totalDur = 0.0 remainDur = 0.0 totalCor = 0.0 remainCor = 0.0 for j in dbJobs: totalDur += j.duration() totalCor += j.duration()/j.speedUp if j.status != 'COMPLETE': remainDur += j.duration() remainCor += j.duration()/j.speedUp j.printJob(totalCor/3600.0, verbose) print '' if totalDur != remainDur: print 'Total observe time = %4.2f hours' % \ (totalDur / 3600.0) print 'Observe time remaining = %4.2f hours' % \ (remainDur/3600.0) if totalCor != remainCor: print 'Total correlator time = %4.2f hours' % \ (totalCor / 3600.0) print 'Correlator time remaining = %4.2f hours' % \ (remainCor/3600.0) print '' def printProductionQueue(dbJobs, fp): first = True lastProject = '' total = 0.0 for j in dbJobs: prj = makeobscode(j.proposal, j.segment) if lastProject != prj: j.printHeader2(fp, first) first = False lastProject = prj total = j.printJob2(fp, total) j.printTrailer2(fp) def printDifxLogs(dbLogs, verbose=0): dbLogs[0].printHeader(verbose) for l in dbLogs: l.printLog(verbose) print '' def countForeign(str): ants = split(strip(str)) f = 0 for a in ants: if upper(a) not in vlbaAntennas: f += 1 return f def inputSanityCheck(inputFile): data = open(inputFile, 'r').readlines() OK = True # check for proper mark5 modules l = 1; for d in data: s = split(strip(d)) if len(s) == 3: if s[0] == 'FILE' and upper(s[2]) == 'NONE': print 'Error: %s line %d: Cannot have VSN or filename of "%s"' \ % (inputFile, l, s[2]) OK = False l += 1 return OK # Class to store information about a job that is contained in a joblist file class Job: def __init__(self, str, nForeign=0): s = split(strip(str)) if len(s) >= 5: self.name = s[0] self.mjdStart = float(s[1]) self.mjdStop = float(s[2]) self.nAnt = int(s[3]) self.nPulsarBins = int(s[4]) self.nSourceFiles = int(s[5]) self.nForeign = nForeign self.computationalLoad = float(s[6]) if len(s) > 5: self.outSize = int(float(s[7])) else: self.outSize = 0 s = split(self.name, '_') if len(s) != 2: print 'Malformed job name "%s" in .joblist file should be of form name_number' % self.name exit(0) self.number = int(s[1]) else: self.name = '' def show(self, indent=0): print '%sJob %s: start=%f stop=%f nPsrBin=%d nSrcFiles=%d nAnt=%d' % \ (' '*indent, self.name, self.mjdStart, self.mjdStop, self.numPulsarBins, self.numSourceFiles, self.nAnt) def verify(self, dir, preCorr=False): if self.nPulsarBins > 1 and self.nSourceFiles > 1: print "Can't do multiple phase centres and pulsar mode in one pass!" return False files = ['.calc', '.input'] for f in files: file = dir + '/' + self.name + f if not isfile(file): print 'Error : %s not found' % file return False if preCorr: if not inputSanityCheck(dir + '/' + self.name + '.input'): return False else: ddir = dir + '/' + self.name + '.difx' if not isdir(ddir): print 'Error: difx output dir %s not found' % ddir return False return True class JobList: def __init__(self, filename): self.filename = filename self.overrideVersion = False data = open(filename).readlines() self.jobs = [] if len(data) < 2: print 'Malformed .joblist file %s' % filename exit(0) self.kv = parsekv(data[0]) if len(self.kv) < 1: print 'Malformed .joblist file %s line 1' % filename exit(0) for n in range(1, len(data)): A = split(data[n], '#') d = A[0] if len(d) < 2: continue if len(A) > 1: nForeign = countForeign(A[1]) else: nForeign = 0 j = Job(d, nForeign) if j.name == '': print 'Malformed line %d in %s' % (n+1, filename) exit(0) self.jobs.append(j) def testversion(self, override): difxVersion = getenv('DIFX_VERSION') if difxVersion == None: print 'Warning: env var DIFX_VERSION is not set!' if override: difxVersion = 'unknown' else: exit(0) elif difxVersion != self.kv['DiFX']: if override: print 'Overriding version mismatch: %s != %s' % \ (difxVersion, self.kv['DiFX']) difxVersion = 'unknown' self.overrideVersion = True else: print 'Error: file DiFX version = %s' % self.kv['DiFX'] print 'and current DiFX version = %s' % difxVersion exit(0) self.kv['DiFX'] = difxVersion return difxVersion def verify(self, queuedir): dir = queuedir + '/' + self.kv['exper'] OK = True for j in self.jobs: if j.verify(dir) == False: OK = False return OK def show(self, indent=0): id = ' '*indent print '%sJobList: %s' % (id, self.filename) id = ' '*(indent+2) for key in self.kv.keys(): print '%sKV: %s = %s' % (id, key, self.kv[key]) for j in self.jobs: j.show(indent+2) # Container object for all information about a full correlator pass, i.e., 1 or # more jobs: used for most modes of this program class Pass: def __init__(self, jobListFile, database, verbose=0): self.jobList = JobList(jobListFile) self.proposal, self.segment = splitobscode(self.jobList.kv['exper']) self.passName = self.jobList.kv['pass'] self.dbJobs = getPassJobsFromDatabase(database, \ self.proposal, self.segment, self.passName, verbose) self.database = database self.corrType = '' def getDatabaseJobsByNumbers(self, numList): jobs = [] if len(numList) == 0: for j in self.dbJobs: jobs.append(j) else: badList = [] for i in numList: OK = False for j in self.dbJobs: if j.number == i: OK = True jobs.append(j) break if not OK: badList.append(i) if len(badList) > 0: print 'Some requested jobs are not in the queue:', badList return [] return jobs def getJoblistJobsByNumbers(self, numList): jobs = [] if len(numList) == 0: for j in self.jobList.jobs: jobs.append(j) else: badList = [] for i in numList: OK = False for j in self.jobList.jobs: if j.number == i: OK = True jobs.append(j) break if not OK: badList.append(i) if len(badList) > 0: print 'Some requested jobs are not in the .jobList file:', badList return [] return jobs def list(self, verbose=0): # send to stdout the list of jobs in the database if len(self.dbJobs) == 0: print 'No incomplete jobs in queue' else: printDatabaseJobs(self.dbJobs, verbose); def updateStatus(self, numList, newStatus, verbose=0): if len(self.dbJobs) == 0: print 'No jobs to change status' jobs = self.getDatabaseJobsByNumbers(numList) if len(jobs) == 0: return for j in jobs: changeDatabaseStatus(self.database, j, "STATUS", newStatus, verbose) j.status = newStatus def upPriority(self, numList, verbose=0): if len(self.dbJobs) == 0: print 'No jobs to change priority' jobs = self.getDatabaseJobsByNumbers(numList) if len(jobs) == 0: return for j in jobs: changeDatabaseStatus(self.database, j, "Priority", j.priority-1, verbose) j.priority -= 1 def downPriority(self, numList, verbose=0): if len(self.dbJobs) == 0: print 'No jobs to change priority' jobs = self.getDatabaseJobsByNumbers(numList) if len(jobs) == 0: return for j in jobs: changeDatabaseStatus(self.database, j, "Priority", j.priority+1, verbose) j.priority += 1 def delete(self, numList, dbOnly, verbose=0): if len(self.dbJobs) == 0: print 'No jobs to change status' jobs = self.getDatabaseJobsByNumbers(numList) if len(jobs) == 0: return dontAsk = False for j in jobs: if dbOnly: print 'Removing job %d of pass %s from database only' % \ (j.number, j.passName) else: print 'Removing job %d of pass %s' % \ (j.number, j.passName) # need safety check here if isdir(j.inputFile[0:-6]+'.difx'): if not dbOnly: print 'Correlator output for job %d of pass %s exists' % \ (j.number, j.passName) if not dontAsk and not dbOnly: print 'Continue anyway, destroying this data? [y/N/c/a]' a = stdin.readline() else: a = 'y' if strip(a) in ['a', 'A']: dontAsk = True print 'Assuming yes for all remaining' a = 'y' if not dbOnly: if strip(a) in ['y', 'Y']: if not dontAsk: print 'OK, proceeding\n' else: print 'Continuing anyway\n' cmd = 'rm -r %s.difx' % j.inputFile[0:-6] if verbose: print 'Executing: %s' % cmd system(cmd) elif strip(a) in ['c', 'C']: print 'Cancelling\n' return else: print 'Not continuing.\n' continue; deleteDatabaseEntry(self.database, j, verbose) if not dbOnly: cmd = 'rm %s.*' % j.inputFile[0:-6] if verbose: print 'Executing: %s' % cmd system(cmd) if len(glob('%s/*' % j.queueDir)) == 0: cmd = 'rmdir %s' % j.queueDir if verbose: print 'Executing: %s' % cmd system(cmd) print '' def queueJobsDBOnly(self, numbers, queueDir, priority, verbose=0): jobs = self.getJoblistJobsByNumbers(numbers) for j in jobs: print 'Adding %s to the queue (database only)' % j.name addDatabaseEntry(self.database, passData, j, queueDir, priority, self.corrType, self.jobList.kv['DiFX'], verbose) def queueJobs(self, numbers, queueDir, priority, verbose=0): groupId = getenv('DIFX_GROUP_ID') if not isdir(queueDir): cmd = 'mkdir -p %s' % queueDir if verbose > 1: print 'Executing: %s' % cmd system(cmd) cmd = 'cp tsys weather flag pcal %s' % queueDir if verbose > 1: print 'Executing: %s' % cmd system(cmd) if groupId != None: cmd = 'chown :%s %s' % (groupId, queueDir) if verbose > 1: print 'Executing: %s' % cmd system(cmd) cmd = 'chown :%s %s/{flag,tsys,weather,pcal}' % (groupId, queueDir) if verbose > 1: print 'Executing: %s' % cmd system(cmd) jobs = self.getJoblistJobsByNumbers(numbers) if len(jobs) == 0: return # check each against database badList = [] for j in jobs: inputFile = queueDir + '/' + j.name + '.input' for k in self.dbJobs: if k.inputFile == inputFile: badList.append(j) break if len(badList) > 0: print '\nThe following jobs are already in the queue:' for j in badList: print ' %s' % j.name print 'Please rerun, including only those jobs not already queued.\n' return options = '' if verbose > 1: options += ' -v' if self.jobList.overrideVersion: options += ' --override-version' for j in jobs: print 'Adding %s to the queue' % j.name addDatabaseEntry(self.database, passData, j, queueDir, priority, self.corrType, self.jobList.kv['DiFX'], verbose) if(queueDir != getcwd()): cmd = 'difxcopy %s %s' % (j.name, queueDir) if verbose > 1: print 'Executing: %s' % cmd system(cmd) elif verbose > 1: print 'Not copying, using local directory' cmd = 'calcif2 %s -f %s/%s' % \ (options, queueDir, j.name) if verbose > 1: print 'Executing: %s' % cmd system(cmd) if groupId != None: cmd = 'chown -R :%s %s/%s.*' % (groupId, queueDir, j.name) if verbose > 1: print 'Executing: %s' % cmd system(cmd) # use checkmpifxcorr to verify files nGood = 0 nBad = 0 for j in jobs: print 'Checking %s' % j.name cmd = 'checkmpifxcorr %s/%s.input' % (queueDir, j.name) p, q = popen2.popen4(cmd) output = p.readlines() p.close() q.close() if output[-1][0:9] == 'No errors': print 'Looks good' nGood += 1 else: print 'One or more errors with input files:' for s in output: print ' %s' % strip(s) nBad += 1 print '%d/%d jobs passed sanity check' % (nGood, nGood+nBad) if nBad > 0: print 'You might want to dequeue these jobs and fix them!' def verify(action): print '\nThe %s action is about to be applied to all matching jobs' %\ action print '\nAre you sure you want to proceed? [y/N]' a = stdin.readline() if strip(a) in ['y', 'Y']: print 'OK, proceeding\n' else: print 'Not continuing.\n' exit(0) # "main" starts here umask(02) optlist, args = getopt(argv[1:], 'hvfdq:p:', ['help', 'verbose', 'force', 'db-only', 'queuedir=', 'priority=', \ 'override-version']) overrideVersion = False force = False verbose = 0 priority = 2 queueDir = '' dbOnly = False for o, a in optlist: if o == '--override-version': overrideVersion = True elif o in ('-v', '--verbose'): verbose += 1 elif o in ('-p', '--priority'): priority = int(a) elif o in ('-d', '--db-only'): dbOnly = True elif o in ('-q', '--queue'): queueDir = a if len(args) < 1: usage(argv[0]) action = args[0] database = cx_Oracle.connect(dbname) if action == 'listall': projects = [] for a in args[1:]: projects.append(a) dbJobs = getActiveJobsFromDatabase(database, projects, verbose) if len(dbJobs) == 0: if len(projects) == 0: print 'No incomplete jobs in queue' else: print 'No matching jobs in queue' else: printDatabaseJobs(dbJobs, verbose) elif action == 'prod': if len(args) > 1: fp = open(args[1], 'w') else: fp = stdout dbJobs = getActiveJobsFromDatabase(database, [], verbose) if len(dbJobs) == 0: print 'No incomplete jobs in queue' else: printProductionQueue(dbJobs, fp) if len(args) > 2: fp.close() elif action == 'log': if len(args) != 2: print 'Error: a project name is required. Aborting.' exit(0) dbLogs = getDifxLogsFromDatabase(database, args[1], verbose) if len(dbLogs) == 0: print 'No jobs have run for this experiment.' else: printDifxLogs(dbLogs, verbose) else: if len(args) < 2: usage(argv[0]) if args[1][-8:] == '.joblist': jobListFile = args[1] else: jobListFile = args[1] + '.joblist' if not isfile(jobListFile): print 'Error: file %s not found. Aborting.' % jobListFile exit(0) passData = Pass(jobListFile, database, verbose) passData.jobList.testversion(overrideVersion) if action == 'list': passData.list(verbose) elif action == 'del': numbers = getnumbers(args[2:]) if len(numbers) == 0 and not force: verify(action) passData.delete(numbers, dbOnly, verbose) elif action == 'bump': print 'Y', verbose numbers = getnumbers(args[2:]) if len(numbers) == 0 and not force: verify(action) passData.upPriority(numbers, verbose) elif action == 'slide': numbers = getnumbers(args[2:]) if len(numbers) == 0 and not force: verify(action) passData.downPriority(numbers, verbose) elif action == 'set': newStatus = upper(args[2]) numbers = getnumbers(args[3:]) if len(numbers) == 0 and not force: verify(action) passData.updateStatus(numbers, newStatus, verbose) elif action == 'add': if queueDir == '': difxQueueBase = getenv('DIFX_QUEUE_BASE') if difxQueueBase == None: print 'Error: env var DIFX_QUEUE_BASE is not set. Cannot proceed.' exit(0) queueDir = difxQueueBase + '/' + \ passData.proposal + passData.segment OK = True numbers = getnumbers(args[2:]) for j in passData.jobList.jobs: if len(numbers) == 0 or j.number in numbers: OK &= j.verify(getcwd(), True) if not OK: print 'Error: one or more jobs failed validation. Stopping.' exit(0) if dbOnly: passData.queueJobsDBOnly(numbers, queueDir, priority, verbose) else: passData.queueJobs(numbers, queueDir, priority, verbose) else: print 'Unknown action "%s". Run with -h to get help.' % action exit(0) database.close()