Changeset 2393


Ignore:
Timestamp:
01/05/12 12:10:37 (13 years ago)
Author:
Takeshi Nakazato
Message:

New Development: No

JIRA Issue: Yes CAS-2816

Ready for Test: Yes

Interface Changes: No

What Interface Changed: Please list interface changes

Test Programs: List test programs

Put in Release Notes: Yes/No

Module(s): Module Names change impacts.

Description: Describe your changes here...

Parallel version of STGrid. Implemented using concurrent module.
Two threads are running in parallel. Those are responsible for
retrieving data and processing data respectively.


Location:
trunk/src
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/CMakeLists.txt

    r2370 r2393  
    2121# source files for libpyrap
    2222set( ASAP_SRCS
     23     ${SRCDIR}/concurrent.cpp
    2324     ${SRCDIR}/MathUtils.cpp
    2425     ${SRCDIR}/TableTraverse.cpp
  • trunk/src/STGrid.cpp

    r2390 r2393  
    2727
    2828using namespace std ;
     29using namespace concurrent ;
    2930using namespace casa ;
    3031using namespace asap ;
     
    3839// constructor
    3940STGrid::STGrid()
     41  : vshape_( 1 ), wshape_( 2 ), dshape_( 2 )
    4042{
    4143  init() ;
     
    4345
    4446STGrid::STGrid( const string infile )
     47  : vshape_( 1 ), wshape_( 2 ), dshape_( 2 )
    4548{
    4649  init() ;
     
    8588void STGrid::setFileIn( const string infile )
    8689{
     90  nfile_ = 1 ;
    8791  String name( infile ) ;
    88   if ( infileList_.size() == 0 || infileList_[0].compare( name ) != 0 ) {
    89     infileList_.resize( 1 ) ;
    90     infileList_[0] = String(infile) ;
    91     nfile_ = 1 ;
    92   }
     92  infileList_.resize( nfile_ ) ;
     93  infileList_[0] = String(infile) ;
    9394}
    9495
     
    250251}
    251252
    252 Bool STGrid::pastEnd()
    253 {
    254   LogIO os( LogOrigin("STGrid","pastEnd",WHERE) ) ;
    255   Bool b = nprocessed_ >= nrow_ ;
    256   return b ;
    257 }
    258253
    259254void STGrid::grid()
     
    312307  Bool b = nchunk_ >= nrow_ ;
    313308  nchunk_ = min( nchunk_, nrow_ ) ;
     309  vshape_ = IPosition( 1, nchunk_ ) ;
     310  wshape_ = IPosition( 2, nchan_, nchunk_ ) ;
     311  dshape_ = IPosition( 2, 2, nchunk_ ) ;
    314312  return b ;
     313}
     314
     315struct STGChunk {
     316  Int nrow ;
     317  Array<Complex> spectra;
     318  Array<Int> flagtra;
     319  Array<Int> rflag;
     320  Array<Float> weight;
     321  Array<Double> direction;
     322  STGChunk(IPosition const &wshape, IPosition const &vshape,
     323           IPosition const &dshape)
     324    : spectra(wshape), flagtra(wshape), rflag(vshape), weight(wshape),
     325      direction(dshape)
     326  { }
     327};
     328
     329struct STCommonData {
     330  Int gnx;
     331  Int gny;
     332  Int *chanMap;
     333  Vector<Float> convFunc ;
     334  Array<Complex> gdataArrC;
     335  Array<Float> gwgtArr;
     336  STCommonData(IPosition const &gshape, Array<Float> const &data)
     337    : gdataArrC(gshape, 0.0), gwgtArr(data) {}
     338};
     339
     340#define DO_AHEAD 3
     341
     342struct STContext {
     343  STCommonData &common;
     344  FIFO<STGChunk *, DO_AHEAD> queue;
     345  STGrid *const self;
     346  const Int pol;
     347  STContext(STGrid *obj, STCommonData &common, Int pol)
     348    : self(obj), common(common), pol(pol) {}
     349};
     350
     351
     352bool STGrid::produceChunk(void *ctx) throw(PCException)
     353{
     354  STContext &context = *(STContext *)ctx;
     355  if ( context.self->nprocessed_ >= context.self->nrow_ ) {
     356    return false;
     357  }
     358  STGChunk *chunk = new STGChunk(context.self->wshape_,
     359                                 context.self->vshape_,
     360                                 context.self->dshape_);
     361
     362  double t0 = mathutil::gettimeofday_sec() ;
     363  chunk->nrow = context.self->getDataChunk(
     364        context.self->wshape_, context.self->vshape_, context.self->dshape_,
     365        chunk->spectra, chunk->direction,
     366        chunk->flagtra, chunk->rflag, chunk->weight);
     367  double t1 = mathutil::gettimeofday_sec() ;
     368  context.self->eGetData_ += t1-t0 ;
     369
     370  context.queue.lock();
     371  context.queue.put(chunk);
     372  context.queue.unlock();
     373  return true;
     374}
     375
     376void STGrid::consumeChunk(void *ctx) throw(PCException)
     377{
     378  STContext &context = *(STContext *)ctx;
     379  STGChunk *chunk = NULL;
     380  try {
     381    context.queue.lock();
     382    chunk = context.queue.get();
     383    context.queue.unlock();
     384  } catch (FullException &e) {
     385    context.queue.unlock();
     386    // TODO: log error
     387    throw PCException();
     388  }
     389
     390  double t0, t1 ;
     391  // world -> pixel
     392  Array<Double> xypos( context.self->dshape_ ) ;
     393  t0 = mathutil::gettimeofday_sec() ;
     394  context.self->toPixel( chunk->direction, xypos ) ;
     395  t1 = mathutil::gettimeofday_sec() ;
     396  context.self->eToPixel_ += t1-t0 ;
     397   
     398  // call ggridsd
     399  Int nvispol = 1 ;
     400  Int irow = -1 ;
     401  t0 = mathutil::gettimeofday_sec() ;
     402  context.self->call_ggridsd( xypos,
     403                chunk->spectra,
     404                nvispol,
     405                context.self->nchan_,
     406                chunk->flagtra,
     407                chunk->rflag,
     408                chunk->weight,
     409                chunk->nrow,
     410                irow,
     411                context.common.gdataArrC,
     412                context.common.gwgtArr,
     413                context.common.gnx,
     414                context.common.gny,
     415                context.self->npol_,
     416                context.self->nchan_,
     417                context.self->convSupport_,
     418                context.self->convSampling_,
     419                context.common.convFunc,
     420                context.common.chanMap,
     421                (Int*)&context.pol ) ;
     422  t1 = mathutil::gettimeofday_sec() ;
     423  context.self->eGGridSD_ += t1-t0 ;
     424 
     425  delete chunk;
    315426}
    316427
     
    320431  double t0, t1 ;
    321432
    322   // convolution kernel
    323   Vector<Float> convFunc ;
    324   t0 = mathutil::gettimeofday_sec() ;
    325   setConvFunc( convFunc ) ;
    326   t1 = mathutil::gettimeofday_sec() ;
    327   os << "setConvFunc: elapsed time is " << t1-t0 << " sec." << LogIO::POST ;
    328433
    329434  // grid data
    330   Int gnx = nx_ ;
    331   Int gny = ny_ ;
    332435  // Extend grid plane with convSupport_
    333 //   Int gnx = nx_+convSupport_*2 ;
    334 //   Int gny = ny_+convSupport_*2 ;
     436  //   Int gnx = nx_+convSupport_*2 ;
     437  //   Int gny = ny_+convSupport_*2 ;
     438  Int gnx = nx_;
     439  Int gny = ny_;
     440
    335441  IPosition gshape( 4, gnx, gny, npol_, nchan_ ) ;
    336   Array<Complex> gdataArrC( gshape, 0.0 ) ;
    337442  // 2011/12/20 TN
    338443  // data_ and gwgtArr share storage
    339444  data_.resize( gshape ) ;
    340445  data_ = 0.0 ;
    341   Array<Float> gwgtArr( data_ ) ;
     446  STCommonData common = STCommonData(gshape, data_);
     447  common.gnx = gnx ;
     448  common.gny = gny ;
    342449
    343450  // parameters for gridding
    344451  Int *chanMap = new Int[nchan_] ;
    345   {
    346     Int *work_p = chanMap ;
    347     for ( Int i = 0 ; i < nchan_ ; i++ ) {
    348       *work_p = i ;
    349       work_p++ ;
    350     }
    351   }
    352   Int *polMap = new Int[1] ;
    353   Int nvispol = 1 ;
    354   Int irow = -1 ;
     452  for ( Int i = 0 ; i < nchan_ ; i++ ) {
     453    chanMap[i] = i ;
     454  }
     455  common.chanMap = chanMap;
     456
     457  // convolution kernel
     458  t0 = mathutil::gettimeofday_sec() ;
     459  setConvFunc( common.convFunc ) ;
     460  t1 = mathutil::gettimeofday_sec() ;
     461  os << "setConvFunc: elapsed time is " << t1-t0 << " sec." << LogIO::POST ;
    355462
    356463  // for performance check
    357   double eGetData = 0.0 ;
    358   double eToPixel = 0.0 ;
    359   double eGGridSD = 0.0 ;
     464  eGetData_ = 0.0 ;
     465  eToPixel_ = 0.0 ;
     466  eGGridSD_ = 0.0 ;
    360467  double eInitPol = 0.0 ;
    361468
     469  Broker broker = Broker(produceChunk, consumeChunk);
    362470  for ( uInt ifile = 0 ; ifile < nfile_ ; ifile++ ) {
    363471    initTable( ifile ) ;
    364472
    365     os << "start table " << ifile << ": " << infileList_[ifile] << LogIO::POST ;
    366 
     473    os << "start table " << ifile << ": " << infileList_[ifile] << LogIO::POST ;   
    367474    for ( Int ipol = 0 ; ipol < npol_ ; ipol++ ) {
    368475      t0 = mathutil::gettimeofday_sec() ;
    369       initPol( ipol ) ;
     476      initPol( ipol ) ; // set ptab_ and attach()
    370477      t1 = mathutil::gettimeofday_sec() ;
    371478      eInitPol += t1-t0 ;
    372479     
    373       polMap[0] = ipol ;
     480      STContext context(this, common, ipol);
    374481     
    375482      os << "start pol " << ipol << LogIO::POST ;
    376483     
    377       while( !pastEnd() ) {
    378         // data storage
    379         IPosition cshape( 3, npol_, nchan_, nchunk_ ) ;
    380         IPosition mshape( 2, npol_, nchunk_ ) ;
    381         IPosition vshape( 1, nchunk_ ) ;
    382         IPosition dshape( 2, 2, nchunk_ ) ;
    383         IPosition wshape( 2, nchan_, nchunk_ ) ;
    384         Array<Complex> spectra( wshape ) ;
    385         Array<Int> flagtra( wshape ) ;
    386         Array<Int> rflag( vshape ) ;
    387         Array<Float> weight( wshape ) ;
    388         Array<Double> direction( dshape ) ;
    389         Array<Double> xypos( dshape ) ;
    390        
    391         spectraF_.resize( wshape ) ;
    392         flagtraUC_.resize( wshape ) ;
    393         rflagUI_.resize( vshape ) ;
    394        
    395         // retrieve data
    396         t0 = mathutil::gettimeofday_sec() ;
    397         Int nrow = getDataChunk( spectra, direction, flagtra, rflag, weight ) ;
    398         t1 = mathutil::gettimeofday_sec() ;
    399         eGetData += t1-t0 ;
    400        
    401         // world -> pixel
    402         t0 = mathutil::gettimeofday_sec() ;
    403         toPixel( direction, xypos ) ;
    404         t1 = mathutil::gettimeofday_sec() ;
    405         eToPixel += t1-t0 ;
    406        
    407         // call ggridsd
    408         t0 = mathutil::gettimeofday_sec() ;
    409         call_ggridsd( xypos,
    410                       spectra,
    411                       nvispol,
    412                       nchan_,
    413                       flagtra,
    414                       rflag,
    415                       weight,
    416                       nrow,
    417                       irow,
    418                       gdataArrC,
    419                       gwgtArr,
    420                       gnx,
    421                       gny,
    422                       npol_,
    423                       nchan_,
    424                       convSupport_,
    425                       convSampling_,
    426                       convFunc,
    427                       chanMap,
    428                       polMap ) ;
    429         t1 = mathutil::gettimeofday_sec() ;
    430         eGGridSD += t1-t0 ;
    431        
     484      nprocessed_ = 0 ;
     485#if 1
     486      broker.runProducerAsMasterThread(&context, DO_AHEAD);
     487#else
     488      for (;;) {
     489        bool produced = produceChunk(&context);
     490        if (! produced) {
     491          break;
     492        }
     493        consumeChunk(&context);
    432494      }
    433      
     495#endif
     496
    434497      os << "end pol " << ipol << LogIO::POST ;
    435      
    436       nprocessed_ = 0 ;
    437     }
     498
     499    }
     500    os << "end table " << ifile << LogIO::POST ;   
    438501  }
    439502  os << "initPol: elapsed time is " << eInitPol << " sec." << LogIO::POST ;
    440   os << "getData: elapsed time is " << eGetData-eToInt-eGetWeight << " sec." << LogIO::POST ;
    441   os << "toPixel: elapsed time is " << eToPixel << " sec." << LogIO::POST ;
    442   os << "ggridsd: elapsed time is " << eGGridSD << " sec." << LogIO::POST ;
     503  os << "getData: elapsed time is " << eGetData_-eToInt-eGetWeight << " sec." << LogIO::POST ;
     504  os << "toPixel: elapsed time is " << eToPixel_ << " sec." << LogIO::POST ;
     505  os << "ggridsd: elapsed time is " << eGGridSD_ << " sec." << LogIO::POST ;
    443506  os << "toInt: elapsed time is " << eToInt << " sec." << LogIO::POST ;
    444507  os << "getWeight: elapsed time is " << eGetWeight << " sec." << LogIO::POST ;
    445508 
    446   delete polMap ;
    447509  delete chanMap ;
    448510
    449511  // set data
    450   setData( gdataArrC, gwgtArr ) ;
     512  setData( common.gdataArrC, common.gwgtArr ) ;
    451513
    452514}
     
    488550    }
    489551  }
    490   Int *polMap = new Int[1] ;
     552  Int polMap[1] ;
    491553
    492554  // some parameters for ggridsd
     
    504566    initTable( ifile ) ;
    505567
    506     os << "start table " << ifile << ": " << infileList_[ifile] << LogIO::POST ;
     568    os << "start table " << ifile << ": " << infileList_[ifile] << LogIO::POST ;   
    507569    // to read data from the table
    508570    IPosition mshape( 2, nchan_, nrow_ ) ;
     
    520582      t1 = mathutil::gettimeofday_sec() ;
    521583      eInitPol += t1-t0 ;
    522 
     584     
     585      os << "start pol " << ipol << LogIO::POST ;
     586     
    523587      // retrieve data
    524588      t0 = mathutil::gettimeofday_sec() ;
     
    558622      t1 = mathutil::gettimeofday_sec() ;
    559623      eGGridSD += t1-t0 ;
    560     }
     624
     625      os << "end pol " << ipol << LogIO::POST ;
     626
     627    }
     628    os << "end table " << ifile << LogIO::POST ;   
    561629  }
    562630  os << "initPol: elapsed time is " << eInitPol << " sec." << LogIO::POST ;
     
    568636
    569637  // delete maps
    570   delete polMap ;
    571638  delete chanMap ;
    572639
     
    780847  Int ifno = ifno_ ;
    781848  tableList_.resize( nfile_ ) ;
     849  if ( ifno == -1 ) {
     850    Table taborg( infileList_[0] ) ;
     851    ROScalarColumn<uInt> ifnoCol( taborg, "IFNO" ) ;
     852    ifno = ifnoCol( 0 ) ;
     853    os << LogIO::WARN
     854       << "IFNO is not given. Using default IFNO: " << ifno << LogIO::POST ;
     855  }
    782856  for ( uInt i = 0 ; i < nfile_ ; i++ ) {
    783857    Table taborg( infileList_[i] ) ;
    784     if ( ifno == -1 ) {
    785       ROScalarColumn<uInt> ifnoCol( taborg, "IFNO" ) ;
    786       ifno = ifnoCol( 0 ) ;
    787       os << LogIO::WARN
    788        << "IFNO is not given. Using default IFNO: " << ifno << LogIO::POST ;
    789     }
    790858    TableExprNode node ;
    791859    if ( isMultiIF( taborg ) ) {
     
    803871      tableList_[i] = taborg( node ) ;
    804872    }
     873    os << "tableList_[" << i << "].nrow()=" << tableList_[i].nrow() << LogIO::POST ;
    805874    if ( tableList_[i].nrow() == 0 ) {
    806875      os << LogIO::SEVERE
     
    890959}
    891960
    892 Int STGrid::getDataChunk( Array<Complex> &spectra,
    893                           Array<Double> &direction,
    894                           Array<Int> &flagtra,
    895                           Array<Int> &rflag,
    896                           Array<Float> &weight )
     961Int STGrid::getDataChunk(
     962                         IPosition const &wshape,
     963                         IPosition const &vshape,
     964                         IPosition const &dshape,
     965                         Array<Complex> &spectra,
     966                         Array<Double> &direction,
     967                         Array<Int> &flagtra,
     968                         Array<Int> &rflag,
     969                         Array<Float> &weight )
    897970{
    898971  LogIO os( LogOrigin("STGrid","getDataChunk",WHERE) ) ;
     972
     973  Array<Float> spectraF_(wshape);
     974  Array<uChar> flagtraUC_(wshape);
     975  Array<uInt> rflagUI_(vshape);
    899976  Int nrow = getDataChunk( spectraF_, direction, flagtraUC_, rflagUI_, weight ) ;
    900977  if ( nrow < nchunk_ ) {
     
    913990  return nrow ;
    914991}
     992
     993#if 0
     994Int STGrid::getDataChunk( Array<Complex> &spectra,
     995                          Array<Double> &direction,
     996                          Array<Int> &flagtra,
     997                          Array<Int> &rflag,
     998                          Array<Float> &weight )
     999{
     1000  LogIO os( LogOrigin("STGrid","getDataChunk",WHERE) ) ;
     1001  Int nrow = getDataChunk( spectraF_, direction, flagtraUC_, rflagUI_, weight ) ;
     1002  if ( nrow < nchunk_ ) {
     1003    spectra.resize( spectraF_.shape() ) ;
     1004    flagtra.resize( flagtraUC_.shape() ) ;
     1005    rflag.resize( rflagUI_.shape() ) ;
     1006  }
     1007  double t0, t1 ;
     1008  t0 = mathutil::gettimeofday_sec() ;
     1009  convertArray( spectra, spectraF_ ) ;
     1010  toInt( flagtraUC_, flagtra ) ;
     1011  toInt( rflagUI_, rflag ) ;
     1012  t1 = mathutil::gettimeofday_sec() ;
     1013  eToInt = t1 - t0 ;
     1014 
     1015  return nrow ;
     1016}
     1017#endif
    9151018
    9161019Int STGrid::getDataChunk( Array<Float> &spectra,
     
    9751078  uInt polno ;
    9761079  for ( uInt i = 0 ; i < polnoCol.nrow() ; i++ ) {
     1080    //polno = polnoCol( i ) ;
    9771081    polno = pols( i ) ;
    9781082    if ( allNE( pollistOrg, polno ) ) {
     
    10031107  for ( uInt i = 0 ; i < nfile_ ; i++ ) {
    10041108    rows_[i] = tableList_[i].nrow() / npolOrg_ ;
     1109    if ( nrow_ < rows_[i] )
     1110      nrow_ = rows_[i] ;
    10051111  }
    10061112  flagtraCol_.attach( tableList_[0], "FLAGTRA" ) ;
     
    12451351  t0 = mathutil::gettimeofday_sec() ;
    12461352
     1353  //Int polno = 0 ;
    12471354  String outfile_ ;
    12481355  if ( outfile.size() == 0 ) {
  • trunk/src/STGrid.h

    r2390 r2393  
    2626#include <tables/Tables/ScalarColumn.h>
    2727#include <tables/Tables/ArrayColumn.h>
     28
     29#include "concurrent.h"
    2830//#include <tables/Tables/TableRow.h>
    2931
     
    100102                      Array<uInt> &rflag,
    101103                      Array<Float> &weight ) ;
     104  Int getDataChunk( IPosition const &wshape,
     105                    IPosition const &vshape,
     106                    IPosition const &dshape,
     107                    Array<Complex> &spectra,
     108                    Array<Double> &direction,
     109                    Array<Int> &flagtra,
     110                    Array<Int> &rflag,
     111                    Array<Float> &weight ) ;
    102112  Int getDataChunk( Array<Complex> &spectra,
    103113                    Array<Double> &direction,
     
    128138  void prepareTable( Table &tab, String &name ) ;
    129139
    130   Bool pastEnd() ;
     140//   Bool pastEnd() ;
    131141
    132142  void selectData() ;
     
    160170  void initTable( uInt idx ) ;
    161171  Bool isMultiIF( Table &tab ) ;
     172  static bool produceChunk(void *ctx) throw(concurrent::PCException);
     173  static void consumeChunk(void *ctx) throw(concurrent::PCException);
    162174
    163175
     
    172184  uInt nfile_ ;
    173185  Int ifno_ ;
     186
    174187  Int nx_ ;
    175188  Int ny_ ;
     
    177190  Int npolOrg_ ;
    178191  Int nchan_ ;
    179   Int nrow_ ;
    180   Block<Int> rows_ ;
    181192  Double cellx_ ;
    182193  Double celly_ ;
     
    186197  Int userSupport_ ;
    187198  Int convSampling_ ;
    188   Array<Float> data_ ;
    189199  Vector<uInt> pollist_ ;
    190200  Vector<uInt> scanlist_ ;
    191201  String wtype_ ;
     202  Block<Table> tableList_ ;
     203  Vector<uInt> rows_ ;
     204  Int nchunk_ ;
     205
     206  /////////////// gridPerRow variable
     207  IPosition vshape_;
     208  IPosition wshape_;
     209  IPosition dshape_;
     210  // loop variable
     211  Int nrow_ ;
     212  Array<Float> data_ ;
    192213
    193214  Table tab_ ;
    194   Block<Table> tableList_ ;
     215  // per pol
    195216  Table ptab_ ;
    196217  ROArrayColumn<Float> spectraCol_ ;
     
    200221  ROArrayColumn<Float> tsysCol_ ;
    201222  ROScalarColumn<Double> intervalCol_ ;
     223
    202224  Int nprocessed_ ;
    203   Int nchunk_ ;
    204 
    205   Array<Float> spectraF_ ;
    206   Array<uChar> flagtraUC_ ;
    207   Array<uInt> rflagUI_ ;
    208 
     225
     226
     227  double eGetData_;
     228  double eToPixel_;
     229  double eGGridSD_;
    209230};
    210231}
Note: See TracChangeset for help on using the changeset viewer.