#!/usr/bin/env python from sys import exit, argv, stdout from os import getenv, mkdir, system from string import split, strip, find from getopt import getopt from os.path import isfile, isdir import datetime import cx_Oracle dbname = 'vlba/vlba@vlbatest' program = 'difxqueuetest' version = '0.0' author = 'Walter Brisken' verdate = '2009xxxx' mjd0 = datetime.datetime(1858, 11, 17, 0, 0) def usage(prog): print '\n%s ver %s %s %s' % (program, version, author, verdate) print '\nUsage: %s [options] ' % prog exit(0) def mjd2str(mjd): t = mjd0 + datetime.timedelta(mjd) return "to_date('%04d/%02d/%02d:%02d:%02d:%02d', 'yyyy/mm/dd:hh24:mi:ss')" % \ (t.year, t.month, t.day, t.hour, t.minute, t.second) def splitobscode(code): obsSeg = '' obsCode = code[:] if len(obsCode) > 3: if obsCode[0].isalpha() and obsCode[1].isalpha() and obsCode[2].isdigit(): for i in range(3, len(obsCode)): if obsCode[i].isalpha(): obsSeg = obsCode[i:] obsCode = obsCode[0:i] break; if obsCode[0].isalpha() and obsCode[1].isdigit(): for i in range(2, len(obsCode)): if obsCode[i].isalpha(): obsSeg = obsCode[i:] obsCode = obsCode[0:i] break; return obsCode, obsSeg def stage(conn, job, dir, header): prefix = job[0] inputfile = '%s/%s.input' % (dir, prefix) prop, seg = splitobscode(header['exper']) passname = header['pass'] start = mjd2str(job[1]) stop = mjd2str(job[2]) priority = 2 status = 'QUEUED' nant = job[3] jobnum = int(job[0][len(passname):]) curs = conn.cursor() cmd = "insert into DIFXQUEUE (PROPOSAL, SEGMENT, JOB_PASS, JOB_NUMBER, PRIORITY, JOB_START, JOB_STOP, INPUT_FILE, STATUS, NUM_ANT) values ('%s', '%s', '%s', %d, %d, %s, %s, '%s', '%s', %d)" % ( \ prop, \ seg, \ passname, \ jobnum, \ priority, \ start, \ stop, \ inputfile, \ status, \ nant) print 'about to execute: ', cmd curs.execute(cmd) def parseHeader(hstr): h = {} str = split(strip(hstr), '#')[0] ss = split(strip(str)) for s in ss: kv = split(s, '=') if len(kv) == 2: h[kv[0]] = kv[1] return h def parseJob(jstr): str = split(strip(jstr), '#')[0] ss = split(strip(str)) return [ss[0], float(ss[1]), float(ss[2]), int(ss[3])] difxVersion = getenv('DIFX_VERSION') if difxVersion == None: print 'Warning: env var DIFX_VERSION is not set!' difxVersion = 'unknown' difxQueueBase = getenv('DIFX_QUEUE_BASE') if difxQueueBase == None: print 'Error: env var DIFX_QUEUE_BASE is not set. Cannot proceed.' exit(0) dbName = getenv('VLBA_DB') if dbName == None: print 'Error: env var VLBA_DB is not set. Cannot proceed.' exit(0) optlist, args = getopt(argv[1:], 'hvp:', ['help', 'verbose', 'priortity=', 'override-version']) if len(args) != 1: usage(argv[0]) overrideVersion = False verbose = False for o, a in optlist: if o == '--override-version': overrideVersion = True elif o in ('-v', '--verbose'): verbose = True passName = args[0] p = find(passName, '.joblist') if p > 0: passName = passName[0:p] jobListFile = passName + '.joblist' data = open(jobListFile, 'r').readlines() if len(data) < 2: print 'Error: file with too few lines supplied' exit(0) header = parseHeader(data[0]) if header['DiFX'] != difxVersion and header['DiFX'] != 'unknown': if overrideVersion: print 'Warning: Overriding DiFX Version!' else: print 'Error: DiFX Version mismatch: env=%s file=%s. Stopping' % (difxVersion, header['DiFX']) exit(0) if passName != header['pass']: print 'Warning: pass name (%s) does not match provided file name prefix (%s)' % (header['pass'], passName) if verbose: print 'Using database = %s' % dbName print 'DiFX Version = %s' % difxVersion print 'Pass name = %s' % passName print 'Queue base = %s' % difxQueueBase print 'Num jobs = %d' % (len(data) - 1) jobs = [] for j in data[1:]: jobs.append(parseJob(j)) nIncomplete = 0 for j in jobs: inputFile = j[0] + '.input' calcFile = j[0] + '.calc' if not isfile(inputFile) or not isfile(calcFile): print 'Error: incomplete file set for job %s' % j[0] nIncomplete += 1 if nIncomplete > 0: print 'Exiting due to missing files for %d of the %d jobs.' % (nIncomplete, len(jobs)) exit(0) if verbose: print 'All %d jobs appear to have complete file sets.' % len(jobs) queueDir = difxQueueBase + '/' + header['exper']; if not isdir(queueDir): mkdir(queueDir) conn = cx_Oracle.connect(dbname) if overrideVersion: options = '--override-version' else: options = '' for j in jobs: stage(conn, j, queueDir, header) system('difxcopy %s %s' % (j[0], queueDir)) system('calcif2 %s -f %s/%s' % (options, queueDir, j[0])) conn.commit()