Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
301 changes: 160 additions & 141 deletions include/seqan3/contrib/stream/bgzf_istream.hpp

Large diffs are not rendered by default.

194 changes: 106 additions & 88 deletions include/seqan3/contrib/stream/bgzf_ostream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,31 +38,27 @@ namespace seqan3::contrib
// Class basic_bgzf_ostreambuf
// --------------------------------------------------------------------------

template<
typename Elem,
typename Tr = std::char_traits<Elem>,
typename ElemA = std::allocator<Elem>,
typename ByteT = char,
typename ByteAT = std::allocator<ByteT>
>
template <typename Elem,
typename Tr = std::char_traits<Elem>,
typename ElemA = std::allocator<Elem>,
typename ByteT = char,
typename ByteAT = std::allocator<ByteT>>
class basic_bgzf_ostreambuf : public std::basic_streambuf<Elem, Tr>
{
private:

typedef std::basic_ostream<Elem, Tr>& ostream_reference;
typedef ElemA char_allocator_type;
typedef ByteT byte_type;
typedef ByteAT byte_allocator_type;
typedef byte_type* byte_buffer_type;
typedef ConcurrentQueue<size_t, Suspendable<Limit> > job_queue_type;
typedef std::basic_ostream<Elem, Tr> & ostream_reference;
typedef ElemA char_allocator_type;
typedef ByteT byte_type;
typedef ByteAT byte_allocator_type;
typedef byte_type * byte_buffer_type;
typedef ConcurrentQueue<size_t, Suspendable<Limit>> job_queue_type;

public:

typedef Tr traits_type;
typedef typename traits_type::char_type char_type;
typedef typename traits_type::int_type int_type;
typedef typename traits_type::pos_type pos_type;
typedef typename traits_type::off_type off_type;
typedef Tr traits_type;
typedef typename traits_type::char_type char_type;
typedef typename traits_type::int_type int_type;
typedef typename traits_type::pos_type pos_type;
typedef typename traits_type::off_type off_type;

struct ScopedLock
{
Expand All @@ -80,20 +76,19 @@ class basic_bgzf_ostreambuf : public std::basic_streambuf<Elem, Tr>
// One compressed block.
struct OutputBuffer
{
char buffer[DefaultPageSize<detail::bgzf_compression>::MAX_BLOCK_SIZE];
size_t size;
char buffer[DefaultPageSize<detail::bgzf_compression>::MAX_BLOCK_SIZE];
size_t size;
};

// Writes the output to the underlying stream when invoked.
struct BufferWriter
{
ostream_reference ostream;

BufferWriter(ostream_reference ostream) :
ostream(ostream)
BufferWriter(ostream_reference ostream) : ostream(ostream)
{}

bool operator() (OutputBuffer const & outputBuffer)
bool operator()(OutputBuffer const & outputBuffer)
{
ostream.write(outputBuffer.buffer, outputBuffer.size);
return ostream.good();
Expand All @@ -104,9 +99,9 @@ class basic_bgzf_ostreambuf : public std::basic_streambuf<Elem, Tr>
{
typedef std::vector<char_type, char_allocator_type> TBuffer;

TBuffer buffer;
size_t size;
OutputBuffer *outputBuffer;
TBuffer buffer;
size_t size;
OutputBuffer * outputBuffer;

CompressionJob() :
buffer(DefaultPageSize<detail::bgzf_compression>::VALUE / sizeof(char_type), 0),
Expand All @@ -116,25 +111,31 @@ class basic_bgzf_ostreambuf : public std::basic_streambuf<Elem, Tr>
};

// string of recycable jobs
size_t numThreads;
size_t numJobs;
std::vector<CompressionJob> jobs;
job_queue_type jobQueue;
job_queue_type idleQueue;
size_t numThreads;
size_t numJobs;
std::vector<CompressionJob> jobs;
job_queue_type jobQueue;
job_queue_type idleQueue;
Serializer<OutputBuffer, BufferWriter> serializer;
size_t currentJobId;
bool currentJobAvail;
size_t currentJobId;
bool currentJobAvail;

struct CompressionThread
{
basic_bgzf_ostreambuf *streamBuf;
basic_bgzf_ostreambuf * streamBuf;
CompressionContext<detail::bgzf_compression> compressionCtx;

void operator()()
{
ScopedLock readLock{[this] () mutable { unlockReading(this->streamBuf->jobQueue); }};
ScopedLock readLock{[this]() mutable
{
unlockReading(this->streamBuf->jobQueue);
}};
// ScopedReadLock<TJobQueue> readLock(streamBuf->jobQueue);
ScopedLock writeLock{[this] () mutable { unlockWriting(this->streamBuf->idleQueue); }};
ScopedLock writeLock{[this]() mutable
{
unlockWriting(this->streamBuf->idleQueue);
}};
// ScopedWriteLock{obQueue> writeLock{str}amBuf->idleQueue);

// wait for a new job to become available
Expand All @@ -145,12 +146,14 @@ class basic_bgzf_ostreambuf : public std::basic_streambuf<Elem, Tr>
if (!popFront(jobId, streamBuf->jobQueue))
return;

CompressionJob &job = streamBuf->jobs[jobId];
CompressionJob & job = streamBuf->jobs[jobId];

// compress block with zlib
job.outputBuffer->size = _compressBlock(
job.outputBuffer->buffer, sizeof(job.outputBuffer->buffer),
&job.buffer[0], job.size, compressionCtx);
job.outputBuffer->size = _compressBlock(job.outputBuffer->buffer,
sizeof(job.outputBuffer->buffer),
&job.buffer[0],
job.size,
compressionCtx);

success = releaseValue(streamBuf->serializer, job.outputBuffer);
appendValue(streamBuf->idleQueue, jobId);
Expand All @@ -160,11 +163,9 @@ class basic_bgzf_ostreambuf : public std::basic_streambuf<Elem, Tr>

// array of worker threads
// using TFuture = decltype(std::async(CompressionThread{nullptr, CompressionContext<BgzfFile>{}, static_cast<size_t>(0)}));
std::vector<std::thread> pool;
std::vector<std::thread> pool;

basic_bgzf_ostreambuf(ostream_reference ostream_,
size_t numThreads = bgzf_thread_count,
size_t jobsPerThread = 8) :
basic_bgzf_ostreambuf(ostream_reference ostream_, size_t numThreads = bgzf_thread_count, size_t jobsPerThread = 8) :
numThreads(numThreads),
numJobs(numThreads * jobsPerThread),
jobQueue(numJobs),
Expand Down Expand Up @@ -193,7 +194,7 @@ class basic_bgzf_ostreambuf : public std::basic_streambuf<Elem, Tr>
currentJobAvail = popFront(currentJobId, idleQueue);
assert(currentJobAvail);

CompressionJob &job = jobs[currentJobId];
CompressionJob & job = jobs[currentJobId];
job.outputBuffer = aquireValue(serializer);
this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
}
Expand Down Expand Up @@ -236,20 +237,20 @@ class basic_bgzf_ostreambuf : public std::basic_streambuf<Elem, Tr>
int_type overflow(int_type c)
{
int w = static_cast<int>(this->pptr() - this->pbase());
if (c != static_cast<int_type>(EOF))
if (!Tr::eq_int_type(c, Tr::eof()))
{
*this->pptr() = c;
++w;
}
if (compressBuffer(w))
{
CompressionJob &job = jobs[currentJobId];
CompressionJob & job = jobs[currentJobId];
this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
return c;
return Tr::not_eof(c);
}
else
{
return EOF;
return Tr::eof();
}
}

Expand All @@ -258,7 +259,7 @@ class basic_bgzf_ostreambuf : public std::basic_streambuf<Elem, Tr>
int w = static_cast<int>(this->pptr() - this->pbase());
if ((w != 0 || flushEmptyBuffer) && compressBuffer(w))
{
CompressionJob &job = jobs[currentJobId];
CompressionJob & job = jobs[currentJobId];
this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
}
else
Expand Down Expand Up @@ -292,42 +293,57 @@ class basic_bgzf_ostreambuf : public std::basic_streambuf<Elem, Tr>
}

// returns a reference to the output stream
ostream_reference get_ostream() const { return serializer.worker.ostream; };
ostream_reference get_ostream() const
{
return serializer.worker.ostream;
};
};

// --------------------------------------------------------------------------
// Class basic_bgzf_ostreambase
// --------------------------------------------------------------------------

template<
typename Elem,
typename Tr = std::char_traits<Elem>,
typename ElemA = std::allocator<Elem>,
typename ByteT = char,
typename ByteAT = std::allocator<ByteT>
>
class basic_bgzf_ostreambase : virtual public std::basic_ios<Elem,Tr>
template <typename Elem,
typename Tr = std::char_traits<Elem>,
typename ElemA = std::allocator<Elem>,
typename ByteT = char,
typename ByteAT = std::allocator<ByteT>>
class basic_bgzf_ostreambase : virtual public std::basic_ios<Elem, Tr>
{
public:
typedef std::basic_ostream<Elem, Tr>& ostream_reference;
typedef std::basic_ostream<Elem, Tr> & ostream_reference;
typedef basic_bgzf_ostreambuf<Elem, Tr, ElemA, ByteT, ByteAT> bgzf_streambuf_type;

basic_bgzf_ostreambase(ostream_reference ostream_)
: m_buf(ostream_)
basic_bgzf_ostreambase(ostream_reference ostream_) : m_buf(ostream_)
{
this->init(&m_buf );
this->init(&m_buf);
};

// returns the underlying zip ostream object
bgzf_streambuf_type* rdbuf() { return &m_buf; };
bgzf_streambuf_type * rdbuf()
{
return &m_buf;
};
// returns the bgzf error state
int get_zerr() const { return m_buf.get_err(); };
int get_zerr() const
{
return m_buf.get_err();
};
// returns the uncompressed data crc
long get_crc() const { return m_buf.get_crc(); };
long get_crc() const
{
return m_buf.get_crc();
};
// returns the compressed data size
long get_out_size() const { return m_buf.get_out_size(); };
long get_out_size() const
{
return m_buf.get_out_size();
};
// returns the uncompressed data size
long get_in_size() const { return m_buf.get_in_size(); };
long get_in_size() const
{
return m_buf.get_in_size();
};

private:
bgzf_streambuf_type m_buf;
Expand All @@ -337,31 +353,31 @@ class basic_bgzf_ostreambase : virtual public std::basic_ios<Elem,Tr>
// Class basic_bgzf_ostream
// --------------------------------------------------------------------------

template<
typename Elem,
typename Tr = std::char_traits<Elem>,
typename ElemA = std::allocator<Elem>,
typename ByteT = char,
typename ByteAT = std::allocator<ByteT>
>
template <typename Elem,
typename Tr = std::char_traits<Elem>,
typename ElemA = std::allocator<Elem>,
typename ByteT = char,
typename ByteAT = std::allocator<ByteT>>
class basic_bgzf_ostream :
public basic_bgzf_ostreambase<Elem,Tr,ElemA,ByteT,ByteAT>,
public std::basic_ostream<Elem,Tr>
public basic_bgzf_ostreambase<Elem, Tr, ElemA, ByteT, ByteAT>,
public std::basic_ostream<Elem, Tr>
{
public:
typedef basic_bgzf_ostreambase<Elem,Tr,ElemA,ByteT,ByteAT> bgzf_ostreambase_type;
typedef std::basic_ostream<Elem,Tr> ostream_type;
typedef ostream_type& ostream_reference;
typedef basic_bgzf_ostreambase<Elem, Tr, ElemA, ByteT, ByteAT> bgzf_ostreambase_type;
typedef std::basic_ostream<Elem, Tr> ostream_type;
typedef ostream_type & ostream_reference;

basic_bgzf_ostream(ostream_reference ostream_) :
bgzf_ostreambase_type(ostream_),
ostream_type(bgzf_ostreambase_type::rdbuf())
{}

// flush inner buffer and zipper buffer
basic_bgzf_ostream<Elem,Tr>& flush()
basic_bgzf_ostream<Elem, Tr> & flush()
{
ostream_type::flush(); this->rdbuf()->flush(); return *this;
ostream_type::flush();
this->rdbuf()->flush();
return *this;
};

~basic_bgzf_ostream()
Expand All @@ -371,10 +387,12 @@ class basic_bgzf_ostream :

private:
static void put_long(ostream_reference out_, unsigned long x_);
#ifdef _WIN32
void _Add_vtordisp1() { } // Required to avoid VC++ warning C4250
void _Add_vtordisp2() { } // Required to avoid VC++ warning C4250
#endif
# ifdef _WIN32
void _Add_vtordisp1()
{} // Required to avoid VC++ warning C4250
void _Add_vtordisp2()
{} // Required to avoid VC++ warning C4250
# endif
};

// ===========================================================================
Expand Down
8 changes: 4 additions & 4 deletions include/seqan3/contrib/stream/bgzf_stream_util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ inline TDestCapacity _compressBlock(TDestValue * dstBegin,
TSourceLength srcLength,
CompressionContext<detail::bgzf_compression> & ctx)
{
const size_t BLOCK_HEADER_LENGTH = DefaultPageSize<detail::bgzf_compression>::BLOCK_HEADER_LENGTH;
const size_t BLOCK_FOOTER_LENGTH = DefaultPageSize<detail::bgzf_compression>::BLOCK_FOOTER_LENGTH;
size_t const BLOCK_HEADER_LENGTH = DefaultPageSize<detail::bgzf_compression>::BLOCK_HEADER_LENGTH;
size_t const BLOCK_FOOTER_LENGTH = DefaultPageSize<detail::bgzf_compression>::BLOCK_FOOTER_LENGTH;

assert(dstCapacity > BLOCK_HEADER_LENGTH + BLOCK_FOOTER_LENGTH);
assert(sizeof(TDestValue) == 1u);
Expand Down Expand Up @@ -249,8 +249,8 @@ inline TDestCapacity _decompressBlock(TDestValue * dstBegin,
TSourceLength srcLength,
CompressionContext<detail::bgzf_compression> & ctx)
{
const size_t BLOCK_HEADER_LENGTH = DefaultPageSize<detail::bgzf_compression>::BLOCK_HEADER_LENGTH;
const size_t BLOCK_FOOTER_LENGTH = DefaultPageSize<detail::bgzf_compression>::BLOCK_FOOTER_LENGTH;
size_t const BLOCK_HEADER_LENGTH = DefaultPageSize<detail::bgzf_compression>::BLOCK_HEADER_LENGTH;
size_t const BLOCK_FOOTER_LENGTH = DefaultPageSize<detail::bgzf_compression>::BLOCK_FOOTER_LENGTH;

assert(sizeof(TSourceValue) == 1u);
assert(sizeof(unsigned) == 4u);
Expand Down
Loading