source: trunk/test/test_scantable.py @ 2791

Last change on this file since 2791 was 2791, checked in by Malte Marquarding, 11 years ago

Ticket #289: added anility to set Tsys, both for scalar and vector values

File size: 8.6 KB
Line 
1import sys
2import os
3import shutil
4import datetime
5from nose.tools import *
6from asap import scantable, selector, mask_not
7from asap.logging import asaplog
8# no need for log messages
9asaplog.disable()
10
11def tempdir_setup():
12    os.makedirs("test_temp")
13
14def tempdir_teardown():
15    shutil.rmtree("test_temp", True)
16
17class TestScantable(object):
18    def setup(self):
19        s = scantable("data/MOPS.rpf", average=True)
20        sel = selector()
21        # make sure this order is always correct - in can be random
22        sel.set_order(["SCANNO", "POLNO"])
23        s.set_selection(sel)
24        self.st = s.copy()
25        restfreqs = [86.243]     # 13CO-1/0, SiO the two IF
26        self.st.set_restfreqs(restfreqs,"GHz")
27
28    def test_init(self):
29        st = scantable("data/MOPS.rpf", average=False)
30        assert_equal(st.ncycle(), 32)
31        st = scantable("data/MOPS.rpf", average=True)
32        assert_equal(st.ncycle(), 2)
33        st = scantable("data/MOPS.rpf", unit="Jy")
34        assert_equal(st.get_fluxunit(), "Jy")
35        st = scantable("data/MOPS.rpf", unit="K")
36        assert_equal(st.get_fluxunit(), "K")
37        assert_raises(RuntimeError, scantable, "data/MOPS.rpf", unit="junk")
38        st = scantable(["data/MOPS.rpf","data/MOPS.rpf"], average=False)
39        assert_equal(st.nscan(), 4)
40
41    def test_copy(self):
42        st = self.st.copy()
43        assert_not_equal(id(st), id(self.st))
44
45    def test_drop_scan(self):
46        st = self.st.drop_scan([1])
47        assert_equal(st.nscan(), 1)
48
49    def test_get_scan(self):
50        st = self.st.get_scan([1])
51        assert_equal(st.nscan(), 1)
52        st = self.st.get_scan("Orion_SiO_R")
53        assert_equal(st.get_sourcename()[-1], "Orion_SiO_R")
54        assert_equal(st.nscan(), 1)
55
56    def test_get_spectrum(self):
57        spec = self.st.get_spectrum(0)
58        assert_almost_equal(max(spec), 215.279830933)
59
60    def test_get_mask(self):
61        spec = self.st.get_mask(0)
62        assert_equal(len(spec), 4096)
63
64    def test_set_spectrum(self):
65        spec = [ 1.0 for i in range(self.st.nchan()) ]
66        self.st.set_spectrum(spec, 0)
67        spec1 = self.st.get_spectrum(0)
68        assert_almost_equal(max(spec1), 1.0)
69
70    def test_selection(self):
71        sel = selector()
72        sel.set_polarisations("YY")
73        self.st.set_selection(sel)
74        assert_equal(self.st.getpolnos(), [1])
75        sel1 = self.st.get_selection()
76        assert_equal(sel1.get_pols(), [1])
77        self.st.set_selection(pols="XX")
78        assert_equal(self.st.getpolnos(), [0])
79
80
81    def stats(self, key, value, mask=False):
82        msk = None
83        if mask:
84            msk = self.st.create_mask([0,100], [3900,4096])
85        sval = self.st.stats(stat=key, mask=msk)
86        assert_almost_equal(sval[0], value)
87
88    def test_masked_stats(self):
89        stats = { 'min': 113.767166138,
90                  'max':128.21571350, 'sumsq':4180516.75,
91                  'sum':35216.87890625, 'mean':118.5753479,
92                  'var':15.75608253, 'stddev':3.9693932533,
93                  'avdev':3.395271778, 'rms':118.6415405,
94                  'median':117.5024261}
95        for k,v in stats.items():
96            yield self.stats, k, v, True
97
98    def test_stats(self):
99        stats = { 'min': 113.767166138,
100                  'max':215.279830933, 'sumsq':128759200.0,
101                  'sum':720262.375, 'mean':175.845306396,
102                  'var':513.95324707, 'stddev':22.6705360413,
103                  'avdev':16.3966751099, 'rms':177.300170898,
104                  'median':182.891845703}
105        for k,v in stats.items():
106            yield self.stats, k, v
107
108    def test_get_column_names(self):
109        cnames = ['SCANNO', 'CYCLENO', 'BEAMNO', 'IFNO',
110                  'POLNO', 'FREQ_ID', 'MOLECULE_ID', 'REFBEAMNO', 'FLAGROW',
111                  'TIME', 'INTERVAL', 'SRCNAME', 'SRCTYPE',
112                  'FIELDNAME', 'SPECTRA', 'FLAGTRA', 'TSYS',
113                  'DIRECTION', 'AZIMUTH', 'ELEVATION',
114                  'OPACITY', 'TCAL_ID', 'FIT_ID',
115                  'FOCUS_ID', 'WEATHER_ID', 'SRCVELOCITY',
116                  'SRCPROPERMOTION', 'SRCDIRECTION',
117                  'SCANRATE']
118        assert_equal(self.st.get_column_names(), cnames)
119
120    def test_get_tsys(self):
121        assert_almost_equal(self.st.get_tsys()[0], 175.830429077)
122
123    def test_set_tsys(self):
124        s = self.st.copy()
125        newval = 100.0
126        s.set_tsys(newval, 0)
127        assert_almost_equal(s.get_tsys()[0], newval)
128        s2 = self.st.copy()
129        s2.set_tsys(newval)
130        out = s2.get_tsys()
131        for i in xrange(len(out)):
132            assert_almost_equal(out[i], newval)
133
134    def test_get_time(self):
135        assert_equal(self.st.get_time(0), '2008/03/12/09:32:50')
136        dt = datetime.datetime(2008,3,12,9,32,50)
137        assert_equal(self.st.get_time(0, True), dt)
138
139    def test_get_inttime(self):
140        assert_almost_equal(self.st.get_inttime()[0], 30.720016479)
141
142    def test_get_sourcename(self):
143        assert_equal(self.st.get_sourcename(0), 'Orion_SiO_R')
144        assert_equal(self.st.get_sourcename(),
145                         ['Orion_SiO_R', 'Orion_SiO_R',
146                          'Orion_SiO', 'Orion_SiO'])
147
148    def test_get_azimuth(self):
149        assert_almost_equal(self.st.get_azimuth()[0], 5.628767013)
150
151    def test_get_elevation(self):
152        assert_almost_equal(self.st.get_elevation()[0], 1.01711678504)
153
154    def test_get_parangle(self):
155        assert_almost_equal(self.st.get_parangle()[0], 2.5921990871)
156
157    def test_get_direction(self):
158        assert_equal(self.st.get_direction()[0], 'J2000 05:35:14.5 -04.52.29.5')
159
160    def test_get_directionval(self):
161        dv = self.st.get_directionval()[0]
162        assert_almost_equal(dv[0], 1.4627692699)
163        assert_almost_equal(dv[1], -0.0850824415)
164
165    def test_unit(self):
166        self.st.set_unit('')
167        self.st.set_unit('GHz')
168        self.st.set_unit('km/s')
169        assert_raises(RuntimeError, self.st.set_unit, 'junk')
170        assert_equals(self.st.get_unit(), 'km/s')
171
172    def test_average_pol(self):
173        ap = self.st.average_pol()
174        assert_equal(ap.npol(), 1)
175
176    def test_drop_scan(self):
177        s0 = self.st.drop_scan(1)
178        assert_equal(s0.getscannos(), [0])
179        s1 = self.st.drop_scan([0])
180        assert_equal(s1.getscannos(), [1])
181
182    def test_flag(self):
183        q = self.st.auto_quotient()
184        q.set_unit('km/s')
185        q0 = q.copy()
186        q1 = q.copy()
187        msk = q0.create_mask([-10,20])
188        q0.flag(mask=mask_not(msk))
189        q1.flag(mask=msk)
190        assert_almost_equal(q0.stats(stat='max')[0], 95.62171936)
191        assert_almost_equal(q1.stats(stat='max')[0], 2.66563416)
192
193    @with_setup(tempdir_setup, tempdir_teardown)
194    def test_save(self):
195        fname = os.path.join("test_temp", 'scantable_test.%s')
196        formats = [(fname % 'sdfits', 'SDFITS', True),
197                   (fname % 'ms', 'MS2', True),
198                   (fname % 'class.fits', 'CLASS', False),
199                   (fname % 'fits', 'FITS', False),
200                   (fname % 'txt', 'ASCII', False),
201                   ]
202        for format in formats:
203            yield self.save, format
204
205    def save(self, args):
206        fname = args[0]
207        self.st.save(fname, args[1], True)
208        # do some verification args[2] == True
209        if args[-1]:
210            s = scantable(fname)
211            ds = self.st - s
212            assert_equals(self.st.getpolnos(), s.getpolnos())
213            assert_equals(self.st.getscannos(), s.getscannos())
214            assert_equals(self.st.getifnos(), s.getifnos())
215            assert_equals(self.st.getbeamnos(), s.getbeamnos())
216            # see if the residual spectra are ~ 0.0
217            for spec in ds:
218                assert_almost_equals(sum(spec)/len(spec), 0.0, 5)
219
220    def test_auto_poly_baseline(self):
221        q = self.st.auto_quotient()
222        b = q.auto_poly_baseline(insitu=False)
223        res_rms = (q-b).stats('rms')
224        assert_almost_equals(res_rms[0], 0.38370, 5)
225        assert_almost_equals(res_rms[1], 0.38780, 5)       
226       
227
228    def test_poly_baseline(self):
229        q = self.st.auto_quotient()
230        msk = q.create_mask([0.0, 1471.0], [1745.0, 4095.0])
231        b = q.poly_baseline(order=0, mask=msk,insitu=False)
232        res_rms = (q-b).stats('rms')
233        assert_almost_equals(res_rms[0], 0.38346, 5)
234        assert_almost_equals(res_rms[1], 0.38780, 5)       
235
236
237    def test_reshape(self):
238        cp = self.st.copy()
239        n = cp.nchan()
240        rs = cp.reshape(10,-10,False)
241        assert_equals(n-20, rs.nchan())
242        assert_equals(cp.nchan(), n)
243        rs = cp.reshape(10,n-11,False)
244        assert_equals(n-20, rs.nchan())
245        cp.reshape(10,-10,True)
246        assert_equals(n-20, cp.nchan())
Note: See TracBrowser for help on using the repository browser.