Skip to content

Commit e3f9a67

Browse files
committed
Add gzip implementation
1 parent f124112 commit e3f9a67

File tree

1 file changed

+142
-65
lines changed

1 file changed

+142
-65
lines changed

core/src/dataio.cxx

Lines changed: 142 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -156,56 +156,112 @@ class RemoteInputStreamBuffer : public std::streambuf {
156156
std::vector<char> buffer_;
157157
};
158158

159-
class GZipDecompressor : public std::streambuf {
159+
class Decompressor : public std::streambuf {
160+
public:
161+
Decompressor(std::istream& file, size_t size)
162+
: file_(file), inbuf_(size), outbuf_(size) {
163+
setg(outbuf_.data(), outbuf_.data(), outbuf_.data());
164+
}
165+
166+
protected:
167+
std::streamsize xsgetn(char* s, std::streamsize n) {
168+
std::streamsize n_read = 0;
169+
while (n_read < n) {
170+
if (gptr() == egptr()) {
171+
if (underflow() == traits_type::eof())
172+
break;
173+
}
174+
175+
std::streamsize remaining = n - n_read;
176+
std::streamsize available = egptr() - gptr();
177+
std::streamsize to_read = std::min(remaining, available);
178+
179+
std::memcpy(s + n_read, gptr(), to_read);
180+
gbump(to_read);
181+
n_read += to_read;
182+
}
183+
return n_read;
184+
}
185+
186+
std::istream& file_;
187+
std::vector<char> inbuf_;
188+
std::vector<char> outbuf_;
189+
};
190+
191+
class GZipDecompressor : public Decompressor {
160192
public:
161193
GZipDecompressor(std::istream& file, size_t size)
194+
: Decompressor(file, size)
162195
#ifdef ZLIB_FOUND
163-
: file_(file), buffer_(size) {
164-
(void) file_;
165-
log_fatal("Not implmented");
196+
{
197+
zstream_.zalloc = Z_NULL;
198+
zstream_.zfree = Z_NULL;
199+
zstream_.opaque = Z_NULL;
200+
zstream_.avail_in = 0;
201+
zstream_.next_in = Z_NULL;
202+
inflateInit2(&zstream_, 16 + MAX_WBITS);
203+
}
204+
205+
~GZipDecompressor() {
206+
inflateEnd(&zstream_);
207+
}
208+
209+
protected:
210+
int_type underflow() {
211+
if (gptr() < egptr())
212+
return traits_type::to_int_type(*gptr());
213+
214+
zstream_.avail_in = file_.read(inbuf_.data(), inbuf_.size()).gcount();
215+
if (zstream_.avail_in <= 0)
216+
return traits_type::eof();
217+
zstream_.next_in = reinterpret_cast<unsigned char*>(inbuf_.data());
218+
219+
zstream_.avail_out = outbuf_.size();
220+
zstream_.next_out = reinterpret_cast<unsigned char*>(outbuf_.data());
221+
222+
int ret = inflate(&zstream_, Z_NO_FLUSH);
223+
if (ret != Z_OK && ret != Z_STREAM_END)
224+
return traits_type::eof();
225+
226+
setg(outbuf_.data(), outbuf_.data(),
227+
outbuf_.data() + outbuf_.size() - zstream_.avail_out);
228+
return traits_type::to_int_type(*gptr());
166229
}
167230

168231
private:
169-
std::istream& file_;
170-
std::vector<char> buffer_;
232+
z_stream zstream_;
171233
#else
172234
{
173235
log_fatal("Library not compiled with gzip support.");
174236
}
175237
#endif
176238
};
177239

178-
class BZip2Decompressor : public std::streambuf {
240+
class BZip2Decompressor : public Decompressor {
179241
public:
180242
BZip2Decompressor(std::istream& file, size_t size)
243+
: Decompressor(file, size)
181244
#ifdef BZIP2_FOUND
182-
: file_(file), buffer_(size) {
245+
{
183246
(void) file_;
184247
log_fatal("Not implmented");
185248
}
186-
187-
private:
188-
std::istream& file_;
189-
std::vector<char> buffer_;
190249
#else
191250
{
192251
log_fatal("Library not compiled with bzip2 support.");
193252
}
194253
#endif
195254
};
196255

197-
class LZMADecompressor : public std::streambuf {
256+
class LZMADecompressor : public Decompressor {
198257
public:
199258
LZMADecompressor(std::istream& file, size_t size)
259+
: Decompressor(file, size)
200260
#ifdef LZMA_FOUND
201-
: file_(file), buffer_(size) {
261+
{
202262
(void) file_;
203263
log_fatal("Not implmented");
204264
}
205-
206-
private:
207-
std::istream& file_;
208-
std::vector<char> buffer_;
209265
#else
210266
{
211267
log_fatal("Library not compiled with LZMA support.");
@@ -234,31 +290,30 @@ g3_istream_from_path(std::istream &stream, const std::string &path, float timeou
234290
log_fatal("Could not open file %s", path.c_str());
235291
}
236292

237-
// Read buffer
238-
fbuf = new std::vector<char>(buffersize);
239-
file->rdbuf()->pubsetbuf(fbuf->data(), buffersize);
240-
241293
if (path.size() > 3 && !path.compare(path.size() - 3, 3, ".gz")) {
242294
sbuf = new GZipDecompressor(*file, buffersize);
243295
} else if (path.size() > 4 && !path.compare(path.size() - 4, 4, ".bz2")) {
244296
sbuf = new BZip2Decompressor(*file, buffersize);
245297
} else if (path.size() > 3 && !path.compare(path.size() - 3, 3, ".xz")) {
246298
sbuf = new LZMADecompressor(*file, buffersize);
247299
} else {
300+
// Read buffer
301+
fbuf = new std::vector<char>(buffersize);
302+
file->rdbuf()->pubsetbuf(fbuf->data(), buffersize);
248303
sbuf = file->rdbuf();
249304
}
250305
}
251306

252307
stream.rdbuf(sbuf);
253308
stream.pword(0) = file;
254-
stream.pword(1) = fbuf;
255-
stream.pword(2) = sbuf;
309+
stream.pword(1) = sbuf;
310+
stream.pword(2) = fbuf;
256311
}
257312

258313
int
259314
g3_istream_handle(std::istream &stream)
260315
{
261-
std::streambuf* sbuf = static_cast<std::streambuf*>(stream.pword(2));
316+
std::streambuf* sbuf = static_cast<std::streambuf*>(stream.pword(1));
262317
if (!sbuf)
263318
return -1;
264319
RemoteInputStreamBuffer* rbuf = dynamic_cast<RemoteInputStreamBuffer*>(sbuf);
@@ -271,15 +326,15 @@ void
271326
g3_istream_close(std::istream &stream)
272327
{
273328
std::ifstream* file = static_cast<std::ifstream*>(stream.pword(0));
274-
std::vector<char>* fbuf = static_cast<std::vector<char>*>(stream.pword(1));
275-
std::streambuf* sbuf = static_cast<std::streambuf*>(stream.pword(2));
276-
277-
if (sbuf && (!file || sbuf != file->rdbuf()))
278-
delete sbuf;
279-
stream.pword(2) = nullptr;
329+
std::streambuf* sbuf = static_cast<std::streambuf*>(stream.pword(1));
330+
std::vector<char>* fbuf = static_cast<std::vector<char>*>(stream.pword(2));
280331

281332
if (fbuf)
282333
delete fbuf;
334+
stream.pword(2) = nullptr;
335+
336+
if (sbuf && (!file || sbuf != file->rdbuf()))
337+
delete sbuf;
283338
stream.pword(1) = nullptr;
284339

285340
if (file)
@@ -292,41 +347,12 @@ g3_istream_close(std::istream &stream)
292347
class Compressor : public std::streambuf {
293348
public:
294349
Compressor(std::ostream &file, size_t size)
295-
: file_(file), buffer_(size), idx_(0) {}
350+
: file_(file), inbuf_(size), outbuf_(size) {}
296351

297352
protected:
298-
int_type overflow(int_type c) {
299-
if (c == traits_type::eof())
300-
return sync() == 0 ? 0 : traits_type::eof();
301-
302-
buffer_[idx_] = traits_type::to_char_type(c);
303-
idx_++;
304-
if (idx_ == buffer_.size()) {
305-
if (sync())
306-
return traits_type::eof();
307-
}
308-
return c;
309-
}
310-
311-
int sync() {
312-
if (idx_ > 0) {
313-
if (compress())
314-
return -1;
315-
idx_ = 0;
316-
}
317-
318-
if (finalize())
319-
return -1;
320-
321-
return 0;
322-
}
323-
324-
virtual int compress() { return 0; };
325-
virtual int finalize() { return 0; };
326-
327353
std::ostream &file_;
328-
std::vector<char> buffer_;
329-
size_t idx_;
354+
std::vector<char> inbuf_;
355+
std::vector<char> outbuf_;
330356
};
331357

332358
class GZipCompressor : public Compressor {
@@ -335,9 +361,60 @@ class GZipCompressor : public Compressor {
335361
: Compressor(file, size)
336362
#ifdef ZLIB_FOUND
337363
{
338-
(void) file_;
339-
log_fatal("Not implemented");
364+
zstream_.zalloc = Z_NULL;
365+
zstream_.zfree = Z_NULL;
366+
zstream_.opaque = Z_NULL;
367+
deflateInit2(&zstream_, Z_DEFAULT_COMPRESSION, Z_DEFLATED,
368+
16 + MAX_WBITS, 8, Z_DEFAULT_STRATEGY);
369+
}
370+
371+
~GZipCompressor() {
372+
deflateEnd(&zstream_);
340373
}
374+
375+
protected:
376+
int_type overflow(int_type c) {
377+
if (pptr() && pbase()) {
378+
std::streamsize len = pptr() - pbase();
379+
zstream_.avail_in = len;
380+
zstream_.next_in = reinterpret_cast<unsigned char*>(pbase());
381+
compress();
382+
}
383+
if (c != traits_type::eof()) {
384+
inbuf_[0] = traits_type::to_char_type(c);
385+
zstream_.avail_in = 1;
386+
zstream_.next_in = reinterpret_cast<unsigned char*>(inbuf_.data());
387+
compress();
388+
}
389+
setp(inbuf_.data(), inbuf_.data() + inbuf_.size());
390+
return traits_type::not_eof(c);
391+
}
392+
393+
std::streamsize xsputn(const char* s, std::streamsize n) {
394+
zstream_.avail_in = n;
395+
zstream_.next_in = reinterpret_cast<unsigned char*>(const_cast<char*>(s));
396+
compress();
397+
return n;
398+
}
399+
400+
int sync() {
401+
zstream_.avail_in = 0;
402+
compress(Z_FINISH);
403+
file_.flush();
404+
return 0;
405+
}
406+
407+
private:
408+
void compress(int flush=Z_NO_FLUSH) {
409+
do {
410+
zstream_.avail_out = outbuf_.size();
411+
zstream_.next_out = reinterpret_cast<unsigned char*>(outbuf_.data());
412+
deflate(&zstream_, flush);
413+
file_.write(outbuf_.data(), outbuf_.size() - zstream_.avail_out);
414+
} while (zstream_.avail_out == 0);
415+
}
416+
417+
z_stream zstream_;
341418
#else
342419
{
343420
log_fatal("Library not implemented with gzip support.");

0 commit comments

Comments
 (0)