1 | import sys |
---|
2 | import os |
---|
3 | import shutil |
---|
4 | import datetime |
---|
5 | from nose.tools import * |
---|
6 | from asap import scantable, selector, mask_not |
---|
7 | from asap.logging import asaplog |
---|
8 | # no need for log messages |
---|
9 | asaplog.disable() |
---|
10 | |
---|
11 | def tempdir_setup(): |
---|
12 | os.makedirs("test_temp") |
---|
13 | |
---|
14 | def tempdir_teardown(): |
---|
15 | shutil.rmtree("test_temp", True) |
---|
16 | |
---|
17 | class TestScantable(object): |
---|
18 | def setup(self): |
---|
19 | s = scantable("data/MOPS.rpf", average=True) |
---|
20 | sel = selector() |
---|
21 | # make sure this order is always correct - in can be random |
---|
22 | sel.set_order(["SCANNO", "POLNO"]) |
---|
23 | s.set_selection(sel) |
---|
24 | self.st = s.copy() |
---|
25 | restfreqs = [86.243] # 13CO-1/0, SiO the two IF |
---|
26 | self.st.set_restfreqs(restfreqs,"GHz") |
---|
27 | |
---|
28 | def test_init(self): |
---|
29 | st = scantable("data/MOPS.rpf", average=False) |
---|
30 | assert_equal(st.ncycle(), 32) |
---|
31 | st = scantable("data/MOPS.rpf", average=True) |
---|
32 | assert_equal(st.ncycle(), 2) |
---|
33 | st = scantable("data/MOPS.rpf", unit="Jy") |
---|
34 | assert_equal(st.get_fluxunit(), "Jy") |
---|
35 | st = scantable("data/MOPS.rpf", unit="K") |
---|
36 | assert_equal(st.get_fluxunit(), "K") |
---|
37 | assert_raises(RuntimeError, scantable, "data/MOPS.rpf", unit="junk") |
---|
38 | st = scantable(["data/MOPS.rpf","data/MOPS.rpf"], average=False) |
---|
39 | assert_equal(st.nscan(), 4) |
---|
40 | |
---|
41 | def test_copy(self): |
---|
42 | st = self.st.copy() |
---|
43 | assert_not_equal(id(st), id(self.st)) |
---|
44 | |
---|
45 | def test_drop_scan(self): |
---|
46 | st = self.st.drop_scan([1]) |
---|
47 | assert_equal(st.nscan(), 1) |
---|
48 | |
---|
49 | def test_get_scan(self): |
---|
50 | st = self.st.get_scan([1]) |
---|
51 | assert_equal(st.nscan(), 1) |
---|
52 | st = self.st.get_scan("Orion_SiO_R") |
---|
53 | assert_equal(st.get_sourcename()[-1], "Orion_SiO_R") |
---|
54 | assert_equal(st.nscan(), 1) |
---|
55 | |
---|
56 | def test_get_spectrum(self): |
---|
57 | spec = self.st.get_spectrum(0) |
---|
58 | assert_almost_equal(max(spec), 215.279830933) |
---|
59 | |
---|
60 | def test_get_mask(self): |
---|
61 | spec = self.st.get_mask(0) |
---|
62 | assert_equal(len(spec), 4096) |
---|
63 | |
---|
64 | def test_set_spectrum(self): |
---|
65 | spec = [ 1.0 for i in range(self.st.nchan()) ] |
---|
66 | self.st.set_spectrum(spec, 0) |
---|
67 | spec1 = self.st.get_spectrum(0) |
---|
68 | assert_almost_equal(max(spec1), 1.0) |
---|
69 | |
---|
70 | def test_selection(self): |
---|
71 | sel = selector() |
---|
72 | sel.set_polarisations("YY") |
---|
73 | self.st.set_selection(sel) |
---|
74 | assert_equal(self.st.getpolnos(), [1]) |
---|
75 | sel1 = self.st.get_selection() |
---|
76 | assert_equal(sel1.get_pols(), [1]) |
---|
77 | self.st.set_selection(pols="XX") |
---|
78 | assert_equal(self.st.getpolnos(), [0]) |
---|
79 | |
---|
80 | |
---|
81 | def stats(self, key, value, mask=False): |
---|
82 | msk = None |
---|
83 | if mask: |
---|
84 | msk = self.st.create_mask([0,100], [3900,4096]) |
---|
85 | sval = self.st.stats(stat=key, mask=msk) |
---|
86 | assert_almost_equal(sval[0], value) |
---|
87 | |
---|
88 | def test_masked_stats(self): |
---|
89 | stats = { 'min': 113.767166138, |
---|
90 | 'max':128.21571350, 'sumsq':4180516.75, |
---|
91 | 'sum':35216.87890625, 'mean':118.5753479, |
---|
92 | 'var':15.75608253, 'stddev':3.9693932533, |
---|
93 | 'avdev':3.395271778, 'rms':118.6415405, |
---|
94 | 'median':117.5024261} |
---|
95 | for k,v in stats.items(): |
---|
96 | yield self.stats, k, v, True |
---|
97 | |
---|
98 | def test_stats(self): |
---|
99 | stats = { 'min': 113.767166138, |
---|
100 | 'max':215.279830933, 'sumsq':128759200.0, |
---|
101 | 'sum':720262.375, 'mean':175.845306396, |
---|
102 | 'var':513.95324707, 'stddev':22.6705360413, |
---|
103 | 'avdev':16.3966751099, 'rms':177.300170898, |
---|
104 | 'median':182.891845703} |
---|
105 | for k,v in stats.items(): |
---|
106 | yield self.stats, k, v |
---|
107 | |
---|
108 | def test_get_column_names(self): |
---|
109 | cnames = ['SCANNO', 'CYCLENO', 'BEAMNO', 'IFNO', |
---|
110 | 'POLNO', 'FREQ_ID', 'MOLECULE_ID', 'REFBEAMNO', 'FLAGROW', |
---|
111 | 'TIME', 'INTERVAL', 'SRCNAME', 'SRCTYPE', |
---|
112 | 'FIELDNAME', 'SPECTRA', 'FLAGTRA', 'TSYS', |
---|
113 | 'DIRECTION', 'AZIMUTH', 'ELEVATION', |
---|
114 | 'OPACITY', 'TCAL_ID', 'FIT_ID', |
---|
115 | 'FOCUS_ID', 'WEATHER_ID', 'SRCVELOCITY', |
---|
116 | 'SRCPROPERMOTION', 'SRCDIRECTION', |
---|
117 | 'SCANRATE'] |
---|
118 | assert_equal(self.st.get_column_names(), cnames) |
---|
119 | |
---|
120 | def test_get_tsys(self): |
---|
121 | assert_almost_equal(self.st.get_tsys()[0], 175.830429077) |
---|
122 | |
---|
123 | def test_get_time(self): |
---|
124 | assert_equal(self.st.get_time(0), '2008/03/12/09:32:50') |
---|
125 | dt = datetime.datetime(2008,3,12,9,32,50) |
---|
126 | assert_equal(self.st.get_time(0, True), dt) |
---|
127 | |
---|
128 | def test_get_inttime(self): |
---|
129 | assert_almost_equal(self.st.get_inttime()[0], 30.720016479) |
---|
130 | |
---|
131 | def test_get_sourcename(self): |
---|
132 | assert_equal(self.st.get_sourcename(0), 'Orion_SiO_R') |
---|
133 | assert_equal(self.st.get_sourcename(), |
---|
134 | ['Orion_SiO_R', 'Orion_SiO_R', |
---|
135 | 'Orion_SiO', 'Orion_SiO']) |
---|
136 | |
---|
137 | def test_get_azimuth(self): |
---|
138 | assert_almost_equal(self.st.get_azimuth()[0], 5.628767013) |
---|
139 | |
---|
140 | def test_get_elevation(self): |
---|
141 | assert_almost_equal(self.st.get_elevation()[0], 1.01711678504) |
---|
142 | |
---|
143 | def test_get_parangle(self): |
---|
144 | assert_almost_equal(self.st.get_parangle()[0], 2.5921990871) |
---|
145 | |
---|
146 | def test_get_direction(self): |
---|
147 | assert_equal(self.st.get_direction()[0], '05:35:14.5 -04.52.29.5') |
---|
148 | |
---|
149 | def test_get_directionval(self): |
---|
150 | dv = self.st.get_directionval()[0] |
---|
151 | assert_almost_equal(dv[0], 1.4627692699) |
---|
152 | assert_almost_equal(dv[1], -0.0850824415) |
---|
153 | |
---|
154 | def test_unit(self): |
---|
155 | self.st.set_unit('') |
---|
156 | self.st.set_unit('GHz') |
---|
157 | self.st.set_unit('km/s') |
---|
158 | assert_raises(RuntimeError, self.st.set_unit, 'junk') |
---|
159 | assert_equals(self.st.get_unit(), 'km/s') |
---|
160 | |
---|
161 | def test_average_pol(self): |
---|
162 | ap = self.st.average_pol() |
---|
163 | assert_equal(ap.npol(), 1) |
---|
164 | |
---|
165 | def test_drop_scan(self): |
---|
166 | s0 = self.st.drop_scan(1) |
---|
167 | assert_equal(s0.getscannos(), [0]) |
---|
168 | s1 = self.st.drop_scan([0]) |
---|
169 | assert_equal(s1.getscannos(), [1]) |
---|
170 | |
---|
171 | def test_flag(self): |
---|
172 | q = self.st.auto_quotient() |
---|
173 | q.set_unit('km/s') |
---|
174 | q0 = q.copy() |
---|
175 | q1 = q.copy() |
---|
176 | msk = q0.create_mask([-10,20]) |
---|
177 | q0.flag(mask=mask_not(msk)) |
---|
178 | q1.flag(mask=msk) |
---|
179 | assert_almost_equal(q0.stats(stat='max')[0], 95.62171936) |
---|
180 | assert_almost_equal(q1.stats(stat='max')[0], 2.66563416) |
---|
181 | |
---|
182 | @with_setup(tempdir_setup, tempdir_teardown) |
---|
183 | def test_save(self): |
---|
184 | fname = os.path.join("test_temp", 'scantable_test.%s') |
---|
185 | formats = [(fname % 'sdfits', 'SDFITS', True), |
---|
186 | (fname % 'ms', 'MS2', True), |
---|
187 | (fname % 'class.fits', 'CLASS', False), |
---|
188 | (fname % 'fits', 'FITS', False), |
---|
189 | (fname % 'txt', 'ASCII', False), |
---|
190 | ] |
---|
191 | for format in formats: |
---|
192 | yield self.save, format |
---|
193 | |
---|
194 | def save(self, args): |
---|
195 | fname = args[0] |
---|
196 | self.st.save(fname, args[1], True) |
---|
197 | # do some verification args[2] == True |
---|
198 | if args[-1]: |
---|
199 | s = scantable(fname) |
---|
200 | ds = self.st - s |
---|
201 | assert_equals(self.st.getpolnos(), s.getpolnos()) |
---|
202 | assert_equals(self.st.getscannos(), s.getscannos()) |
---|
203 | assert_equals(self.st.getifnos(), s.getifnos()) |
---|
204 | assert_equals(self.st.getbeamnos(), s.getbeamnos()) |
---|
205 | # see if the residual spectra are ~ 0.0 |
---|
206 | for spec in ds: |
---|
207 | assert_almost_equals(sum(spec)/len(spec), 0.0, 5) |
---|