#!/usr/bin/env python #************************************************************************** # Copyright (C) 2009-2014 by Walter Brisken * # * # This program is free software; you can redistribute it and/or modify * # it under the terms of the GNU General Public License as published by * # the Free Software Foundation; either version 3 of the License, or * # (at your option) any later version. * # * # This program is distributed in the hope that it will be useful, * # but WITHOUT ANY WARRANTY; without even the implied warranty of * # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * # GNU General Public License for more details. * # * # You should have received a copy of the GNU General Public License * # along with this program; if not, write to the * # Free Software Foundation, Inc., * # 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * #************************************************************************** #=========================================================================== # SVN properties (DO NOT CHANGE) # # $Id$ # $HeadURL$ # $LastChangedRevision$ # $Author$ # $LastChangedDate$ # #============================================================================ from sys import argv, exit from string import split, strip from glob import glob from os import getenv, chdir, system, getcwd, rmdir, umask from os.path import isfile, isdir, getsize from time import gmtime, time program = 'makefits' version = '1.0' author = 'Walter Brisken' verdate = '20140213' calfiles = 'tsys flag pcal weather *.*.flag *.*.weather *.*.pcal *.*.tsys *.*.cablecal *.*.gain' def usage(prog): print '\n%s ver %s %s %s' % (program, version, author, verdate) print '\nUsage: %s [options] ' % prog print '\nOptions can include:' print ' --verbose' print ' -v Send more output to the screen\n' print ' --help' print ' -h Print this help information and quit\n' print ' --override-version' print ' To force operation with mixed DiFX versions\n' print ' --allow-partial' print ' Bypass check for complete set of correlated\n' print ' --cwd' print ' Look for files in the current working directory\n' exit(0) def testgainfiles(): gcPath = getenv("GAIN_CURVE_PATH") if gcPath == None: return [] gcFiles = glob(gcPath + '/*') missing = [] for f in gcFiles: if not isfile(f): if len(missing) == 0: print '' print 'Warning: symlink %s is broken' % f missing.append(f) if len(missing) > 0: print '' return missing def parsekv(str): kv = {} ss = split(str) for s in ss: p = split(s, '=') if len(p) != 2: print 'Error parsing key=value statement: %s', s return {} kv[p[0]] = p[1] return kv # turn unix time into mjd. Should only be used when precision is not required def getmjd(t): return 40587.0 + t/86400.0 def countDigits(n): nDig = 1 while True: n /= 10 if n == 0: break; nDig += 1 return nDig def mergesnifferfiles(expt, list, passname, ext, cwd, verbose, hasHeaderLine): dir = cwd + '/sniffer' if not isdir(dir): cmd = 'mkdir -p %s' % dir if verbose: print 'Executing: %s' % cmd system(cmd) outFile = '%s/%s.%s.%s' % (dir, expt, passname, ext) if isfile(outFile): cmd = 'mv %s %s.old' % (outFile, outFile) if verbose: print 'Executing: %s' % cmd system(cmd) skipFirst = False files = [] for i in list: infile = '%s.%d.bin%04d.source%04d.%s' % (expt, i[0], i[1], i[2], ext) if isfile(infile): files.append(infile) else: print 'Warning: %s not found' % infile if len(files) == 0: print 'No %s files to merge!' % ext return None out = open(outFile, 'w') if verbose: print '\nMaking sniffer file %s from:' % outFile for infile in files: if verbose: print ' %s' % infile data = open(infile, 'r').readlines() if len(data) > 0: if skipFirst == False: skipFirst = hasHeaderLine out.write(data[0]) for d in data[1:]: out.write(d) system('rm %s' % infile) out.close() return outFile def genDateStr(t): d = gmtime(t) return '%02d%02d%02dT%02d%02d%02d' % \ (d.tm_year % 100, d.tm_mon, d.tm_mday, \ d.tm_hour, d.tm_min, d.tm_sec) class Job: def __init__(self, str): s = split(strip(str)) if len(s) >= 6: self.name = s[0] self.mjdStart = float(s[1]) self.mjdStop = float(s[2]) self.nAnt = int(s[3]) self.nPulsarBins = int(s[4]) self.nSourceFiles = int(s[5]) else: self.name = '' def show(self, indent=0): print '%sJob %s: start=%f stop=%f nPsrBins=%d nSrcFiles=%d nAnt=%d' % \ (' '*indent, self.name, self.mjdStart, self.mjdStop, self.nPulsarBins, self.nSourceFiles, self.nAnt) def verify(self, dir): files = ['.calc', '.input', '.flag', '.im'] for f in files: file = dir + '/' + self.name + f if not isfile(file): print '%s not found' % file return False ddir = dir + '/' + self.name + '.difx' if not isdir(ddir): print 'difx output dir %s not found' % ddir return False return True class JobList: def __init__(self, filename): self.filename = filename data = open(filename).readlines() self.jobs = [] self.npsrbins = 0 self.nsrcfiles = 1 if len(data) < 2: print 'Malformed .joblist file %s' % filename exit(0) self.kv = parsekv(data[0]) if len(self.kv) < 1: print 'Malformed .joblist file %s line 1' % filename exit(0) for n in range(1, len(data)): d = split(data[n], '#')[0] if len(d) < 2: continue j = Job(d) if j.name == '': print 'Malformed line %d in %s' % (n+1, filename) exit(0) self.jobs.append(j) if self.jobs[-1].nPulsarBins > self.npsrbins: self.npsrbins = self.jobs[-1].nPulsarBins if self.jobs[-1].nSourceFiles > self.nsrcfiles: self.nsrcfiles = self.jobs[-1].nSourceFiles def testversion(self, override): difxVersion = getenv('DIFX_VERSION') if difxVersion == None: print 'Warning: env var DIFX_VERSION is not set!' if override: difxVersion = 'unknown' else: exit(0) else: if difxVersion != self.kv['DiFX']: if override: print 'Overriding version mismatch: %s != %s' % (difxVersion, self.kv['DiFX']) difxVersion = 'unknown' else: print 'Error: file DiFX version = %s' % self.kv['DiFX'] print 'and environment DiFX version = %s' % difxVersion exit(0) difxLabel = getenv('DIFX_LABEL') if difxLabel != None: if not 'label' in self.kv: if override: print 'Override causing DiFX label to be set to %s' % difxLabel self.kv['label'] = difxLabel difxVersion = 'unknown' else: print 'Error: file DiFX label is not set but environment DiFX label is %s' % difxLabel exit(0) if difxLabel != self.kv['label']: if override: print 'Overriding DiFX label mismatch: %s != %s' % (difxLabel, self.kv['label']) difxVersion = 'unknown' self.kv['label'] = difxLabel elif 'label' in self.kv: if override: print 'Override causing DiFX label to be unset (it was %s)' % self.kv['label'] del self.kv['label'] else: print 'Error: file DiFX label is set to %s but environment DiFX label not set' % self.kv['label'] exit(0) def makefits(self, queuedir, verbose, sniff, usecwd, options=''): nSizeError = 0 groupId = getenv('DIFX_GROUP_ID') cwd = getcwd() if usecwd: dir = cwd else: dir = queuedir + '/' + self.kv['exper'] t = time() datestr = genDateStr(t) tmp = dir + '/' + 'makefits.tmp' if isdir(tmp): print '\nError: temporary directory %s already exists. Aborting.' % tmp exit(0) cmd = 'mkdir -p %s' % tmp if verbose: print 'Executing: %s' % cmd system(cmd) cmd = 'cp %s %s' % (calfiles, tmp) if verbose: print 'Executing: %s' % cmd system(cmd) if verbose: print 'Changing to directory: %s' % tmp chdir(tmp) binrange = 1 if self.npsrbins > binrange: binrange = self.npsrbins for b in range(binrange): for s in range(self.nsrcfiles): initcmd = 'difx2fits -B %d --phasecentre %d %%s ' % (b,s) cmd = initcmd % options for j in self.jobs: cmd += ('../%s ' % j.name) if verbose: print 'Executing: %s' % cmd system(cmd) files = glob('*.*.*.*.FITS') if len(files) == 0: print 'Error: No fits files made' print 'Please clean remove %s when you are done picking up the pieces.' % tmp exit(0) if verbose: print 'Made %d FITS files' % len(files) expt = split(files[0], '.')[0] indices = [] for f in files: s = split(f, '.') if s[0] != expt: print 'Error: Unexpected variety of FITS files found:', files print 'Please clean remove %s when you are done picking up the pieces.' % tmp exit(0) indices.append([int(s[1]), int(s[2][3:]), int(s[3][6:])]) indices.sort() j = 1 fitslistfile = '%s/%s.fitslist' % (cwd, self.kv['pass']) if isfile(fitslistfile): print "FITS listing file " + fitslistfile + " already exists; not proceeding!" print "Remove this file if the FITS files listed within have already been" print "archived or are not required" exit(0) out = open(fitslistfile, 'w') out.write('exper=%s pass=%s jobs=%s mjd=%9.7f DiFX=%s difx2fits=0' % \ (self.kv['exper'], self.kv['pass'], self.filename, getmjd(t), self.kv['DiFX'])) if 'label' in self.kv: out.write(' label=%s' % self.kv['label']) out.write('\n') nDigit = countDigits(indices[-1][0]) nBinDigits = countDigits(binrange) nSrcDigits = countDigits(self.nsrcfiles) format = 'VLBA_%%s_%%s_BIN%%0%dd_SRC%%0%dd_%%0%dd_%%s.idifits' % (nBinDigits, nSrcDigits, nDigit) for i in indices: fileName = '%s.%d.bin%04d.source%04d.FITS' % (expt, i[0], i[1], i[2]) archiveFile = format % (self.kv['exper'], self.kv['pass'], i[1], i[2], i[0], datestr) size = getsize(fileName) if size % 2880 != 0: print 'ERROR!!! %s is not a multiple of 2880 bytes!' % archiveFile nSizeError += 1 out.write('%s %4.2f %s\n' % (archiveFile, size/1000000.0, fileName)) cmd = 'mv %s %s/%s' % (fileName, dir, archiveFile) if verbose: print 'Executing: %s' % cmd system(cmd) if groupId != None: # make sure anyone in the difx group can remove the file later cmd = 'chown -R :%s %s/%s' % (groupId, dir, archiveFile) if verbose: print 'Executing: %s' % cmd system(cmd) j += 1 out.close() j = 1 format = 'VLBA_%%s_%%s_BIN%%0%dd_SRC%%0%dd_%%0%dd_%%s.jobmatrix.txt' % (nBinDigits, nSrcDigits, nDigit) for i in indices: fileName = '%s.%d.bin%04d.source%04d.jobmatrix' % (expt, i[0], i[1], i[2]) if isfile(fileName): archiveFile = format % (self.kv['exper'], self.kv['pass'], i[1], i[2], i[0], datestr) cmd = 'mv %s %s/%s' % (fileName, dir, archiveFile) if verbose: print 'Executing: %s' % cmd system(cmd) if groupId != None: # make sure anyone in the difx group can remove the file later cmd = 'chown -R :%s %s/%s' % (groupId, dir, archiveFile) if verbose: print 'Executing: %s' % cmd system(cmd) j += 1 # merge and gzip difxlog files logFile = '%s/%s.difxlog' % (cwd, self.kv['pass']) cmd = 'cat' nLog = 0 for j in self.jobs: fileName = dir + '/' + j.name + '.difxlog' if isfile(fileName): cmd = cmd + ' ' + fileName nLog += 1 if nLog > 0: cmd = cmd + ' > ' + logFile if verbose: print 'Executing: %s' % cmd system(cmd) cmd = 'gzip -f %s' % logFile if verbose: print 'Executing: %s' % cmd system(cmd) if groupId != None: cmd = 'chown -R :%s %s.gz' % (groupId, logFile) if verbose: print 'Executing: %s' % cmd system(cmd) else: if verbose: print 'No .difxlog files to merge' # merge sniffer files sniffFiles = '' if sniff: for x in [['acb', False] , ['apd', True], ['apc', True], ['log', False], ['wts', True], ['xcb', False]]: sniffFile = mergesnifferfiles(expt, indices, self.kv['pass'], x[0], cwd, verbose, x[1]) if sniffFile != None: sniffFiles += ' ' + sniffFile if sniffFiles != '': if groupId != None: cmd = 'chown -R :%s %s' % (groupId, sniffFiles) if verbose: print 'Executing: %s' % cmd system(cmd) cmd = 'rm %s' % calfiles if verbose: print 'Executing: %s' % cmd system(cmd) chdir(cwd) if len(glob('%s/*' % tmp)) == 0: rmdir(tmp) if isdir(tmp): print 'Warning: unexpected files left in %s' % tmp if nSizeError > 0: return False else: return True def verify(self, queuedir, allowPartial, usecwd): if usecwd: dir = getcwd() else: dir = queuedir + '/' + self.kv['exper'] completeJobs = [] OK = True for j in self.jobs: if self.npsrbins != j.nPulsarBins: print "Jobs have inconsistent number of bins (%d,%d)" % \ (self.npsrbins,j.nPulsarBins) if j.verify(dir) == False: OK = False else: completeJobs.append(j) if self.npsrbins > 0 and self.nsrcfiles > 1: print "Cannot build multiple-field and pulsar files simultaneously!" return False if not allowPartial: return OK nFail = len(self.jobs) - len(completeJobs) if nFail == 0: return True if len(completeJobs) == 0: print 'No jobs are complete. Cannot continue' return False print '%d of %d jobs were not complete. Continuing anyway...' % \ (nFail, len(self.jobs)) self.jobs = completeJobs return True def show(self, indent=0): id = ' '*indent print '%sJobList: %s' % (id, self.filename) id = ' '*(indent+2) for key in self.kv.keys(): print '%sKV: %s = %s' % (id, key, self.kv[key]) for j in self.jobs: j.show(indent+2) # main below here umask(02) difxQueueBase = getenv('DIFX_QUEUE_BASE') if difxQueueBase == None: print 'Error: env var DIFX_QUEUE_BASE is not set. Cannot proceed.' exit(0) overrideVersion = False allowPartial = False verbose = False sniff = True usecwd = False opts = '' args = [] for a in argv[1:]: if a[0] == '-': if a == '--override-version': overrideVersion = True opts = opts + ' ' + '--override-version' elif a == '--allow-partial': allowPartial = True elif a == '--cwd': usecwd = True elif a in ('-h', '--help'): usage(argv[0]) opts = opts + ' -v' else: opts = opts + ' ' + a if a in ('-v', '--verbose'): verbose = True if a in ('-x', '--dont-sniff'): sniff = False elif isfile(a) or isfile(a + '.joblist'): args.append(a) else: opts = opts + ' ' + a if opts == '': opts = '-v' if len(args) < 1: print 'No pass names on command line. Run with -h for help.' exit(0) print 'options: %s' % opts gcFilesMissing = testgainfiles() for arg in args: if arg[-8:] == '.joblist': joblistfile = arg else: joblistfile = arg + '.joblist' if not isfile(joblistfile): print 'Error: file %s not found' % joblistfile exit(0) jl = JobList(joblistfile) jl.testversion(overrideVersion) if verbose: jl.show() OK = jl.verify(difxQueueBase, allowPartial, usecwd) if not OK: if not allowPartial: print '\nJob list verification failed!' print 'Run with --allow-partial to process a subset of data.\n' exit(0) OK = jl.makefits(difxQueueBase, verbose, sniff, usecwd, opts) if not OK: print 'Serious errors were detected!' if len(gcFilesMissing) > 0: print '\nWarning: some gain curve files were not found:' for f in fcFilesMissing: print ' %s' % f print ''