1 | import sys
|
---|
2 | import os
|
---|
3 | import shutil
|
---|
4 | import datetime
|
---|
5 | from nose.tools import *
|
---|
6 | import asap
|
---|
7 | from asap import scantable, selector, mask_not
|
---|
8 | from asap.logging import asaplog
|
---|
9 | # no need for log messages
|
---|
10 | asaplog.disable()
|
---|
11 |
|
---|
12 | def tempdir_setup():
|
---|
13 | os.makedirs("test_temp")
|
---|
14 |
|
---|
15 | def tempdir_teardown():
|
---|
16 | shutil.rmtree("test_temp", True)
|
---|
17 |
|
---|
18 | class TestScantable(object):
|
---|
19 | def setup(self):
|
---|
20 | pth = os.path.dirname(__file__)
|
---|
21 | s = scantable(os.path.join(pth, "data", "MOPS.rpf"), average=True)
|
---|
22 | sel = selector()
|
---|
23 | # make sure this order is always correct - in can be random
|
---|
24 | sel.set_order(["SCANNO", "POLNO"])
|
---|
25 | s.set_selection(sel)
|
---|
26 | self.st = s.copy()
|
---|
27 | restfreqs = [86.243] # 13CO-1/0, SiO the two IF
|
---|
28 | self.st.set_restfreqs(restfreqs,"GHz")
|
---|
29 |
|
---|
30 | def test_init(self):
|
---|
31 | fname = os.path.join(os.path.dirname(__file__), "data", "MOPS.rpf")
|
---|
32 | st = scantable(fname, average=False)
|
---|
33 | assert_equal(st.ncycle(), 32)
|
---|
34 | st = scantable(fname, average=True)
|
---|
35 | assert_equal(st.ncycle(), 2)
|
---|
36 | st = scantable(fname, unit="Jy")
|
---|
37 | assert_equal(st.get_fluxunit(), "Jy")
|
---|
38 | st = scantable(fname, unit="K")
|
---|
39 | assert_equal(st.get_fluxunit(), "K")
|
---|
40 | assert_raises(RuntimeError, scantable, fname, unit="junk")
|
---|
41 | st = scantable([fname,fname], average=False)
|
---|
42 | assert_equal(st.nscan(), 4)
|
---|
43 |
|
---|
44 | def test_copy(self):
|
---|
45 | st = self.st.copy()
|
---|
46 | assert_not_equal(id(st), id(self.st))
|
---|
47 |
|
---|
48 | def test_drop_scan(self):
|
---|
49 | st = self.st.drop_scan([1])
|
---|
50 | assert_equal(st.nscan(), 1)
|
---|
51 |
|
---|
52 | def test_get_scan(self):
|
---|
53 | st = self.st.get_scan([1])
|
---|
54 | assert_equal(st.nscan(), 1)
|
---|
55 | st = self.st.get_scan("Orion_SiO_R")
|
---|
56 | assert_equal(st.get_sourcename()[-1], "Orion_SiO_R")
|
---|
57 | assert_equal(st.nscan(), 1)
|
---|
58 |
|
---|
59 | def test_get_spectrum(self):
|
---|
60 | spec = self.st.get_spectrum(0)
|
---|
61 | assert_almost_equal(max(spec), 215.279830933)
|
---|
62 |
|
---|
63 | def test_get_mask(self):
|
---|
64 | spec = self.st.get_mask(0)
|
---|
65 | assert_equal(len(spec), 4096)
|
---|
66 |
|
---|
67 | def test_set_spectrum(self):
|
---|
68 | spec = [ 1.0 for i in range(self.st.nchan()) ]
|
---|
69 | self.st.set_spectrum(spec, 0)
|
---|
70 | spec1 = self.st.get_spectrum(0)
|
---|
71 | assert_almost_equal(max(spec1), 1.0)
|
---|
72 |
|
---|
73 | def test_selection(self):
|
---|
74 | sel = selector()
|
---|
75 | sel.set_polarisations("YY")
|
---|
76 | self.st.set_selection(sel)
|
---|
77 | assert_equal(self.st.getpolnos(), [1])
|
---|
78 | sel1 = self.st.get_selection()
|
---|
79 | assert_equal(sel1.get_pols(), [1])
|
---|
80 | self.st.set_selection(pols="XX")
|
---|
81 | assert_equal(self.st.getpolnos(), [0])
|
---|
82 |
|
---|
83 |
|
---|
84 | def stats(self, key, value, mask=False):
|
---|
85 | msk = None
|
---|
86 | if mask:
|
---|
87 | msk = self.st.create_mask([0,100], [3900,4096])
|
---|
88 | sval = self.st.stats(stat=key, mask=msk)
|
---|
89 | assert_almost_equal(sval[0], value)
|
---|
90 |
|
---|
91 | def test_masked_stats(self):
|
---|
92 | stats = { 'min': 113.767166138,
|
---|
93 | 'max':128.21571350, 'sumsq':4180516.75,
|
---|
94 | 'sum':35216.87890625, 'mean':118.5753479,
|
---|
95 | 'var':15.75608253, 'stddev':3.9693932533,
|
---|
96 | 'avdev':3.395271778, 'rms':118.6415405,
|
---|
97 | 'median':117.5024261}
|
---|
98 | for k,v in stats.items():
|
---|
99 | yield self.stats, k, v, True
|
---|
100 |
|
---|
101 | def test_stats(self):
|
---|
102 | stats = { 'min': 113.767166138,
|
---|
103 | 'max':215.279830933, 'sumsq':128759200.0,
|
---|
104 | 'sum':720262.375, 'mean':175.845306396,
|
---|
105 | 'var':513.95324707, 'stddev':22.6705360413,
|
---|
106 | 'avdev':16.3966751099, 'rms':177.300170898,
|
---|
107 | 'median':182.891845703}
|
---|
108 | for k,v in stats.items():
|
---|
109 | yield self.stats, k, v
|
---|
110 |
|
---|
111 | def test_get_column_names(self):
|
---|
112 | cnames = ['SCANNO', 'CYCLENO', 'BEAMNO', 'IFNO',
|
---|
113 | 'POLNO', 'FREQ_ID', 'MOLECULE_ID', 'REFBEAMNO', 'FLAGROW',
|
---|
114 | 'TIME', 'INTERVAL', 'SRCNAME', 'SRCTYPE',
|
---|
115 | 'FIELDNAME', 'SPECTRA', 'FLAGTRA', 'TSYS',
|
---|
116 | 'DIRECTION', 'AZIMUTH', 'ELEVATION',
|
---|
117 | 'OPACITY', 'TCAL_ID', 'FIT_ID',
|
---|
118 | 'FOCUS_ID', 'WEATHER_ID', 'SRCVELOCITY',
|
---|
119 | 'SRCPROPERMOTION', 'SRCDIRECTION',
|
---|
120 | 'SCANRATE']
|
---|
121 | assert_equal(self.st.get_column_names(), cnames)
|
---|
122 |
|
---|
123 | def test_get_tsys(self):
|
---|
124 | assert_almost_equal(self.st.get_tsys()[0], 175.830429077)
|
---|
125 |
|
---|
126 | def test_set_tsys(self):
|
---|
127 | s = self.st.copy()
|
---|
128 | newval = 100.0
|
---|
129 | s.set_tsys(newval, 0)
|
---|
130 | assert_almost_equal(s.get_tsys()[0], newval)
|
---|
131 | s2 = self.st.copy()
|
---|
132 | s2.set_tsys(newval)
|
---|
133 | out = s2.get_tsys()
|
---|
134 | for i in xrange(len(out)):
|
---|
135 | assert_almost_equal(out[i], newval)
|
---|
136 |
|
---|
137 | def test_get_time(self):
|
---|
138 | assert_equal(self.st.get_time(0), '2008/03/12/09:32:50')
|
---|
139 | dt = datetime.datetime(2008,3,12,9,32,50)
|
---|
140 | assert_equal(self.st.get_time(0, True), dt)
|
---|
141 |
|
---|
142 | def test_get_inttime(self):
|
---|
143 | assert_almost_equal(self.st.get_inttime()[0], 30.720016479)
|
---|
144 |
|
---|
145 | def test_get_sourcename(self):
|
---|
146 | assert_equal(self.st.get_sourcename(0), 'Orion_SiO_R')
|
---|
147 | assert_equal(self.st.get_sourcename(),
|
---|
148 | ['Orion_SiO_R', 'Orion_SiO_R',
|
---|
149 | 'Orion_SiO', 'Orion_SiO'])
|
---|
150 |
|
---|
151 | def test_set_sourcename(self):
|
---|
152 | s = self.st.copy()
|
---|
153 | newname = "TEST"
|
---|
154 | s.set_sourcename(newname)
|
---|
155 | assert_equal(s.get_sourcename(), [newname]*4)
|
---|
156 |
|
---|
157 | def test_get_azimuth(self):
|
---|
158 | assert_almost_equal(self.st.get_azimuth()[0], 5.628767013)
|
---|
159 |
|
---|
160 | def test_get_elevation(self):
|
---|
161 | assert_almost_equal(self.st.get_elevation()[0], 1.01711678504)
|
---|
162 |
|
---|
163 | def test_get_parangle(self):
|
---|
164 | assert_almost_equal(self.st.get_parangle()[0], 2.5921990871)
|
---|
165 |
|
---|
166 | def test_get_direction(self):
|
---|
167 | assert_equal(self.st.get_direction()[0], 'J2000 05:35:14.5 -04.52.29.5')
|
---|
168 |
|
---|
169 | def test_get_directionval(self):
|
---|
170 | dv = self.st.get_directionval()[0]
|
---|
171 | assert_almost_equal(dv[0], 1.4627692699)
|
---|
172 | assert_almost_equal(dv[1], -0.0850824415)
|
---|
173 |
|
---|
174 | def test_unit(self):
|
---|
175 | self.st.set_unit('')
|
---|
176 | self.st.set_unit('GHz')
|
---|
177 | self.st.set_unit('km/s')
|
---|
178 | assert_raises(RuntimeError, self.st.set_unit, 'junk')
|
---|
179 | assert_equals(self.st.get_unit(), 'km/s')
|
---|
180 |
|
---|
181 | def test_average_pol(self):
|
---|
182 | ap = self.st.average_pol()
|
---|
183 | assert_equal(ap.npol(), 1)
|
---|
184 |
|
---|
185 | def test_drop_scan(self):
|
---|
186 | s0 = self.st.drop_scan(1)
|
---|
187 | assert_equal(s0.getscannos(), [0])
|
---|
188 | s1 = self.st.drop_scan([0])
|
---|
189 | assert_equal(s1.getscannos(), [1])
|
---|
190 |
|
---|
191 | def test_flag(self):
|
---|
192 | q = self.st.auto_quotient()
|
---|
193 | q.set_unit('km/s')
|
---|
194 | q0 = q.copy()
|
---|
195 | q1 = q.copy()
|
---|
196 | msk = q0.create_mask([-10,20])
|
---|
197 | q0.flag(mask=mask_not(msk))
|
---|
198 | q1.flag(mask=msk)
|
---|
199 | assert_almost_equal(q0.stats(stat='max')[0], 95.62171936)
|
---|
200 | assert_almost_equal(q1.stats(stat='max')[0], 2.66563416)
|
---|
201 |
|
---|
202 |
|
---|
203 | def test_average_time_weight(self):
|
---|
204 | weights = {'none': 236.61423,
|
---|
205 | 'var' : 232.98752,
|
---|
206 | 'tsys' : 236.37482,
|
---|
207 | 'tint' : 236.61423,
|
---|
208 | 'tintsys' : 236.37485,
|
---|
209 | 'median' : 236.61423,
|
---|
210 | }
|
---|
211 |
|
---|
212 | for k,v in weights.items():
|
---|
213 | yield self.av_weight, k, v
|
---|
214 |
|
---|
215 | def av_weight(self, weight, result):
|
---|
216 | out = self.st.average_time(weight=weight)
|
---|
217 | assert_almost_equals(max(out.get_spectrum(0)), result, 5)
|
---|
218 |
|
---|
219 | @with_setup(tempdir_setup, tempdir_teardown)
|
---|
220 | def test_save(self):
|
---|
221 | fname = os.path.join("test_temp", 'scantable_test.%s')
|
---|
222 | formats = [(fname % 'sdfits', 'SDFITS', True),
|
---|
223 | (fname % 'ms', 'MS2', True),
|
---|
224 | (fname % 'class.fits', 'CLASS', False),
|
---|
225 | (fname % 'fits', 'FITS', False),
|
---|
226 | (fname % 'txt', 'ASCII', False),
|
---|
227 | ]
|
---|
228 | for fmt in formats:
|
---|
229 | yield self.save, fmt
|
---|
230 |
|
---|
231 | def save(self, args):
|
---|
232 | fname = args[0]
|
---|
233 | self.st.save(fname, args[1], True)
|
---|
234 | # do some verification args[2] == True
|
---|
235 | if args[-1]:
|
---|
236 | s = scantable(fname)
|
---|
237 | ds = self.st - s
|
---|
238 | assert_equals(self.st.getpolnos(), s.getpolnos())
|
---|
239 | assert_equals(self.st.getscannos(), s.getscannos())
|
---|
240 | assert_equals(self.st.getifnos(), s.getifnos())
|
---|
241 | assert_equals(self.st.getbeamnos(), s.getbeamnos())
|
---|
242 | # see if the residual spectra are ~ 0.0
|
---|
243 | for spec in ds:
|
---|
244 | assert_almost_equals(sum(spec)/len(spec), 0.0, 5)
|
---|
245 |
|
---|
246 | def test_auto_poly_baseline(self):
|
---|
247 | q = self.st.auto_quotient()
|
---|
248 | b = q.auto_poly_baseline(insitu=False)
|
---|
249 | res_rms = (q-b).stats('rms')
|
---|
250 | assert_almost_equals(res_rms[0], 0.38370, 5)
|
---|
251 | assert_almost_equals(res_rms[1], 0.38780, 5)
|
---|
252 |
|
---|
253 |
|
---|
254 | def test_poly_baseline(self):
|
---|
255 | q = self.st.auto_quotient()
|
---|
256 | msk = q.create_mask([0.0, 1471.0], [1745.0, 4095.0])
|
---|
257 | b = q.poly_baseline(order=0, mask=msk,insitu=False)
|
---|
258 | res_rms = (q-b).stats('rms')
|
---|
259 | assert_almost_equals(res_rms[0], 0.38346, 5)
|
---|
260 | assert_almost_equals(res_rms[1], 0.38780, 5)
|
---|
261 |
|
---|
262 |
|
---|
263 | def test_reshape(self):
|
---|
264 | cp = self.st.copy()
|
---|
265 | n = cp.nchan()
|
---|
266 | rs = cp.reshape(10,-10,False)
|
---|
267 | assert_equals(n-20, rs.nchan())
|
---|
268 | assert_equals(cp.nchan(), n)
|
---|
269 | rs = cp.reshape(10,n-11,False)
|
---|
270 | assert_equals(n-20, rs.nchan())
|
---|
271 | cp.reshape(10,-10,True)
|
---|
272 | assert_equals(n-20, cp.nchan())
|
---|