Skip to content

Commit 77fc08e

Browse files
committed
Rewrite the atomics code to be simpler, cleaner and more portable.
1 parent 91db63d commit 77fc08e

File tree

5 files changed

+92
-212
lines changed

5 files changed

+92
-212
lines changed

src/concurrent.hpp

Lines changed: 44 additions & 163 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ limitations under the License.
2222
#if __cplusplus >= 201103L || (defined(_MSC_VER) && _MSC_VER >= 1700)
2323
// C++ 11 (or partial)
2424
#include <atomic>
25+
#define HAVE_STD_ATOMICS 1
2526

2627
#ifndef CONCURRENCY_DISABLED
2728
#ifdef __clang__
@@ -37,7 +38,7 @@ limitations under the License.
3738
#endif
3839
#endif
3940
#else
40-
#define CONCURRENT_FALLBACK_ATOMICS
41+
#define HAVE_STD_ATOMICS 0
4142
#endif
4243

4344

@@ -204,171 +205,51 @@ class Task {
204205
#endif
205206

206207

207-
#ifdef CONCURRENT_FALLBACK_ATOMICS
208-
// ! Stubs for NON CONCURRENT USAGE !
209-
// Used when compiling for older C++ standards (C++98/03)
208+
#if HAVE_STD_ATOMICS
209+
typedef std::atomic_int atomic_int_t;
210210

211-
/*
212-
// Use enum instead of const int to prevent linkage issues
213-
enum fallback_memory_order {
214-
memory_order_relaxed = 0,
215-
memory_order_consume = 1,
216-
memory_order_acquire = 2,
217-
memory_order_release = 3,
218-
memory_order_acq_rel = 4,
219-
memory_order_seq_cst = 5
220-
};
221-
*/
222-
223-
224-
// Naming the class 'fallback_...' avoids conflicts if the macro
225-
// matches the class name recursively in some preprocessors.
226-
class fallback_atomic_int {
227-
private:
228-
volatile int _n;
229-
230-
// Disable copy constructor and assignment operator
231-
// (Atomics should not be copyable)
232-
fallback_atomic_int(const fallback_atomic_int&);
233-
fallback_atomic_int& operator=(const fallback_atomic_int&);
234-
235-
public:
236-
fallback_atomic_int(int n = 0) : _n(n) {}
237-
238-
// Assignment returns the value (int), NOT the object reference
239-
int operator=(int n)
240-
{
241-
_n = n;
242-
return n;
243-
}
244-
245-
operator int() const
246-
{
247-
return _n;
248-
}
249-
250-
int load(int mo = 0 /*memory_order_relaxed*/) const
251-
{
252-
(void)mo;
253-
return _n;
254-
}
255-
256-
void store(int n, int mo = 3 /*memory_order_release*/)
257-
{
258-
(void)mo;
259-
_n = n;
260-
}
261-
262-
// Postfix ++ (x++) returns OLD value
263-
int operator++(int)
264-
{
265-
int old = _n;
266-
_n++;
267-
return old;
268-
}
269-
270-
// Prefix ++ (++x) returns NEW value
271-
int operator++()
272-
{
273-
return ++_n;
274-
}
275-
276-
// Standard signature: takes int delta, returns OLD value
277-
int fetch_add(int delta, int mo = 5 /*memory_order_seq_cst*/)
278-
{
279-
(void)mo;
280-
int old = _n;
281-
_n += delta;
282-
return old;
283-
}
284-
285-
int fetch_sub(int delta, int mo = 5 /*memory_order_seq_cst*/)
286-
{
287-
(void)mo;
288-
int old = _n;
289-
_n -= delta;
290-
return old;
291-
}
211+
#define LOAD_ATOMIC(a) ((a).load(std::memory_order_acquire))
212+
#define STORE_ATOMIC(a, v) ((a).store((v), std::memory_order_release))
213+
#define EXCHANGE_ATOMIC(a, v) ((a).exchange((v), std::memory_order_acq_rel))
214+
#define FETCH_ADD_ATOMIC(a, v) ((a).fetch_add((v), std::memory_order_acq_rel))
215+
#define COMPARE_EXCHANGE_ATOMIC(obj, expected, desired) \
216+
((obj).compare_exchange_strong((expected), (desired), \
217+
std::memory_order_release, std::memory_order_acquire))
292218

293-
// CRITICAL FIX: Must update 'expected' on failure
294-
bool compare_exchange_strong(int& expected, int desired, int mo = 5 /*memory_order_seq_cst*/)
295-
{
296-
(void)mo;
297-
298-
if (_n == expected) {
299-
_n = desired;
300-
return true;
301-
} else {
302-
expected = _n;
303-
return false;
304-
}
305-
}
306-
307-
bool compare_exchange_weak(int& expected, int desired, int mo = 5 /*memory_order_seq_cst*/)
308-
{
309-
return compare_exchange_strong(expected, desired, mo);
310-
}
311-
};
312-
313-
class fallback_atomic_bool {
314-
private:
315-
volatile bool _b;
316-
317-
fallback_atomic_bool(const fallback_atomic_bool&);
318-
fallback_atomic_bool& operator=(const fallback_atomic_bool&);
319-
320-
public:
321-
fallback_atomic_bool(bool b = false) : _b(b) {}
322-
323-
bool operator=(bool b)
324-
{
325-
_b = b;
326-
return b;
327-
}
328-
329-
operator bool() const
330-
{
331-
return _b;
332-
}
333-
334-
bool load(int mo = 0 /*memory_order_relaxed*/) const
335-
{
336-
(void)mo;
337-
return _b;
338-
}
339-
340-
void store(bool b, int mo = 3 /*memory_order_release*/)
341-
{
342-
(void)mo;
343-
_b = b;
344-
}
345-
346-
bool exchange(bool val, int mo = 2 /*memory_order_acquire*/)
347-
{
348-
(void)mo;
349-
bool old = _b;
350-
_b = val;
351-
return old;
352-
}
353-
354-
bool compare_exchange_strong(bool& expected, bool desired, int mo = 5 /*memory_order_seq_cst*/)
355-
{
356-
(void)mo;
357-
if (_b == expected) {
358-
_b = desired;
359-
return true;
360-
} else {
361-
expected = _b;
362-
return false;
363-
}
364-
}
365-
};
366-
367-
#define ATOMIC_INT fallback_atomic_int
368-
#define ATOMIC_BOOL fallback_atomic_bool
369219
#else
370-
#define ATOMIC_INT std::atomic_int
371-
#define ATOMIC_BOOL std::atomic_bool
220+
221+
typedef int atomic_int_t;
222+
#define LOAD_ATOMIC(a) (a)
223+
#define STORE_ATOMIC(a, v) ((a) = (v))
224+
#define EXCHANGE_ATOMIC(a, v) exchange_atomic_int((a), (v))
225+
#define FETCH_ADD_ATOMIC(a, v) fetch_add_atomic_int((a), (v))
226+
#define COMPARE_EXCHANGE_ATOMIC(obj, expected, desired) \
227+
compare_exchange_fallback((obj), (expected), (desired))
228+
229+
inline int exchange_atomic_int(int& a, int v)
230+
{
231+
int old = a;
232+
a = v;
233+
return old;
234+
}
235+
236+
inline int fetch_add_atomic_int(int& a, int v)
237+
{
238+
int old = a;
239+
a += v;
240+
return old;
241+
}
242+
243+
inline bool compare_exchange_fallback(int& obj, int& expected, int desired)
244+
{
245+
if (obj == expected) {
246+
obj = desired;
247+
return true;
248+
} else {
249+
expected = obj; // update expected on failure
250+
return false;
251+
}
252+
}
372253

373254
#endif
374255

src/io/CompressedInputStream.cpp

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,8 @@ CompressedInputStream::CompressedInputStream(InputStream& is,
7676
_available = 0;
7777
_entropyType = EntropyDecoderFactory::getType(entropy.c_str()); // throws on error
7878
_transformType = TransformFactory<byte>::getType(transform.c_str()); // throws on error
79-
_initialized = false;
80-
_closed = false;
79+
_initialized = 0;
80+
_closed = 0;
8181
_gcount = 0;
8282
_ibs = new DefaultInputBitStream(is, DEFAULT_BUFFER_SIZE);
8383
_jobs = tasks;
@@ -152,8 +152,8 @@ CompressedInputStream::CompressedInputStream(InputStream& is, Context& ctx, bool
152152
_available = 0;
153153
_entropyType = EntropyDecoderFactory::NONE_TYPE;
154154
_transformType = TransformFactory<byte>::NONE_TYPE;
155-
_initialized = false;
156-
_closed = false;
155+
_initialized = 0;
156+
_closed = 0;
157157
_gcount = 0;
158158
_ibs = new DefaultInputBitStream(is, DEFAULT_BUFFER_SIZE);
159159
_jobs = tasks;
@@ -326,15 +326,15 @@ void CompressedInputStream::submitBlock(int bufferId)
326326
int CompressedInputStream::_get(int inc)
327327
{
328328
try {
329-
if (_initialized.load(memory_order_acquire) == false) {
329+
if (LOAD_ATOMIC(_initialized) == 0) {
330330
readHeader();
331331

332332
for (int i = 0; i < _jobs; i++)
333333
submitBlock(i);
334334
}
335335

336336
if (_available == 0) {
337-
if (_closed.load(memory_order_relaxed) == true)
337+
if (LOAD_ATOMIC(_closed) == 1)
338338
throw ios_base::failure("Stream closed");
339339

340340
DecodingTaskResult res;
@@ -427,7 +427,7 @@ istream& CompressedInputStream::read(char* data, streamsize length)
427427

428428
while (remaining > 0) {
429429
// Reuse _get(0) logic logic implicitly
430-
if (_initialized.load(memory_order_acquire) == false) {
430+
if (LOAD_ATOMIC(_initialized) == 0) {
431431
readHeader();
432432

433433
for (int i = 0; i < _jobs; i++)
@@ -507,7 +507,7 @@ istream& CompressedInputStream::read(char* data, streamsize length)
507507

508508
void CompressedInputStream::readHeader()
509509
{
510-
if (_initialized.exchange(true, memory_order_acquire))
510+
if (EXCHANGE_ATOMIC(_initialized, 1) == 1)
511511
return;
512512

513513
if (_headless == true)
@@ -695,11 +695,11 @@ bool CompressedInputStream::removeListener(Listener<Event>& bl)
695695

696696
void CompressedInputStream::close()
697697
{
698-
if (_closed.exchange(true, memory_order_relaxed))
698+
if (EXCHANGE_ATOMIC(_closed, 1) == 1)
699699
return;
700700

701701
// Signal to break the spin-loops in DecodingTask::run immediately.
702-
_blockId.store(CANCEL_TASKS_ID, memory_order_release);
702+
STORE_ATOMIC(_blockId, CANCEL_TASKS_ID);
703703

704704
#ifdef CONCURRENCY_ENABLED
705705
// We must ensure no thread is writing to _buffers before we delete them.
@@ -748,7 +748,7 @@ void CompressedInputStream::notifyListeners(vector<Listener<Event>*>& listeners,
748748
template <class T>
749749
DecodingTask<T>::DecodingTask(SliceArray<byte>* iBuffer, SliceArray<byte>* oBuffer,
750750
int blockSize, DefaultInputBitStream* ibs, XXHash32* hasher32, XXHash64* hasher64,
751-
ATOMIC_INT* processedBlockId, vector<Listener<Event>*>& listeners,
751+
atomic_int_t* processedBlockId, vector<Listener<Event>*>& listeners,
752752
const Context& ctx)
753753
: _listeners(listeners)
754754
, _ctx(ctx)
@@ -779,7 +779,7 @@ T DecodingTask<T>::run()
779779

780780
// Lock free synchronization
781781
while (true) {
782-
const int taskId = _processedBlockId->load(memory_order_acquire);
782+
const int taskId = LOAD_ATOMIC(*_processedBlockId);
783783

784784
if (taskId == CompressedInputStream::CANCEL_TASKS_ID) {
785785
// Skip, an error occurred
@@ -807,12 +807,12 @@ T DecodingTask<T>::run()
807807
uint64 read = _ibs->readBits(lr);
808808

809809
if (read == 0) {
810-
_processedBlockId->store(CompressedInputStream::CANCEL_TASKS_ID, memory_order_release);
810+
STORE_ATOMIC(*_processedBlockId, CompressedInputStream::CANCEL_TASKS_ID);
811811
return T(*_data, blockId, 0, 0, 0, "Success");
812812
}
813813

814814
if (read > (uint64(1) << 34)) {
815-
_processedBlockId->store(CompressedInputStream::CANCEL_TASKS_ID, memory_order_release);
815+
STORE_ATOMIC(*_processedBlockId, CompressedInputStream::CANCEL_TASKS_ID);
816816
return T(*_data, blockId, 0, 0, Error::ERR_BLOCK_SIZE, "Invalid block size");
817817
}
818818

@@ -835,7 +835,7 @@ T DecodingTask<T>::run()
835835

836836
// After completion of the bitstream reading, increment the block id.
837837
// It unblocks the task processing the next block (if any)
838-
_processedBlockId->store(blockId, memory_order_release);
838+
STORE_ATOMIC(*_processedBlockId, blockId);
839839

840840
const int from = _ctx.getInt("from", 1);
841841
const int to = _ctx.getInt("to", CompressedInputStream::MAX_BLOCK_ID);
@@ -876,7 +876,7 @@ T DecodingTask<T>::run()
876876

877877
if ((preTransformLength <= 0) || (preTransformLength > maxTransformSize)) {
878878
// Error => cancel concurrent decoding tasks
879-
_processedBlockId->store(CompressedInputStream::CANCEL_TASKS_ID, memory_order_release);
879+
STORE_ATOMIC(*_processedBlockId, CompressedInputStream::CANCEL_TASKS_ID);
880880
stringstream ss;
881881
ss << "Invalid compressed block length: " << preTransformLength;
882882

@@ -933,7 +933,7 @@ T DecodingTask<T>::run()
933933
if (ed->decode(_buffer->_array, 0, preTransformLength) != preTransformLength) {
934934
// Error => cancel concurrent decoding tasks
935935
delete ed;
936-
_processedBlockId->store(CompressedInputStream::CANCEL_TASKS_ID, memory_order_release);
936+
STORE_ATOMIC(*_processedBlockId, CompressedInputStream::CANCEL_TASKS_ID);
937937
return T(*_data, blockId, 0, checksum1, Error::ERR_PROCESS_BLOCK,
938938
"Entropy decoding failed");
939939
}
@@ -998,8 +998,8 @@ T DecodingTask<T>::run()
998998
}
999999
catch (const exception& e) {
10001000
// Make sure to unfreeze next block
1001-
if (_processedBlockId->load(memory_order_acquire) == blockId - 1)
1002-
_processedBlockId->store(blockId, memory_order_release);
1001+
int curBlockId = blockId - 1;
1002+
COMPARE_EXCHANGE_ATOMIC(*_processedBlockId, curBlockId, blockId);
10031003

10041004
if (transform != nullptr)
10051005
delete transform;

0 commit comments

Comments
 (0)