Skip to content

Commit b32e9cb

Browse files
committed
Make more tests pass TSAN checks
The db_cloud_tests currently do not pass TSAN but the other cloud-specific ones should now pass.
1 parent 9d3e9a1 commit b32e9cb

File tree

4 files changed

+97
-21
lines changed

4 files changed

+97
-21
lines changed

cloud/cloud_scheduler.cc

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ class CloudSchedulerImpl : public CloudScheduler {
4646
std::function<void(void*)> callback,
4747
void* arg) override;
4848
bool CancelJob(long handle) override;
49+
bool IsScheduled(long handle) override;
4950

5051
private:
5152
void DoWork();
@@ -70,8 +71,14 @@ class LocalCloudScheduler : public CloudScheduler {
7071
public:
7172
LocalCloudScheduler(const std::shared_ptr<CloudScheduler>& scheduler,
7273
long local_id)
73-
: scheduler_(scheduler), next_local_id_(local_id) {}
74+
: scheduler_(scheduler),
75+
next_local_id_(local_id),
76+
shutting_down_(false) {}
7477
~LocalCloudScheduler() override {
78+
{
79+
std::lock_guard<std::mutex> lk(job_mutex_);
80+
shutting_down_ = true;
81+
}
7582
for (const auto& job : jobs_) {
7683
scheduler_->CancelJob(job.second);
7784
}
@@ -80,6 +87,9 @@ class LocalCloudScheduler : public CloudScheduler {
8087

8188
long ScheduleJob(std::chrono::microseconds when,
8289
std::function<void(void*)> callback, void* arg) override {
90+
if (shutting_down_) {
91+
return -1;
92+
}
8393
std::lock_guard<std::mutex> lk(job_mutex_);
8494
long local_id = next_local_id_++;
8595
auto job = [this, local_id, callback](void* a) {
@@ -95,12 +105,37 @@ class LocalCloudScheduler : public CloudScheduler {
95105
std::chrono::microseconds frequency,
96106
std::function<void(void*)> callback,
97107
void* arg) override {
108+
if (shutting_down_) {
109+
return -1;
110+
}
98111
auto job = scheduler_->ScheduleRecurringJob(when, frequency, callback, arg);
99112
std::lock_guard<std::mutex> lk(job_mutex_);
100113
long local_id = next_local_id_++;
101114
jobs_[local_id] = job;
102115
return local_id;
103116
}
117+
118+
bool IsScheduled(long handle) override {
119+
if (shutting_down_) {
120+
return false;
121+
} else {
122+
std::lock_guard<std::mutex> lk(job_mutex_);
123+
const auto& it = jobs_.find(handle);
124+
if (it == jobs_.end()) {
125+
// We do not have the job in our queue. Return false
126+
return false;
127+
} else if (scheduler_->IsScheduled(it->second)) {
128+
// The job is still scheduled. Return false
129+
return true;
130+
} else {
131+
// We have the job in our queue but it has already
132+
// completed. Erase from our queue and return false
133+
jobs_.erase(it);
134+
return false;
135+
}
136+
}
137+
}
138+
104139
// Cancels the job referred to by handle if it is active and associated with
105140
// this scheduler
106141
bool CancelJob(long handle) override {
@@ -123,6 +158,7 @@ class LocalCloudScheduler : public CloudScheduler {
123158
std::mutex job_mutex_;
124159
std::shared_ptr<CloudScheduler> scheduler_;
125160
long next_local_id_;
161+
bool shutting_down_;
126162
std::unordered_map<long, long> jobs_;
127163
};
128164

@@ -187,6 +223,22 @@ long CloudSchedulerImpl::ScheduleRecurringJob(
187223
return id;
188224
}
189225

226+
bool CloudSchedulerImpl::IsScheduled(long id) {
227+
if (id < 0) {
228+
return false;
229+
} else {
230+
std::unique_lock<std::mutex> lk(mutex_);
231+
if (!scheduled_jobs_.empty()) {
232+
for (const auto& job : scheduled_jobs_) {
233+
if (job.id == id) {
234+
return true;
235+
}
236+
}
237+
}
238+
return false;
239+
}
240+
}
241+
190242
bool CloudSchedulerImpl::CancelJob(long id) {
191243
if (id < 0) {
192244
return false;

cloud/cloud_scheduler.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ class CloudScheduler {
3737
// was canceled, false otherwise.
3838
virtual bool CancelJob(long handle) = 0;
3939

40+
// Returns true if the job represented by handle has not completed,
41+
// meaning that it is either currently schedule or actively running
42+
virtual bool IsScheduled(long handle) = 0;
4043
// Returns a new instance of a cloud scheduler. The caller is responsible
4144
// for freeing the scheduler when it is no longer required.
4245
static std::shared_ptr<CloudScheduler> Get();

cloud/cloud_scheduler_test.cc

Lines changed: 38 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,21 @@ class CloudSchedulerTest : public testing::Test {
2323
~CloudSchedulerTest() {}
2424

2525
std::shared_ptr<CloudScheduler> scheduler_;
26+
void WaitForJobs(const std::vector<long> &jobs, uint32_t delay) {
27+
bool running = true;
28+
while (running) {
29+
running = false;
30+
for (const auto &job : jobs) {
31+
if (scheduler_->IsScheduled(job)) {
32+
running = true;
33+
break;
34+
}
35+
}
36+
if (running) {
37+
usleep(delay);
38+
}
39+
}
40+
}
2641
};
2742

2843
// This test tests basic scheduling function. There are 2 jobs, job2 is
@@ -34,10 +49,13 @@ TEST_F(CloudSchedulerTest, TestSchedule) {
3449
auto job1 = [&p1](void *) { p1 = std::chrono::steady_clock::now(); };
3550
auto job2 = [&p2](void *) { p2 = std::chrono::steady_clock::now(); };
3651

37-
scheduler_->ScheduleJob(std::chrono::milliseconds(300), job1, nullptr);
38-
scheduler_->ScheduleJob(std::chrono::milliseconds(100), job2, nullptr);
39-
40-
std::this_thread::sleep_for(std::chrono::milliseconds(400));
52+
auto h1 =
53+
scheduler_->ScheduleJob(std::chrono::milliseconds(300), job1, nullptr);
54+
auto h2 =
55+
scheduler_->ScheduleJob(std::chrono::milliseconds(100), job2, nullptr);
56+
while (scheduler_->IsScheduled(h1) && scheduler_->IsScheduled(h2)) {
57+
std::this_thread::sleep_for(std::chrono::milliseconds(400));
58+
}
4159
ASSERT_LT(p2, p1);
4260
}
4361

@@ -51,16 +69,16 @@ TEST_F(CloudSchedulerTest, TestCancel) {
5169
auto handle2 =
5270
scheduler_->ScheduleJob(std::chrono::microseconds(200), doJob, &job2);
5371
ASSERT_TRUE(scheduler_->CancelJob(handle2));
54-
usleep(300);
72+
WaitForJobs({handle1, handle2}, 300);
5573
ASSERT_EQ(job1, 2);
5674
ASSERT_EQ(job2, 0);
5775
ASSERT_FALSE(scheduler_->CancelJob(handle1));
5876
ASSERT_FALSE(scheduler_->CancelJob(handle2));
5977
}
6078

6179
TEST_F(CloudSchedulerTest, TestRecurring) {
62-
int job1 = 1;
63-
int job2 = 1;
80+
std::atomic<int> job1(1);
81+
std::atomic<int> job2(1);
6482
auto doJob1 = [&job1](void *) { job1++; };
6583
auto doJob2 = [&job2](void *) { job2++; };
6684

@@ -70,22 +88,24 @@ TEST_F(CloudSchedulerTest, TestRecurring) {
7088
scheduler_->ScheduleRecurringJob(std::chrono::microseconds(120),
7189
std::chrono::microseconds(100), doJob2,
7290
nullptr);
73-
usleep(700);
74-
ASSERT_GE(job2, 4);
75-
ASSERT_GT(job1, job2);
91+
while (job2 <= 4) {
92+
usleep(100);
93+
}
94+
ASSERT_GE(job2.load(), 4);
95+
ASSERT_GT(job1.load(), job2);
7696
ASSERT_TRUE(scheduler_->CancelJob(handle1));
77-
auto old1 = job1;
78-
auto old2 = job2;
97+
auto old1 = job1.load();
98+
auto old2 = job2.load();
7999
usleep(200);
80-
ASSERT_EQ(job1, old1);
81-
ASSERT_GT(job2, old2);
100+
ASSERT_EQ(job1.load(), old1);
101+
ASSERT_GT(job2.load(), old2);
82102
}
83103

84104
TEST_F(CloudSchedulerTest, TestMultipleSchedulers) {
85105
auto scheduler2 = CloudScheduler::Get();
86106

87-
int job1 = 1;
88-
int job2 = 1;
107+
std::atomic<int> job1(1);
108+
std::atomic<int> job2(1);
89109
auto doJob1 = [&job1](void *) { job1++; };
90110
auto doJob2 = [&job2](void *) { job2++; };
91111

@@ -108,8 +128,8 @@ TEST_F(CloudSchedulerTest, TestMultipleSchedulers) {
108128
std::chrono::microseconds(20), doJob2,
109129
nullptr);
110130
scheduler2.reset();
111-
auto old1 = job1;
112-
auto old2 = job2;
131+
auto old1 = job1.load();
132+
auto old2 = job2.load();
113133
usleep(200);
114134
ASSERT_EQ(job2, old2);
115135
ASSERT_GT(job1, old1);

cloud/db_cloud_test.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,9 @@ class CloudTest : public testing::Test {
6464

6565
CloudEnv* aenv;
6666
// create a dummy aws env
67-
CloudEnv::NewAwsEnv(base_env_, cloud_env_options_, options_.info_log,
68-
&aenv);
67+
ASSERT_OK(CloudEnv::NewAwsEnv(base_env_, cloud_env_options_,
68+
options_.info_log, &aenv));
69+
ASSERT_NE(aenv, nullptr);
6970
aenv_.reset(aenv);
7071
// delete all pre-existing contents from the bucket
7172
Status st = aenv_->GetCloudEnvOptions().storage_provider->EmptyBucket(

0 commit comments

Comments
 (0)