Skip to content

Commit a1a905a

Browse files
author
Duc Hieu Pham
committed
When CancelJob, wait for it to finish if it's running
Summary: Right now, when we CancelJob, we only remove it from the queue. However, in some cases, the job already started, and it might refer to objects that were already destroyed. This diff forces CancelJob to wait for the job to finish if it has started. Test Plan: Add a unit test. Reviewers: dhruba, igor, mdcallag Reviewed By: igor Subscribers: ben, ngbronson Differential Revision: https://rockset.phacility.com/D9853
1 parent 3d31986 commit a1a905a

File tree

2 files changed

+110
-29
lines changed

2 files changed

+110
-29
lines changed

cloud/cloud_scheduler.cc

Lines changed: 54 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#ifndef ROCKSDB_LITE
33
#include "cloud/cloud_scheduler.h"
44

5+
#include <chrono>
56
#include <condition_variable>
67
#include <mutex>
78
#include <thread>
@@ -23,6 +24,8 @@ struct ScheduledJob {
2324
std::chrono::steady_clock::time_point when;
2425
std::chrono::microseconds frequency;
2526
std::function<void(void*)> callback;
27+
28+
// Caller is responsible for the lifetime of arg.
2629
void* arg;
2730
};
2831

@@ -49,9 +52,13 @@ class CloudSchedulerImpl : public CloudScheduler {
4952
long next_id_;
5053

5154
std::mutex mutex_;
52-
// Notified when the earliest job to be scheduled has changed.
55+
// Notified when the earliest job to be scheduled has changed or
56+
// currently_running_ has changed.
5357
std::condition_variable jobs_changed_cv_;
5458
std::multiset<ScheduledJob, Comp> scheduled_jobs_;
59+
// Whether the job in the front of scheduled_jobs_ is running
60+
bool currently_running_;
61+
5562
bool shutting_down_{false};
5663

5764
std::unique_ptr<std::thread> thread_;
@@ -97,14 +104,19 @@ class LocalCloudScheduler : public CloudScheduler {
97104
// Cancels the job referred to by handle if it is active and associated with
98105
// this scheduler
99106
bool CancelJob(long handle) override {
100-
std::lock_guard<std::mutex> lk(job_mutex_);
101-
const auto& it = jobs_.find(handle);
102-
if (it != jobs_.end()) {
103-
jobs_.erase(it);
104-
return scheduler_->CancelJob(it->second);
105-
} else {
106-
return false;
107+
long internal_job_id = -1;
108+
{
109+
std::lock_guard<std::mutex> lk(job_mutex_);
110+
const auto& it = jobs_.find(handle);
111+
if (it != jobs_.end()) {
112+
internal_job_id = it->second;
113+
jobs_.erase(it);
114+
} else {
115+
return false;
116+
}
107117
}
118+
119+
return scheduler_->CancelJob(internal_job_id);
108120
}
109121

110122
private:
@@ -127,6 +139,7 @@ std::shared_ptr<CloudScheduler> CloudScheduler::Get() {
127139

128140
CloudSchedulerImpl::CloudSchedulerImpl() {
129141
next_id_ = 1;
142+
currently_running_ = false;
130143
auto lambda = [this]() { DoWork(); };
131144
thread_.reset(new std::thread(lambda));
132145
}
@@ -175,7 +188,28 @@ long CloudSchedulerImpl::ScheduleRecurringJob(
175188
}
176189

177190
bool CloudSchedulerImpl::CancelJob(long id) {
178-
std::lock_guard<std::mutex> lk(mutex_);
191+
if (id < 0) {
192+
return false;
193+
}
194+
195+
std::unique_lock<std::mutex> lk(mutex_);
196+
jobs_changed_cv_.wait(lk, [this, id]() {
197+
// No job is scheduled. Good to continue.
198+
if (scheduled_jobs_.empty()) {
199+
return true;
200+
}
201+
202+
// Front of the queue is not this job. That means this job is not running.
203+
// We're clear to cancel this job.
204+
if (scheduled_jobs_.begin()->id != id) {
205+
return true;
206+
}
207+
208+
// Front of the queue is the current job. That means only continue if this
209+
// job is not running. If it's still running, wait.
210+
return !currently_running_;
211+
});
212+
179213
for (auto it = scheduled_jobs_.begin(); it != scheduled_jobs_.end(); ++it) {
180214
if (it->id == id) {
181215
bool is_first = (it == scheduled_jobs_.begin());
@@ -205,18 +239,28 @@ void CloudSchedulerImpl::DoWork() {
205239
jobs_changed_cv_.wait_until(lk, earliest_job_time);
206240
continue;
207241
}
208-
// invoke the function
242+
243+
currently_running_ = true;
209244
lk.unlock();
210245

246+
// invoke the function
211247
earliest_job->callback(earliest_job->arg);
212248

213249
lk.lock();
250+
// Finished running the job. Remove it from the queue.
251+
currently_running_ = false;
252+
253+
// If this is a recurring job, add back to the queue.
214254
if (earliest_job->frequency.count() > 0) {
215255
ScheduledJob new_job = *earliest_job;
216256
new_job.when = std::chrono::steady_clock::now() + new_job.frequency;
217257
scheduled_jobs_.emplace(new_job);
218258
}
219259
scheduled_jobs_.erase(earliest_job);
260+
261+
// We might be waiting for the change in currently_running_job_id_ when
262+
// cancelling a job.
263+
jobs_changed_cv_.notify_all();
220264
}
221265
}
222266
} // namespace ROCKSDB_NAMESPACE

cloud/cloud_scheduler_test.cc

Lines changed: 56 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,14 @@
22

33
#include "cloud/cloud_scheduler.h"
44

5+
#include <gtest/gtest.h>
6+
7+
#include <atomic>
58
#include <chrono>
9+
#include <condition_variable>
10+
#include <iostream>
11+
#include <mutex>
12+
#include <thread>
613
#include <unordered_set>
714

815
#include "test_util/testharness.h"
@@ -17,29 +24,20 @@ class CloudSchedulerTest : public testing::Test {
1724
std::shared_ptr<CloudScheduler> scheduler_;
1825
};
1926

27+
// This test tests basic scheduling function. There are 2 jobs, job2 is
28+
// scheduled before job1. We check that job2 is actually run before job1.
2029
TEST_F(CloudSchedulerTest, TestSchedule) {
21-
static int job1 = 1;
22-
static int job2 = 1;
23-
auto doJob = [](void *arg) { (*(reinterpret_cast<int *>(arg)))++; };
30+
std::chrono::steady_clock::time_point p1;
31+
std::chrono::steady_clock::time_point p2;
2432

25-
scheduler_->ScheduleJob(std::chrono::microseconds(100), doJob, &job1);
26-
scheduler_->ScheduleJob(std::chrono::microseconds(300), doJob, &job2);
33+
auto job1 = [&p1](void *) { p1 = std::chrono::steady_clock::now(); };
34+
auto job2 = [&p2](void *) { p2 = std::chrono::steady_clock::now(); };
2735

28-
usleep(200);
29-
ASSERT_EQ(job2, 1);
30-
ASSERT_EQ(job1, 2);
31-
usleep(200);
32-
ASSERT_EQ(job2, 2);
33-
ASSERT_EQ(job1, 2);
36+
scheduler_->ScheduleJob(std::chrono::milliseconds(300), job1, nullptr);
37+
scheduler_->ScheduleJob(std::chrono::milliseconds(100), job2, nullptr);
3438

35-
scheduler_->ScheduleJob(std::chrono::microseconds(300), doJob, &job1);
36-
scheduler_->ScheduleJob(std::chrono::microseconds(100), doJob, &job2);
37-
usleep(200);
38-
ASSERT_EQ(job1, 2);
39-
ASSERT_EQ(job2, 3);
40-
usleep(200);
41-
ASSERT_EQ(job1, 3);
42-
ASSERT_EQ(job2, 3);
39+
std::this_thread::sleep_for(std::chrono::milliseconds(400));
40+
ASSERT_LT(p2, p1);
4341
}
4442

4543
TEST_F(CloudSchedulerTest, TestCancel) {
@@ -116,6 +114,45 @@ TEST_F(CloudSchedulerTest, TestMultipleSchedulers) {
116114
ASSERT_GT(job1, old1);
117115
}
118116

117+
// This test tests the scenario where a job is started before we attempt to
118+
// cancel it. It should wait for the job to finish.
119+
TEST_F(CloudSchedulerTest, TestLongRunningJobCancel) {
120+
enum class JobStatus {
121+
NOT_STARTED = 0,
122+
STARTED = 1,
123+
FINISHED = 2,
124+
};
125+
126+
std::mutex statusLock;
127+
std::atomic<JobStatus> status{JobStatus::NOT_STARTED};
128+
std::condition_variable statusVar;
129+
130+
auto doJob = [&](void *) {
131+
{
132+
std::unique_lock<std::mutex> lk(statusLock);
133+
status.store(JobStatus::STARTED);
134+
statusVar.notify_all();
135+
}
136+
137+
std::this_thread::sleep_for(std::chrono::seconds(1));
138+
{
139+
std::unique_lock<std::mutex> lk(statusLock);
140+
status.store(JobStatus::FINISHED);
141+
}
142+
};
143+
144+
auto handle =
145+
scheduler_->ScheduleJob(std::chrono::microseconds(0), doJob, nullptr);
146+
147+
{
148+
std::unique_lock<std::mutex> lg(statusLock);
149+
statusVar.wait(lg, [&]() { return status == JobStatus::STARTED; });
150+
}
151+
152+
scheduler_->CancelJob(handle);
153+
ASSERT_EQ(status.load(), JobStatus::FINISHED);
154+
}
155+
119156
} // namespace ROCKSDB_NAMESPACE
120157

121158
int main(int argc, char **argv) {

0 commit comments

Comments
 (0)