Skip to content

Commit 7dbe94d

Browse files
committed
Move compression classes to separate file
Handle fstream construction/destruction in streambuf classes. This also avoids the need for storing pointers in the pword() array.
1 parent 3704d9c commit 7dbe94d

File tree

8 files changed

+472
-424
lines changed

8 files changed

+472
-424
lines changed

core/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ add_spt3g_library(core SHARED
1212
src/G3Data.cxx src/G3Vector.cxx src/G3Map.cxx src/G3Timestream.cxx
1313
src/G3Timesample.cxx
1414
src/G3TriggeredBuilder.cxx src/G3MultiFileWriter.cxx src/dataio.cxx
15-
src/crc32.c ${CORE_EXTRA_SRCS}
15+
src/compression.cxx src/crc32.c ${CORE_EXTRA_SRCS}
1616
src/G3NetworkSender.cxx src/G3SyslogLogger.cxx
1717
src/G3PipelineInfo.cxx src/G3Quat.cxx src/G3Units.cxx
1818
src/int_storage.cxx src/pybindings.cxx

core/include/core/G3Writer.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99
class G3Writer : public G3Module {
1010
public:
1111
G3Writer(std::string filename,
12-
std::vector<G3Frame::FrameType> streams={}, bool append=false);
12+
std::vector<G3Frame::FrameType> streams={}, bool append=false,
13+
size_t buffersize=1024*1024);
1314
// Writes to file <filename> all frames with types in <streams>.
1415
// If <streams> is empty (default), writes all frames.
1516
virtual ~G3Writer();

core/include/core/dataio.h

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -59,14 +59,6 @@ void
5959
g3_ostream_to_path(std::ostream &stream, const std::string &path, bool append=false,
6060
size_t buffersize=1024*1024, const std::string &ext=".g3");
6161

62-
/**
63-
* Flush the output file stream.
64-
*
65-
* @param stream A reference to the output stream, as configured by
66-
* g3_ostream_to_path.
67-
*/
68-
void g3_ostream_flush(std::ostream &stream);
69-
7062
/**
7163
* Clean up the output stream.
7264
*

core/src/G3MultiFileWriter.cxx

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ class G3MultiFileWriter : public G3Module {
88
public:
99
G3MultiFileWriter(boost::python::object filename,
1010
size_t size_limit,
11-
boost::python::object divide_on = boost::python::object());
11+
boost::python::object divide_on = boost::python::object(),
12+
size_t buffersize=1024*1024);
1213
~G3MultiFileWriter();
1314
void Process(G3FramePtr frame, std::deque<G3FramePtr> &out);
1415
std::string CurrentFile() { return current_filename_; }
@@ -19,6 +20,7 @@ class G3MultiFileWriter : public G3Module {
1920
boost::python::object filename_callback_;
2021
std::string current_filename_;
2122
size_t size_limit_;
23+
size_t buffersize_;
2224

2325
std::vector<G3Frame::FrameType> always_break_on_;
2426
boost::python::object newfile_callback_;
@@ -31,8 +33,8 @@ class G3MultiFileWriter : public G3Module {
3133
};
3234

3335
G3MultiFileWriter::G3MultiFileWriter(boost::python::object filename,
34-
size_t size_limit, boost::python::object divide_on)
35-
: size_limit_(size_limit), stream_(nullptr), seqno(0)
36+
size_t size_limit, boost::python::object divide_on, size_t buffersize)
37+
: size_limit_(size_limit), buffersize_(buffersize), stream_(nullptr), seqno(0)
3638
{
3739
boost::python::extract<std::string> fstr(filename);
3840

@@ -122,7 +124,7 @@ G3MultiFileWriter::CheckNewFile(G3FramePtr frame)
122124
}
123125

124126
current_filename_ = filename;
125-
g3_ostream_to_path(stream_, filename, false);
127+
g3_ostream_to_path(stream_, filename, false, buffersize_);
126128

127129
for (auto i = metadata_cache_.begin(); i != metadata_cache_.end(); i++)
128130
(*i)->saves(stream_);
@@ -192,8 +194,8 @@ PYBINDINGS("core") {
192194
"python callable as divide_on. This callable will be passed each "
193195
"frame in turn. If it returns True (or something with positive "
194196
"truth-value), a new file will be started at that frame.",
195-
init<object, size_t, optional<object> >((arg("filename"),
196-
arg("size_limit"), arg("divide_on")=object())))
197+
init<object, size_t, optional<object, size_t> >((arg("filename"),
198+
arg("size_limit"), arg("divide_on")=object(), arg("buffersize")=1024*1024)))
197199
.def_readonly("current_file", &G3MultiFileWriter::CurrentFile)
198200
.def_readonly("__g3module__", true)
199201
;

core/src/G3Writer.cxx

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@
44

55
G3Writer::G3Writer(std::string filename,
66
std::vector<G3Frame::FrameType> streams,
7-
bool append) :
7+
bool append, size_t buffersize) :
88
filename_(filename), stream_(nullptr), streams_(streams)
99
{
1010
g3_check_output_path(filename);
11-
g3_ostream_to_path(stream_, filename, append);
11+
g3_ostream_to_path(stream_, filename, append, buffersize);
1212
}
1313

1414
G3Writer::~G3Writer()
@@ -39,7 +39,7 @@ void G3Writer::Process(G3FramePtr frame, std::deque<G3FramePtr> &out)
3939
void G3Writer::Flush()
4040
{
4141
try {
42-
g3_ostream_flush(stream_);
42+
stream_.flush();
4343
} catch (...) {
4444
}
4545
}
@@ -56,9 +56,10 @@ PYBINDINGS("core") {
5656
"types to the second optional argument (streams). If no streams argument "
5757
"is given, writes all types of frames. If append is set to True, will "
5858
"append frames to its output file rather than overwriting it.",
59-
init<std::string, std::vector<G3Frame::FrameType>, bool>((arg("filename"),
60-
arg("streams")=std::vector<G3Frame::FrameType>(), arg("append")=false)))
61-
.def("Flush", &G3Writer::Flush)
62-
.def_readonly("__g3module__", true)
59+
init<std::string, std::vector<G3Frame::FrameType>, bool, size_t>((arg("filename"),
60+
arg("streams")=std::vector<G3Frame::FrameType>(), arg("append")=false,
61+
arg("buffersize")=1024*1024)))
62+
.def("Flush", &G3Writer::Flush)
63+
.def_readonly("__g3module__", true)
6364
;
6465
}

core/src/compression.cxx

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
#include "compression.h"
2+
3+
#ifdef ZLIB_FOUND
4+
GZipDecoder::GZipDecoder(const std::string& path, size_t size) : Decoder(path, size)
5+
{
6+
stream_.zalloc = Z_NULL;
7+
stream_.zfree = Z_NULL;
8+
stream_.opaque = Z_NULL;
9+
stream_.avail_in = 0;
10+
stream_.next_in = Z_NULL;
11+
if (inflateInit2(&stream_, 16 + MAX_WBITS) != Z_OK)
12+
log_fatal("Error initializing gzip decoder: %s", stream_.msg);
13+
}
14+
15+
GZipDecoder::~GZipDecoder()
16+
{
17+
inflateEnd(&stream_);
18+
}
19+
20+
int GZipDecoder::decode()
21+
{
22+
int ret = inflate(&stream_, Z_NO_FLUSH);
23+
if (ret != Z_OK && ret != Z_STREAM_END) {
24+
log_error("Error running gzip decoder: %s", stream_.msg);
25+
return ret;
26+
}
27+
return 0;
28+
}
29+
30+
GZipEncoder::GZipEncoder(const std::string& path, size_t size) : Encoder(path, size)
31+
{
32+
stream_.zalloc = Z_NULL;
33+
stream_.zfree = Z_NULL;
34+
stream_.opaque = Z_NULL;
35+
if (deflateInit2(&stream_, Z_DEFAULT_COMPRESSION, Z_DEFLATED,
36+
16 + MAX_WBITS, 8, Z_DEFAULT_STRATEGY) != Z_OK)
37+
log_fatal("Error initializing gzip encoder: %s", stream_.msg);
38+
}
39+
40+
GZipEncoder::~GZipEncoder()
41+
{
42+
sync();
43+
deflateEnd(&stream_);
44+
}
45+
46+
int GZipEncoder::encode(bool flush)
47+
{
48+
int ret = deflate(&stream_, flush ? Z_FINISH : Z_NO_FLUSH);
49+
if (ret == Z_STREAM_ERROR) {
50+
log_error("Error running gzip encoder: %s", stream_.msg);
51+
return ret;
52+
}
53+
return 0;
54+
}
55+
#endif
56+
57+
#ifdef BZIP2_FOUND
58+
BZip2Decoder::BZip2Decoder(const std::string& path, size_t size) : Decoder(path, size)
59+
{
60+
stream_.bzalloc = nullptr;
61+
stream_.bzfree = nullptr;
62+
stream_.opaque = nullptr;
63+
stream_.avail_in = 0;
64+
stream_.next_in = nullptr;
65+
if (BZ2_bzDecompressInit(&stream_, 0, 0) != BZ_OK)
66+
log_fatal("Error initializing bzip2 decoder");
67+
}
68+
69+
BZip2Decoder::~BZip2Decoder()
70+
{
71+
BZ2_bzDecompressEnd(&stream_);
72+
}
73+
74+
int BZip2Decoder::decode()
75+
{
76+
int ret = BZ2_bzDecompress(&stream_);
77+
if (ret != BZ_OK && ret != BZ_STREAM_END) {
78+
log_error("Error running bzip2 decoder");
79+
return ret;
80+
}
81+
return 0;
82+
}
83+
84+
BZip2Encoder::BZip2Encoder(const std::string& path, size_t size) : Encoder(path, size)
85+
{
86+
stream_.bzalloc = nullptr;
87+
stream_.bzfree = nullptr;
88+
stream_.opaque = nullptr;
89+
if (BZ2_bzCompressInit(&stream_, 9, 0, 0) != BZ_OK)
90+
log_fatal("Error initializing bzip2 encoder");
91+
}
92+
93+
BZip2Encoder::~BZip2Encoder()
94+
{
95+
sync();
96+
BZ2_bzCompressEnd(&stream_);
97+
}
98+
99+
int BZip2Encoder::encode(bool flush)
100+
{
101+
int ret = BZ2_bzCompress(&stream_, flush ? BZ_FINISH : BZ_RUN);
102+
if (ret == BZ_SEQUENCE_ERROR) {
103+
log_error("Error running bzip2 encoder");
104+
return ret;
105+
}
106+
return 0;
107+
}
108+
#endif
109+
110+
#ifdef LZMA_FOUND
111+
LZMADecoder::LZMADecoder(const std::string& path, size_t size) : Decoder(path, size)
112+
{
113+
stream_ = LZMA_STREAM_INIT;
114+
lzma_ret ret = lzma_stream_decoder(&stream_, UINT64_MAX,
115+
LZMA_CONCATENATED);
116+
if (ret != LZMA_OK)
117+
log_fatal("Error initializing LZMA decoder.");
118+
}
119+
120+
LZMADecoder::~LZMADecoder()
121+
{
122+
lzma_end(&stream_);
123+
}
124+
125+
int LZMADecoder::decode()
126+
{
127+
lzma_ret ret = lzma_code(&stream_, LZMA_RUN);
128+
if (ret != LZMA_OK && ret != LZMA_STREAM_END) {
129+
log_error("Error running LZMA decoder");
130+
return ret;
131+
}
132+
return 0;
133+
}
134+
135+
LZMAEncoder::LZMAEncoder(const std::string& path, size_t size) : Encoder(path, size)
136+
{
137+
stream_ = LZMA_STREAM_INIT;
138+
lzma_ret ret = lzma_easy_encoder(&stream_, LZMA_PRESET_DEFAULT,
139+
LZMA_CHECK_CRC64);
140+
if (ret != LZMA_OK)
141+
log_fatal("Error initializing LZMA encoder.");
142+
}
143+
144+
LZMAEncoder::~LZMAEncoder()
145+
{
146+
sync();
147+
lzma_end(&stream_);
148+
}
149+
150+
int LZMAEncoder::encode(bool flush)
151+
{
152+
lzma_action action = flush ? LZMA_FINISH : LZMA_RUN;
153+
lzma_ret ret = lzma_code(&stream_, action);
154+
if (ret != LZMA_OK && ret != LZMA_STREAM_END) {
155+
log_error("Error running LZMA encoder");
156+
return ret;
157+
}
158+
return 0;
159+
}
160+
#endif

0 commit comments

Comments
 (0)