Skip to content
This repository was archived by the owner on Dec 8, 2021. It is now read-only.

Commit 946b6d3

Browse files
authored
feat: cancel futures returned by .then() (#166)
1 parent c84fa28 commit 946b6d3

File tree

6 files changed

+74
-12
lines changed

6 files changed

+74
-12
lines changed

google/cloud/completion_queue.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ namespace {
3838
class AsyncTimerFuture : public internal::AsyncGrpcOperation {
3939
public:
4040
explicit AsyncTimerFuture(std::unique_ptr<grpc::Alarm> alarm)
41-
: alarm_(std::move(alarm)) {}
41+
: promise_(/*cancellation_callback=*/[this] { Cancel(); }),
42+
alarm_(std::move(alarm)) {}
4243

4344
future<StatusOr<std::chrono::system_clock::time_point>> GetFuture() {
4445
return promise_.get_future();

google/cloud/completion_queue_test.cc

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,7 @@ TEST(CompletionQueueTest, RunAsync) {
330330
runner.join();
331331
}
332332

333-
// Sets up a timer that reschedules itself and verfies we can shut down
333+
// Sets up a timer that reschedules itself and verifies we can shut down
334334
// cleanly whether we call `CancelAll()` on the queue first or not.
335335
namespace {
336336
void RunAndReschedule(CompletionQueue& cq, bool ok) {
@@ -363,6 +363,35 @@ TEST(CompletionQueueTest, CancelAndShutdownWithReschedulingTimer) {
363363
t.join();
364364
}
365365

366+
TEST(CompletionQueueTest, CancelTimerSimple) {
367+
CompletionQueue cq;
368+
std::thread t([&cq] { cq.Run(); });
369+
370+
using ms = std::chrono::milliseconds;
371+
auto fut = cq.MakeRelativeTimer(ms(20000));
372+
fut.cancel();
373+
auto tp = fut.get();
374+
EXPECT_FALSE(tp.ok()) << ", status=" << tp.status();
375+
cq.Shutdown();
376+
t.join();
377+
}
378+
379+
TEST(CompletionQueueTest, CancelTimerContinuation) {
380+
CompletionQueue cq;
381+
std::thread t([&cq] { cq.Run(); });
382+
383+
using ms = std::chrono::milliseconds;
384+
auto fut = cq.MakeRelativeTimer(ms(20000)).then(
385+
[](future<StatusOr<std::chrono::system_clock::time_point>> f) {
386+
return f.get().status();
387+
});
388+
fut.cancel();
389+
auto status = fut.get();
390+
EXPECT_FALSE(status.ok()) << ", status=" << status;
391+
cq.Shutdown();
392+
t.join();
393+
}
394+
366395
} // namespace
367396
} // namespace GOOGLE_CLOUD_CPP_NS
368397
} // namespace cloud

google/cloud/future_generic_then_test.cc

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,18 @@ TEST(FutureTestInt, ThenByCopy) {
175175
EXPECT_FALSE(next.valid());
176176
}
177177

178+
/// @test Verify the behavior around cancellation.
179+
TEST(FutureTestInt, CancelThroughContinuation) {
180+
bool cancelled = false;
181+
promise<int> p0([&cancelled] { cancelled = true; });
182+
auto f0 = p0.get_future();
183+
auto f1 = f0.then([](future<int> f) { return f.get() * 2; });
184+
EXPECT_TRUE(f1.cancel());
185+
EXPECT_TRUE(cancelled);
186+
p0.set_value(42);
187+
EXPECT_EQ(84, f1.get());
188+
}
189+
178190
// The following tests reference the technical specification:
179191
// http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2015/p0159r0.html
180192
// The test names match the section and paragraph from the TS.

google/cloud/future_void_then_test.cc

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,18 @@ TEST(FutureTestVoid, ThenByCopy) {
166166
// http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2015/p0159r0.html
167167
// The test names match the section and paragraph from the TS.
168168

169+
/// @test Verify the behavior around cancellation.
170+
TEST(FutureTestVoid, CancelThroughContinuation) {
171+
bool cancelled = false;
172+
promise<void> p0([&cancelled] { cancelled = true; });
173+
auto f0 = p0.get_future();
174+
auto f1 = f0.then([](future<void>) { return 7; });
175+
EXPECT_TRUE(f1.cancel());
176+
EXPECT_TRUE(cancelled);
177+
p0.set_value();
178+
EXPECT_EQ(7, f1.get());
179+
}
180+
169181
/// @test Verify conformance with section 2.3 of the Concurrency TS.
170182
TEST(FutureTestVoid, conform_2_3_2_a) {
171183
// future<void> should have an unwrapping constructor.

google/cloud/internal/future_impl.h

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,11 @@ class continuation_base {
6969
class future_shared_state_base {
7070
public:
7171
future_shared_state_base() : future_shared_state_base([] {}) {}
72-
future_shared_state_base(std::function<void()> cancellation_callback)
72+
explicit future_shared_state_base(std::function<void()> cancellation_callback)
7373
: mu_(),
7474
cv_(),
7575
current_state_(state::not_ready),
76-
cancellation_callback_(cancellation_callback) {}
76+
cancellation_callback_(std::move(cancellation_callback)) {}
7777
/// Return true if the shared state has a value or an exception.
7878
bool is_ready() const {
7979
std::unique_lock<std::mutex> lk(mu_);
@@ -204,6 +204,10 @@ class future_shared_state_base {
204204
continuation_ = std::move(c);
205205
}
206206

207+
std::function<void()> extract_cancellation_callback() {
208+
return std::move(cancellation_callback_);
209+
}
210+
207211
// Try to cancel the task by invoking the cancellation_callback.
208212
bool cancel() {
209213
if (!cancellable()) {
@@ -330,8 +334,8 @@ template <typename T>
330334
class future_shared_state final : private future_shared_state_base {
331335
public:
332336
future_shared_state() : future_shared_state_base(), buffer_() {}
333-
future_shared_state(std::function<void()> cancellation_callback)
334-
: future_shared_state_base(cancellation_callback), buffer_() {}
337+
explicit future_shared_state(std::function<void()> cancellation_callback)
338+
: future_shared_state_base(std::move(cancellation_callback)), buffer_() {}
335339
~future_shared_state() {
336340
if (current_state_ == state::has_value) {
337341
// Recall that state::has_value is a terminal state, once a value is
@@ -345,6 +349,7 @@ class future_shared_state final : private future_shared_state_base {
345349

346350
using future_shared_state_base::abandon;
347351
using future_shared_state_base::cancel;
352+
using future_shared_state_base::extract_cancellation_callback;
348353
using future_shared_state_base::is_ready;
349354
using future_shared_state_base::set_continuation;
350355
using future_shared_state_base::set_exception;
@@ -468,11 +473,12 @@ template <>
468473
class future_shared_state<void> final : private future_shared_state_base {
469474
public:
470475
future_shared_state() : future_shared_state_base() {}
471-
future_shared_state(std::function<void()> cancellation_callback)
472-
: future_shared_state_base(cancellation_callback) {}
476+
explicit future_shared_state(std::function<void()> cancellation_callback)
477+
: future_shared_state_base(std::move(cancellation_callback)) {}
473478

474479
using future_shared_state_base::abandon;
475480
using future_shared_state_base::cancel;
481+
using future_shared_state_base::extract_cancellation_callback;
476482
using future_shared_state_base::is_ready;
477483
using future_shared_state_base::set_continuation;
478484
using future_shared_state_base::set_exception;
@@ -666,7 +672,8 @@ struct continuation : public continuation_base {
666672
continuation(Functor&& f, std::shared_ptr<input_shared_state_t> s)
667673
: functor(std::move(f)),
668674
input(std::move(s)),
669-
output(std::make_shared<future_shared_state<result_t>>()) {}
675+
output(std::make_shared<future_shared_state<result_t>>(
676+
input.lock()->extract_cancellation_callback())) {}
670677

671678
continuation(Functor&& f, std::shared_ptr<input_shared_state_t> s,
672679
std::shared_ptr<output_shared_state_t> o)
@@ -721,7 +728,8 @@ struct unwrapping_continuation : public continuation_base {
721728
: functor(std::move(f)),
722729
input(std::move(s)),
723730
intermediate(),
724-
output(std::make_shared<output_shared_state_t>()) {}
731+
output(std::make_shared<output_shared_state_t>(
732+
input.lock()->extract_cancellation_callback())) {}
725733

726734
void execute() override {
727735
auto tmp = input.lock();

google/cloud/internal/future_then_impl.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,9 @@ typename internal::then_helper<F, T>::future_t future<T>::then_impl(
5454
typename internal::then_helper<F, T>::functor_result_t;
5555
using future_t = typename internal::then_helper<F, T>::future_t;
5656

57-
// The `shared_state_type` (aka `future_shared_state<T>`) is be written
57+
// The `shared_state_type` (aka `future_shared_state<T>`) is written
5858
// without any reference to the `future<T>` class, otherwise there would
59-
// be cycling dependencies between the two classes. We must adapt the
59+
// be cyclic dependencies between the two classes. We must adapt the
6060
// provided functor, which takes a `future<T>` parameter to take a
6161
// `shared_ptr<shared_state_type` parameter so it can be consumed by the
6262
// underlying class. Because we need to support C++11, we use a local class

0 commit comments

Comments
 (0)