Skip to content

Commit 0cd6c30

Browse files
Remove signaled threads on TaskScheduler destruction on Windows with timeout (#2582)
#### Reference Issues/PRs <!--Example: Fixes #1234. See also #3456.--> #### What does this implement or fix? This reapplies the reverted PR #2544 with two changes that previously caused flaky tests: 1. Increase test timeout from 5 to 10 seconds. Previously some successful tests took up to 4+ seconds so it is reasonable to increase the limit. 2. Increase the timeout of windows api call. Previously the function immediatly returned which caused some rare cases where the function was wrongly returning that the thread is not signaled and thus causing the tests to fail. I've tested both before the change and after. Before: * Out of 100 repetitions of all param combinations of the test ~ 4 were failing After * Out of ~200~ 1000 repetitions of all parameter combinations - no failures For easier review here is the diff with the reverted PR: https://github.com/man-group/ArcticDB/compare/threads_already_dead..threads_already_dead_after_revert For safety on the CI I additionally increased the test timeout to 15 seconds. #### Any other comments? #### Checklist <details> <summary> Checklist for code changes... </summary> - [ ] Have you updated the relevant docstrings, documentation and copyright notice? - [ ] Is this contribution tested against [all ArcticDB's features](../docs/mkdocs/docs/technical/contributing.md)? - [ ] Do all exceptions introduced raise appropriate [error messages](https://docs.arcticdb.io/error_messages/)? - [ ] Are API changes highlighted in the PR description? - [ ] Is the PR labelled as enhancement or bug so it appears in autogenerated release notes? </details> <!-- Thanks for contributing a Pull Request to ArcticDB! Please ensure you have taken a look at: - ArcticDB's Code of Conduct: https://github.com/man-group/ArcticDB/blob/master/CODE_OF_CONDUCT.md - ArcticDB's Contribution Licensing: https://github.com/man-group/ArcticDB/blob/master/docs/mkdocs/docs/technical/contributing.md#contribution-licensing -->
1 parent 52859b0 commit 0cd6c30

File tree

4 files changed

+55
-14
lines changed

4 files changed

+55
-14
lines changed

cpp/arcticdb/async/task_scheduler.cpp

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,12 @@ std::once_flag TaskScheduler::init_flag_;
1919
std::once_flag TaskScheduler::shutdown_flag_;
2020
bool TaskScheduler::forked_ = false;
2121

22-
void TaskScheduler::destroy_instance() {
23-
std::call_once(TaskScheduler::shutdown_flag_, &TaskScheduler::stop_and_destroy);
24-
}
25-
26-
void TaskScheduler::stop_and_destroy() {
27-
if(TaskScheduler::instance_) {
28-
TaskScheduler::instance()->stop();
29-
30-
TaskScheduler::instance_.reset();
31-
}
22+
void TaskScheduler::stop_active_threads() {
23+
std::call_once(shutdown_flag_, [] {
24+
if (instance_) {
25+
instance()->stop();
26+
}
27+
});
3228
}
3329

3430
void TaskScheduler::reattach_instance() {
@@ -54,6 +50,7 @@ void TaskScheduler::init(){
5450
}
5551

5652
TaskSchedulerPtrWrapper::~TaskSchedulerPtrWrapper() {
53+
ptr_->stop_orphaned_threads();
5754
delete ptr_;
5855
}
5956

cpp/arcticdb/async/task_scheduler.hpp

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,18 @@ struct SchedulerWrapper : public SchedulerType {
101101
void ensure_active_threads() {
102102
SchedulerType::ensureActiveThreads();
103103
}
104+
105+
void stop_orphaned_threads() {
106+
#ifdef _WIN32
107+
const auto to_remove_copy = SchedulerType::threadList_.get();
108+
for (const auto& thread : to_remove_copy) {
109+
const bool is_signaled = WaitForSingleObject(thread->handle.native_handle(), 5000) == WAIT_OBJECT_0;
110+
if (is_signaled) {
111+
SchedulerType::threadList_.remove(thread);
112+
}
113+
}
114+
#endif
115+
}
104116
};
105117

106118
struct CGroupValues {
@@ -215,8 +227,7 @@ class TaskScheduler {
215227

216228
static TaskScheduler* instance();
217229
static void reattach_instance();
218-
static void destroy_instance();
219-
static void stop_and_destroy();
230+
static void stop_active_threads();
220231
static bool forked_;
221232
static bool is_forked();
222233
static void set_forked(bool);
@@ -229,8 +240,8 @@ class TaskScheduler {
229240

230241
void stop() {
231242
ARCTICDB_DEBUG(log::schedule(), "Stopping task scheduler");
232-
cpu_exec_.stop();
233243
io_exec_.stop();
244+
cpu_exec_.stop();
234245
}
235246

236247
void set_active_threads(size_t n) {
@@ -275,6 +286,11 @@ class TaskScheduler {
275286
return io_thread_count_;
276287
}
277288

289+
void stop_orphaned_threads() {
290+
io_exec_.stop_orphaned_threads();
291+
cpu_exec_.stop_orphaned_threads();
292+
}
293+
278294
private:
279295
std::string cgroup_folder_;
280296
size_t cpu_thread_count_;

cpp/arcticdb/util/global_lifetimes.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ std::shared_ptr<ModuleData> ModuleData::instance_;
6565
std::once_flag ModuleData::init_flag_;
6666

6767
void shutdown_globals() {
68-
async::TaskScheduler::destroy_instance();
68+
async::TaskScheduler::stop_active_threads();
6969
storage::mongo::MongoInstance::destroy_instance();
7070
ModuleData::destroy_instance();
7171
}

python/tests/stress/arcticdb/version_store/test_deallocation.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55
66
As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
77
"""
8+
import os
9+
from multiprocessing import Process
10+
811
import pandas as pd
912
import numpy as np
1013
import pytest
@@ -25,3 +28,28 @@ def test_many_version_store(basic_store_factory):
2528
version_store.write(symbol, df2)
2629
vit = version_store.read(symbol)
2730
assert_frame_equal(vit.data, df2)
31+
32+
33+
def killed_worker(lib, io_threads, cpu_threads):
34+
if io_threads:
35+
lib.delete_snapshot("snap")
36+
if cpu_threads:
37+
lib.read("sym")
38+
os._exit(0)
39+
40+
@pytest.mark.parametrize("io_threads_spawned_in_child", [True, False])
41+
@pytest.mark.parametrize("cpu_threads_spawned_in_child", [True, False])
42+
def test_os_exit_exits_within_timeout(lmdb_storage, lib_name, io_threads_spawned_in_child, cpu_threads_spawned_in_child):
43+
lib = lmdb_storage.create_arctic().create_library(lib_name)
44+
df = pd.DataFrame()
45+
lib.write("sym", df)
46+
lib.snapshot("snap")
47+
proc = Process(target=killed_worker, args=(lib, io_threads_spawned_in_child, cpu_threads_spawned_in_child))
48+
proc.start()
49+
proc.join(timeout=15)
50+
51+
if proc.is_alive():
52+
proc.terminate()
53+
pytest.fail("os._exit did not exit within 15 seconds")
54+
55+
assert proc.exitcode == 0

0 commit comments

Comments
 (0)