import sys import os import shutil import datetime from asap import scantable, selector, mask_not from asap.logging import asaplog asaplog.disable() from nose.tools import * def tempdir_setup(): os.makedirs("test_temp") def tempdir_teardown(): shutil.rmtree("test_temp", True) class TestScantable(object): def setup(self): s = scantable("data/MOPS.rpf", average=True) sel = selector() # make sure this order is always correct - in can be random sel.set_order(["SCANNO", "POLNO"]) s.set_selection(sel) self.st = s.copy() restfreqs = [86.243] # 13CO-1/0, SiO the two IF self.st.set_restfreqs(restfreqs,"GHz") def test_init(self): st = scantable("data/MOPS.rpf", average=False) assert_equal(st.ncycle(), 32) st = scantable("data/MOPS.rpf", average=True) assert_equal(st.ncycle(), 2) st = scantable("data/MOPS.rpf", unit="Jy") assert_equal(st.get_fluxunit(), "Jy") st = scantable("data/MOPS.rpf", unit="K") assert_equal(st.get_fluxunit(), "K") assert_raises(RuntimeError, scantable, "data/MOPS.rpf", unit="junk") st = scantable(["data/MOPS.rpf","data/MOPS.rpf"], average=False) assert_equal(st.nscan(), 4) def test_copy(self): st = self.st.copy() assert_not_equal(id(st), id(self.st)) def test_drop_scan(self): st = self.st.drop_scan([1]) assert_equal(st.nscan(), 1) def test_get_scan(self): st = self.st.get_scan([1]) assert_equal(st.nscan(), 1) st = self.st.get_scan("Orion_SiO_R") assert_equal(st.get_sourcename()[-1], "Orion_SiO_R") assert_equal(st.nscan(), 1) def test_get_spectrum(self): spec = self.st.get_spectrum(0) assert_almost_equal(max(spec), 215.279830933) def test_get_mask(self): spec = self.st.get_mask(0) assert_equal(len(spec), 4096) def test_set_spectrum(self): spec = [ 1.0 for i in range(self.st.nchan()) ] self.st.set_spectrum(spec, 0) spec1 = self.st.get_spectrum(0) assert_almost_equal(max(spec1), 1.0) def test_selection(self): sel = selector() sel.set_polarisations("YY") self.st.set_selection(sel) assert_equal(self.st.getpolnos(), (1,)) sel1 = self.st.get_selection() assert_equal(sel1.get_pols(), [1]) self.st.set_selection(pols="XX") assert_equal(self.st.getpolnos(), (0,)) def stats(self, key, value, mask=False): msk = None if mask: msk = self.st.create_mask([0,100], [3900,4096]) sval = self.st.stats(stat=key, mask=msk) assert_almost_equal(sval[0], value) def test_masked_stats(self): stats = { 'min': 113.767166138, 'max':128.21571350, 'sumsq':4180516.75, 'sum':35216.87890625, 'mean':118.5753479, 'var':15.75608253, 'stddev':3.9693932533, 'avdev':3.395271778, 'rms':118.6415405, 'median':117.5024261} for k,v in stats.items(): yield self.stats, k, v, True def test_stats(self): stats = { 'min': 113.767166138, 'max':215.279830933, 'sumsq':128759200.0, 'sum':720262.375, 'mean':175.845306396, 'var':513.95324707, 'stddev':22.6705360413, 'avdev':16.3966751099, 'rms':177.300170898, 'median':182.891845703} for k,v in stats.items(): yield self.stats, k, v def test_get_column_names(self): cnames = ['SCANNO', 'CYCLENO', 'BEAMNO', 'IFNO', 'POLNO', 'FREQ_ID', 'MOLECULE_ID', 'REFBEAMNO', 'FLAGROW', 'TIME', 'INTERVAL', 'SRCNAME', 'SRCTYPE', 'FIELDNAME', 'SPECTRA', 'FLAGTRA', 'TSYS', 'DIRECTION', 'AZIMUTH', 'ELEVATION', 'OPACITY', 'TCAL_ID', 'FIT_ID', 'FOCUS_ID', 'WEATHER_ID', 'SRCVELOCITY', 'SRCPROPERMOTION', 'SRCDIRECTION', 'SCANRATE'] assert_equal(self.st.get_column_names(), cnames) def test_get_tsys(self): assert_almost_equal(self.st.get_tsys()[0], 175.830429077) def test_get_time(self): assert_equal(self.st.get_time(0), '2008/03/12/09:32:50') dt = datetime.datetime(2008,3,12,9,32,50) assert_equal(self.st.get_time(0, True), dt) def test_get_inttime(self): assert_almost_equal(self.st.get_inttime()[0], 30.720016479) def test_get_sourcename(self): assert_equal(self.st.get_sourcename(0), 'Orion_SiO_R') assert_equal(self.st.get_sourcename(), ['Orion_SiO_R', 'Orion_SiO_R', 'Orion_SiO', 'Orion_SiO']) def test_get_azimuth(self): assert_almost_equal(self.st.get_azimuth()[0], 5.628767013) def test_get_elevation(self): assert_almost_equal(self.st.get_elevation()[0], 1.01711678504) def test_get_parangle(self): assert_almost_equal(self.st.get_parangle()[0], 2.5921990871) def test_get_direction(self): assert_equal(self.st.get_direction()[0], '05:35:14.5 -04.52.29.5') def test_get_directionval(self): dv = self.st.get_directionval()[0] assert_almost_equal(dv[0], 1.4627692699) assert_almost_equal(dv[1], -0.0850824415) def test_unit(self): self.st.set_unit('') self.st.set_unit('GHz') self.st.set_unit('km/s') assert_raises(RuntimeError, self.st.set_unit, 'junk') assert_equals(self.st.get_unit(), 'km/s') def test_average_pol(self): ap = self.st.average_pol() assert_equal(ap.npol(), 1) def test_drop_scan(self): s0 = self.st.drop_scan(1) assert_equal(s0.getscannos(), (0,)) s1 = self.st.drop_scan([0]) assert_equal(s1.getscannos(), (1,)) def test_flag(self): q = self.st.auto_quotient() q.set_unit('km/s') q0 = q.copy() q1 = q.copy() msk = q0.create_mask([-10,20]) q0.flag(mask=mask_not(msk)) q1.flag(mask=msk) assert_almost_equal(q0.stats(stat='max')[0], 95.62171936) assert_almost_equal(q1.stats(stat='max')[0], 2.66563416) @with_setup(tempdir_setup, tempdir_teardown) def test_save(self): fname = os.path.join("test_temp", 'scantable_test.%s') formats = [(fname % 'sdfits', 'SDFITS', True), (fname % 'ms', 'MS2', True), (fname % 'class.fits', 'CLASS', False), (fname % 'fits', 'FITS', False), (fname % 'txt', 'ASCII', False), ] for format in formats: yield self.save, format def save(self, args): fname = args[0] self.st.save(fname, args[1], True) # do some verification args[2] == True if args[-1]: s = scantable(fname) ds = self.st - s assert_equals(self.st.getpolnos(), s.getpolnos()) assert_equals(self.st.getscannos(), s.getscannos()) assert_equals(self.st.getifnos(), s.getifnos()) assert_equals(self.st.getbeamnos(), s.getbeamnos()) # see if the residual spectra are ~ 0.0 for spec in ds: assert_almost_equals(sum(spec)/len(spec), 0.0, 5)