Skip to content
This repository was archived by the owner on Dec 8, 2021. It is now read-only.

Commit bfe16d3

Browse files
author
Takashi Matsuo
authored
feat: support cancellation for long running operations (#160)
* feat: support cancellation for long running operations * address review comments * address code review comments
1 parent 79ab7b2 commit bfe16d3

File tree

6 files changed

+108
-5
lines changed

6 files changed

+108
-5
lines changed

google/cloud/future_generic.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ class future final : private internal::future_base<T> {
7676
return tmp->get();
7777
}
7878

79+
using internal::future_base<T>::cancel;
7980
using internal::future_base<T>::is_ready;
8081
using internal::future_base<T>::valid;
8182
using internal::future_base<T>::wait;
@@ -132,7 +133,8 @@ template <typename T>
132133
class promise final : private internal::promise_base<T> {
133134
public:
134135
/// Creates a promise with an unsatisfied shared state.
135-
promise() = default;
136+
promise(std::function<void()> cancellation_callback = [] {})
137+
: internal::promise_base<T>(cancellation_callback) {}
136138

137139
/// Constructs a new promise and transfer any shared state from @p rhs.
138140
promise(promise&&) noexcept = default;

google/cloud/future_generic_test.cc

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -635,6 +635,38 @@ TEST(FutureTestInt, conform_30_6_6_25_3) {
635635
// raise too. We do not need to test for that, exceptions are always propagated,
636636
// this is just giving implementors freedom.
637637

638+
/// @test Verify the behavior around cancellation.
639+
TEST(FutureTestInt, cancellation_without_satisfaction) {
640+
bool cancelled = false;
641+
promise<int> p0([&cancelled] { cancelled = true; });
642+
auto f0 = p0.get_future();
643+
ASSERT_TRUE(f0.cancel());
644+
ASSERT_TRUE(cancelled);
645+
}
646+
647+
/// @test Verify the case for cancel then satisfy.
648+
TEST(FutureTestInt, cancellation_and_satisfaction) {
649+
bool cancelled = false;
650+
promise<int> p0([&cancelled] { cancelled = true; });
651+
auto f0 = p0.get_future();
652+
ASSERT_TRUE(f0.cancel());
653+
p0.set_value(1);
654+
ASSERT_EQ(std::future_status::ready, f0.wait_for(0_ms));
655+
ASSERT_EQ(1, f0.get());
656+
ASSERT_TRUE(cancelled);
657+
}
658+
659+
/// @test Verify the cancellation fails on satisfied promise.
660+
TEST(FutureTestInt, cancellation_after_satisfaction) {
661+
bool cancelled = false;
662+
promise<int> p0([&cancelled] { cancelled = true; });
663+
auto f0 = p0.get_future();
664+
p0.set_value(1);
665+
ASSERT_FALSE(f0.cancel());
666+
ASSERT_FALSE(cancelled);
667+
ASSERT_EQ(1, f0.get());
668+
}
669+
638670
} // namespace
639671
} // namespace GOOGLE_CLOUD_CPP_NS
640672
} // namespace cloud

google/cloud/future_void.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ class future<void> final : private internal::future_base<void> {
7373
return tmp->get();
7474
}
7575

76+
using future_base::cancel;
7677
using future_base::is_ready;
7778
using future_base::valid;
7879
using future_base::wait;
@@ -128,7 +129,8 @@ template <>
128129
class promise<void> final : private internal::promise_base<void> {
129130
public:
130131
/// Creates a promise with an unsatisfied shared state.
131-
promise() = default;
132+
promise(std::function<void()> cancellation_callback = [] {})
133+
: promise_base(cancellation_callback) {}
132134

133135
/// Constructs a new promise and transfer any shared state from @p rhs.
134136
promise(promise&&) noexcept = default;

google/cloud/future_void_test.cc

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -644,6 +644,36 @@ TEST(FutureTestVoid, conform_30_6_6_25_3) {
644644
// raise too. We do not need to test for that, exceptions are always propagated,
645645
// this is just giving implementors freedom.
646646

647+
/// @test Verify the behavior around cancellation.
648+
TEST(FutureTestVoid, cancellation_without_satisfaction) {
649+
bool cancelled = false;
650+
promise<void> p0([&cancelled] { cancelled = true; });
651+
auto f0 = p0.get_future();
652+
ASSERT_TRUE(f0.cancel());
653+
ASSERT_TRUE(cancelled);
654+
}
655+
656+
/// @test Verify the case for cancel then satisfy.
657+
TEST(FutureTestVoid, cancellation_and_satisfaction) {
658+
bool cancelled = false;
659+
promise<void> p0([&cancelled] { cancelled = true; });
660+
auto f0 = p0.get_future();
661+
ASSERT_TRUE(f0.cancel());
662+
p0.set_value();
663+
ASSERT_EQ(std::future_status::ready, f0.wait_for(0_ms));
664+
ASSERT_TRUE(cancelled);
665+
}
666+
667+
/// @test Verify the cancellation fails on satisfied promise.
668+
TEST(FutureTestVoid, cancellation_after_satisfaction) {
669+
bool cancelled = false;
670+
promise<void> p0([&cancelled] { cancelled = true; });
671+
auto f0 = p0.get_future();
672+
p0.set_value();
673+
ASSERT_FALSE(f0.cancel());
674+
ASSERT_FALSE(cancelled);
675+
}
676+
647677
} // namespace
648678
} // namespace GOOGLE_CLOUD_CPP_NS
649679
} // namespace cloud

google/cloud/internal/future_base.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,11 @@ class future_base {
123123
return shared_state_->is_ready();
124124
}
125125

126+
/**
127+
* Cancel the future by invoking cancel() on the shared state.
128+
*/
129+
bool cancel() { return shared_state_->cancel(); }
130+
126131
protected:
127132
/// Shorthand to refer to the shared state type.
128133
using shared_state_type = internal::future_shared_state<T>;
@@ -144,7 +149,9 @@ class future_base {
144149
template <typename T>
145150
class promise_base {
146151
public:
147-
promise_base() : shared_state_(std::make_shared<shared_state_type>()) {}
152+
explicit promise_base(std::function<void()> cancellation_callback)
153+
: shared_state_(
154+
std::make_shared<shared_state_type>(cancellation_callback)) {}
148155
promise_base(promise_base&&) noexcept = default;
149156

150157
~promise_base() {

google/cloud/internal/future_impl.h

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,14 +68,21 @@ class continuation_base {
6868
*/
6969
class future_shared_state_base {
7070
public:
71-
future_shared_state_base() : mu_(), cv_(), current_state_(state::not_ready) {}
72-
71+
future_shared_state_base() : future_shared_state_base([] {}) {}
72+
future_shared_state_base(std::function<void()> cancellation_callback)
73+
: mu_(),
74+
cv_(),
75+
current_state_(state::not_ready),
76+
cancellation_callback_(cancellation_callback) {}
7377
/// Return true if the shared state has a value or an exception.
7478
bool is_ready() const {
7579
std::unique_lock<std::mutex> lk(mu_);
7680
return is_ready_unlocked();
7781
}
7882

83+
/// Return true if the shared state can be cancelled.
84+
bool cancellable() const { return !is_ready() && !cancelled_; }
85+
7986
/// Block until is_ready() returns true ...
8087
void wait() {
8188
std::unique_lock<std::mutex> lk(mu_);
@@ -197,6 +204,19 @@ class future_shared_state_base {
197204
continuation_ = std::move(c);
198205
}
199206

207+
// Try to cancel the task by invoking the cancellation_callback.
208+
bool cancel() {
209+
if (!cancellable()) {
210+
return false;
211+
}
212+
cancellation_callback_();
213+
// If the callback fails with an exception we assume it had no effect.
214+
// Incidentally this means we provide the strong exception guarantee for
215+
// this function.
216+
cancelled_ = true;
217+
return true;
218+
}
219+
200220
protected:
201221
bool is_ready_unlocked() const { return current_state_ != state::not_ready; }
202222

@@ -279,6 +299,10 @@ class future_shared_state_base {
279299
* member variable and does not satisfy the shared state.
280300
*/
281301
std::unique_ptr<continuation_base> continuation_;
302+
303+
// Allow users "cancel" the future with the given callback.
304+
std::atomic<bool> cancelled_ = ATOMIC_VAR_INIT(false);
305+
std::function<void()> cancellation_callback_;
282306
};
283307

284308
/**
@@ -306,6 +330,8 @@ template <typename T>
306330
class future_shared_state final : private future_shared_state_base {
307331
public:
308332
future_shared_state() : future_shared_state_base(), buffer_() {}
333+
future_shared_state(std::function<void()> cancellation_callback)
334+
: future_shared_state_base(cancellation_callback), buffer_() {}
309335
~future_shared_state() {
310336
if (current_state_ == state::has_value) {
311337
// Recall that state::has_value is a terminal state, once a value is
@@ -318,6 +344,7 @@ class future_shared_state final : private future_shared_state_base {
318344
}
319345

320346
using future_shared_state_base::abandon;
347+
using future_shared_state_base::cancel;
321348
using future_shared_state_base::is_ready;
322349
using future_shared_state_base::set_continuation;
323350
using future_shared_state_base::set_exception;
@@ -441,8 +468,11 @@ template <>
441468
class future_shared_state<void> final : private future_shared_state_base {
442469
public:
443470
future_shared_state() : future_shared_state_base() {}
471+
future_shared_state(std::function<void()> cancellation_callback)
472+
: future_shared_state_base(cancellation_callback) {}
444473

445474
using future_shared_state_base::abandon;
475+
using future_shared_state_base::cancel;
446476
using future_shared_state_base::is_ready;
447477
using future_shared_state_base::set_continuation;
448478
using future_shared_state_base::set_exception;

0 commit comments

Comments
 (0)