#!/usr/bin/env python3 #************************************************************************** # Copyright (C) 2009-2022 by Walter Brisken * # * # This program is free software; you can redistribute it and/or modify * # it under the terms of the GNU General Public License as published by * # the Free Software Foundation; either version 3 of the License, or * # (at your option) any later version. * # * # This program is distributed in the hope that it will be useful, * # but WITHOUT ANY WARRANTY; without even the implied warranty of * # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * # GNU General Public License for more details. * # * # You should have received a copy of the GNU General Public License * # along with this program; if not, write to the * # Free Software Foundation, Inc., * # 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * #************************************************************************** #=========================================================================== # SVN properties (DO NOT CHANGE) # # $Id$ # $HeadURL$ # $LastChangedRevision$ # $Author$ # $LastChangedDate$ # #============================================================================ # Note: this utility can run under python2.7 or python3 from sys import argv, exit from datetime import datetime from glob import glob from os import getenv, chdir, system, getcwd, rmdir, umask from os.path import isfile, isdir, getsize from time import gmtime, time program = 'makefits' version = '1.1' author = 'Walter Brisken' verdate = '20221203' calfiles = 'tsys flag pcal weather *.*.flag *.*.weather *.*.pcal *.*.tsys *.*.cablecal *.*.gain' def usage(prog): print('\n%s ver %s %s %s' % (program, version, author, verdate)) print('\nUsage: %s [options] ' % prog) print('\nOptions can include:') print(' --verbose') print(' -v Send more output to the screen\n') print(' --help') print(' -h Print this help information and quit\n') print(' --override-version') print(' To force operation with mixed DiFX versions\n') print(' --allow-partial') print(' Bypass check for complete set of correlated\n') print(' --cwd') print(' Look for files in the current working directory\n') print(' --profilemode') print(' Pass the profilemode switch to difx2fits pulsar autocorrelations\n') exit(0) def testgainfiles(): gcPath = getenv("GAIN_CURVE_PATH") if gcPath == None: return [] gcFiles = glob(gcPath + '/*') missing = [] for f in gcFiles: if not isfile(f): if len(missing) == 0: print('') print('Warning: symlink %s is broken' % f) missing.append(f) if len(missing) > 0: print('') return missing def parsekv(strg): kv = {} ss = strg.split() for s in ss: p = s.split('=') if len(p) != 2: print('Error parsing key=value statement: %s', s) return {} kv[p[0]] = p[1] return kv # turn unix time into mjd. Should only be used when precision is not required def getmjd(t): return 40587.0 + t/86400.0 def countDigits(n): nDig = 1 while True: n //= 10 if n == 0: break; nDig += 1 return nDig def mergesnifferfiles(expt, list, passname, ext, cwd, verbose, hasHeaderLine): dir = cwd + '/sniffer' if not isdir(dir): cmd = 'mkdir -p %s' % dir if verbose: print('Executing: %s' % cmd) system(cmd) outFile = '%s/%s.%s.%s' % (dir, expt, passname, ext) if isfile(outFile): cmd = 'mv %s %s.old' % (outFile, outFile) if verbose: print('Executing: %s' % cmd) system(cmd) skipFirst = False files = [] for i in list: infile = '%s.%d.bin%04d.source%04d.%s' % (expt, i[0], i[1], i[2], ext) if isfile(infile): files.append(infile) else: print('Warning: %s not found' % infile) if len(files) == 0: print('No %s files to merge!' % ext) return None out = open(outFile, 'w') if verbose: print('\nMaking sniffer file %s from:' % outFile) for infile in files: if verbose: print(' %s' % infile) data = open(infile, 'r').readlines() if len(data) > 0: if skipFirst == False: skipFirst = hasHeaderLine out.write(data[0]) for d in data[1:]: out.write(d) system('rm %s' % infile) out.close() return outFile def genDateStr(t): d = gmtime(t) return '%02d%02d%02dT%02d%02d%02d' % \ (d.tm_year % 100, d.tm_mon, d.tm_mday, \ d.tm_hour, d.tm_min, d.tm_sec) class Job: def __init__(self, strg): s = strg.strip().split() if len(s) >= 6: self.name = s[0] self.mjdStart = float(s[1]) self.mjdStop = float(s[2]) self.nAnt = int(s[3]) self.nPulsarBins = int(s[4]) self.nSourceFiles = int(s[5]) else: self.name = '' def show(self, indent=0): print('%sJob %s: start=%f stop=%f nPsrBins=%d nSrcFiles=%d nAnt=%d' % \ (' '*indent, self.name, self.mjdStart, self.mjdStop, self.nPulsarBins, self.nSourceFiles, self.nAnt)) def verify(self, dir): files = ['.calc', '.input', '.flag', '.im'] for f in files: file = dir + '/' + self.name + f if not isfile(file): print('%s not found' % file) return False ddir = dir + '/' + self.name + '.difx' if not isdir(ddir): print('difx output dir %s not found' % ddir) return False return True class JobList: def __init__(self, filename): self.filename = filename data = open(filename).readlines() self.jobs = [] self.npsrbins = 0 self.nsrcfiles = 1 if len(data) < 2: print('Malformed .joblist file %s' % filename) exit(0) self.kv = parsekv(data[0]) if len(self.kv) < 1: print('Malformed .joblist file %s line 1' % filename) exit(0) for n in range(1, len(data)): d = data[n].split('#')[0] if len(d) < 2: continue j = Job(d) if j.name == '': print('Malformed line %d in %s' % (n+1, filename)) exit(0) self.jobs.append(j) if self.jobs[-1].nPulsarBins > self.npsrbins: self.npsrbins = self.jobs[-1].nPulsarBins if self.jobs[-1].nSourceFiles > self.nsrcfiles: self.nsrcfiles = self.jobs[-1].nSourceFiles def testversion(self, override): difxVersion = getenv('DIFX_VERSION') if difxVersion == None: print('Warning: env var DIFX_VERSION is not set!') if override: difxVersion = 'unknown' else: exit(0) else: if difxVersion != self.kv['DiFX']: if override: print('Overriding version mismatch: %s != %s' % (difxVersion, self.kv['DiFX'])) difxVersion = 'unknown' else: print('Error: file DiFX version = %s' % self.kv['DiFX']) print('and environment DiFX version = %s' % difxVersion) exit(0) difxLabel = getenv('DIFX_LABEL') if difxLabel != None: if not 'label' in self.kv: if override: print('Override causing DiFX label to be set to %s' % difxLabel) self.kv['label'] = difxLabel difxVersion = 'unknown' else: print('Error: file DiFX label is not set but environment DiFX label is %s' % difxLabel) exit(0) if difxLabel != self.kv['label']: if override: print('Overriding DiFX label mismatch: %s != %s' % (difxLabel, self.kv['label'])) difxVersion = 'unknown' self.kv['label'] = difxLabel elif 'label' in self.kv: if override: print('Override causing DiFX label to be unset (it was %s)' % self.kv['label']) del self.kv['label'] else: print('Error: file DiFX label is set to %s but environment DiFX label not set' % self.kv['label']) exit(0) def makefits(self, queuedir, verbose, sniff, usecwd, options=''): nSizeError = 0 groupId = getenv('DIFX_GROUP_ID') cwd = getcwd() if usecwd: dir = cwd else: dir = queuedir + '/' + self.kv['exper'] t = time() datestr = genDateStr(t) tmp = dir + '/' + 'makefits.tmp.' + self.kv['pass'] if isdir(tmp): print('\nError: temporary directory %s already exists. Aborting.' % tmp) exit(0) cmd = 'mkdir -p %s' % tmp if verbose: print('Executing: %s' % cmd) system(cmd) cmd = 'cp %s %s' % (calfiles, tmp) if verbose: print('Executing: %s' % cmd) system(cmd) if verbose: print('Changing to directory: %s' % tmp) chdir(tmp) binrange = 1 if self.npsrbins > binrange: binrange = self.npsrbins for b in range(binrange): for s in range(self.nsrcfiles): cmd = 'difx2fits -B %d --phasecentre %d %s ' % (b, s, options) if profilemode: cmd += '--profilemode ' for j in self.jobs: cmd += ('../%s ' % j.name) if verbose: print('Executing: %s' % cmd) system(cmd) files = glob('*.*.*.*.FITS') if len(files) == 0: print('Error: No fits files made') print('Please clean up and remove %s when you are done picking up the pieces.' % tmp) exit(0) if verbose: print('Made %d FITS files' % len(files)) expt = files[0].split('.')[0] indices = [] for f in files: s = f.split('.') if s[0] != expt: print('Error: Unexpected variety of FITS files found:', files) print('Please clean up and remove %s when you are done picking up the pieces.' % tmp) exit(0) indices.append([int(s[1]), int(s[2][3:]), int(s[3][6:])]) indices.sort() j = 1 fitslistfile = '%s/%s.fitslist' % (cwd, self.kv['pass']) if isfile(fitslistfile): ts = f"{datetime.now():%Y-%m-%d.%H:%M:%S}" fitslistfilets = fitslistfile + '.' + ts cmd = 'mv %s %s' % (fitslistfile, fitslistfilets) print("FITS listing file " + fitslistfile + " already exists") print("Renaming to %s" % fitslistfilets) print("Executing: %s" % cmd) system(cmd) if isfile(fitslistfile): printf("File rename failed. Quitting.") exit(0) out = open(fitslistfile, 'w') out.write('exper=%s pass=%s jobs=%s mjd=%9.7f DiFX=%s difx2fits=0' % \ (self.kv['exper'], self.kv['pass'], self.filename, getmjd(t), self.kv['DiFX'])) if 'label' in self.kv: out.write(' label=%s' % self.kv['label']) out.write('\n') nDigit = countDigits(indices[-1][0]) nBinDigits = countDigits(binrange) nSrcDigits = countDigits(self.nsrcfiles) format = 'VLBA_%%s_%%s_BIN%%0%dd_SRC%%0%dd_%%0%dd_%%s.idifits' % (nBinDigits, nSrcDigits, nDigit) for i in indices: fileName = '%s.%d.bin%04d.source%04d.FITS' % (expt, i[0], i[1], i[2]) archiveFile = format % (self.kv['exper'], self.kv['pass'], i[1], i[2], i[0], datestr) size = getsize(fileName) if size % 2880 != 0: print('ERROR!!! %s is not a multiple of 2880 bytes!' % archiveFile) nSizeError += 1 out.write('%s %4.2f %s\n' % (archiveFile, size/1000000.0, fileName)) cmd = 'mv %s %s/%s' % (fileName, dir, archiveFile) if verbose: print('Executing: %s' % cmd) system(cmd) if groupId != None: # make sure anyone in the difx group can remove the file later cmd = 'chown -R :%s %s/%s' % (groupId, dir, archiveFile) if verbose: print('Executing: %s' % cmd) system(cmd) j += 1 out.close() j = 1 format = 'VLBA_%%s_%%s_BIN%%0%dd_SRC%%0%dd_%%0%dd_%%s.jobmatrix.txt' % (nBinDigits, nSrcDigits, nDigit) for i in indices: fileName = '%s.%d.bin%04d.source%04d.jobmatrix' % (expt, i[0], i[1], i[2]) if isfile(fileName): archiveFile = format % (self.kv['exper'], self.kv['pass'], i[1], i[2], i[0], datestr) cmd = 'mv %s %s/%s' % (fileName, dir, archiveFile) if verbose: print('Executing: %s' % cmd) system(cmd) if groupId != None: # make sure anyone in the difx group can remove the file later cmd = 'chown -R :%s %s/%s' % (groupId, dir, archiveFile) if verbose: print('Executing: %s' % cmd) system(cmd) j += 1 # merge and gzip difxlog files logFile = '%s/%s.difxlog' % (cwd, self.kv['pass']) cmd = 'cat' nLog = 0 for j in self.jobs: fileName = dir + '/' + j.name + '.difxlog' if isfile(fileName): cmd = cmd + ' ' + fileName nLog += 1 if nLog > 0: cmd = cmd + ' > ' + logFile if verbose: print('Executing: %s' % cmd) system(cmd) cmd = 'gzip -f %s' % logFile if verbose: print('Executing: %s' % cmd) system(cmd) if groupId != None: cmd = 'chown -R :%s %s.gz' % (groupId, logFile) if verbose: print('Executing: %s' % cmd) system(cmd) else: if verbose: print('No .difxlog files to merge') # merge sniffer files sniffFiles = '' if sniff: for x in [['acb', False] , ['apd', True], ['apc', True], ['log', False], ['wts', True], ['xcb', False]]: sniffFile = mergesnifferfiles(expt, indices, self.kv['pass'], x[0], cwd, verbose, x[1]) if sniffFile != None: sniffFiles += ' ' + sniffFile if sniffFiles != '': if groupId != None: cmd = 'chown -R :%s %s' % (groupId, sniffFiles) if verbose: print('Executing: %s' % cmd) system(cmd) cmd = 'rm %s' % calfiles if verbose: print('Executing: %s' % cmd) system(cmd) chdir(cwd) unexpectedFileCount = 0 cpolCount = 0 jobmatrixCount = 0 G = glob('%s/*' % tmp) for g in G: if g[-5:] == '.cpol': cpolCount += 1 continue if g[-10:] == '.jobmatrix': jobmatrixCount += 1 continue unexpectedFileCount += 1 if unexpectedFileCount == 0: cmd = 'rm --force %s/*.cpol %s/*.jobmatrix' % (tmp, tmp) if verbose: print('Executing: %s' % cmd) system(cmd) rmdir(tmp) if isdir(tmp): print('Warning: unexpected files left in %s' % tmp) if nSizeError > 0: return False else: return True def verify(self, queuedir, allowPartial, usecwd): if usecwd: dir = getcwd() else: dir = queuedir + '/' + self.kv['exper'] completeJobs = [] OK = True for j in self.jobs: if self.npsrbins != j.nPulsarBins: print('Jobs have inconsistent number of bins (%d, %d)' % (self.npsrbins, j.nPulsarBins)) if j.verify(dir) == False: OK = False else: completeJobs.append(j) if self.npsrbins > 0 and self.nsrcfiles > 1: print('Cannot build multiple-field and pulsar files simultaneously!') return False if not allowPartial: return OK nFail = len(self.jobs) - len(completeJobs) if nFail == 0: return True if len(completeJobs) == 0: print('No jobs are complete. Cannot continue') return False print('%d of %d jobs were not complete. Continuing anyway...' % (nFail, len(self.jobs))) self.jobs = completeJobs return True def show(self, indent=0): id = ' '*indent print('%sJobList: %s' % (id, self.filename)) id = ' '*(indent+2) for key in self.kv.keys(): print('%sKV: %s = %s' % (id, key, self.kv[key])) for j in self.jobs: j.show(indent+2) # main below here umask(2) difxQueueBase = getenv('DIFX_QUEUE_BASE') if difxQueueBase == None: print('Error: env var DIFX_QUEUE_BASE is not set. Cannot proceed.') exit(0) overrideVersion = False allowPartial = False verbose = False sniff = True usecwd = False profilemode = False opts = '' args = [] for a in argv[1:]: if a[0] == '-': if a == '--override-version': overrideVersion = True opts = opts + ' ' + '--override-version' elif a == '--allow-partial': allowPartial = True elif a == '--profilemode': profilemode = True elif a == '--cwd': usecwd = True elif a in ('-h', '--help'): usage(argv[0]) opts = opts + ' -v' else: opts = opts + ' ' + a if a in ('-v', '--verbose'): verbose = True if a in ('-x', '--dont-sniff'): sniff = False elif isfile(a) or isfile(a + '.joblist'): args.append(a) else: opts = opts + ' ' + a if opts == '': opts = '-v' if len(args) < 1: print('No pass names on command line. Run with -h for help.') exit(0) print('options: %s' % opts) gcFilesMissing = testgainfiles() for arg in args: if arg[-8:] == '.joblist': joblistfile = arg else: joblistfile = arg + '.joblist' if not isfile(joblistfile): print('Error: file %s not found' % joblistfile) exit(0) jl = JobList(joblistfile) jl.testversion(overrideVersion) if verbose: jl.show() OK = jl.verify(difxQueueBase, allowPartial, usecwd) if not OK: if not allowPartial: print('\nJob list verification failed!') print('Run with --allow-partial to process a subset of data.\n') exit(0) OK = jl.makefits(difxQueueBase, verbose, sniff, usecwd, opts) if not OK: print('Serious errors were detected!') if len(gcFilesMissing) > 0: print('\nWarning: some gain curve files were not found:') for f in fcFilesMissing: print(' %s' % f) print('')