source: trunk/test/test_scantable.py @ 2807

Last change on this file since 2807 was 2802, checked in by Malte Marquarding, 11 years ago

reverted previous jenkins test changeset

File size: 8.6 KB
RevLine 
[1840]1import sys
2import os
3import shutil
[1600]4import datetime
[1903]5from nose.tools import *
[2797]6import asap
[1840]7from asap import scantable, selector, mask_not
8from asap.logging import asaplog
[1903]9# no need for log messages
[1824]10asaplog.disable()
[1544]11
[1840]12def tempdir_setup():
13    os.makedirs("test_temp")
14
15def tempdir_teardown():
16    shutil.rmtree("test_temp", True)
17
18class TestScantable(object):
19    def setup(self):
[2797]20        pth = os.path.dirname(__file__)
21        s = scantable(os.path.join(pth, "data", "MOPS.rpf"), average=True)
[1824]22        sel = selector()
23        # make sure this order is always correct - in can be random
24        sel.set_order(["SCANNO", "POLNO"])
25        s.set_selection(sel)
26        self.st = s.copy()
[1725]27        restfreqs = [86.243]     # 13CO-1/0, SiO the two IF
28        self.st.set_restfreqs(restfreqs,"GHz")
[1544]29
30    def test_init(self):
[2797]31        fname = os.path.join(os.path.dirname(__file__), "data", "MOPS.rpf")
32        st = scantable(fname, average=False)
[1840]33        assert_equal(st.ncycle(), 32)
[2797]34        st = scantable(fname, average=True)
[1840]35        assert_equal(st.ncycle(), 2)
[2797]36        st = scantable(fname, unit="Jy")
[1840]37        assert_equal(st.get_fluxunit(), "Jy")
[2797]38        st = scantable(fname, unit="K")
[1840]39        assert_equal(st.get_fluxunit(), "K")
[2797]40        assert_raises(RuntimeError, scantable, fname, unit="junk")
41        st = scantable([fname,fname], average=False)
[1840]42        assert_equal(st.nscan(), 4)
[1544]43
44    def test_copy(self):
45        st = self.st.copy()
[1840]46        assert_not_equal(id(st), id(self.st))
[1592]47
[1544]48    def test_drop_scan(self):
49        st = self.st.drop_scan([1])
[1840]50        assert_equal(st.nscan(), 1)
[1544]51
52    def test_get_scan(self):
53        st = self.st.get_scan([1])
[1840]54        assert_equal(st.nscan(), 1)
[1544]55        st = self.st.get_scan("Orion_SiO_R")
[1840]56        assert_equal(st.get_sourcename()[-1], "Orion_SiO_R")
57        assert_equal(st.nscan(), 1)
[1544]58
59    def test_get_spectrum(self):
60        spec = self.st.get_spectrum(0)
[1840]61        assert_almost_equal(max(spec), 215.279830933)
[1544]62
63    def test_get_mask(self):
64        spec = self.st.get_mask(0)
[1840]65        assert_equal(len(spec), 4096)
[1592]66
[1544]67    def test_set_spectrum(self):
68        spec = [ 1.0 for i in range(self.st.nchan()) ]
69        self.st.set_spectrum(spec, 0)
70        spec1 = self.st.get_spectrum(0)
[1840]71        assert_almost_equal(max(spec1), 1.0)
[1544]72
73    def test_selection(self):
74        sel = selector()
75        sel.set_polarisations("YY")
76        self.st.set_selection(sel)
[1903]77        assert_equal(self.st.getpolnos(), [1])
[1544]78        sel1 = self.st.get_selection()
[1840]79        assert_equal(sel1.get_pols(), [1])
[1600]80        self.st.set_selection(pols="XX")
[1903]81        assert_equal(self.st.getpolnos(), [0])
[1544]82
[1842]83
84    def stats(self, key, value, mask=False):
85        msk = None
86        if mask:
87            msk = self.st.create_mask([0,100], [3900,4096])
88        sval = self.st.stats(stat=key, mask=msk)
89        assert_almost_equal(sval[0], value)
90
91    def test_masked_stats(self):
92        stats = { 'min': 113.767166138,
93                  'max':128.21571350, 'sumsq':4180516.75,
94                  'sum':35216.87890625, 'mean':118.5753479,
95                  'var':15.75608253, 'stddev':3.9693932533,
96                  'avdev':3.395271778, 'rms':118.6415405,
97                  'median':117.5024261}
98        for k,v in stats.items():
99            yield self.stats, k, v, True
100
[1544]101    def test_stats(self):
102        stats = { 'min': 113.767166138,
103                  'max':215.279830933, 'sumsq':128759200.0,
104                  'sum':720262.375, 'mean':175.845306396,
105                  'var':513.95324707, 'stddev':22.6705360413,
[1592]106                  'avdev':16.3966751099, 'rms':177.300170898,
107                  'median':182.891845703}
[1842]108        for k,v in stats.items():
109            yield self.stats, k, v
[1544]110
111    def test_get_column_names(self):
[1592]112        cnames = ['SCANNO', 'CYCLENO', 'BEAMNO', 'IFNO',
[1824]113                  'POLNO', 'FREQ_ID', 'MOLECULE_ID', 'REFBEAMNO', 'FLAGROW',
[1592]114                  'TIME', 'INTERVAL', 'SRCNAME', 'SRCTYPE',
[1544]115                  'FIELDNAME', 'SPECTRA', 'FLAGTRA', 'TSYS',
[1592]116                  'DIRECTION', 'AZIMUTH', 'ELEVATION',
117                  'OPACITY', 'TCAL_ID', 'FIT_ID',
[1544]118                  'FOCUS_ID', 'WEATHER_ID', 'SRCVELOCITY',
119                  'SRCPROPERMOTION', 'SRCDIRECTION',
120                  'SCANRATE']
[1840]121        assert_equal(self.st.get_column_names(), cnames)
[1544]122
[1600]123    def test_get_tsys(self):
[1840]124        assert_almost_equal(self.st.get_tsys()[0], 175.830429077)
[1600]125
[2791]126    def test_set_tsys(self):
127        s = self.st.copy()
128        newval = 100.0
129        s.set_tsys(newval, 0)
130        assert_almost_equal(s.get_tsys()[0], newval)
131        s2 = self.st.copy()
132        s2.set_tsys(newval)
133        out = s2.get_tsys()
134        for i in xrange(len(out)):
135            assert_almost_equal(out[i], newval)
136
[1600]137    def test_get_time(self):
[1840]138        assert_equal(self.st.get_time(0), '2008/03/12/09:32:50')
[1600]139        dt = datetime.datetime(2008,3,12,9,32,50)
[1840]140        assert_equal(self.st.get_time(0, True), dt)
[1600]141
142    def test_get_inttime(self):
[1840]143        assert_almost_equal(self.st.get_inttime()[0], 30.720016479)
[1600]144
145    def test_get_sourcename(self):
[1840]146        assert_equal(self.st.get_sourcename(0), 'Orion_SiO_R')
147        assert_equal(self.st.get_sourcename(),
[1824]148                         ['Orion_SiO_R', 'Orion_SiO_R',
149                          'Orion_SiO', 'Orion_SiO'])
[1600]150
151    def test_get_azimuth(self):
[1840]152        assert_almost_equal(self.st.get_azimuth()[0], 5.628767013)
[1600]153
154    def test_get_elevation(self):
[1840]155        assert_almost_equal(self.st.get_elevation()[0], 1.01711678504)
[1600]156
157    def test_get_parangle(self):
[1840]158        assert_almost_equal(self.st.get_parangle()[0], 2.5921990871)
[1600]159
160    def test_get_direction(self):
[2634]161        assert_equal(self.st.get_direction()[0], 'J2000 05:35:14.5 -04.52.29.5')
[1600]162
163    def test_get_directionval(self):
164        dv = self.st.get_directionval()[0]
[1840]165        assert_almost_equal(dv[0], 1.4627692699)
166        assert_almost_equal(dv[1], -0.0850824415)
[1600]167
168    def test_unit(self):
169        self.st.set_unit('')
170        self.st.set_unit('GHz')
171        self.st.set_unit('km/s')
[1840]172        assert_raises(RuntimeError, self.st.set_unit, 'junk')
173        assert_equals(self.st.get_unit(), 'km/s')
[1600]174
[1592]175    def test_average_pol(self):
176        ap = self.st.average_pol()
[1840]177        assert_equal(ap.npol(), 1)
[1544]178
[1600]179    def test_drop_scan(self):
180        s0 = self.st.drop_scan(1)
[1903]181        assert_equal(s0.getscannos(), [0])
[1600]182        s1 = self.st.drop_scan([0])
[1903]183        assert_equal(s1.getscannos(), [1])
[1592]184
[1725]185    def test_flag(self):
186        q = self.st.auto_quotient()
187        q.set_unit('km/s')
188        q0 = q.copy()
189        q1 = q.copy()
190        msk = q0.create_mask([-10,20])
191        q0.flag(mask=mask_not(msk))
[1824]192        q1.flag(mask=msk)
[1840]193        assert_almost_equal(q0.stats(stat='max')[0], 95.62171936)
194        assert_almost_equal(q1.stats(stat='max')[0], 2.66563416)
[1600]195
[1840]196    @with_setup(tempdir_setup, tempdir_teardown)
197    def test_save(self):
198        fname = os.path.join("test_temp", 'scantable_test.%s')
199        formats = [(fname % 'sdfits', 'SDFITS', True),
200                   (fname % 'ms', 'MS2', True),
201                   (fname % 'class.fits', 'CLASS', False),
202                   (fname % 'fits', 'FITS', False),
203                   (fname % 'txt', 'ASCII', False),
204                   ]
205        for format in formats:
206            yield self.save, format
207
208    def save(self, args):
209        fname = args[0]
210        self.st.save(fname, args[1], True)
[1844]211        # do some verification args[2] == True
[1840]212        if args[-1]:
213            s = scantable(fname)
214            ds = self.st - s
215            assert_equals(self.st.getpolnos(), s.getpolnos())
216            assert_equals(self.st.getscannos(), s.getscannos())
217            assert_equals(self.st.getifnos(), s.getifnos())
218            assert_equals(self.st.getbeamnos(), s.getbeamnos())
[1844]219            # see if the residual spectra are ~ 0.0
[1840]220            for spec in ds:
221                assert_almost_equals(sum(spec)/len(spec), 0.0, 5)
[1925]222
223    def test_auto_poly_baseline(self):
224        q = self.st.auto_quotient()
225        b = q.auto_poly_baseline(insitu=False)
226        res_rms = (q-b).stats('rms')
227        assert_almost_equals(res_rms[0], 0.38370, 5)
228        assert_almost_equals(res_rms[1], 0.38780, 5)       
229       
230
231    def test_poly_baseline(self):
232        q = self.st.auto_quotient()
233        msk = q.create_mask([0.0, 1471.0], [1745.0, 4095.0])
234        b = q.poly_baseline(order=0, mask=msk,insitu=False)
235        res_rms = (q-b).stats('rms')
236        assert_almost_equals(res_rms[0], 0.38346, 5)
[2787]237        assert_almost_equals(res_rms[1], 0.38780, 5)       
[2672]238
239
240    def test_reshape(self):
241        cp = self.st.copy()
242        n = cp.nchan()
243        rs = cp.reshape(10,-10,False)
244        assert_equals(n-20, rs.nchan())
245        assert_equals(cp.nchan(), n)
246        rs = cp.reshape(10,n-11,False)
247        assert_equals(n-20, rs.nchan())
248        cp.reshape(10,-10,True)
249        assert_equals(n-20, cp.nchan())
Note: See TracBrowser for help on using the repository browser.