source: trunk/test/test_scantable.py@ 2937

Last change on this file since 2937 was 2818, checked in by Malte Marquarding, 11 years ago

Issue #291: added scantable.set_sourcename to overwrite sourcename to allow freq_align to work. Exposed 'SOURCE' averaging to python api. Added some logging to freq_align refering to the sources it aligns to

File size: 9.3 KB
Line 
1import sys
2import os
3import shutil
4import datetime
5from nose.tools import *
6import asap
7from asap import scantable, selector, mask_not
8from asap.logging import asaplog
9# no need for log messages
10asaplog.disable()
11
12def tempdir_setup():
13 os.makedirs("test_temp")
14
15def tempdir_teardown():
16 shutil.rmtree("test_temp", True)
17
18class TestScantable(object):
19 def setup(self):
20 pth = os.path.dirname(__file__)
21 s = scantable(os.path.join(pth, "data", "MOPS.rpf"), average=True)
22 sel = selector()
23 # make sure this order is always correct - in can be random
24 sel.set_order(["SCANNO", "POLNO"])
25 s.set_selection(sel)
26 self.st = s.copy()
27 restfreqs = [86.243] # 13CO-1/0, SiO the two IF
28 self.st.set_restfreqs(restfreqs,"GHz")
29
30 def test_init(self):
31 fname = os.path.join(os.path.dirname(__file__), "data", "MOPS.rpf")
32 st = scantable(fname, average=False)
33 assert_equal(st.ncycle(), 32)
34 st = scantable(fname, average=True)
35 assert_equal(st.ncycle(), 2)
36 st = scantable(fname, unit="Jy")
37 assert_equal(st.get_fluxunit(), "Jy")
38 st = scantable(fname, unit="K")
39 assert_equal(st.get_fluxunit(), "K")
40 assert_raises(RuntimeError, scantable, fname, unit="junk")
41 st = scantable([fname,fname], average=False)
42 assert_equal(st.nscan(), 4)
43
44 def test_copy(self):
45 st = self.st.copy()
46 assert_not_equal(id(st), id(self.st))
47
48 def test_drop_scan(self):
49 st = self.st.drop_scan([1])
50 assert_equal(st.nscan(), 1)
51
52 def test_get_scan(self):
53 st = self.st.get_scan([1])
54 assert_equal(st.nscan(), 1)
55 st = self.st.get_scan("Orion_SiO_R")
56 assert_equal(st.get_sourcename()[-1], "Orion_SiO_R")
57 assert_equal(st.nscan(), 1)
58
59 def test_get_spectrum(self):
60 spec = self.st.get_spectrum(0)
61 assert_almost_equal(max(spec), 215.279830933)
62
63 def test_get_mask(self):
64 spec = self.st.get_mask(0)
65 assert_equal(len(spec), 4096)
66
67 def test_set_spectrum(self):
68 spec = [ 1.0 for i in range(self.st.nchan()) ]
69 self.st.set_spectrum(spec, 0)
70 spec1 = self.st.get_spectrum(0)
71 assert_almost_equal(max(spec1), 1.0)
72
73 def test_selection(self):
74 sel = selector()
75 sel.set_polarisations("YY")
76 self.st.set_selection(sel)
77 assert_equal(self.st.getpolnos(), [1])
78 sel1 = self.st.get_selection()
79 assert_equal(sel1.get_pols(), [1])
80 self.st.set_selection(pols="XX")
81 assert_equal(self.st.getpolnos(), [0])
82
83
84 def stats(self, key, value, mask=False):
85 msk = None
86 if mask:
87 msk = self.st.create_mask([0,100], [3900,4096])
88 sval = self.st.stats(stat=key, mask=msk)
89 assert_almost_equal(sval[0], value)
90
91 def test_masked_stats(self):
92 stats = { 'min': 113.767166138,
93 'max':128.21571350, 'sumsq':4180516.75,
94 'sum':35216.87890625, 'mean':118.5753479,
95 'var':15.75608253, 'stddev':3.9693932533,
96 'avdev':3.395271778, 'rms':118.6415405,
97 'median':117.5024261}
98 for k,v in stats.items():
99 yield self.stats, k, v, True
100
101 def test_stats(self):
102 stats = { 'min': 113.767166138,
103 'max':215.279830933, 'sumsq':128759200.0,
104 'sum':720262.375, 'mean':175.845306396,
105 'var':513.95324707, 'stddev':22.6705360413,
106 'avdev':16.3966751099, 'rms':177.300170898,
107 'median':182.891845703}
108 for k,v in stats.items():
109 yield self.stats, k, v
110
111 def test_get_column_names(self):
112 cnames = ['SCANNO', 'CYCLENO', 'BEAMNO', 'IFNO',
113 'POLNO', 'FREQ_ID', 'MOLECULE_ID', 'REFBEAMNO', 'FLAGROW',
114 'TIME', 'INTERVAL', 'SRCNAME', 'SRCTYPE',
115 'FIELDNAME', 'SPECTRA', 'FLAGTRA', 'TSYS',
116 'DIRECTION', 'AZIMUTH', 'ELEVATION',
117 'OPACITY', 'TCAL_ID', 'FIT_ID',
118 'FOCUS_ID', 'WEATHER_ID', 'SRCVELOCITY',
119 'SRCPROPERMOTION', 'SRCDIRECTION',
120 'SCANRATE']
121 assert_equal(self.st.get_column_names(), cnames)
122
123 def test_get_tsys(self):
124 assert_almost_equal(self.st.get_tsys()[0], 175.830429077)
125
126 def test_set_tsys(self):
127 s = self.st.copy()
128 newval = 100.0
129 s.set_tsys(newval, 0)
130 assert_almost_equal(s.get_tsys()[0], newval)
131 s2 = self.st.copy()
132 s2.set_tsys(newval)
133 out = s2.get_tsys()
134 for i in xrange(len(out)):
135 assert_almost_equal(out[i], newval)
136
137 def test_get_time(self):
138 assert_equal(self.st.get_time(0), '2008/03/12/09:32:50')
139 dt = datetime.datetime(2008,3,12,9,32,50)
140 assert_equal(self.st.get_time(0, True), dt)
141
142 def test_get_inttime(self):
143 assert_almost_equal(self.st.get_inttime()[0], 30.720016479)
144
145 def test_get_sourcename(self):
146 assert_equal(self.st.get_sourcename(0), 'Orion_SiO_R')
147 assert_equal(self.st.get_sourcename(),
148 ['Orion_SiO_R', 'Orion_SiO_R',
149 'Orion_SiO', 'Orion_SiO'])
150
151 def test_set_sourcename(self):
152 s = self.st.copy()
153 newname = "TEST"
154 s.set_sourcename(newname)
155 assert_equal(s.get_sourcename(), [newname]*4)
156
157 def test_get_azimuth(self):
158 assert_almost_equal(self.st.get_azimuth()[0], 5.628767013)
159
160 def test_get_elevation(self):
161 assert_almost_equal(self.st.get_elevation()[0], 1.01711678504)
162
163 def test_get_parangle(self):
164 assert_almost_equal(self.st.get_parangle()[0], 2.5921990871)
165
166 def test_get_direction(self):
167 assert_equal(self.st.get_direction()[0], 'J2000 05:35:14.5 -04.52.29.5')
168
169 def test_get_directionval(self):
170 dv = self.st.get_directionval()[0]
171 assert_almost_equal(dv[0], 1.4627692699)
172 assert_almost_equal(dv[1], -0.0850824415)
173
174 def test_unit(self):
175 self.st.set_unit('')
176 self.st.set_unit('GHz')
177 self.st.set_unit('km/s')
178 assert_raises(RuntimeError, self.st.set_unit, 'junk')
179 assert_equals(self.st.get_unit(), 'km/s')
180
181 def test_average_pol(self):
182 ap = self.st.average_pol()
183 assert_equal(ap.npol(), 1)
184
185 def test_drop_scan(self):
186 s0 = self.st.drop_scan(1)
187 assert_equal(s0.getscannos(), [0])
188 s1 = self.st.drop_scan([0])
189 assert_equal(s1.getscannos(), [1])
190
191 def test_flag(self):
192 q = self.st.auto_quotient()
193 q.set_unit('km/s')
194 q0 = q.copy()
195 q1 = q.copy()
196 msk = q0.create_mask([-10,20])
197 q0.flag(mask=mask_not(msk))
198 q1.flag(mask=msk)
199 assert_almost_equal(q0.stats(stat='max')[0], 95.62171936)
200 assert_almost_equal(q1.stats(stat='max')[0], 2.66563416)
201
202
203 def test_average_time_weight(self):
204 weights = {'none': 236.61423,
205 'var' : 232.98752,
206 'tsys' : 236.37482,
207 'tint' : 236.61423,
208 'tintsys' : 236.37485,
209 'median' : 236.61423,
210 }
211
212 for k,v in weights.items():
213 yield self.av_weight, k, v
214
215 def av_weight(self, weight, result):
216 out = self.st.average_time(weight=weight)
217 assert_almost_equals(max(out.get_spectrum(0)), result, 5)
218
219 @with_setup(tempdir_setup, tempdir_teardown)
220 def test_save(self):
221 fname = os.path.join("test_temp", 'scantable_test.%s')
222 formats = [(fname % 'sdfits', 'SDFITS', True),
223 (fname % 'ms', 'MS2', True),
224 (fname % 'class.fits', 'CLASS', False),
225 (fname % 'fits', 'FITS', False),
226 (fname % 'txt', 'ASCII', False),
227 ]
228 for fmt in formats:
229 yield self.save, fmt
230
231 def save(self, args):
232 fname = args[0]
233 self.st.save(fname, args[1], True)
234 # do some verification args[2] == True
235 if args[-1]:
236 s = scantable(fname)
237 ds = self.st - s
238 assert_equals(self.st.getpolnos(), s.getpolnos())
239 assert_equals(self.st.getscannos(), s.getscannos())
240 assert_equals(self.st.getifnos(), s.getifnos())
241 assert_equals(self.st.getbeamnos(), s.getbeamnos())
242 # see if the residual spectra are ~ 0.0
243 for spec in ds:
244 assert_almost_equals(sum(spec)/len(spec), 0.0, 5)
245
246 def test_auto_poly_baseline(self):
247 q = self.st.auto_quotient()
248 b = q.auto_poly_baseline(insitu=False)
249 res_rms = (q-b).stats('rms')
250 assert_almost_equals(res_rms[0], 0.38370, 5)
251 assert_almost_equals(res_rms[1], 0.38780, 5)
252
253
254 def test_poly_baseline(self):
255 q = self.st.auto_quotient()
256 msk = q.create_mask([0.0, 1471.0], [1745.0, 4095.0])
257 b = q.poly_baseline(order=0, mask=msk,insitu=False)
258 res_rms = (q-b).stats('rms')
259 assert_almost_equals(res_rms[0], 0.38346, 5)
260 assert_almost_equals(res_rms[1], 0.38780, 5)
261
262
263 def test_reshape(self):
264 cp = self.st.copy()
265 n = cp.nchan()
266 rs = cp.reshape(10,-10,False)
267 assert_equals(n-20, rs.nchan())
268 assert_equals(cp.nchan(), n)
269 rs = cp.reshape(10,n-11,False)
270 assert_equals(n-20, rs.nchan())
271 cp.reshape(10,-10,True)
272 assert_equals(n-20, cp.nchan())
Note: See TracBrowser for help on using the repository browser.