source: trunk/test/test_scantable.py@ 2104

Last change on this file since 2104 was 1925, checked in by Malte Marquarding, 14 years ago

Add some basic tests of poly_baseline

File size: 7.9 KB
RevLine 
[1840]1import sys
2import os
3import shutil
[1600]4import datetime
[1903]5from nose.tools import *
[1840]6from asap import scantable, selector, mask_not
7from asap.logging import asaplog
[1903]8# no need for log messages
[1824]9asaplog.disable()
[1544]10
[1840]11def tempdir_setup():
12 os.makedirs("test_temp")
13
14def tempdir_teardown():
15 shutil.rmtree("test_temp", True)
16
17class TestScantable(object):
18 def setup(self):
[1824]19 s = scantable("data/MOPS.rpf", average=True)
20 sel = selector()
21 # make sure this order is always correct - in can be random
22 sel.set_order(["SCANNO", "POLNO"])
23 s.set_selection(sel)
24 self.st = s.copy()
[1725]25 restfreqs = [86.243] # 13CO-1/0, SiO the two IF
26 self.st.set_restfreqs(restfreqs,"GHz")
[1544]27
28 def test_init(self):
29 st = scantable("data/MOPS.rpf", average=False)
[1840]30 assert_equal(st.ncycle(), 32)
[1544]31 st = scantable("data/MOPS.rpf", average=True)
[1840]32 assert_equal(st.ncycle(), 2)
[1544]33 st = scantable("data/MOPS.rpf", unit="Jy")
[1840]34 assert_equal(st.get_fluxunit(), "Jy")
[1544]35 st = scantable("data/MOPS.rpf", unit="K")
[1840]36 assert_equal(st.get_fluxunit(), "K")
37 assert_raises(RuntimeError, scantable, "data/MOPS.rpf", unit="junk")
[1544]38 st = scantable(["data/MOPS.rpf","data/MOPS.rpf"], average=False)
[1840]39 assert_equal(st.nscan(), 4)
[1544]40
41 def test_copy(self):
42 st = self.st.copy()
[1840]43 assert_not_equal(id(st), id(self.st))
[1592]44
[1544]45 def test_drop_scan(self):
46 st = self.st.drop_scan([1])
[1840]47 assert_equal(st.nscan(), 1)
[1544]48
49 def test_get_scan(self):
50 st = self.st.get_scan([1])
[1840]51 assert_equal(st.nscan(), 1)
[1544]52 st = self.st.get_scan("Orion_SiO_R")
[1840]53 assert_equal(st.get_sourcename()[-1], "Orion_SiO_R")
54 assert_equal(st.nscan(), 1)
[1544]55
56 def test_get_spectrum(self):
57 spec = self.st.get_spectrum(0)
[1840]58 assert_almost_equal(max(spec), 215.279830933)
[1544]59
60 def test_get_mask(self):
61 spec = self.st.get_mask(0)
[1840]62 assert_equal(len(spec), 4096)
[1592]63
[1544]64 def test_set_spectrum(self):
65 spec = [ 1.0 for i in range(self.st.nchan()) ]
66 self.st.set_spectrum(spec, 0)
67 spec1 = self.st.get_spectrum(0)
[1840]68 assert_almost_equal(max(spec1), 1.0)
[1544]69
70 def test_selection(self):
71 sel = selector()
72 sel.set_polarisations("YY")
73 self.st.set_selection(sel)
[1903]74 assert_equal(self.st.getpolnos(), [1])
[1544]75 sel1 = self.st.get_selection()
[1840]76 assert_equal(sel1.get_pols(), [1])
[1600]77 self.st.set_selection(pols="XX")
[1903]78 assert_equal(self.st.getpolnos(), [0])
[1544]79
[1842]80
81 def stats(self, key, value, mask=False):
82 msk = None
83 if mask:
84 msk = self.st.create_mask([0,100], [3900,4096])
85 sval = self.st.stats(stat=key, mask=msk)
86 assert_almost_equal(sval[0], value)
87
88 def test_masked_stats(self):
89 stats = { 'min': 113.767166138,
90 'max':128.21571350, 'sumsq':4180516.75,
91 'sum':35216.87890625, 'mean':118.5753479,
92 'var':15.75608253, 'stddev':3.9693932533,
93 'avdev':3.395271778, 'rms':118.6415405,
94 'median':117.5024261}
95 for k,v in stats.items():
96 yield self.stats, k, v, True
97
[1544]98 def test_stats(self):
99 stats = { 'min': 113.767166138,
100 'max':215.279830933, 'sumsq':128759200.0,
101 'sum':720262.375, 'mean':175.845306396,
102 'var':513.95324707, 'stddev':22.6705360413,
[1592]103 'avdev':16.3966751099, 'rms':177.300170898,
104 'median':182.891845703}
[1842]105 for k,v in stats.items():
106 yield self.stats, k, v
[1544]107
108 def test_get_column_names(self):
[1592]109 cnames = ['SCANNO', 'CYCLENO', 'BEAMNO', 'IFNO',
[1824]110 'POLNO', 'FREQ_ID', 'MOLECULE_ID', 'REFBEAMNO', 'FLAGROW',
[1592]111 'TIME', 'INTERVAL', 'SRCNAME', 'SRCTYPE',
[1544]112 'FIELDNAME', 'SPECTRA', 'FLAGTRA', 'TSYS',
[1592]113 'DIRECTION', 'AZIMUTH', 'ELEVATION',
114 'OPACITY', 'TCAL_ID', 'FIT_ID',
[1544]115 'FOCUS_ID', 'WEATHER_ID', 'SRCVELOCITY',
116 'SRCPROPERMOTION', 'SRCDIRECTION',
117 'SCANRATE']
[1840]118 assert_equal(self.st.get_column_names(), cnames)
[1544]119
[1600]120 def test_get_tsys(self):
[1840]121 assert_almost_equal(self.st.get_tsys()[0], 175.830429077)
[1600]122
123 def test_get_time(self):
[1840]124 assert_equal(self.st.get_time(0), '2008/03/12/09:32:50')
[1600]125 dt = datetime.datetime(2008,3,12,9,32,50)
[1840]126 assert_equal(self.st.get_time(0, True), dt)
[1600]127
128 def test_get_inttime(self):
[1840]129 assert_almost_equal(self.st.get_inttime()[0], 30.720016479)
[1600]130
131 def test_get_sourcename(self):
[1840]132 assert_equal(self.st.get_sourcename(0), 'Orion_SiO_R')
133 assert_equal(self.st.get_sourcename(),
[1824]134 ['Orion_SiO_R', 'Orion_SiO_R',
135 'Orion_SiO', 'Orion_SiO'])
[1600]136
137 def test_get_azimuth(self):
[1840]138 assert_almost_equal(self.st.get_azimuth()[0], 5.628767013)
[1600]139
140 def test_get_elevation(self):
[1840]141 assert_almost_equal(self.st.get_elevation()[0], 1.01711678504)
[1600]142
143 def test_get_parangle(self):
[1840]144 assert_almost_equal(self.st.get_parangle()[0], 2.5921990871)
[1600]145
146 def test_get_direction(self):
[1840]147 assert_equal(self.st.get_direction()[0], '05:35:14.5 -04.52.29.5')
[1600]148
149 def test_get_directionval(self):
150 dv = self.st.get_directionval()[0]
[1840]151 assert_almost_equal(dv[0], 1.4627692699)
152 assert_almost_equal(dv[1], -0.0850824415)
[1600]153
154 def test_unit(self):
155 self.st.set_unit('')
156 self.st.set_unit('GHz')
157 self.st.set_unit('km/s')
[1840]158 assert_raises(RuntimeError, self.st.set_unit, 'junk')
159 assert_equals(self.st.get_unit(), 'km/s')
[1600]160
[1592]161 def test_average_pol(self):
162 ap = self.st.average_pol()
[1840]163 assert_equal(ap.npol(), 1)
[1544]164
[1600]165 def test_drop_scan(self):
166 s0 = self.st.drop_scan(1)
[1903]167 assert_equal(s0.getscannos(), [0])
[1600]168 s1 = self.st.drop_scan([0])
[1903]169 assert_equal(s1.getscannos(), [1])
[1592]170
[1725]171 def test_flag(self):
172 q = self.st.auto_quotient()
173 q.set_unit('km/s')
174 q0 = q.copy()
175 q1 = q.copy()
176 msk = q0.create_mask([-10,20])
177 q0.flag(mask=mask_not(msk))
[1824]178 q1.flag(mask=msk)
[1840]179 assert_almost_equal(q0.stats(stat='max')[0], 95.62171936)
180 assert_almost_equal(q1.stats(stat='max')[0], 2.66563416)
[1600]181
[1840]182 @with_setup(tempdir_setup, tempdir_teardown)
183 def test_save(self):
184 fname = os.path.join("test_temp", 'scantable_test.%s')
185 formats = [(fname % 'sdfits', 'SDFITS', True),
186 (fname % 'ms', 'MS2', True),
187 (fname % 'class.fits', 'CLASS', False),
188 (fname % 'fits', 'FITS', False),
189 (fname % 'txt', 'ASCII', False),
190 ]
191 for format in formats:
192 yield self.save, format
193
194 def save(self, args):
195 fname = args[0]
196 self.st.save(fname, args[1], True)
[1844]197 # do some verification args[2] == True
[1840]198 if args[-1]:
199 s = scantable(fname)
200 ds = self.st - s
201 assert_equals(self.st.getpolnos(), s.getpolnos())
202 assert_equals(self.st.getscannos(), s.getscannos())
203 assert_equals(self.st.getifnos(), s.getifnos())
204 assert_equals(self.st.getbeamnos(), s.getbeamnos())
[1844]205 # see if the residual spectra are ~ 0.0
[1840]206 for spec in ds:
207 assert_almost_equals(sum(spec)/len(spec), 0.0, 5)
[1925]208
209 def test_auto_poly_baseline(self):
210 q = self.st.auto_quotient()
211 b = q.auto_poly_baseline(insitu=False)
212 res_rms = (q-b).stats('rms')
213 assert_almost_equals(res_rms[0], 0.38370, 5)
214 assert_almost_equals(res_rms[1], 0.38780, 5)
215
216
217 def test_poly_baseline(self):
218 q = self.st.auto_quotient()
219 msk = q.create_mask([0.0, 1471.0], [1745.0, 4095.0])
220 b = q.poly_baseline(order=0, mask=msk,insitu=False)
221 res_rms = (q-b).stats('rms')
222 assert_almost_equals(res_rms[0], 0.38346, 5)
223 assert_almost_equals(res_rms[1], 0.38780, 5)
Note: See TracBrowser for help on using the repository browser.