XCube Stream Reader SDK
x3c/include/packetReader/CPacketReader.h
00001 /*
00002  * CPacketReader.h
00003  *
00004  *  Created on: Nov 18, 2011
00005  *      Author: mtaveniku
00006  *  The Packet reader class is a multithreaded class reading X3C packets from
00007  *  a parallel file system. The Packets in the stream are delivered in
00008  *  chronological sequence from the files
00009  *  This class is supposed to work on both Windows and Linux machines
00010  */
00011 
00012 #ifndef CPACKETREADER_H_
00013 #define CPACKETREADER_H_
00014 
00015 #include <vector>
00016 
00017 #include "X3cPacket.h"
00018 #include "CFileReader.h"
00019 #include "CFrameFilter.h"
00020 #include "CTimeFilter.h"
00021 
00022 using namespace std;
00023 //#define INTERNAL_PACKET_HANDLING
00024 
00026 
00043 class CPacketReader {
00044 
00045 public:
00047         static const size_t MAX_DISK_READ_SIZE_MB = 128;
00048 
00050         typedef struct _packet_handle_ {
00052                 int index;
00054                 int streamNo;
00056                 X3cPacket *pkt;
00057         } packet_handle_t;
00058 
00059 public:
00060 
00062 
00064         CPacketReader();
00065 
00067 
00068         virtual ~CPacketReader();
00069 
00071 
00073         X3cPacket *getPacket();
00074 
00076 
00078         void freePacket(X3cPacket *pkt);
00079 
00086     void setPacketFilter(UINT64 start, UINT64 end);
00087 
00094     void setTimeFilter(CTimeFilter const& tFilter);
00095 
00096 
00097     UINT64 getPacketCount() const;
00098 
00099         // int seekTime(struct timespec ts);
00100         // int seek(size_t position);
00101         // int seekPacket(unsigned long pktNo);
00102 
00103 
00105 
00115         bool initialize(int fileCnt, char **fileNames, size_t bufSz, size_t chunkSz);
00116 
00117 
00119 
00121         int start();
00122 
00124 
00127         bool stop();
00128 
00129 #ifdef INTERNAL_PACKET_HANDLING
00130 
00131 
00133         packet_handle_t *getPacketHandle();
00134 #else
00135 
00136 
00141         void getPacketHandle(packet_handle_t *hdl);
00142 #endif
00143 
00145 
00147         void freePacketHandle(packet_handle_t *hdl);
00148 
00149 private:
00151 
00152         X3cPacket *persistent_get_packet(int streamNo);
00153 
00155 
00156         void sortPackets();
00157 
00159         void freePktHandleIdx(int index);
00160 
00162         int  getFreePktHandleIdx();
00163 
00165         void initPktHandles();
00166 
00167 
00169 
00174         CStreamFilter::FilterReturn getShouldBeTimeFiltered(X3cPacket *pkt);
00175 
00176 
00178 
00179         bool mIsStarted;
00180 
00182 
00183         bool mIsInitialized;
00184 
00186 
00187         int mNumInput;
00188 
00190 
00191         vector<char*> mFileNames;
00192 
00194 
00196         size_t mRdChunkSz;
00197 
00199 
00200         size_t mBufSz;
00201 
00203         vector <CPacketBuffer*> mPacketBuffers;
00204 
00206 
00208         vector <CFileReader*> mFileReaders;
00209 
00211 
00212         vector <X3cPacket *> mPktPtrs;
00213 
00215 
00216         int mOldestStream;
00217 
00219 
00220         UINT64 mOldestTs;
00221 
00223 
00224         UINT64 mNextTs;
00225 
00227 
00228         static const int mFHListSz = 512;
00229 
00230         //A filter on the packet count;
00231         CFrameFilter mPacketFilter;
00232 
00234     CTimeFilter mTimeFilter;
00235 
00236         UINT64 mPacketCounter;
00237 
00238 #ifdef INTERNAL_PACKET_HANDLING
00239 
00240         packet_handle_t *mPacketHandles;
00241 
00243         int *mFreeHandles;
00244 
00246         int mFHHd;
00247 
00249         int mFHTl;
00250 #endif
00251 };
00252 
00253 inline void CPacketReader::setPacketFilter(UINT64 start, UINT64 end)
00254 {
00255     mPacketFilter.setFilter(start, end);
00256 }
00257 
00258 inline void CPacketReader::setTimeFilter(CTimeFilter const& tFilter)
00259 {
00260     mTimeFilter = tFilter;
00261 }
00262 
00263 inline UINT64 CPacketReader::getPacketCount() const
00264 {
00265     return mPacketCounter;
00266 }
00267 
00268 #endif /* CPACKETREADER_H_ */
 All Classes Files Functions Variables Typedefs Friends Defines