XCube Stream Reader SDK
|
00001 /* 00002 * CPacketReader.h 00003 * 00004 * Created on: Nov 18, 2011 00005 * Author: mtaveniku 00006 * The Packet reader class is a multithreaded class reading X3C packets from 00007 * a parallel file system. The Packets in the stream are delivered in 00008 * chronological sequence from the files 00009 * This class is supposed to work on both Windows and Linux machines 00010 */ 00011 00012 #ifndef CPACKETREADER_H_ 00013 #define CPACKETREADER_H_ 00014 00015 #include <vector> 00016 00017 #include "X3cPacket.h" 00018 #include "CFileReader.h" 00019 #include "CFrameFilter.h" 00020 #include "CTimeFilter.h" 00021 00022 using namespace std; 00023 //#define INTERNAL_PACKET_HANDLING 00024 00026 00043 class CPacketReader { 00044 00045 public: 00047 static const size_t MAX_DISK_READ_SIZE_MB = 128; 00048 00050 typedef struct _packet_handle_ { 00052 int index; 00054 int streamNo; 00056 X3cPacket *pkt; 00057 } packet_handle_t; 00058 00059 public: 00060 00062 00064 CPacketReader(); 00065 00067 00068 virtual ~CPacketReader(); 00069 00071 00073 X3cPacket *getPacket(); 00074 00076 00078 void freePacket(X3cPacket *pkt); 00079 00086 void setPacketFilter(UINT64 start, UINT64 end); 00087 00094 void setTimeFilter(CTimeFilter const& tFilter); 00095 00096 00097 UINT64 getPacketCount() const; 00098 00099 // int seekTime(struct timespec ts); 00100 // int seek(size_t position); 00101 // int seekPacket(unsigned long pktNo); 00102 00103 00105 00115 bool initialize(int fileCnt, char **fileNames, size_t bufSz, size_t chunkSz); 00116 00117 00119 00121 int start(); 00122 00124 00127 bool stop(); 00128 00129 #ifdef INTERNAL_PACKET_HANDLING 00130 00131 00133 packet_handle_t *getPacketHandle(); 00134 #else 00135 00136 00141 void getPacketHandle(packet_handle_t *hdl); 00142 #endif 00143 00145 00147 void freePacketHandle(packet_handle_t *hdl); 00148 00149 private: 00151 00152 X3cPacket *persistent_get_packet(int streamNo); 00153 00155 00156 void sortPackets(); 00157 00159 void freePktHandleIdx(int index); 00160 00162 int getFreePktHandleIdx(); 00163 00165 void initPktHandles(); 00166 00167 00169 00174 CStreamFilter::FilterReturn getShouldBeTimeFiltered(X3cPacket *pkt); 00175 00176 00178 00179 bool mIsStarted; 00180 00182 00183 bool mIsInitialized; 00184 00186 00187 int mNumInput; 00188 00190 00191 vector<char*> mFileNames; 00192 00194 00196 size_t mRdChunkSz; 00197 00199 00200 size_t mBufSz; 00201 00203 vector <CPacketBuffer*> mPacketBuffers; 00204 00206 00208 vector <CFileReader*> mFileReaders; 00209 00211 00212 vector <X3cPacket *> mPktPtrs; 00213 00215 00216 int mOldestStream; 00217 00219 00220 UINT64 mOldestTs; 00221 00223 00224 UINT64 mNextTs; 00225 00227 00228 static const int mFHListSz = 512; 00229 00230 //A filter on the packet count; 00231 CFrameFilter mPacketFilter; 00232 00234 CTimeFilter mTimeFilter; 00235 00236 UINT64 mPacketCounter; 00237 00238 #ifdef INTERNAL_PACKET_HANDLING 00239 00240 packet_handle_t *mPacketHandles; 00241 00243 int *mFreeHandles; 00244 00246 int mFHHd; 00247 00249 int mFHTl; 00250 #endif 00251 }; 00252 00253 inline void CPacketReader::setPacketFilter(UINT64 start, UINT64 end) 00254 { 00255 mPacketFilter.setFilter(start, end); 00256 } 00257 00258 inline void CPacketReader::setTimeFilter(CTimeFilter const& tFilter) 00259 { 00260 mTimeFilter = tFilter; 00261 } 00262 00263 inline UINT64 CPacketReader::getPacketCount() const 00264 { 00265 return mPacketCounter; 00266 } 00267 00268 #endif /* CPACKETREADER_H_ */