source: trunk/test/test_scantable.py @ 1840

Last change on this file since 1840 was 1840, checked in by Malte Marquarding, 14 years ago

movde to nose based testing

File size: 6.7 KB
RevLine 
[1840]1import sys
2import os
3import shutil
[1600]4import datetime
[1840]5from asap import scantable, selector, mask_not
6from asap.logging import asaplog
[1824]7asaplog.disable()
[1544]8
[1840]9from nose.tools import *
10
11def tempdir_setup():
12    os.makedirs("test_temp")
13
14def tempdir_teardown():
15    shutil.rmtree("test_temp", True)
16
17class TestScantable(object):
18    def setup(self):
[1824]19        s = scantable("data/MOPS.rpf", average=True)
20        sel = selector()
21        # make sure this order is always correct - in can be random
22        sel.set_order(["SCANNO", "POLNO"])
23        s.set_selection(sel)
24        self.st = s.copy()
[1725]25        restfreqs = [86.243]     # 13CO-1/0, SiO the two IF
26        self.st.set_restfreqs(restfreqs,"GHz")
[1544]27
28    def test_init(self):
29        st = scantable("data/MOPS.rpf", average=False)
[1840]30        assert_equal(st.ncycle(), 32)
[1544]31        st = scantable("data/MOPS.rpf", average=True)
[1840]32        assert_equal(st.ncycle(), 2)
[1544]33        st = scantable("data/MOPS.rpf", unit="Jy")
[1840]34        assert_equal(st.get_fluxunit(), "Jy")
[1544]35        st = scantable("data/MOPS.rpf", unit="K")
[1840]36        assert_equal(st.get_fluxunit(), "K")
37        assert_raises(RuntimeError, scantable, "data/MOPS.rpf", unit="junk")
[1544]38        st = scantable(["data/MOPS.rpf","data/MOPS.rpf"], average=False)
[1840]39        assert_equal(st.nscan(), 4)
[1544]40
41    def test_copy(self):
42        st = self.st.copy()
[1840]43        assert_not_equal(id(st), id(self.st))
[1592]44
[1544]45    def test_drop_scan(self):
46        st = self.st.drop_scan([1])
[1840]47        assert_equal(st.nscan(), 1)
[1544]48
49    def test_get_scan(self):
50        st = self.st.get_scan([1])
[1840]51        assert_equal(st.nscan(), 1)
[1544]52        st = self.st.get_scan("Orion_SiO_R")
[1840]53        assert_equal(st.get_sourcename()[-1], "Orion_SiO_R")
54        assert_equal(st.nscan(), 1)
[1544]55
56    def test_get_spectrum(self):
57        spec = self.st.get_spectrum(0)
[1840]58        assert_almost_equal(max(spec), 215.279830933)
[1544]59
60    def test_get_mask(self):
61        spec = self.st.get_mask(0)
[1840]62        assert_equal(len(spec), 4096)
[1592]63
[1544]64    def test_set_spectrum(self):
65        spec = [ 1.0 for i in range(self.st.nchan()) ]
66        self.st.set_spectrum(spec, 0)
67        spec1 = self.st.get_spectrum(0)
[1840]68        assert_almost_equal(max(spec1), 1.0)
[1544]69
70    def test_selection(self):
71        sel = selector()
72        sel.set_polarisations("YY")
73        self.st.set_selection(sel)
[1840]74        assert_equal(self.st.getpolnos(), (1,))
[1544]75        sel1 = self.st.get_selection()
[1840]76        assert_equal(sel1.get_pols(), [1])
[1600]77        self.st.set_selection(pols="XX")
[1840]78        assert_equal(self.st.getpolnos(), (0,))
[1544]79
80    def test_stats(self):
81        stats = { 'min': 113.767166138,
82                  'max':215.279830933, 'sumsq':128759200.0,
83                  'sum':720262.375, 'mean':175.845306396,
84                  'var':513.95324707, 'stddev':22.6705360413,
[1592]85                  'avdev':16.3966751099, 'rms':177.300170898,
86                  'median':182.891845703}
[1544]87        for k,v in stats.iteritems():
88            sval = self.st.stats(stat=k)
[1840]89            assert_almost_equal(sval[0], v)
[1544]90        msk = self.st.create_mask([0,100], [3900,4096])
[1840]91        assert_almost_equal(self.st.stats("sum", msk)[0], 35216.87890625)
[1544]92
93    def test_get_column_names(self):
[1592]94        cnames = ['SCANNO', 'CYCLENO', 'BEAMNO', 'IFNO',
[1824]95                  'POLNO', 'FREQ_ID', 'MOLECULE_ID', 'REFBEAMNO', 'FLAGROW',
[1592]96                  'TIME', 'INTERVAL', 'SRCNAME', 'SRCTYPE',
[1544]97                  'FIELDNAME', 'SPECTRA', 'FLAGTRA', 'TSYS',
[1592]98                  'DIRECTION', 'AZIMUTH', 'ELEVATION',
99                  'OPACITY', 'TCAL_ID', 'FIT_ID',
[1544]100                  'FOCUS_ID', 'WEATHER_ID', 'SRCVELOCITY',
101                  'SRCPROPERMOTION', 'SRCDIRECTION',
102                  'SCANRATE']
[1840]103        assert_equal(self.st.get_column_names(), cnames)
[1544]104
[1600]105    def test_get_tsys(self):
[1840]106        assert_almost_equal(self.st.get_tsys()[0], 175.830429077)
[1600]107
108    def test_get_time(self):
[1840]109        assert_equal(self.st.get_time(0), '2008/03/12/09:32:50')
[1600]110        dt = datetime.datetime(2008,3,12,9,32,50)
[1840]111        assert_equal(self.st.get_time(0, True), dt)
[1600]112
113    def test_get_inttime(self):
[1840]114        assert_almost_equal(self.st.get_inttime()[0], 30.720016479)
[1600]115
116    def test_get_sourcename(self):
[1840]117        assert_equal(self.st.get_sourcename(0), 'Orion_SiO_R')
118        assert_equal(self.st.get_sourcename(),
[1824]119                         ['Orion_SiO_R', 'Orion_SiO_R',
120                          'Orion_SiO', 'Orion_SiO'])
[1600]121
122    def test_get_azimuth(self):
[1840]123        assert_almost_equal(self.st.get_azimuth()[0], 5.628767013)
[1600]124
125    def test_get_elevation(self):
[1840]126        assert_almost_equal(self.st.get_elevation()[0], 1.01711678504)
[1600]127
128    def test_get_parangle(self):
[1840]129        assert_almost_equal(self.st.get_parangle()[0], 2.5921990871)
[1600]130
131    def test_get_direction(self):
[1840]132        assert_equal(self.st.get_direction()[0], '05:35:14.5 -04.52.29.5')
[1600]133
134    def test_get_directionval(self):
135        dv = self.st.get_directionval()[0]
[1840]136        assert_almost_equal(dv[0], 1.4627692699)
137        assert_almost_equal(dv[1], -0.0850824415)
[1600]138
139    def test_unit(self):
140        self.st.set_unit('')
141        self.st.set_unit('GHz')
142        self.st.set_unit('km/s')
[1840]143        assert_raises(RuntimeError, self.st.set_unit, 'junk')
144        assert_equals(self.st.get_unit(), 'km/s')
[1600]145
[1592]146    def test_average_pol(self):
147        ap = self.st.average_pol()
[1840]148        assert_equal(ap.npol(), 1)
[1544]149
[1600]150    def test_drop_scan(self):
151        s0 = self.st.drop_scan(1)
[1840]152        assert_equal(s0.getscannos(), (0,))
[1600]153        s1 = self.st.drop_scan([0])
[1840]154        assert_equal(s1.getscannos(), (1,))
[1592]155
[1725]156    def test_flag(self):
157        q = self.st.auto_quotient()
158        q.set_unit('km/s')
159        q0 = q.copy()
160        q1 = q.copy()
161        msk = q0.create_mask([-10,20])
162        q0.flag(mask=mask_not(msk))
[1824]163        q1.flag(mask=msk)
[1840]164        assert_almost_equal(q0.stats(stat='max')[0], 95.62171936)
165        assert_almost_equal(q1.stats(stat='max')[0], 2.66563416)
[1600]166
[1840]167
168    @with_setup(tempdir_setup, tempdir_teardown)
169    def test_save(self):
170        fname = os.path.join("test_temp", 'scantable_test.%s')
171        formats = [(fname % 'sdfits', 'SDFITS', True),
172                   (fname % 'ms', 'MS2', True),
173                   (fname % 'class.fits', 'CLASS', False),
174                   (fname % 'fits', 'FITS', False),
175                   (fname % 'txt', 'ASCII', False),
176                   ]
177        for format in formats:
178            yield self.save, format
179
180    def save(self, args):
181        fname = args[0]
182        self.st.save(fname, args[1], True)
183        if args[-1]:
184            s = scantable(fname)
185            ds = self.st - s
186            assert_equals(self.st.getpolnos(), s.getpolnos())
187            assert_equals(self.st.getscannos(), s.getscannos())
188            assert_equals(self.st.getifnos(), s.getifnos())
189            assert_equals(self.st.getbeamnos(), s.getbeamnos())
190            for spec in ds:
191                assert_almost_equals(sum(spec)/len(spec), 0.0, 5)
Note: See TracBrowser for help on using the repository browser.