Skip to content

Commit f558210

Browse files
Revert "Remove signaled threads on TaskScheduler destruction on Windo… (#2568)
…ws (#2544)" This reverts commit 7534e22. Tests are intermittently failing on the CI. Will investigate further #### Reference Issues/PRs <!--Example: Fixes #1234. See also #3456.--> #### What does this implement or fix? #### Any other comments? #### Checklist <details> <summary> Checklist for code changes... </summary> - [ ] Have you updated the relevant docstrings, documentation and copyright notice? - [ ] Is this contribution tested against [all ArcticDB's features](../docs/mkdocs/docs/technical/contributing.md)? - [ ] Do all exceptions introduced raise appropriate [error messages](https://docs.arcticdb.io/error_messages/)? - [ ] Are API changes highlighted in the PR description? - [ ] Is the PR labelled as enhancement or bug so it appears in autogenerated release notes? </details> <!-- Thanks for contributing a Pull Request to ArcticDB! Please ensure you have taken a look at: - ArcticDB's Code of Conduct: https://github.com/man-group/ArcticDB/blob/master/CODE_OF_CONDUCT.md - ArcticDB's Contribution Licensing: https://github.com/man-group/ArcticDB/blob/master/docs/mkdocs/docs/technical/contributing.md#contribution-licensing -->
1 parent a53dc85 commit f558210

File tree

4 files changed

+14
-55
lines changed

4 files changed

+14
-55
lines changed

cpp/arcticdb/async/task_scheduler.cpp

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,16 @@ std::once_flag TaskScheduler::init_flag_;
1919
std::once_flag TaskScheduler::shutdown_flag_;
2020
bool TaskScheduler::forked_ = false;
2121

22-
void TaskScheduler::stop_active_threads() {
23-
std::call_once(shutdown_flag_, [] {
24-
if (instance_) {
25-
instance()->stop();
26-
}
27-
});
22+
void TaskScheduler::destroy_instance() {
23+
std::call_once(TaskScheduler::shutdown_flag_, &TaskScheduler::stop_and_destroy);
24+
}
25+
26+
void TaskScheduler::stop_and_destroy() {
27+
if(TaskScheduler::instance_) {
28+
TaskScheduler::instance()->stop();
29+
30+
TaskScheduler::instance_.reset();
31+
}
2832
}
2933

3034
void TaskScheduler::reattach_instance() {
@@ -50,7 +54,6 @@ void TaskScheduler::init(){
5054
}
5155

5256
TaskSchedulerPtrWrapper::~TaskSchedulerPtrWrapper() {
53-
ptr_->stop_orphaned_threads();
5457
delete ptr_;
5558
}
5659

cpp/arcticdb/async/task_scheduler.hpp

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -101,18 +101,6 @@ struct SchedulerWrapper : public SchedulerType {
101101
void ensure_active_threads() {
102102
SchedulerType::ensureActiveThreads();
103103
}
104-
105-
void stop_orphaned_threads() {
106-
#ifdef _WIN32
107-
const auto to_remove_copy = SchedulerType::threadList_.get();
108-
for (const auto& thread : to_remove_copy) {
109-
const bool is_signaled = WaitForSingleObject(thread->handle.native_handle(), 0) == WAIT_OBJECT_0;
110-
if (is_signaled) {
111-
SchedulerType::threadList_.remove(thread);
112-
}
113-
}
114-
#endif
115-
}
116104
};
117105

118106
struct CGroupValues {
@@ -227,7 +215,8 @@ class TaskScheduler {
227215

228216
static TaskScheduler* instance();
229217
static void reattach_instance();
230-
static void stop_active_threads();
218+
static void destroy_instance();
219+
static void stop_and_destroy();
231220
static bool forked_;
232221
static bool is_forked();
233222
static void set_forked(bool);
@@ -240,8 +229,8 @@ class TaskScheduler {
240229

241230
void stop() {
242231
ARCTICDB_DEBUG(log::schedule(), "Stopping task scheduler");
243-
io_exec_.stop();
244232
cpu_exec_.stop();
233+
io_exec_.stop();
245234
}
246235

247236
void set_active_threads(size_t n) {
@@ -286,11 +275,6 @@ class TaskScheduler {
286275
return io_thread_count_;
287276
}
288277

289-
void stop_orphaned_threads() {
290-
io_exec_.stop_orphaned_threads();
291-
cpu_exec_.stop_orphaned_threads();
292-
}
293-
294278
private:
295279
std::string cgroup_folder_;
296280
size_t cpu_thread_count_;

cpp/arcticdb/util/global_lifetimes.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ std::shared_ptr<ModuleData> ModuleData::instance_;
6565
std::once_flag ModuleData::init_flag_;
6666

6767
void shutdown_globals() {
68-
async::TaskScheduler::stop_active_threads();
68+
async::TaskScheduler::destroy_instance();
6969
storage::mongo::MongoInstance::destroy_instance();
7070
ModuleData::destroy_instance();
7171
}

python/tests/stress/arcticdb/version_store/test_deallocation.py

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,6 @@
55
66
As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
77
"""
8-
import os
9-
from multiprocessing import Process
10-
118
import pandas as pd
129
import numpy as np
1310
import pytest
@@ -28,28 +25,3 @@ def test_many_version_store(basic_store_factory):
2825
version_store.write(symbol, df2)
2926
vit = version_store.read(symbol)
3027
assert_frame_equal(vit.data, df2)
31-
32-
33-
def killed_worker(lib, io_threads, cpu_threads):
34-
if io_threads:
35-
lib.delete_snapshot("snap")
36-
if cpu_threads:
37-
lib.read("sym")
38-
os._exit(0)
39-
40-
@pytest.mark.parametrize("io_threads_spawned_in_child", [True, False])
41-
@pytest.mark.parametrize("cpu_threads_spawned_in_child", [True, False])
42-
def test_os_exit_exits_within_timeout(lmdb_storage, lib_name, io_threads_spawned_in_child, cpu_threads_spawned_in_child):
43-
lib = lmdb_storage.create_arctic().create_library(lib_name)
44-
df = pd.DataFrame()
45-
lib.write("sym", df)
46-
lib.snapshot("snap")
47-
proc = Process(target=killed_worker, args=(lib, io_threads_spawned_in_child, cpu_threads_spawned_in_child))
48-
proc.start()
49-
proc.join(timeout=10)
50-
51-
if proc.is_alive():
52-
proc.terminate()
53-
pytest.fail("os._exit did not exit within 5 seconds")
54-
55-
assert proc.exitcode == 0

0 commit comments

Comments
 (0)