Skip to content

Commit b7986ff

Browse files
committed
Move event management code outside of the critical section
1 parent 735739b commit b7986ff

File tree

3 files changed

+32
-28
lines changed

3 files changed

+32
-28
lines changed

src/io/CompressedInputStream.cpp

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -727,7 +727,7 @@ void CompressedInputStream::close()
727727
// Force subsequent reads to trigger submitBlock immediately
728728
_bufferThreshold = 0;
729729

730-
// Buffer cleanup: force error on any subsequent write attempt
730+
// Buffer cleanup: force error on any subsequent read attempt
731731
for (int i = 0; i < 2 * _jobs; i++) {
732732
if (_buffers[i]->_array != nullptr)
733733
delete[] _buffers[i]->_array;
@@ -773,6 +773,9 @@ template <class T>
773773
T DecodingTask<T>::run()
774774
{
775775
int blockId = _ctx.getInt("blockId");
776+
bool streamPerTask = _ctx.getInt("tasks") > 1;
777+
uint64 tType = _ctx.getLong("tType");
778+
short eType = short(_ctx.getInt("eType"));
776779

777780
// Lock free synchronization
778781
while (true) {
@@ -794,9 +797,6 @@ T DecodingTask<T>::run()
794797
EntropyDecoder* ed = nullptr;
795798
InputBitStream* ibs = nullptr;
796799
TransformSequence<byte>* transform = nullptr;
797-
bool streamPerTask = _ctx.getInt("tasks") > 1;
798-
uint64 tType = _ctx.getLong("tType");
799-
short eType = short(_ctx.getInt("eType"));
800800

801801
try {
802802
// Read shared bitstream sequentially (each task is gated by _processedBlockId)
@@ -816,11 +816,11 @@ T DecodingTask<T>::run()
816816
return T(*_data, blockId, 0, 0, Error::ERR_BLOCK_SIZE, "Invalid block size");
817817
}
818818

819-
const int r = int((read + 7) >> 3);
819+
const uint r = uint((read + 7) >> 3);
820820

821821
if (streamPerTask == true) {
822-
if (_data->_length < max(_blockLength, r)) {
823-
_data->_length = max(_blockLength, r);
822+
if (_data->_length < int(max(_blockLength, r))) {
823+
_data->_length = int(max(_blockLength, r));
824824
delete[] _data->_array;
825825
_data->_array = new byte[_data->_length];
826826
}
@@ -871,7 +871,8 @@ T DecodingTask<T>::run()
871871
const int length = dataSize << 3;
872872
const uint64 mask = (uint64(1) << length) - 1;
873873
const int preTransformLength = int(ibs->readBits(length) & mask);
874-
const int maxTransformSize = min(max(_blockLength + _blockLength / 2, 2048), CompressedInputStream::MAX_BITSTREAM_BLOCK_SIZE);
874+
const int maxTransformSize = int(min(max(_blockLength + _blockLength / 2, 2048u),
875+
uint(CompressedInputStream::MAX_BITSTREAM_BLOCK_SIZE)));
875876

876877
if ((preTransformLength <= 0) || (preTransformLength > maxTransformSize)) {
877878
// Error => cancel concurrent decoding tasks
@@ -911,7 +912,7 @@ T DecodingTask<T>::run()
911912
CompressedInputStream::notifyListeners(_listeners, evt2);
912913
}
913914

914-
const int bufferSize = max(_blockLength, preTransformLength + CompressedInputStream::EXTRA_BUFFER_SIZE);
915+
const int bufferSize = max(int(_blockLength), preTransformLength + CompressedInputStream::EXTRA_BUFFER_SIZE);
915916

916917
if (_buffer->_length < bufferSize) {
917918
_buffer->_length = bufferSize;

src/io/CompressedInputStream.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ namespace kanzi
110110
private:
111111
SliceArray<byte>* _data;
112112
SliceArray<byte>* _buffer;
113-
int _blockLength;
113+
uint _blockLength;
114114
DefaultInputBitStream* _ibs;
115115
XXHash32* _hasher32;
116116
XXHash64* _hasher64;

src/io/CompressedOutputStream.cpp

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -453,7 +453,9 @@ void CompressedOutputStream::processBuffer()
453453
#ifdef CONCURRENCY_ENABLED
454454
if (_futures[_bufferId].valid()) {
455455
EncodingTaskResult res = _futures[_bufferId].get();
456-
if (res._error != 0) throw IOException(res._msg, res._error);
456+
457+
if (res._error != 0)
458+
throw IOException(res._msg, res._error);
457459
}
458460
#endif
459461

@@ -554,6 +556,7 @@ ostream& CompressedOutputStream::put(char c)
554556
#ifdef CONCURRENCY_ENABLED
555557
if (_futures[_bufferId].valid()) {
556558
EncodingTaskResult res = _futures[_bufferId].get();
559+
557560
if (res._error != 0)
558561
throw IOException(res._msg, res._error);
559562
}
@@ -792,6 +795,7 @@ T EncodingTask<T>::run()
792795
ee = nullptr;
793796
obs.close();
794797
uint64 written = obs.written();
798+
const uint lw = (written < 8) ? 3 : uint(Global::log2(uint32(written >> 3)) + 4);
795799

796800
// Lock free synchronization
797801
while (true) {
@@ -807,6 +811,22 @@ T EncodingTask<T>::run()
807811
CPU_PAUSE();
808812
}
809813

814+
// Emit block size in bits (max size pre-entropy is 1 GB = 1 << 30 bytes)
815+
_obs->writeBits(lw - 3, 5); // write length-3 (5 bits max)
816+
_obs->writeBits(written, lw);
817+
818+
// Emit data to shared bitstream
819+
for (uint n = 0; written > 0; ) {
820+
uint chkSize = uint(min(written, uint64(1) << 30));
821+
_obs->writeBits(&_data->_array[n], chkSize);
822+
n += ((chkSize + 7) >> 3);
823+
written -= uint64(chkSize);
824+
}
825+
826+
// After completion of the entropy coding, increment the block id.
827+
// It unblocks the task processing the next block (if any).
828+
_processedBlockId->store(blockId, memory_order_release);
829+
810830
if (_listeners.size() > 0) {
811831
// Notify after entropy
812832
Event evt1(Event::AFTER_ENTROPY, blockId,
@@ -829,23 +849,6 @@ T EncodingTask<T>::run()
829849
#endif
830850
}
831851

832-
// Emit block size in bits (max size pre-entropy is 1 GB = 1 << 30 bytes)
833-
const uint lw = (written < 8) ? 3 : uint(Global::log2(uint32(written >> 3)) + 4);
834-
_obs->writeBits(lw - 3, 5); // write length-3 (5 bits max)
835-
_obs->writeBits(written, lw);
836-
837-
// Emit data to shared bitstream
838-
for (uint n = 0; written > 0; ) {
839-
uint chkSize = uint(min(written, uint64(1) << 30));
840-
_obs->writeBits(&_data->_array[n], chkSize);
841-
n += ((chkSize + 7) >> 3);
842-
written -= uint64(chkSize);
843-
}
844-
845-
// After completion of the entropy coding, increment the block id.
846-
// It unblocks the task processing the next block (if any).
847-
_processedBlockId->store(blockId, memory_order_release);
848-
849852
return T(blockId, 0, "Success");
850853
}
851854
catch (const exception& e) {

0 commit comments

Comments
 (0)