#!/usr/bin/env python2 #************************************************************************** # Copyright (C) 2008-2013 by Walter Brisken & Helge Rottmann * # * # This program is free software; you can redistribute it and/or modify * # it under the terms of the GNU General Public License as published by * # the Free Software Foundation; either version 3 of the License, or * # (at your option) any later version. * # * # This program is distributed in the hope that it will be useful, * # but WITHOUT ANY WARRANTY; without even the implied warranty of * # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * # GNU General Public License for more details. * # * # You should have received a copy of the GNU General Public License * # along with this program; if not, write to the * # Free Software Foundation, Inc., * # 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * #************************************************************************** #=========================================================================== # SVN properties (DO NOT CHANGE) # # $Id$ # $HeadURL: $ # $LastChangedRevision$ # $Author$ # $LastChangedDate$ # #============================================================================ from string import split, strip, upper, lower from sys import exit from os import getenv, umask, environ from os.path import isfile import re import socket import struct import subprocess import signal import sys from optparse import OptionParser from xml.parsers import expat from copy import deepcopy from ast import literal_eval from datetime import datetime try: from difxfile.difxmachines import * except ImportError: print "ERROR: Cannot find difxmachines library. Please include $DIFXROOT/lib/python in your $PYTHONPATH environment" sys.exit(1) author = 'Walter Brisken and Helge Rottmann' version = '2.5.0' verdate = '20160112' minMachinefileVersion = "1.0" # cluster definition file must have at least this version defaultDifxMessagePort = 50200 defaultDifxMessageGroup = '224.2.2.1' #ignoreIncompleteModules = True MARK6_VSN_IDS = ['%'] def getmonthdate(daynumber, yearnumber): md = [0, 31, 59, 90, 120, 151, 181, 212, 243, 273, 304, 334] mdl = [0, 31, 60, 91, 121, 152, 182, 213, 244, 274, 305, 335] if yearnumber % 4 == 0: dl = mdl else: dl = md for i in range(len(dl)): if daynumber > dl[i] and daynumber <= dl[i+1]: return [i + 1, daynumber - dl[i]] def convertDateString2Mjd(dateStr): mjd0 = datetime(1858, 11, 17, 0, 0) if len(dateStr) == 18 and dateStr[4] == 'y' and dateStr[8] == 'd' and dateStr[11] == 'h' and dateStr[14] == 'm' and dateStr[17] == 's': startyear = int(dateStr[0:4]) startday = int(dateStr[5:8]) mondate = getmonthdate(startday, startyear) startmonth = mondate[0] startdate = mondate[1] starthour = int(dateStr[9:11]) startminute = int(dateStr[12:14]) startsecond = int(dateStr[15:17]) startdt = datetime(startyear, startmonth, startdate, starthour, startminute, startsecond) startmjd = startdt - mjd0 return startmjd else: print 'invalid date string', dateStr return datetime(1900, 1, 1, 0, 0) def getUsage(): """ Compile usage text for OptionParser """ usage = "%prog [options] [ [] ...]\n" usage += '\n is a DiFX .input file.' usage += '\nA program to find required Mark5 modules and write the machines file' usage += '\nappropriate for a particular DiFX job.' usage += '\n\nNote: %prog respects the following environment variables:' usage += '\nDIFX_MACHINES: required, unless -m option is given. -m overrides DIFX_MACHINES.' usage += '\nDIFX_GROUP: if not defined a default of %s will be used.' % defaultDifxMessageGroup usage += '\nDIFX_PORT: if not defined a default of %s will be used.' % defaultDifxMessagePort usage += '\nSee http://cira.ivec.org/dokuwiki/doku.php/difx/clusterdef for documentation on the machines file format' return(usage) class MessageParser: """ Parses Mark5StatusMessage and Mark6StatusMessage """ def __init__(self): self._parser = expat.ParserCreate() self._parser.StartElementHandler = self.start self._parser.EndElementHandler = self.end self._parser.CharacterDataHandler = self.data self.fields = {} self.type = 'unknown' self.vsnA = 'none' self.vsnB = 'none' self.state = 'Unknown' self.unit = 'unknown' self.sender = 'unknown' self.tmp = '' self.ok = False def feed(self, sender, data): self._parser.Parse(data, 0) self.sender = sender def close(self): self._parser.Parse("", 1) # end of data del self._parser # get rid of circular references def start(self, tag, attrs): if tag == 'mark5Status' or tag=='mark6Status': self.type = tag self.ok = True def parseMark6(self,tag): self.fields[tag] = self.tmp def parseMark5(self,tag): if tag == 'bankAVSN' and self.ok: if len(self.tmp) != 8: self.vsnA = 'none' else: self.vsnA = upper(self.tmp) if tag == 'bankBVSN' and self.ok: if len(self.tmp) != 8: self.vsnB = 'none' else: self.vsnB = upper(self.tmp) def end(self, tag): if tag == 'from': self.unit = lower(self.tmp) if tag == 'state' and self.ok: self.state = self.tmp if self.type == 'mark5Status': self.parseMark5(tag) if self.type == 'mark6Status': self.parseMark6(tag) def data(self, data): self.tmp = data def getinfo(self): if self.ok: if self.type == 'mark5Status': return [self.unit, self.type, self.vsnA, self.vsnB, self.state, self.sender] elif self.type == 'mark6Status': return [self.unit, self.type, self.fields, self.state, self.sender] else: return ['unknown', 'none', 'none', 'Unknown', 'unknown'] def sendRequest(destination, command): src = socket.gethostname() dest = '%s' %(destination) message = \ '\n' \ '' \ '
' \ '%s' \ '%s' \ '-1' \ 'genmachines' \ 'DifxCommand' \ '
' \ '' \ '0' \ '' \ '%s' \ '' \ '' \ '
' % (src, dest, command) sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2) sock.sendto(message, (group, port)) return message def isModuleComplete(slot, message): """ Checks whether a Mark6 module has the expected number of disks """ key = slot + "Disks" if key in message.keys(): disks = int(message[key]) key = slot + "MissingDisks" if key in message.keys(): missingDisks = int(message[key]) if disks ==0: return False if missingDisks > 0: return False return True def getVsnsByMulticast(maxtime, datastreams, verbose): dt = 0.2 t = 0.0 vsnlist = [] count = 0 for stream in datastreams: count += 1 if len(stream.vsn) > 0: vsnlist.append(stream.vsn) if len(stream.msn) > 0: for m in stream.msn: if isinstance(m,list): vsnlist += m else: vsnlist.append(m) missingVsns = deepcopy(vsnlist) # First send out a call for VSNs sendRequest("mark5","getvsn") sendRequest("mark6","getvsn") # Now listen for responses, until either time runs out or we get all we need s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind(('', port)) mreq = struct.pack("4sl", socket.inet_aton(group), socket.INADDR_ANY) s.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) s.settimeout(dt) conflicts = [] results = [] machines = [] notidle = [] incomplete = [] while t < maxtime and len(missingVsns) > 0: try: message, address = s.recvfrom(2048) sender = split(socket.gethostbyaddr(address[0])[0], '.')[0] if verbose > 1: print message p = MessageParser() p.feed(sender, message) info = p.getinfo() p.close() if info[0] == 'unknown': continue if info[0] in machines: continue machines.append(info[0]) results.append(info) if info[1] == "mark5Status": if info[2] in missingVsns and info[3] in missingVsns: conflicts.append(info) if info[2] in missingVsns: missingVsns.remove(info[2]) if info[4] != 'Idle' and info[4] != 'Close': notidle.append(info[2]) if info[3] in missingVsns: missingVsns.remove(info[3]) if info[4] != 'Idle' and info[4] != 'Close': notidle.append(info[3]) elif info[1] == 'mark6Status': for key in ['slot1MSN', 'slot2MSN', 'slot3MSN', 'slot4MSN']: if key in info[2].keys(): if info[2][key] in missingVsns: missingVsns.remove(info[2][key]) print 'found',info[2][key],'on',info[0] if not isModuleComplete(key[:5], info[2]): incomplete.append(info[2][key]) except socket.timeout: t += dt except socket.herror: print 'Weird: cannot gethostbyaddr for %s' % address[0] results.sort() conflicts.sort() missingVsns.sort() notidle.sort() incomplete.sort() return results, conflicts, missingVsns, notidle, incomplete def startTimeInPackTimeRange(startTime, packStartStr, packEndStr): packStartMjd = convertDateString2Mjd(packStartStr) packEndMjd = convertDateString2Mjd(packEndStr) if startTime.days > packStartMjd.days or (startTime.days == packStartMjd.days and startTime.seconds >= packStartMjd.seconds): if startTime.days < packEndMjd.days or (startTime.days == packEndMjd.days and startTime.seconds <= packEndMjd.seconds): return True return False def getvexantennamodlists(vexobsfile, startTime): ''' Returns tuple (experName,antennaModuleList,modList) for Mark6 antennas and VSNs found in the given file with path vexobsfile. On errors this function exits the script. ''' experNameFound = False tapelogObsFound = False experName = '' ant = '' antennaModuleList = [] modList = [] # Regexps to flexibly detect VEX entries and fields; can test with https://regex101.com/ reComment = re.compile("^\s*\*") reSection = re.compile("^\s*\$\s*(.*?)\s*;") reDef = re.compile("^\s*def\s+(.*?)\s*;") reVSN = re.compile("\W*VSN\s*=\s*(.*?)\s*:\s*(.*?)\s*:\s*(.*?)\s*:\s*(.*?)\s*;") reEnddef = re.compile("\W*enddef\s*;") # Look up expt name and module info from VEX vexobs = open(vexobsfile, "r") for line in vexobs: if reComment.match(line): continue # detect EXPER info i = line.find('exper_name') if i != -1: experNameFound = True lineend = line[i+10:len(line)-1] for i in range(len(lineend)): if lineend[i] not in [' ', '=', ';']: experName += lineend[i].upper() continue # detect change of VEX section section = reSection.search(line) if section and tapelogObsFound: # encountered section after TAPELOG_OBS and if expt name also present by now, finished! if experNameFound: break if section and not tapelogObsFound: tapelogObsFound = section.group(1) == 'TAPELOG_OBS' # detect entries in TAPELOG_OBS section if tapelogObsFound: # note the possibility of one-liners e.g. 'def Pv; VSN=0::