Skip to content

Commit 6765a46

Browse files
authored
Merge pull request #3349 from eseiler/fix/contrib
[FIX] compressed stream's overflow
2 parents 472a80a + cdd4fe5 commit 6765a46

File tree

8 files changed

+735
-896
lines changed

8 files changed

+735
-896
lines changed

include/seqan3/contrib/stream/bgzf_istream.hpp

Lines changed: 160 additions & 141 deletions
Large diffs are not rendered by default.

include/seqan3/contrib/stream/bgzf_ostream.hpp

Lines changed: 106 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -38,31 +38,27 @@ namespace seqan3::contrib
3838
// Class basic_bgzf_ostreambuf
3939
// --------------------------------------------------------------------------
4040

41-
template<
42-
typename Elem,
43-
typename Tr = std::char_traits<Elem>,
44-
typename ElemA = std::allocator<Elem>,
45-
typename ByteT = char,
46-
typename ByteAT = std::allocator<ByteT>
47-
>
41+
template <typename Elem,
42+
typename Tr = std::char_traits<Elem>,
43+
typename ElemA = std::allocator<Elem>,
44+
typename ByteT = char,
45+
typename ByteAT = std::allocator<ByteT>>
4846
class basic_bgzf_ostreambuf : public std::basic_streambuf<Elem, Tr>
4947
{
5048
private:
51-
52-
typedef std::basic_ostream<Elem, Tr>& ostream_reference;
53-
typedef ElemA char_allocator_type;
54-
typedef ByteT byte_type;
55-
typedef ByteAT byte_allocator_type;
56-
typedef byte_type* byte_buffer_type;
57-
typedef ConcurrentQueue<size_t, Suspendable<Limit> > job_queue_type;
49+
typedef std::basic_ostream<Elem, Tr> & ostream_reference;
50+
typedef ElemA char_allocator_type;
51+
typedef ByteT byte_type;
52+
typedef ByteAT byte_allocator_type;
53+
typedef byte_type * byte_buffer_type;
54+
typedef ConcurrentQueue<size_t, Suspendable<Limit>> job_queue_type;
5855

5956
public:
60-
61-
typedef Tr traits_type;
62-
typedef typename traits_type::char_type char_type;
63-
typedef typename traits_type::int_type int_type;
64-
typedef typename traits_type::pos_type pos_type;
65-
typedef typename traits_type::off_type off_type;
57+
typedef Tr traits_type;
58+
typedef typename traits_type::char_type char_type;
59+
typedef typename traits_type::int_type int_type;
60+
typedef typename traits_type::pos_type pos_type;
61+
typedef typename traits_type::off_type off_type;
6662

6763
struct ScopedLock
6864
{
@@ -80,20 +76,19 @@ class basic_bgzf_ostreambuf : public std::basic_streambuf<Elem, Tr>
8076
// One compressed block.
8177
struct OutputBuffer
8278
{
83-
char buffer[DefaultPageSize<detail::bgzf_compression>::MAX_BLOCK_SIZE];
84-
size_t size;
79+
char buffer[DefaultPageSize<detail::bgzf_compression>::MAX_BLOCK_SIZE];
80+
size_t size;
8581
};
8682

8783
// Writes the output to the underlying stream when invoked.
8884
struct BufferWriter
8985
{
9086
ostream_reference ostream;
9187

92-
BufferWriter(ostream_reference ostream) :
93-
ostream(ostream)
88+
BufferWriter(ostream_reference ostream) : ostream(ostream)
9489
{}
9590

96-
bool operator() (OutputBuffer const & outputBuffer)
91+
bool operator()(OutputBuffer const & outputBuffer)
9792
{
9893
ostream.write(outputBuffer.buffer, outputBuffer.size);
9994
return ostream.good();
@@ -104,9 +99,9 @@ class basic_bgzf_ostreambuf : public std::basic_streambuf<Elem, Tr>
10499
{
105100
typedef std::vector<char_type, char_allocator_type> TBuffer;
106101

107-
TBuffer buffer;
108-
size_t size;
109-
OutputBuffer *outputBuffer;
102+
TBuffer buffer;
103+
size_t size;
104+
OutputBuffer * outputBuffer;
110105

111106
CompressionJob() :
112107
buffer(DefaultPageSize<detail::bgzf_compression>::VALUE / sizeof(char_type), 0),
@@ -116,25 +111,31 @@ class basic_bgzf_ostreambuf : public std::basic_streambuf<Elem, Tr>
116111
};
117112

118113
// string of recycable jobs
119-
size_t numThreads;
120-
size_t numJobs;
121-
std::vector<CompressionJob> jobs;
122-
job_queue_type jobQueue;
123-
job_queue_type idleQueue;
114+
size_t numThreads;
115+
size_t numJobs;
116+
std::vector<CompressionJob> jobs;
117+
job_queue_type jobQueue;
118+
job_queue_type idleQueue;
124119
Serializer<OutputBuffer, BufferWriter> serializer;
125-
size_t currentJobId;
126-
bool currentJobAvail;
120+
size_t currentJobId;
121+
bool currentJobAvail;
127122

128123
struct CompressionThread
129124
{
130-
basic_bgzf_ostreambuf *streamBuf;
125+
basic_bgzf_ostreambuf * streamBuf;
131126
CompressionContext<detail::bgzf_compression> compressionCtx;
132127

133128
void operator()()
134129
{
135-
ScopedLock readLock{[this] () mutable { unlockReading(this->streamBuf->jobQueue); }};
130+
ScopedLock readLock{[this]() mutable
131+
{
132+
unlockReading(this->streamBuf->jobQueue);
133+
}};
136134
// ScopedReadLock<TJobQueue> readLock(streamBuf->jobQueue);
137-
ScopedLock writeLock{[this] () mutable { unlockWriting(this->streamBuf->idleQueue); }};
135+
ScopedLock writeLock{[this]() mutable
136+
{
137+
unlockWriting(this->streamBuf->idleQueue);
138+
}};
138139
// ScopedWriteLock{obQueue> writeLock{str}amBuf->idleQueue);
139140

140141
// wait for a new job to become available
@@ -145,12 +146,14 @@ class basic_bgzf_ostreambuf : public std::basic_streambuf<Elem, Tr>
145146
if (!popFront(jobId, streamBuf->jobQueue))
146147
return;
147148

148-
CompressionJob &job = streamBuf->jobs[jobId];
149+
CompressionJob & job = streamBuf->jobs[jobId];
149150

150151
// compress block with zlib
151-
job.outputBuffer->size = _compressBlock(
152-
job.outputBuffer->buffer, sizeof(job.outputBuffer->buffer),
153-
&job.buffer[0], job.size, compressionCtx);
152+
job.outputBuffer->size = _compressBlock(job.outputBuffer->buffer,
153+
sizeof(job.outputBuffer->buffer),
154+
&job.buffer[0],
155+
job.size,
156+
compressionCtx);
154157

155158
success = releaseValue(streamBuf->serializer, job.outputBuffer);
156159
appendValue(streamBuf->idleQueue, jobId);
@@ -160,11 +163,9 @@ class basic_bgzf_ostreambuf : public std::basic_streambuf<Elem, Tr>
160163

161164
// array of worker threads
162165
// using TFuture = decltype(std::async(CompressionThread{nullptr, CompressionContext<BgzfFile>{}, static_cast<size_t>(0)}));
163-
std::vector<std::thread> pool;
166+
std::vector<std::thread> pool;
164167

165-
basic_bgzf_ostreambuf(ostream_reference ostream_,
166-
size_t numThreads = bgzf_thread_count,
167-
size_t jobsPerThread = 8) :
168+
basic_bgzf_ostreambuf(ostream_reference ostream_, size_t numThreads = bgzf_thread_count, size_t jobsPerThread = 8) :
168169
numThreads(numThreads),
169170
numJobs(numThreads * jobsPerThread),
170171
jobQueue(numJobs),
@@ -193,7 +194,7 @@ class basic_bgzf_ostreambuf : public std::basic_streambuf<Elem, Tr>
193194
currentJobAvail = popFront(currentJobId, idleQueue);
194195
assert(currentJobAvail);
195196

196-
CompressionJob &job = jobs[currentJobId];
197+
CompressionJob & job = jobs[currentJobId];
197198
job.outputBuffer = aquireValue(serializer);
198199
this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
199200
}
@@ -236,20 +237,20 @@ class basic_bgzf_ostreambuf : public std::basic_streambuf<Elem, Tr>
236237
int_type overflow(int_type c)
237238
{
238239
int w = static_cast<int>(this->pptr() - this->pbase());
239-
if (c != static_cast<int_type>(EOF))
240+
if (!Tr::eq_int_type(c, Tr::eof()))
240241
{
241242
*this->pptr() = c;
242243
++w;
243244
}
244245
if (compressBuffer(w))
245246
{
246-
CompressionJob &job = jobs[currentJobId];
247+
CompressionJob & job = jobs[currentJobId];
247248
this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
248-
return c;
249+
return Tr::not_eof(c);
249250
}
250251
else
251252
{
252-
return EOF;
253+
return Tr::eof();
253254
}
254255
}
255256

@@ -258,7 +259,7 @@ class basic_bgzf_ostreambuf : public std::basic_streambuf<Elem, Tr>
258259
int w = static_cast<int>(this->pptr() - this->pbase());
259260
if ((w != 0 || flushEmptyBuffer) && compressBuffer(w))
260261
{
261-
CompressionJob &job = jobs[currentJobId];
262+
CompressionJob & job = jobs[currentJobId];
262263
this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
263264
}
264265
else
@@ -292,42 +293,57 @@ class basic_bgzf_ostreambuf : public std::basic_streambuf<Elem, Tr>
292293
}
293294

294295
// returns a reference to the output stream
295-
ostream_reference get_ostream() const { return serializer.worker.ostream; };
296+
ostream_reference get_ostream() const
297+
{
298+
return serializer.worker.ostream;
299+
};
296300
};
297301

298302
// --------------------------------------------------------------------------
299303
// Class basic_bgzf_ostreambase
300304
// --------------------------------------------------------------------------
301305

302-
template<
303-
typename Elem,
304-
typename Tr = std::char_traits<Elem>,
305-
typename ElemA = std::allocator<Elem>,
306-
typename ByteT = char,
307-
typename ByteAT = std::allocator<ByteT>
308-
>
309-
class basic_bgzf_ostreambase : virtual public std::basic_ios<Elem,Tr>
306+
template <typename Elem,
307+
typename Tr = std::char_traits<Elem>,
308+
typename ElemA = std::allocator<Elem>,
309+
typename ByteT = char,
310+
typename ByteAT = std::allocator<ByteT>>
311+
class basic_bgzf_ostreambase : virtual public std::basic_ios<Elem, Tr>
310312
{
311313
public:
312-
typedef std::basic_ostream<Elem, Tr>& ostream_reference;
314+
typedef std::basic_ostream<Elem, Tr> & ostream_reference;
313315
typedef basic_bgzf_ostreambuf<Elem, Tr, ElemA, ByteT, ByteAT> bgzf_streambuf_type;
314316

315-
basic_bgzf_ostreambase(ostream_reference ostream_)
316-
: m_buf(ostream_)
317+
basic_bgzf_ostreambase(ostream_reference ostream_) : m_buf(ostream_)
317318
{
318-
this->init(&m_buf );
319+
this->init(&m_buf);
319320
};
320321

321322
// returns the underlying zip ostream object
322-
bgzf_streambuf_type* rdbuf() { return &m_buf; };
323+
bgzf_streambuf_type * rdbuf()
324+
{
325+
return &m_buf;
326+
};
323327
// returns the bgzf error state
324-
int get_zerr() const { return m_buf.get_err(); };
328+
int get_zerr() const
329+
{
330+
return m_buf.get_err();
331+
};
325332
// returns the uncompressed data crc
326-
long get_crc() const { return m_buf.get_crc(); };
333+
long get_crc() const
334+
{
335+
return m_buf.get_crc();
336+
};
327337
// returns the compressed data size
328-
long get_out_size() const { return m_buf.get_out_size(); };
338+
long get_out_size() const
339+
{
340+
return m_buf.get_out_size();
341+
};
329342
// returns the uncompressed data size
330-
long get_in_size() const { return m_buf.get_in_size(); };
343+
long get_in_size() const
344+
{
345+
return m_buf.get_in_size();
346+
};
331347

332348
private:
333349
bgzf_streambuf_type m_buf;
@@ -337,31 +353,31 @@ class basic_bgzf_ostreambase : virtual public std::basic_ios<Elem,Tr>
337353
// Class basic_bgzf_ostream
338354
// --------------------------------------------------------------------------
339355

340-
template<
341-
typename Elem,
342-
typename Tr = std::char_traits<Elem>,
343-
typename ElemA = std::allocator<Elem>,
344-
typename ByteT = char,
345-
typename ByteAT = std::allocator<ByteT>
346-
>
356+
template <typename Elem,
357+
typename Tr = std::char_traits<Elem>,
358+
typename ElemA = std::allocator<Elem>,
359+
typename ByteT = char,
360+
typename ByteAT = std::allocator<ByteT>>
347361
class basic_bgzf_ostream :
348-
public basic_bgzf_ostreambase<Elem,Tr,ElemA,ByteT,ByteAT>,
349-
public std::basic_ostream<Elem,Tr>
362+
public basic_bgzf_ostreambase<Elem, Tr, ElemA, ByteT, ByteAT>,
363+
public std::basic_ostream<Elem, Tr>
350364
{
351365
public:
352-
typedef basic_bgzf_ostreambase<Elem,Tr,ElemA,ByteT,ByteAT> bgzf_ostreambase_type;
353-
typedef std::basic_ostream<Elem,Tr> ostream_type;
354-
typedef ostream_type& ostream_reference;
366+
typedef basic_bgzf_ostreambase<Elem, Tr, ElemA, ByteT, ByteAT> bgzf_ostreambase_type;
367+
typedef std::basic_ostream<Elem, Tr> ostream_type;
368+
typedef ostream_type & ostream_reference;
355369

356370
basic_bgzf_ostream(ostream_reference ostream_) :
357371
bgzf_ostreambase_type(ostream_),
358372
ostream_type(bgzf_ostreambase_type::rdbuf())
359373
{}
360374

361375
// flush inner buffer and zipper buffer
362-
basic_bgzf_ostream<Elem,Tr>& flush()
376+
basic_bgzf_ostream<Elem, Tr> & flush()
363377
{
364-
ostream_type::flush(); this->rdbuf()->flush(); return *this;
378+
ostream_type::flush();
379+
this->rdbuf()->flush();
380+
return *this;
365381
};
366382

367383
~basic_bgzf_ostream()
@@ -371,10 +387,12 @@ class basic_bgzf_ostream :
371387

372388
private:
373389
static void put_long(ostream_reference out_, unsigned long x_);
374-
#ifdef _WIN32
375-
void _Add_vtordisp1() { } // Required to avoid VC++ warning C4250
376-
void _Add_vtordisp2() { } // Required to avoid VC++ warning C4250
377-
#endif
390+
# ifdef _WIN32
391+
void _Add_vtordisp1()
392+
{} // Required to avoid VC++ warning C4250
393+
void _Add_vtordisp2()
394+
{} // Required to avoid VC++ warning C4250
395+
# endif
378396
};
379397

380398
// ===========================================================================

include/seqan3/contrib/stream/bgzf_stream_util.hpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -171,8 +171,8 @@ inline TDestCapacity _compressBlock(TDestValue * dstBegin,
171171
TSourceLength srcLength,
172172
CompressionContext<detail::bgzf_compression> & ctx)
173173
{
174-
const size_t BLOCK_HEADER_LENGTH = DefaultPageSize<detail::bgzf_compression>::BLOCK_HEADER_LENGTH;
175-
const size_t BLOCK_FOOTER_LENGTH = DefaultPageSize<detail::bgzf_compression>::BLOCK_FOOTER_LENGTH;
174+
size_t const BLOCK_HEADER_LENGTH = DefaultPageSize<detail::bgzf_compression>::BLOCK_HEADER_LENGTH;
175+
size_t const BLOCK_FOOTER_LENGTH = DefaultPageSize<detail::bgzf_compression>::BLOCK_FOOTER_LENGTH;
176176

177177
assert(dstCapacity > BLOCK_HEADER_LENGTH + BLOCK_FOOTER_LENGTH);
178178
assert(sizeof(TDestValue) == 1u);
@@ -249,8 +249,8 @@ inline TDestCapacity _decompressBlock(TDestValue * dstBegin,
249249
TSourceLength srcLength,
250250
CompressionContext<detail::bgzf_compression> & ctx)
251251
{
252-
const size_t BLOCK_HEADER_LENGTH = DefaultPageSize<detail::bgzf_compression>::BLOCK_HEADER_LENGTH;
253-
const size_t BLOCK_FOOTER_LENGTH = DefaultPageSize<detail::bgzf_compression>::BLOCK_FOOTER_LENGTH;
252+
size_t const BLOCK_HEADER_LENGTH = DefaultPageSize<detail::bgzf_compression>::BLOCK_HEADER_LENGTH;
253+
size_t const BLOCK_FOOTER_LENGTH = DefaultPageSize<detail::bgzf_compression>::BLOCK_FOOTER_LENGTH;
254254

255255
assert(sizeof(TSourceValue) == 1u);
256256
assert(sizeof(unsigned) == 4u);

0 commit comments

Comments
 (0)