Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions core/include/core/G3Reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,31 @@ class G3Reader : public G3Module {
public:
G3Reader(const std::string &filename, int n_frames_to_read = -1,
float timeout = -1., bool track_filename = false,
size_t buffersize = 1024*1024);
size_t buffersize = 1024*1024, const std::string &ext=".g3");
G3Reader(const std::vector<std::string> &filenames, int n_frames_to_read = -1,
float timeout = -1., bool track_filename = false,
size_t buffersize = 1024*1024);
size_t buffersize = 1024*1024, const std::string &ext=".g3");

void Process(G3FramePtr frame, std::deque<G3FramePtr> &out);
off_t Seek(off_t offset);
off_t Tell();

private:
void StartFile(const std::string &path);
bool prefix_file_;
protected:
virtual G3FramePtr FillFrame();
std::string cur_file_;
std::deque<std::string> filename_;
std::istream stream_;
int n_frames_to_read_;
int n_frames_read_;
int n_frames_cur_;

private:
void StartFile(const std::string &path);
bool prefix_file_;
std::deque<std::string> filename_;
float timeout_;
bool track_filename_;
size_t buffersize_;
std::string ext_;

SET_LOGGER("G3Reader");
};
Expand Down
28 changes: 17 additions & 11 deletions core/src/G3Reader.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,19 @@
#include <G3Reader.h>

G3Reader::G3Reader(const std::string &filename, int n_frames_to_read,
float timeout, bool track_filename, size_t buffersize) :
prefix_file_(false), stream_(nullptr), n_frames_to_read_(n_frames_to_read),
n_frames_read_(0), n_frames_cur_(0), timeout_(timeout),
track_filename_(track_filename), buffersize_(buffersize)
float timeout, bool track_filename, size_t buffersize, const std::string &ext) :
stream_(nullptr), n_frames_to_read_(n_frames_to_read), n_frames_read_(0),
n_frames_cur_(0), prefix_file_(false), timeout_(timeout),
track_filename_(track_filename), buffersize_(buffersize), ext_(ext)
{
StartFile(filename);
}

G3Reader::G3Reader(const std::vector<std::string> &filename, int n_frames_to_read,
float timeout, bool track_filename, size_t buffersize) :
prefix_file_(false), stream_(nullptr), n_frames_to_read_(n_frames_to_read),
n_frames_read_(0), n_frames_cur_(0), timeout_(timeout),
track_filename_(track_filename), buffersize_(buffersize)
float timeout, bool track_filename, size_t buffersize, const std::string &ext) :
stream_(nullptr), n_frames_to_read_(n_frames_to_read), n_frames_read_(0),
n_frames_cur_(0), prefix_file_(false), timeout_(timeout),
track_filename_(track_filename), buffersize_(buffersize), ext_(ext)
{
if (filename.size() == 0)
log_fatal("Empty file list provided to G3Reader");
Expand All @@ -32,7 +32,14 @@ void G3Reader::StartFile(const std::string &path)
log_info("Starting file %s\n", path.c_str());
cur_file_ = path;
n_frames_cur_ = 0;
g3_istream_from_path(stream_, path, timeout_, buffersize_);
g3_istream_from_path(stream_, path, timeout_, buffersize_, ext_);
}

G3FramePtr G3Reader::FillFrame()
{
G3FramePtr frame(new G3Frame);
frame->loads(stream_);
return frame;
}

void G3Reader::Process(G3FramePtr frame, std::deque<G3FramePtr> &out)
Expand Down Expand Up @@ -80,9 +87,8 @@ void G3Reader::Process(G3FramePtr frame, std::deque<G3FramePtr> &out)
return;
}
}
frame = G3FramePtr(new G3Frame);
try {
frame->loads(stream_);
frame = FillFrame();
} catch (...) {
log_error("Exception raised while reading file %s",
cur_file_.c_str());
Expand Down
137 changes: 49 additions & 88 deletions gcp/src/ARCFileReader.cxx
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#include <pybindings.h>

#include <G3Module.h>
#include <G3Reader.h>
#include <G3TimeStamp.h>
#include <G3Map.h>
#include <G3Vector.h>
Expand Down Expand Up @@ -76,22 +76,21 @@ enum {

#define REG_TYPEMASK 0xffe01

class ARCFileReader : public G3Module {
class ARCFileReader : public G3Reader {
public:
ARCFileReader(const std::string &path,
Experiment experiment=Experiment::SPT, float timeout=-1.,
bool track_filename=false, size_t buffersize=1024*1024);
Experiment experiment=Experiment::SPT, int n_frames_to_read=-1,
float timeout=-1., bool track_filename=false,
size_t buffersize=1024*1024);
ARCFileReader(const std::vector<std::string> & filename,
Experiment experiment=Experiment::SPT, float timeout=-1.,
bool track_filename=false, size_t buffersize=1024*1024);

void Process(G3FramePtr frame, std::deque<G3FramePtr> &out);
Experiment experiment=Experiment::SPT, int n_frames_to_read=-1,
float timeout=-1., bool track_filename=false,
size_t buffersize=1024*1024);

private:
void StartFile(const std::string & path);
G3FramePtr FillFrame() override;
void ReadHeader();

std::istream stream_;

struct block_stats {
int flags;
int mode;
Expand All @@ -116,45 +115,28 @@ class ARCFileReader : public G3Module {
G3FrameObjectPtr GCPToFrameObject(uint8_t *buffer,
const struct block_stats &block,
int depth = 0, int base_offset = 0);
std::deque<std::string> filename_;
std::string cur_file_;

Experiment experiment;
void SetExperiment(Experiment exp);
G3TimePtr GCPToTime(uint8_t *buffer, off_t offset);

float timeout_;
bool track_filename_;
size_t buffersize_;

SET_LOGGER("ARCFileReader");
};


ARCFileReader::ARCFileReader(const std::string &path,
Experiment experiment, float timeout, bool track_filename, size_t buffersize) :
stream_(nullptr), timeout_(timeout), track_filename_(track_filename),
buffersize_(buffersize)
ARCFileReader::ARCFileReader(const std::string &path, Experiment experiment,
int n_frames_to_read, float timeout, bool track_filename, size_t buffersize) :
G3Reader(path, n_frames_to_read, timeout, track_filename, buffersize, ".dat")
{
SetExperiment(experiment);
StartFile(path);
}

ARCFileReader::ARCFileReader(const std::vector<std::string> &filename,
Experiment experiment, float timeout, bool track_filename, size_t buffersize) :
stream_(nullptr), timeout_(timeout), track_filename_(track_filename),
buffersize_(buffersize)
Experiment experiment, int n_frames_to_read, float timeout, bool track_filename,
size_t buffersize) :
G3Reader(filename, n_frames_to_read, timeout, track_filename, buffersize, ".dat")
{
SetExperiment(experiment);

if (filename.size() == 0)
log_fatal("Empty file list provided to G3Reader");

for (auto i = filename.begin(); i != filename.end(); i++)
filename_.push_back(*i);

StartFile(filename_.front());
filename_.pop_front();
}


Expand All @@ -171,16 +153,12 @@ void ARCFileReader::SetExperiment(Experiment exp)
}
}

void ARCFileReader::StartFile(const std::string & path)
void ARCFileReader::ReadHeader()
{
int32_t size, opcode;
uint8_t *buffer;

// Open file, including whatever decompression/network access/etc.
// may be required
g3_istream_from_path(stream_, path, timeout_, buffersize_, ".dat");
fd_ = g3_istream_handle(stream_);
cur_file_ = path;
revision_ = 0;
has_string_flag_ = false;

Expand Down Expand Up @@ -782,52 +760,37 @@ G3FrameObjectPtr ARCFileReader::GCPToFrameObject(uint8_t *buffer,
return (G3FrameObjectPtr());
}

void ARCFileReader::Process(G3FramePtr frame, std::deque<G3FramePtr> &out)
G3FramePtr ARCFileReader::FillFrame()
{
G3FramePtr outframe(new G3Frame(G3Frame::GcpSlow));
int32_t size, opcode;
uint8_t *buffer;

G3PythonContext ctx("ARCFileReader", false);

try {
while (stream_.peek() == EOF) {
if (filename_.size() > 0) {
const std::string path = filename_.front();
filename_.pop_front();
StartFile(path);
} else {
return;
}
}
if (n_frames_cur_ == 0)
ReadHeader();

stream_.read((char *)&size, sizeof(size));
size = ntohl(size) - 8;
stream_.read((char *)&opcode, sizeof(opcode));
opcode = ntohl(opcode);
if (opcode != ARC_FRAME_RECORD)
log_fatal("Message not an ARC_FRAME_RECORD mid-file");
if (fd_ >= 0) {
// network source (e.g. GCP) can support changing register selection on the fly
int32_t rev;
stream_.read((char *)&rev, sizeof(rev));
rev = ntohl(rev);
if (rev != revision_)
log_fatal("Frame regset revision %d does not match expected revision (%d)",
rev, revision_);
stream_.read((char *)&size, sizeof(size));
size = ntohl(size) - 8;
stream_.read((char *)&opcode, sizeof(opcode));
opcode = ntohl(opcode);
if (opcode != ARC_FRAME_RECORD)
log_fatal("Message not an ARC_FRAME_RECORD mid-file");
if (fd_ >= 0) {
// network source (e.g. GCP) can support changing register selection on the fly
int32_t rev;
stream_.read((char *)&rev, sizeof(rev));
rev = ntohl(rev);
if (rev != revision_)
log_fatal("Frame regset revision %d does not match expected revision (%d)",
rev, revision_);
stream_.read((char *)&size, sizeof(size));
size = ntohl(size);
}
if (size != frame_length_)
log_fatal("%zd-byte frame does not match expected length (%zd)",
(size_t)size, (size_t)frame_length_);
buffer = new uint8_t[size];
stream_.read((char *)buffer, size);

} catch (...) {
log_error("Exception raised while reading file %s", cur_file_.c_str());
throw;
size = ntohl(size);
}
if (size != frame_length_)
log_fatal("%zd-byte frame does not match expected length (%zd)",
(size_t)size, (size_t)frame_length_);
buffer = new uint8_t[size];
stream_.read((char *)buffer, size);

for (auto temp = array_map_.begin(); temp != array_map_.end(); temp++) {
G3MapFrameObjectPtr templ(new G3MapFrameObject);
Expand All @@ -844,31 +807,29 @@ void ARCFileReader::Process(G3FramePtr frame, std::deque<G3FramePtr> &out)
outframe->Put(temp->first, templ);
}

if (track_filename_)
outframe->_filename = cur_file_;
out.push_back(outframe);

delete [] buffer;

return outframe;
}

PYBINDINGS("gcp") {
using namespace boost::python;

// Instead of EXPORT_G3MODULE since there are two constructors
class_<ARCFileReader, bases<G3Module>, std::shared_ptr<ARCFileReader>,
class_<ARCFileReader, bases<G3Reader>, std::shared_ptr<ARCFileReader>,
boost::noncopyable>("ARCFileReader",
"Read GCP archive file (or files if you pass an iterable of paths). "
"For non-SPT ARC file formats, please set Experiment to the "
"appropriate value. Set track_filename to True to record the "
"filename for each frame in the ._filename attribute (fragile).",
init<std::string, Experiment, float, bool, size_t>((arg("filename"),
arg("experiment")=Experiment::SPT, arg("timeout")=-1.,
arg("track_filename")=false, arg("buffersize")=1024*1024)))
.def(init<std::vector<std::string>, Experiment, float, bool, size_t>(
(arg("filename"), arg("experiment")=Experiment::SPT,
init<std::string, Experiment, int, float, bool, size_t>((arg("filename"),
arg("experiment")=Experiment::SPT, arg("n_frames_to_read")=0,
arg("timeout")=-1., arg("track_filename")=false,
arg("buffersize")=1024*1024)))
.def_readonly("__g3module__", true)
.def(init<std::vector<std::string>, Experiment, int, float, bool, size_t>(
(arg("filename"), arg("experiment")=Experiment::SPT,
arg("n_frames_to_read")=0, arg("timeout")=-1.,
arg("track_filename")=false, arg("buffersize")=1024*1024)))
;
}

Loading