Skip to content

Commit 6683867

Browse files
committed
Bugfix: Removed deadlock when shutting down active Reader
1 parent 9c1cfc0 commit 6683867

File tree

5 files changed

+63
-23
lines changed

5 files changed

+63
-23
lines changed

include/osmium/io/detail/queue_util.hpp

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -101,13 +101,11 @@ namespace osmium {
101101
class queue_wrapper {
102102

103103
future_queue_type<T>& m_queue;
104-
bool m_has_reached_end_of_data;
105104

106105
public:
107106

108107
explicit queue_wrapper(future_queue_type<T>& queue) :
109-
m_queue(queue),
110-
m_has_reached_end_of_data(false) {
108+
m_queue(queue) {
111109
}
112110

113111
queue_wrapper(const queue_wrapper&) = delete;
@@ -117,33 +115,30 @@ namespace osmium {
117115
queue_wrapper& operator=(queue_wrapper&&) = delete;
118116

119117
~queue_wrapper() noexcept {
120-
drain();
118+
try {
119+
shutdown();
120+
} catch (...) {
121+
}
121122
}
122123

123-
void drain() {
124-
while (!m_queue.empty()) {
125-
try {
126-
std::future<T> data_future;
127-
m_queue.try_pop(data_future);
128-
} catch (...) {
129-
// Ignore any exceptions.
130-
}
131-
}
124+
void shutdown() {
125+
m_queue.shutdown();
132126
}
133127

134128
bool has_reached_end_of_data() const noexcept {
135-
return m_has_reached_end_of_data;
129+
return !m_queue.in_use();
136130
}
137131

138132
T pop() {
139133
T data;
140-
if (!m_has_reached_end_of_data) {
134+
if (m_queue.in_use()) {
141135
std::future<T> data_future;
142136
m_queue.wait_and_pop(data_future);
143-
assert(data_future.valid());
144-
data = std::move(data_future.get());
145-
if (at_end_of_data(data)) {
146-
m_has_reached_end_of_data = true;
137+
if (data_future.valid()) {
138+
data = std::move(data_future.get());
139+
if (at_end_of_data(data)) {
140+
m_queue.shutdown();
141+
}
147142
}
148143
}
149144
return data;

include/osmium/io/detail/write_thread.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ namespace osmium {
9797
} catch (...) {
9898
m_notification->store(true);
9999
m_promise.set_exception(std::current_exception());
100-
m_queue.drain();
100+
m_queue.shutdown();
101101
}
102102
}
103103

include/osmium/io/reader.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,7 @@ namespace osmium {
386386

387387
m_read_thread_manager.stop();
388388

389-
m_osmdata_queue_wrapper.drain();
389+
m_osmdata_queue_wrapper.shutdown();
390390

391391
try {
392392
m_read_thread_manager.close();

include/osmium/thread/queue.hpp

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ DEALINGS IN THE SOFTWARE.
3333
3434
*/
3535

36+
#include <atomic>
3637
#include <chrono>
3738
#include <condition_variable>
3839
#include <cstddef>
@@ -42,7 +43,6 @@ DEALINGS IN THE SOFTWARE.
4243
#include <utility> // IWYU pragma: keep
4344

4445
#ifdef OSMIUM_DEBUG_QUEUE_SIZE
45-
# include <atomic>
4646
# include <iostream>
4747
#endif
4848

@@ -73,6 +73,8 @@ namespace osmium {
7373
/// Used to signal producers when queue is not full.
7474
std::condition_variable m_space_available;
7575

76+
std::atomic<bool> m_in_use{true};
77+
7678
#ifdef OSMIUM_DEBUG_QUEUE_SIZE
7779
/// The largest size the queue has been so far.
7880
std::size_t m_largest_size;
@@ -145,6 +147,9 @@ namespace osmium {
145147
* this call will block if the queue is full.
146148
*/
147149
void push(T value) {
150+
if (!m_in_use) {
151+
return;
152+
}
148153
constexpr const std::chrono::milliseconds max_wait{10};
149154
#ifdef OSMIUM_DEBUG_QUEUE_SIZE
150155
++m_push_counter;
@@ -181,7 +186,7 @@ namespace osmium {
181186
}
182187
#endif
183188
m_data_available.wait(lock, [this] {
184-
return !m_queue.empty();
189+
return !m_in_use || !m_queue.empty();
185190
});
186191
if (!m_queue.empty()) {
187192
value = std::move(m_queue.front());
@@ -224,6 +229,19 @@ namespace osmium {
224229
return m_queue.size();
225230
}
226231

232+
bool in_use() const noexcept {
233+
return m_in_use;
234+
}
235+
236+
void shutdown() {
237+
m_in_use = false;
238+
std::lock_guard<std::mutex> lock{m_mutex};
239+
while (!m_queue.empty()) {
240+
m_queue.pop();
241+
}
242+
m_data_available.notify_all();
243+
}
244+
227245
}; // class Queue
228246

229247
} // namespace thread

test/t/thread/test_queue.cpp

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,30 @@ TEST_CASE("Queue can have max elements and can be named") {
1818
osmium::thread::Queue<int> queue{100, "Queue of max size 100"};
1919
}
2020

21+
TEST_CASE("When queue is shut down, nothing goes in or out") {
22+
osmium::thread::Queue<std::string> queue;
23+
REQUIRE(queue.in_use());
24+
REQUIRE(queue.empty());
25+
queue.push("foo");
26+
queue.push("bar");
27+
queue.push("baz");
28+
REQUIRE(queue.size() == 3);
29+
30+
std::string value;
31+
32+
queue.wait_and_pop(value);
33+
REQUIRE(value == "foo");
34+
REQUIRE(queue.size() == 2);
35+
REQUIRE(queue.in_use());
36+
queue.shutdown();
37+
REQUIRE_FALSE(queue.in_use());
38+
REQUIRE(queue.empty());
39+
queue.push("lost");
40+
REQUIRE(queue.empty());
41+
42+
value.clear();
43+
queue.try_pop(value);
44+
REQUIRE(value.empty());
45+
queue.wait_and_pop(value);
46+
REQUIRE(value.empty());
47+
}

0 commit comments

Comments
 (0)