#!/usr/bin/env python #************************************************************************** # Copyright (C) 2008-2011 by Walter Brisken * # * # This program is free software; you can redistribute it and/or modify * # it under the terms of the GNU General Public License as published by * # the Free Software Foundation; either version 3 of the License, or * # (at your option) any later version. * # * # This program is distributed in the hope that it will be useful, * # but WITHOUT ANY WARRANTY; without even the implied warranty of * # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * # GNU General Public License for more details. * # * # You should have received a copy of the GNU General Public License * # along with this program; if not, write to the * # Free Software Foundation, Inc., * # 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * #************************************************************************** #=========================================================================== # SVN properties (DO NOT CHANGE) # # $Id$ # $HeadURL: $ # $LastChangedRevision$ # $Author$ # $LastChangedDate$ # #============================================================================ from string import split, strip, find, upper, lower from sys import argv, exit from os import popen, getenv, umask from glob import glob import socket import struct import subprocess import signal import sys from xml.parsers import expat from copy import deepcopy program = 'genmachines' author = 'Walter Brisken' version = '0.16' verdate = '20121216' defaultDifxMessagePort = 50200 defaultDifxMessageGroup = '224.2.2.1' def usage(): print '\n%s ver. %s %s %s' % (program, version, author, verdate) print '\nA program to find required Mark5 modules and write the machines file' print 'appropriate for a particular DiFX job.' print '\nUsage : %s [options] ' % argv[0] print '\noptions can include:' print '\n -h or --help' print ' print this usage info and exit' print '\n -v or --verbose' print ' increase verbosity of output' print '\n -o or --overheadcores ' print ' set overheadcores to , default = 1' print '\n -m or --machines ' print ' use instead of $DIFX_MACHINES' print '\n -n or --no-threads' print ' don\'t write a .threads file' print '\n -d or --difxdb' print ' use difxdb database to obtain module slots' print '\n is a DiFX .input file.' print '\nEnv. Var. DIFX_MACHINES must point to the machines file if no ' print 'is specifided.\n' exit(1) class Parser: def __init__(self): self._parser = expat.ParserCreate() self._parser.StartElementHandler = self.start self._parser.EndElementHandler = self.end self._parser.CharacterDataHandler = self.data self.vsnA = 'none' self.vsnB = 'none' self.state = 'Unknown' self.unit = 'unknown' self.sender = 'unknown' self.tmp = '' self.ok = False def feed(self, sender, data): self._parser.Parse(data, 0) self.sender = sender def close(self): self._parser.Parse("", 1) # end of data del self._parser # get rid of circular references def start(self, tag, attrs): if tag == 'mark5Status': self.ok = True def end(self, tag): if tag == 'bankAVSN' and self.ok: if len(self.tmp) != 8: self.vsnA = 'none' else: self.vsnA = upper(self.tmp) if tag == 'bankBVSN' and self.ok: if len(self.tmp) != 8: self.vsnB = 'none' else: self.vsnB = upper(self.tmp) if tag == 'from': self.unit = lower(self.tmp) if tag == 'state' and self.ok: self.state = self.tmp def data(self, data): self.tmp = data def getinfo(self): if self.ok: return [self.unit, self.vsnA, self.vsnB, self.state, self.sender] else: return ['unknown', 'none', 'none', 'Unknown', 'unknown'] def vsn_request(): src = socket.gethostname() dest = 'mark5' cmd = 'getvsn' message = \ '\n' \ '' \ '
' \ '%s' \ '%s' \ '-1' \ 'genmachines' \ 'DifxCommand' \ '
' \ '' \ '0' \ '' \ '%s' \ '' \ '' \ '
' % (src, dest, cmd) return message def getVsnsByMulticast(maxtime, modlist, verbose): dt = 0.2 t = 0.0 port = getenv('DIFX_MESSAGE_PORT') if port == None: port = defaultDifxMessagePort else: port = int(port) group = getenv('DIFX_MESSAGE_GROUP') if group == None: group = defaultDifxMessageGroup missing = deepcopy(modlist) message = vsn_request() # First send out a call for VSNs sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2) sock.sendto(message, (group, port)) # Now listen for responses, until either time runs out or we get all we need s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind(('', port)) mreq = struct.pack("4sl", socket.inet_aton(group), socket.INADDR_ANY) s.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) s.settimeout(dt) conflicts = [] results = [] machines = [] notidle = [] while t < maxtime and len(missing) > 0: try: message, address = s.recvfrom(2048) sender = split(socket.gethostbyaddr(address[0])[0], '.')[0] if verbose > 1: print message p = Parser() p.feed(sender, message) info = p.getinfo() p.close() if info[0] == 'unknown': continue if info[0] in machines: continue machines.append(info[0]) results.append(info) if info[1] in missing and info[2] in missing: conflicts.append(info) if info[1] in missing: missing.remove(info[1]) if info[3] != 'Idle' and info[3] != 'Close': notidle.append(info[1]) if info[2] in missing: missing.remove(info[2]) if info[3] != 'Idle' and info[3] != 'Close': notidle.append(info[2]) except socket.timeout: t += dt results.sort() conflicts.sort() missing.sort() notidle.sort() return results, conflicts, missing, notidle def getVsnsFromInputFile(inputfile): vsns = [] nds = 0 input = open(inputfile).readlines() dsindices = [] dssources = [] dscount = 0 dsfilecount = 0 for inputLine in input: s = split(inputLine, ':') if len(s) < 2: continue; key = s[0].strip() keyparts = key.split() value = s[1].strip() if key == 'ACTIVE DATASTREAMS': nds = int(value) if len(keyparts) == 3 and keyparts[0] == 'DATASTREAM' and keyparts[2] == 'INDEX': dsindices.append(int(value)) if key == 'DATA SOURCE': if dscount in dsindices: dssources.append(value) dscount += 1 if len(keyparts) == 2 and keyparts[0] == 'FILE': # VSNs only required for MODULES - will assume files are there for now s = split(keyparts[1], '/') ds = int(strip(s[0])) if dssources[ds] == 'MODULE': if ds < nds: vsns.append(value) return (dssources, vsns) def readmachines(machinesfile, verbose): machines = [] cores = [] ismk5 = {} lines = open(machinesfile).readlines() for l in lines: l = split(strip(l), '#')[0] s = split(l) if len(s) >= 2: machines.append(s[0]) cores.append(int(s[1])) if len(s) >= 3: ismk5[s[0]] = int(s[2]) else: if s[0][:5] == 'mark5': ismk5[s[0]] = 1 else: ismk5[s[0]] = 0 if verbose > 1: print 'MACHINES = ', machines print 'CORES = ', cores print 'IS MK5 = ', ismk5 return machines,cores,ismk5 def writethreads(basename, threads): o = open(basename+'threads', 'w') o.write('NUMBER OF CORES: %d\n' % len(threads)) for t in threads: o.write('%d\n' % t) o.close() def writemachines(basename, hostname, machines, cores, results, vsns, dssources, ismk5, \ overheadcores, verbose): dsnodes = [] threads = [] extrathreads = [] quit = False moduleCount = 0 headCount = 0 for ds in dssources: if ds == "FILE": foundsuitable = False for m in machines: # don't use the head node if m == hostname: continue # don't use machines already used as ds nodes if m in dsnodes: continue # don't use mark5 machines if ismk5[m] == 1: continue foundsuitable = True dsnodes.append(m) break if not foundsuitable: print "Could not find a machine not already used" print "Will allocate a FILE datastream to %s" % (hostname) dsnodes.append(hostname) headCount += 1 elif ds == "MODULE": for r in results: if r[1] == vsns[moduleCount] or r[2] == vsns[moduleCount]: if r[0] in machines: dsnodes.append(r[0]) else: dsnodes.append(r[4]) moduleCount += 1 for d in dsnodes: if not d in machines: print '%s not enabled in machines file' % d quit = True if quit: return [] # Check if we must add the head node as a correlation node, too foundsuitable = False for m in range(len(machines)): if machines[m] in dsnodes: continue if cores[m] < 1: continue foundsuitable = True break if not foundsuitable: headCount += 1 # write file o = open(basename+'machines', 'w') # head node maxslots = 1 if hostname in machines: m = machines.index(hostname) if headCount > 0: maxslots = headCount + 1 elif cores[m] > overheadcores+1: maxslots = 2 extrathreads.append(cores[m] - (overheadcores+1)) o.write('%s slots=1 max-slots=%d\n' % \ (hostname, maxslots)) # datastream nodes for d in dsnodes: m = machines.index(d) if headCount > 0: maxslots = headCount + 1 elif cores[m] > overheadcores+1: maxslots = 2 extrathreads.append(cores[m] - (overheadcores+1)) else: maxslots = 1 o.write('%s slots=1 max-slots=%d\n' % \ (d, maxslots)) # core nodes foundsuitable = False for m in range(len(machines)): if machines[m] in dsnodes: continue if cores[m] < 1: continue threads.append(cores[m]) foundsuitable = True o.write('%s slots=1 max-slots=1\n' % machines[m]) if not foundsuitable: print "Could not find a core machine not already used" print "Will allocate %s as a core, too" % (hostname) if hostname in machines: m = machines.index(hostname) nthreads = cores[m] - 2 if nthreads < 1: nthreads = 1 else: nthreads = 1 threads.append(nthreads) o.write('%s slots=1 max-slots=%s\n' % (machines[m], headCount+1)) o.close() for e in extrathreads: threads.append(e) return threads def uniqueVsns(vsns): d = {} for v in vsns: d[v] = 1 if len(d) != len(vsns): return 0 else: return 1 def run(files, machinesfile, overheadcores, verbose, dothreads, useDifxDb): ok = True machines, cores, ismk5 = readmachines(machinesfile, verbose) hostname = socket.gethostname() if not hostname in machines: print 'ERROR: hostname not in machines file : %s' % machinesfile exit(1) infile = files[0] basename = infile[0:-5] if basename + 'input' != infile: print 'expecting input file' exit(1) (dssources,vsns) = getVsnsFromInputFile(infile) if not uniqueVsns(vsns): print 'ERROR: at least one duplicate VSN exists in %s !' % infile exit(1) results, conflicts, missing, notidle = getVsnsByMulticast(5, vsns, verbose) if verbose > 0: print 'Found modules:' for r in results: print ' %-10s : %10s %10s %s' % (r[0], r[1], r[2], r[3]) if len(conflicts) > 0: ok = False print 'Module conflicts:' for c in conflicts: print ' %-10s : %10s %10s' % (c[0], c[1], c[2]) if len(missing) > 0: ok = False print 'Missing modules:' for m in missing: slot = "unknown" if useDifxDb: child = subprocess.Popen(["getslot", m], stdout=subprocess.PIPE) (slot, stderr) = child.communicate() print ' %s (slot = %s )' % (m, strip(slot)) if len(notidle) > 0: ok = False print 'Modules not ready:' for n in notidle: print ' %s' % n if not ok: return 1 t = writemachines(basename, hostname, machines, cores, results, \ vsns, dssources, ismk5, overheadcores, verbose) if len(t) == 0: return 1 if dothreads: writethreads(basename, t) return 0 def signal_handler(signal, frame): print 'You pressed Ctrl+C!' sys.exit(8) if len(argv) < 2: usage() # catch ctrl+c signal.signal(signal.SIGINT, signal_handler) files = [] machinesfile = getenv('DIFX_MACHINES') overheadcores = 1 verbose = 0 dothreads = True useDifxDb = False if getenv('DIFX_GROUP_ID'): umask(2) a = 1 while a < len(argv): arg = argv[a] if arg == '-h' or arg == '--help': usage() elif arg == '-v' or arg == '--verbose': verbose += 1 elif arg == '-n' or arg == '--no-threads': dothreads = False elif arg == '-m' or arg == '--machinesfile': a += 1 if a >= len(argv): print 'No machinesfile specified!' exit(1) machinesfile = argv[a] elif arg == '-o' or arg == '--overheadcores': a += 1 if a >= len(argv): print 'No overhead cores specified1' exit(1) overheadcores = int(argv[a]) elif arg == '-d' or arg == '--difxdb': useDifxDb = True else: files.append(arg) a += 1 if len(files) != 1: usage() quit = False for f in files: if len(glob(f)) != 1: print 'File %s not found' % f quit = True if machinesfile == None: print 'DIFX_MACHINES env var not defined!'; quit = True elif len(glob(machinesfile)) != 1: print 'Machinesfile %s not found.' % machinesfile quit = True if quit: print 'genmachines quitting.' exit(1) if verbose > 0: print 'DIFX_MACHINES -> %s' % machinesfile v = run(files, machinesfile, overheadcores, verbose, dothreads, useDifxDb) if v != 0: exit(v)