Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions core/include/core/G3Reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ class G3Reader : public G3Module {
off_t Seek(off_t offset);
off_t Tell();

private:
void StartFile(std::string path);
protected:
virtual void StartFile(std::string path);
virtual G3FramePtr FillFrame();
bool prefix_file_;
std::string cur_file_;
std::deque<std::string> filename_;
std::shared_ptr<std::istream> stream_;
int fd_;
int n_frames_to_read_;
int n_frames_read_;
int n_frames_cur_;
Expand Down
2 changes: 1 addition & 1 deletion core/src/G3MultiFileWriter.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#include <string>
#include <G3Module.h>

#include <dataio.h>
#include "dataio.h"

class G3MultiFileWriter : public G3Module {
public:
Expand Down
17 changes: 12 additions & 5 deletions core/src/G3Reader.cxx
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
#include <pybindings.h>
#include <dataio.h>
#include "dataio.h"
#include <G3Reader.h>

G3Reader::G3Reader(std::string filename, int n_frames_to_read,
float timeout, bool track_filename, size_t buffersize) :
prefix_file_(false), n_frames_to_read_(n_frames_to_read),
prefix_file_(false), fd_(-1), n_frames_to_read_(n_frames_to_read),
n_frames_read_(0), n_frames_cur_(0), timeout_(timeout),
track_filename_(track_filename), buffersize_(buffersize)
{
Expand All @@ -14,7 +14,7 @@ G3Reader::G3Reader(std::string filename, int n_frames_to_read,

G3Reader::G3Reader(std::vector<std::string> filename, int n_frames_to_read,
float timeout, bool track_filename, size_t buffersize) :
prefix_file_(false), n_frames_to_read_(n_frames_to_read),
prefix_file_(false), fd_(-1), n_frames_to_read_(n_frames_to_read),
n_frames_read_(0), n_frames_cur_(0), timeout_(timeout),
track_filename_(track_filename), buffersize_(buffersize)
{
Expand All @@ -35,6 +35,14 @@ void G3Reader::StartFile(std::string path)
cur_file_ = path;
n_frames_cur_ = 0;
stream_ = g3_istream_from_path(path, timeout_, buffersize_);
fd_ = g3_istream_handle(stream_);
}

G3FramePtr G3Reader::FillFrame()
{
G3FramePtr frame(new G3Frame);
frame->loads(stream_);
return frame;
}

void G3Reader::Process(G3FramePtr frame, std::deque<G3FramePtr> &out)
Expand Down Expand Up @@ -82,9 +90,8 @@ void G3Reader::Process(G3FramePtr frame, std::deque<G3FramePtr> &out)
return;
}
}
frame = G3FramePtr(new G3Frame);
try {
frame->loads(stream_);
frame = FillFrame();
} catch (...) {
log_error("Exception raised while reading file %s",
cur_file_.c_str());
Expand Down
2 changes: 1 addition & 1 deletion core/src/G3Writer.cxx
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#include <pybindings.h>
#include <G3Writer.h>
#include <dataio.h>
#include "dataio.h"

G3Writer::G3Writer(std::string filename,
std::vector<G3Frame::FrameType> streams,
Expand Down
2 changes: 1 addition & 1 deletion core/src/dataio.cxx
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#include <G3Logging.h>
#include <dataio.h>
#include "dataio.h"

#include <boost/iostreams/filtering_stream.hpp>
#include <boost/iostreams/filter/gzip.hpp>
Expand Down
File renamed without changes.
173 changes: 73 additions & 100 deletions gcp/src/ARCFileReader.cxx
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
#include <pybindings.h>

#include <G3Module.h>
#include <G3Reader.h>
#include <G3TimeStamp.h>
#include <G3Map.h>
#include <G3Vector.h>
#include <G3Data.h>
#include <G3Units.h>
#include <dataio.h>
#include <gcp/Experiments.h>

#include <string.h>
Expand Down Expand Up @@ -76,21 +75,22 @@ enum {

#define REG_TYPEMASK 0xffe01

class ARCFileReader : public G3Module {
class ARCFileReader : public G3Reader {
public:
ARCFileReader(const std::string &path,
Experiment experiment=Experiment::SPT, bool track_filename=false);
Experiment experiment=Experiment::SPT, int n_frames_to_read=-1,
float timeout=-1., bool track_filename=false,
size_t buffersize=1024*1024);
ARCFileReader(const std::vector<std::string> & filename,
Experiment experiment=Experiment::SPT, bool track_filename=false);
virtual ~ARCFileReader() {}

void Process(G3FramePtr frame, std::deque<G3FramePtr> &out);
Experiment experiment=Experiment::SPT, int n_frames_to_read=-1,
float timeout=-1., bool track_filename=false,
size_t buffersize=1024*1024);

private:
void StartFile(const std::string & path);
void StartFile(std::string path) override;
G3FramePtr FillFrame() override;
void ReadHeader();

std::shared_ptr<std::istream> stream_;

struct block_stats {
int flags;
int mode;
Expand All @@ -108,80 +108,70 @@ class ARCFileReader : public G3Module {
bool has_string_flag_;
off_t frame_length_;
uint64_t ms_jiffie_base_;
int fd_;
int32_t revision_;

void ParseArrayMap(uint8_t *buffer, size_t size);
G3FrameObjectPtr GCPToFrameObject(uint8_t *buffer,
const struct block_stats &block, bool has_string_flag,
int depth = 0, int base_offset = 0);
std::deque<std::string> filename_;
std::string cur_file_;

Experiment experiment;
void SetExperiment(Experiment exp);
G3TimePtr GCPToTime(uint8_t *buffer, off_t offset);

bool track_filename_;

SET_LOGGER("ARCFileReader");
};


ARCFileReader::ARCFileReader(const std::string &path,
Experiment experiment, bool track_filename) :
experiment(experiment), track_filename_(track_filename)
ARCFileReader::ARCFileReader(const std::string &path, Experiment experiment,
int n_frames_to_read, float timeout, bool track_filename, size_t buffersize) :
G3Reader(path, n_frames_to_read, timeout, track_filename, buffersize)
{
if (experiment == Experiment::SPT || experiment == Experiment::BK) {
ms_jiffie_base_ = G3Units::ms;
} else if (experiment == Experiment::PB) {
ms_jiffie_base_ = 86400/INT_MAX;
} else {
log_fatal("Unrecognized Experiment");
}

g3_check_input_path(path);
StartFile(path);
SetExperiment(experiment);
ReadHeader();
}



ARCFileReader::ARCFileReader(const std::vector<std::string> &filename,
Experiment experiment, bool track_filename) :
experiment(experiment), track_filename_(track_filename)
Experiment experiment, int n_frames_to_read, float timeout, bool track_filename,
size_t buffersize) :
G3Reader(filename, n_frames_to_read, timeout, track_filename, buffersize)
{
SetExperiment(experiment);
ReadHeader();
}


void ARCFileReader::SetExperiment(Experiment exp)
{
experiment = exp;

if (experiment == Experiment::SPT || experiment == Experiment::BK) {
ms_jiffie_base_ = G3Units::ms;
} else if (experiment == Experiment::PB) {
ms_jiffie_base_ = 86400/INT_MAX;
} else {
log_fatal("Unrecognized Experiment");
}
}

if (filename.size() == 0)
log_fatal("Empty file list provided to G3Reader");

for (auto i = filename.begin(); i != filename.end(); i++){
g3_check_input_path(*i);
filename_.push_back(*i);
}

const std::string path = filename_.front();
filename_.pop_front();
StartFile(path);
void ARCFileReader::StartFile(std::string path)
{
// Open file, including whatever decompression/network access/etc.
// may be required
G3Reader::StartFile(path);

ReadHeader();
}


void ARCFileReader::StartFile(const std::string & path)
void ARCFileReader::ReadHeader()
{
int32_t size, opcode;
uint8_t *buffer;

// Open file, including whatever decompression/network access/etc.
// may be required
stream_ = g3_istream_from_path(path);
fd_ = g3_istream_handle(stream_);
cur_file_ = path;
revision_ = 0;
has_string_flag_ = false;

Expand All @@ -200,7 +190,7 @@ void ARCFileReader::StartFile(const std::string & path)

if (opcode != ARC_SIZE_RECORD)
log_fatal("No ARC_SIZE_RECORD at beginning of %s",
path.c_str());
cur_file_.c_str());
if ((fd_ < 0 && size != 4) || (fd_ >= 0 && size != 8))
log_fatal("Incorrectly sized ARC_SIZE_RECORD (%d)", size);
stream_->read((char *)&size, sizeof(size)); /* Skip size field */
Expand All @@ -214,19 +204,19 @@ void ARCFileReader::StartFile(const std::string & path)
opcode = ntohl(opcode);
if (opcode != ARC_ARRAYMAP_RECORD)
log_fatal("No ARC_ARRAYMAP_RECORD at beginning of %s",
path.c_str());
cur_file_.c_str());

buffer = new uint8_t[size];
stream_->read((char *)buffer, size);
if (stream_->eof()) {
delete [] buffer;
log_fatal("%s truncated; unable to read register map",
path.c_str());
cur_file_.c_str());
}
if (!stream_->good()) {
delete [] buffer;
log_fatal("Read error on %s while reading register map",
path.c_str());
cur_file_.c_str());
}

ParseArrayMap(buffer, size);
Expand Down Expand Up @@ -784,52 +774,34 @@ G3FrameObjectPtr ARCFileReader::GCPToFrameObject(uint8_t *buffer,
return (G3FrameObjectPtr());
}

void ARCFileReader::Process(G3FramePtr frame, std::deque<G3FramePtr> &out)
G3FramePtr ARCFileReader::FillFrame()
{
G3FramePtr outframe(new G3Frame(G3Frame::GcpSlow));
int32_t size, opcode;
uint8_t *buffer;

G3PythonContext ctx("ARCFileReader", false);

try {
while (stream_->peek() == EOF) {
if (filename_.size() > 0) {
const std::string path = filename_.front();
filename_.pop_front();
StartFile(path);
} else {
return;
}
}

stream_->read((char *)&size, sizeof(size));
size = ntohl(size) - 8;
stream_->read((char *)&opcode, sizeof(opcode));
opcode = ntohl(opcode);
if (opcode != ARC_FRAME_RECORD)
log_fatal("Message not an ARC_FRAME_RECORD mid-file");
if (fd_ >= 0) {
// network source (e.g. GCP) can support changing register selection on the fly
int32_t rev;
stream_->read((char *)&rev, sizeof(rev));
rev = ntohl(rev);
if (rev != revision_)
log_fatal("Frame regset revision %d does not match expected revision (%d)",
rev, revision_);
stream_->read((char *)&size, sizeof(size));
size = ntohl(size) - 8;
stream_->read((char *)&opcode, sizeof(opcode));
opcode = ntohl(opcode);
if (opcode != ARC_FRAME_RECORD)
log_fatal("Message not an ARC_FRAME_RECORD mid-file");
if (fd_ >= 0) {
// network source (e.g. GCP) can support changing register selection on the fly
int32_t rev;
stream_->read((char *)&rev, sizeof(rev));
rev = ntohl(rev);
if (rev != revision_)
log_fatal("Frame regset revision %d does not match expected revision (%d)",
rev, revision_);
stream_->read((char *)&size, sizeof(size));
size = ntohl(size);
}
if (size != frame_length_)
log_fatal("%zd-byte frame does not match expected length (%zd)",
(size_t)size, (size_t)frame_length_);
buffer = new uint8_t[size];
stream_->read((char *)buffer, size);

} catch (...) {
log_error("Exception raised while reading file %s", cur_file_.c_str());
throw;
size = ntohl(size);
}
if (size != frame_length_)
log_fatal("%zd-byte frame does not match expected length (%zd)",
(size_t)size, (size_t)frame_length_);
buffer = new uint8_t[size];
stream_->read((char *)buffer, size);

for (auto temp = array_map_.begin(); temp != array_map_.end(); temp++) {
G3MapFrameObjectPtr templ(new G3MapFrameObject);
Expand All @@ -846,28 +818,29 @@ void ARCFileReader::Process(G3FramePtr frame, std::deque<G3FramePtr> &out)
outframe->Put(temp->first, templ);
}

if (track_filename_)
outframe->_filename = cur_file_;
out.push_back(outframe);

delete [] buffer;

return outframe;
}

PYBINDINGS("gcp") {
using namespace boost::python;

// Instead of EXPORT_G3MODULE since there are two constructors
class_<ARCFileReader, bases<G3Module>, std::shared_ptr<ARCFileReader>,
class_<ARCFileReader, bases<G3Reader>, std::shared_ptr<ARCFileReader>,
boost::noncopyable>("ARCFileReader",
"Read GCP archive file (or files if you pass an iterable of paths). "
"For non-SPT ARC file formats, please set Experiment to the "
"appropriate value. Set track_filename to True to record the "
"filename for each frame in the ._filename attribute (fragile).",
init<std::string, Experiment, bool>((arg("filename"),
arg("experiment")=Experiment::SPT, arg("track_filename")=false)))
.def(init<std::vector<std::string>, Experiment, bool>((arg("filename"),
arg("experiment")=Experiment::SPT, arg("track_filename")=false)))
.def_readonly("__g3module__", true)
init<std::string, Experiment, int, float, bool, size_t>((arg("filename"),
arg("experiment")=Experiment::SPT, arg("n_frames_to_read")=0,
arg("timeout")=-1., arg("track_filename")=false,
arg("buffersize")=1024*1024)))
.def(init<std::vector<std::string>, Experiment, int, float, bool, size_t>(
(arg("filename"), arg("experiment")=Experiment::SPT,
arg("n_frames_to_read")=0, arg("timeout")=-1.,
arg("track_filename")=false, arg("buffersize")=1024*1024)))
;
}