#!/usr/bin/env python
#**************************************************************************
# Copyright (C) 2008-2011 by Walter Brisken *
# *
# This program is free software; you can redistribute it and/or modify *
# it under the terms of the GNU General Public License as published by *
# the Free Software Foundation; either version 3 of the License, or *
# (at your option) any later version. *
# *
# This program is distributed in the hope that it will be useful, *
# but WITHOUT ANY WARRANTY; without even the implied warranty of *
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
# GNU General Public License for more details. *
# *
# You should have received a copy of the GNU General Public License *
# along with this program; if not, write to the *
# Free Software Foundation, Inc., *
# 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *
#**************************************************************************
#===========================================================================
# SVN properties (DO NOT CHANGE)
#
# $Id: genmachines 5068 2012-12-16 19:25:27Z WalterBrisken $
# $HeadURL: $
# $LastChangedRevision: 5068 $
# $Author: WalterBrisken $
# $LastChangedDate: 2012-12-16 14:25:27 -0500 (Sun, 16 Dec 2012) $
#
#============================================================================
from string import split, strip, find, upper, lower
from sys import argv, exit
from os import popen, getenv, umask
from glob import glob
import socket
import struct
import subprocess
import signal
import sys
from xml.parsers import expat
from copy import deepcopy
program = 'genmachines'
author = 'Walter Brisken'
version = '0.16'
verdate = '20121216'
defaultDifxMessagePort = 50200
defaultDifxMessageGroup = '224.2.2.1'
def usage():
print '\n%s ver. %s %s %s' % (program, version, author, verdate)
print '\nA program to find required Mark5 modules and write the machines file'
print 'appropriate for a particular DiFX job.'
print '\nUsage : %s [options] ' % argv[0]
print '\noptions can include:'
print '\n -h or --help'
print ' print this usage info and exit'
print '\n -v or --verbose'
print ' increase verbosity of output'
print '\n -o or --overheadcores '
print ' set overheadcores to , default = 1'
print '\n -m or --machines '
print ' use instead of $DIFX_MACHINES'
print '\n -n or --no-threads'
print ' don\'t write a .threads file'
print '\n -d or --difxdb'
print ' use difxdb database to obtain module slots'
print '\n is a DiFX .input file.'
print '\nEnv. Var. DIFX_MACHINES must point to the machines file if no '
print 'is specifided.\n'
exit(1)
class Parser:
def __init__(self):
self._parser = expat.ParserCreate()
self._parser.StartElementHandler = self.start
self._parser.EndElementHandler = self.end
self._parser.CharacterDataHandler = self.data
self.vsnA = 'none'
self.vsnB = 'none'
self.state = 'Unknown'
self.unit = 'unknown'
self.sender = 'unknown'
self.tmp = ''
self.ok = False
def feed(self, sender, data):
self._parser.Parse(data, 0)
self.sender = sender
def close(self):
self._parser.Parse("", 1) # end of data
del self._parser # get rid of circular references
def start(self, tag, attrs):
if tag == 'mark5Status':
self.ok = True
def end(self, tag):
if tag == 'bankAVSN' and self.ok:
if len(self.tmp) != 8:
self.vsnA = 'none'
else:
self.vsnA = upper(self.tmp)
if tag == 'bankBVSN' and self.ok:
if len(self.tmp) != 8:
self.vsnB = 'none'
else:
self.vsnB = upper(self.tmp)
if tag == 'from':
self.unit = lower(self.tmp)
if tag == 'state' and self.ok:
self.state = self.tmp
def data(self, data):
self.tmp = data
def getinfo(self):
if self.ok:
return [self.unit, self.vsnA, self.vsnB, self.state, self.sender]
else:
return ['unknown', 'none', 'none', 'Unknown', 'unknown']
def vsn_request():
src = socket.gethostname()
dest = 'mark5'
cmd = 'getvsn'
message = \
'\n' \
'' \
'' \
'%s' \
'%s' \
'-1' \
'genmachines' \
'DifxCommand' \
'' \
'' \
'0' \
'' \
'%s' \
'' \
'' \
'' % (src, dest, cmd)
return message
def getVsnsByMulticast(maxtime, modlist, verbose):
dt = 0.2
t = 0.0
port = getenv('DIFX_MESSAGE_PORT')
if port == None:
port = defaultDifxMessagePort
else:
port = int(port)
group = getenv('DIFX_MESSAGE_GROUP')
if group == None:
group = defaultDifxMessageGroup
missing = deepcopy(modlist)
message = vsn_request()
# First send out a call for VSNs
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
sock.sendto(message, (group, port))
# Now listen for responses, until either time runs out or we get all we need
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(('', port))
mreq = struct.pack("4sl", socket.inet_aton(group), socket.INADDR_ANY)
s.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
s.settimeout(dt)
conflicts = []
results = []
machines = []
notidle = []
while t < maxtime and len(missing) > 0:
try:
message, address = s.recvfrom(2048)
sender = split(socket.gethostbyaddr(address[0])[0], '.')[0]
if verbose > 1:
print message
p = Parser()
p.feed(sender, message)
info = p.getinfo()
p.close()
if info[0] == 'unknown':
continue
if info[0] in machines:
continue
machines.append(info[0])
results.append(info)
if info[1] in missing and info[2] in missing:
conflicts.append(info)
if info[1] in missing:
missing.remove(info[1])
if info[3] != 'Idle' and info[3] != 'Close':
notidle.append(info[1])
if info[2] in missing:
missing.remove(info[2])
if info[3] != 'Idle' and info[3] != 'Close':
notidle.append(info[2])
except socket.timeout:
t += dt
except socket.herror:
print 'Weird: cannot gethostbyaddr for %s' % address[0]
results.sort()
conflicts.sort()
missing.sort()
notidle.sort()
return results, conflicts, missing, notidle
def getVsnsFromInputFile(inputfile):
vsns = []
nds = 0
input = open(inputfile).readlines()
dsindices = []
dssources = []
dscount = 0
dsfilecount = 0
for inputLine in input:
s = split(inputLine, ':')
if len(s) < 2:
continue;
key = s[0].strip()
keyparts = key.split()
value = s[1].strip()
if key == 'ACTIVE DATASTREAMS':
nds = int(value)
if len(keyparts) == 3 and keyparts[0] == 'DATASTREAM' and keyparts[2] == 'INDEX':
dsindices.append(int(value))
if key == 'DATA SOURCE':
if dscount in dsindices:
dssources.append(value)
dscount += 1
if len(keyparts) == 2 and keyparts[0] == 'FILE':
# VSNs only required for MODULES - will assume files are there for now
s = split(keyparts[1], '/')
ds = int(strip(s[0]))
if dssources[ds] == 'MODULE':
if ds < nds:
vsns.append(value)
return (dssources, vsns)
def readmachines(machinesfile, verbose):
machines = []
cores = []
ismk5 = {}
lines = open(machinesfile).readlines()
for l in lines:
l = split(strip(l), '#')[0]
s = split(l)
if len(s) >= 2:
machines.append(s[0])
cores.append(int(s[1]))
if len(s) >= 3:
ismk5[s[0]] = int(s[2])
else:
if s[0][:5] == 'mark5':
ismk5[s[0]] = 1
else:
ismk5[s[0]] = 0
if verbose > 1:
print 'MACHINES = ', machines
print 'CORES = ', cores
print 'IS MK5 = ', ismk5
return machines,cores,ismk5
def writethreads(basename, threads):
o = open(basename+'threads', 'w')
o.write('NUMBER OF CORES: %d\n' % len(threads))
for t in threads:
o.write('%d\n' % t)
o.close()
def writemachines(basename, hostname, machines, cores, results, vsns, dssources, ismk5, \
overheadcores, verbose):
dsnodes = []
threads = []
extrathreads = []
quit = False
moduleCount = 0
headCount = 0
for ds in dssources:
if ds == "FILE":
foundsuitable = False
for m in machines:
# don't use the head node
if m == hostname:
continue
# don't use machines already used as ds nodes
if m in dsnodes:
continue
# don't use mark5 machines
if ismk5[m] == 1:
continue
foundsuitable = True
dsnodes.append(m)
break
if not foundsuitable:
print "Could not find a machine not already used"
print "Will allocate a FILE datastream to %s" % (hostname)
dsnodes.append(hostname)
headCount += 1
elif ds == "MODULE":
for r in results:
if r[1] == vsns[moduleCount] or r[2] == vsns[moduleCount]:
if r[0] in machines:
dsnodes.append(r[0])
else:
dsnodes.append(r[4])
moduleCount += 1
for d in dsnodes:
if not d in machines:
print '%s not enabled in machines file' % d
quit = True
if quit:
return []
# Check if we must add the head node as a correlation node, too
foundsuitable = False
for m in range(len(machines)):
if machines[m] in dsnodes:
continue
if cores[m] < 1:
continue
foundsuitable = True
break
if not foundsuitable:
headCount += 1
# write file
o = open(basename+'machines', 'w')
# head node
maxslots = 1
if hostname in machines:
m = machines.index(hostname)
if headCount > 0:
maxslots = headCount + 1
elif cores[m] > overheadcores+1:
maxslots = 2
extrathreads.append(cores[m] - (overheadcores+1))
o.write('%s\n' % (hostname))
# datastream nodes
for d in dsnodes:
m = machines.index(d)
if headCount > 0:
maxslots = headCount + 1
elif cores[m] > overheadcores+1:
maxslots = 2
extrathreads.append(cores[m] - (overheadcores+1))
else:
maxslots = 1
o.write('%s\n' % (d))
# core nodes
foundsuitable = False
for m in range(len(machines)):
if ismk5[machines[m]] == 1:
continue
if machines[m] in dsnodes:
nthreads = cores[m] - 2
if nthreads < 1:
nthreads = 1
threads.append(nthreads)
foundsuitable = True
o.write('%s\n' % (machines[m]))
elif cores[m] > 0:
threads.append(cores[m])
foundsuitable = True
o.write('%s\n' % (machines[m]))
if not foundsuitable:
print "Could not find a core machine not already used"
print "Will allocate %s as a core, too" % (hostname)
if hostname in machines:
m = machines.index(hostname)
nthreads = cores[m] - 2
if nthreads < 1:
nthreads = 1
else:
nthreads = 1
threads.append(nthreads)
o.write('%s\n' % (machines[m]))
o.close()
for e in extrathreads:
threads.append(e)
return threads
def uniqueVsns(vsns):
d = {}
for v in vsns:
d[v] = 1
if len(d) != len(vsns):
return 0
else:
return 1
def run(files, machinesfile, overheadcores, verbose, dothreads, useDifxDb):
ok = True
machines, cores, ismk5 = readmachines(machinesfile, verbose)
hostname = socket.gethostname()
if not hostname in machines:
print 'ERROR: hostname not in machines file : %s' % machinesfile
exit(1)
infile = files[0]
basename = infile[0:-5]
if basename + 'input' != infile:
print 'expecting input file'
exit(1)
(dssources,vsns) = getVsnsFromInputFile(infile)
if not uniqueVsns(vsns):
print 'ERROR: at least one duplicate VSN exists in %s !' % infile
exit(1)
results, conflicts, missing, notidle = getVsnsByMulticast(5, vsns, verbose)
if verbose > 0:
print 'Found modules:'
for r in results:
print ' %-10s : %10s %10s %s' % (r[0], r[1], r[2], r[3])
if len(conflicts) > 0:
ok = False
print 'Module conflicts:'
for c in conflicts:
print ' %-10s : %10s %10s' % (c[0], c[1], c[2])
if len(missing) > 0:
ok = False
print 'Missing modules:'
for m in missing:
slot = "unknown"
if useDifxDb:
child = subprocess.Popen(["getslot", m], stdout=subprocess.PIPE)
(slot, stderr) = child.communicate()
print ' %s (slot = %s )' % (m, strip(slot))
if len(notidle) > 0:
ok = False
print 'Modules not ready:'
for n in notidle:
print ' %s' % n
if not ok:
return 1
t = writemachines(basename, hostname, machines, cores, results, \
vsns, dssources, ismk5, overheadcores, verbose)
if len(t) == 0:
return 1
if dothreads:
writethreads(basename, t)
return 0
def signal_handler(signal, frame):
print 'You pressed Ctrl+C!'
sys.exit(8)
if len(argv) < 2:
usage()
# catch ctrl+c
signal.signal(signal.SIGINT, signal_handler)
files = []
machinesfile = getenv('DIFX_MACHINES')
overheadcores = 1
verbose = 0
dothreads = True
useDifxDb = False
if getenv('DIFX_GROUP_ID'):
umask(2)
a = 1
while a < len(argv):
arg = argv[a]
if arg == '-h' or arg == '--help':
usage()
elif arg == '-v' or arg == '--verbose':
verbose += 1
elif arg == '-n' or arg == '--no-threads':
dothreads = False
elif arg == '-m' or arg == '--machinesfile':
a += 1
if a >= len(argv):
print 'No machinesfile specified!'
exit(1)
machinesfile = argv[a]
elif arg == '-o' or arg == '--overheadcores':
a += 1
if a >= len(argv):
print 'No overhead cores specified1'
exit(1)
overheadcores = int(argv[a])
elif arg == '-d' or arg == '--difxdb':
useDifxDb = True
else:
files.append(arg)
a += 1
if len(files) != 1:
usage()
quit = False
for f in files:
if len(glob(f)) != 1:
print 'File %s not found' % f
quit = True
if machinesfile == None:
print 'DIFX_MACHINES env var not defined!';
quit = True
elif len(glob(machinesfile)) != 1:
print 'Machinesfile %s not found.' % machinesfile
quit = True
if quit:
print 'genmachines quitting.'
exit(1)
if verbose > 0:
print 'DIFX_MACHINES -> %s' % machinesfile
v = run(files, machinesfile, overheadcores, verbose, dothreads, useDifxDb)
if v != 0:
exit(v)