XCube Stream Reader SDK
x3c/include/packetReader/CPacketReader.h
00001 /*
00002  * CPacketReader.h
00003  *
00004  *  Created on: Nov 18, 2011
00005  *      Author: mtaveniku
00006  *  The Packet reader class is a multithreaded class reading X3C packets from
00007  *  a parallel file system. The Packets in the stream are delivered in
00008  *  chronological sequence from the files
00009  *  This class is supposed to work on both Windows and Linux machines
00010  */
00011 
00012 #ifndef CPACKETREADER_H_
00013 #define CPACKETREADER_H_
00014 
00015 #include <vector>
00016 
00017 #include "X3cPacket.h"
00018 #include "CFileReader.h"
00019 #include "CFrameFilter.h"
00020 #include "CTimeFilter.h"
00021 
00022 using namespace std;
00023 //#define INTERNAL_PACKET_HANDLING
00024 
00026 
00043 class CPacketReader {
00044 
00045 public:
00047         static const size_t MAX_DISK_READ_SIZE_MB = 128;
00048 
00050         typedef struct _packet_handle_ {
00052                 int index;
00054                 int streamNo;
00056                 X3cPacket *pkt;
00057         } packet_handle_t;
00058 
00059 public:
00060 
00062 
00064         CPacketReader();
00065 
00067 
00068         virtual ~CPacketReader();
00069 
00071 
00073         X3cPacket *getPacket();
00074 
00076 
00078         void freePacket(X3cPacket *pkt);
00079 
00086     void setPacketFilter(UINT64 start, UINT64 end);
00087 
00094     void setTimeFilter(CTimeFilter const& tFilter);
00095 
00096 
00097     UINT64 getPacketCount() const;
00098 
00099         // int seekTime(struct timespec ts);
00100         // int seek(size_t position);
00101         // int seekPacket(unsigned long pktNo);
00102 
00103 
00105 
00115         bool initialize(std::vector<std::string> const& fileNames, size_t bufSz,
00116                     size_t chunkSz);
00117 
00118 
00120 
00122         int start();
00123 
00125 
00128         bool stop();
00129 
00130 #ifdef INTERNAL_PACKET_HANDLING
00131 
00132 
00134         packet_handle_t *getPacketHandle();
00135 #else
00136 
00137 
00142         void getPacketHandle(packet_handle_t *hdl);
00143 #endif
00144 
00146 
00148         void freePacketHandle(packet_handle_t *hdl);
00149 
00151 
00152     void reset();
00153 
00154 private:
00156 
00157         X3cPacket *persistent_get_packet(int streamNo);
00158 
00160 
00161         void sortPackets();
00162 
00164         void freePktHandleIdx(int index);
00165 
00167         int  getFreePktHandleIdx();
00168 
00170         void initPktHandles();
00171 
00173     void resetFileReaders();
00174 
00176     void resetPacketBuffers();
00177 
00179     void resumeFileReaders();
00180 
00182 
00187         CStreamFilter::FilterReturn getShouldBeTimeFiltered(X3cPacket *pkt);
00188 
00190 
00191         bool mIsStarted;
00192 
00194 
00195         bool mIsInitialized;
00196 
00198 
00199         int mNumInput;
00200 
00202 
00203         vector<std::string> mFileNames;
00204 
00206 
00208         size_t mRdChunkSz;
00209 
00211 
00212         size_t mBufSz;
00213 
00215         vector <CPacketBuffer*> mPacketBuffers;
00216 
00218 
00220         vector <CFileReader*> mFileReaders;
00221 
00223 
00224         vector <X3cPacket *> mPktPtrs;
00225 
00227 
00228         int mOldestStream;
00229 
00231 
00232         UINT64 mOldestTs;
00233 
00235 
00236         UINT64 mNextTs;
00237 
00239 
00240         static const int mFHListSz = 512;
00241 
00242         //A filter on the packet count;
00243         CFrameFilter mPacketFilter;
00244 
00246     CTimeFilter mTimeFilter;
00247 
00248         UINT64 mPacketCounter;
00249 
00250 #ifdef INTERNAL_PACKET_HANDLING
00251 
00252         packet_handle_t *mPacketHandles;
00253 
00255         int *mFreeHandles;
00256 
00258         int mFHHd;
00259 
00261         int mFHTl;
00262 #endif
00263 };
00264 
00265 inline void CPacketReader::setPacketFilter(UINT64 start, UINT64 end)
00266 {
00267     mPacketFilter.setFilter(start, end);
00268 }
00269 
00270 inline void CPacketReader::setTimeFilter(CTimeFilter const& tFilter)
00271 {
00272     mTimeFilter = tFilter;
00273 }
00274 
00275 inline UINT64 CPacketReader::getPacketCount() const
00276 {
00277     return mPacketCounter;
00278 }
00279 
00280 #endif /* CPACKETREADER_H_ */
 All Classes Files Functions Variables Typedefs Friends Defines