XCube Stream Reader SDK
|
00001 /* 00002 * CPacketReader.h 00003 * 00004 * Created on: Nov 18, 2011 00005 * Author: mtaveniku 00006 * The Packet reader class is a multithreaded class reading X3C packets from 00007 * a parallel file system. The Packets in the stream are delivered in 00008 * chronological sequence from the files 00009 * This class is supposed to work on both Windows and Linux machines 00010 */ 00011 00012 #ifndef CPACKETREADER_H_ 00013 #define CPACKETREADER_H_ 00014 00015 #include <vector> 00016 00017 #include "X3cPacket.h" 00018 #include "CFileReader.h" 00019 #include "CFrameFilter.h" 00020 #include "CTimeFilter.h" 00021 00022 using namespace std; 00023 //#define INTERNAL_PACKET_HANDLING 00024 00026 00043 class CPacketReader { 00044 00045 public: 00047 static const size_t MAX_DISK_READ_SIZE_MB = 128; 00048 00050 typedef struct _packet_handle_ { 00052 int index; 00054 int streamNo; 00056 X3cPacket *pkt; 00057 } packet_handle_t; 00058 00059 public: 00060 00062 00064 CPacketReader(); 00065 00067 00068 virtual ~CPacketReader(); 00069 00071 00073 X3cPacket *getPacket(); 00074 00076 00078 void freePacket(X3cPacket *pkt); 00079 00086 void setPacketFilter(UINT64 start, UINT64 end); 00087 00094 void setTimeFilter(CTimeFilter const& tFilter); 00095 00096 00097 UINT64 getPacketCount() const; 00098 00099 // int seekTime(struct timespec ts); 00100 // int seek(size_t position); 00101 // int seekPacket(unsigned long pktNo); 00102 00103 00105 00115 bool initialize(std::vector<std::string> const& fileNames, size_t bufSz, 00116 size_t chunkSz); 00117 00118 00120 00122 int start(); 00123 00125 00128 bool stop(); 00129 00130 #ifdef INTERNAL_PACKET_HANDLING 00131 00132 00134 packet_handle_t *getPacketHandle(); 00135 #else 00136 00137 00142 void getPacketHandle(packet_handle_t *hdl); 00143 #endif 00144 00146 00148 void freePacketHandle(packet_handle_t *hdl); 00149 00151 00152 void reset(); 00153 00154 private: 00156 00157 X3cPacket *persistent_get_packet(int streamNo); 00158 00160 00161 void sortPackets(); 00162 00164 void freePktHandleIdx(int index); 00165 00167 int getFreePktHandleIdx(); 00168 00170 void initPktHandles(); 00171 00173 void resetFileReaders(); 00174 00176 void resetPacketBuffers(); 00177 00179 void resumeFileReaders(); 00180 00182 00187 CStreamFilter::FilterReturn getShouldBeTimeFiltered(X3cPacket *pkt); 00188 00190 00191 bool mIsStarted; 00192 00194 00195 bool mIsInitialized; 00196 00198 00199 int mNumInput; 00200 00202 00203 vector<std::string> mFileNames; 00204 00206 00208 size_t mRdChunkSz; 00209 00211 00212 size_t mBufSz; 00213 00215 vector <CPacketBuffer*> mPacketBuffers; 00216 00218 00220 vector <CFileReader*> mFileReaders; 00221 00223 00224 vector <X3cPacket *> mPktPtrs; 00225 00227 00228 int mOldestStream; 00229 00231 00232 UINT64 mOldestTs; 00233 00235 00236 UINT64 mNextTs; 00237 00239 00240 static const int mFHListSz = 512; 00241 00242 //A filter on the packet count; 00243 CFrameFilter mPacketFilter; 00244 00246 CTimeFilter mTimeFilter; 00247 00248 UINT64 mPacketCounter; 00249 00250 #ifdef INTERNAL_PACKET_HANDLING 00251 00252 packet_handle_t *mPacketHandles; 00253 00255 int *mFreeHandles; 00256 00258 int mFHHd; 00259 00261 int mFHTl; 00262 #endif 00263 }; 00264 00265 inline void CPacketReader::setPacketFilter(UINT64 start, UINT64 end) 00266 { 00267 mPacketFilter.setFilter(start, end); 00268 } 00269 00270 inline void CPacketReader::setTimeFilter(CTimeFilter const& tFilter) 00271 { 00272 mTimeFilter = tFilter; 00273 } 00274 00275 inline UINT64 CPacketReader::getPacketCount() const 00276 { 00277 return mPacketCounter; 00278 } 00279 00280 #endif /* CPACKETREADER_H_ */