Skip to content
This repository was archived by the owner on Dec 8, 2021. It is now read-only.

Commit c84fa28

Browse files
authored
Copy BackgroundThreads from google::cloud::spanner, planning to
later remove the class from `g-c-cpp-spanner`.
1 parent b3834be commit c84fa28

File tree

7 files changed

+231
-1
lines changed

7 files changed

+231
-1
lines changed

google/cloud/CMakeLists.txt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,7 @@ if (GOOGLE_CLOUD_CPP_ENABLE_GRPC_UTILS)
316316
add_library(
317317
google_cloud_cpp_grpc_utils
318318
async_operation.h
319+
background_threads.h
319320
completion_queue.cc
320321
completion_queue.h
321322
grpc_error_delegate.cc
@@ -325,6 +326,8 @@ if (GOOGLE_CLOUD_CPP_ENABLE_GRPC_UTILS)
325326
grpc_utils/grpc_error_delegate.h
326327
grpc_utils/version.h
327328
internal/async_read_stream_impl.h
329+
internal/background_threads_impl.cc
330+
internal/background_threads_impl.h
328331
internal/completion_queue_impl.cc
329332
internal/completion_queue_impl.h)
330333
target_link_libraries(
@@ -352,7 +355,8 @@ if (GOOGLE_CLOUD_CPP_ENABLE_GRPC_UTILS)
352355
# List the unit tests, then setup the targets and dependencies.
353356
set(google_cloud_cpp_grpc_utils_unit_tests
354357
# cmake-format: sort
355-
completion_queue_test.cc grpc_error_delegate_test.cc)
358+
completion_queue_test.cc grpc_error_delegate_test.cc
359+
internal/background_threads_impl_test.cc)
356360

357361
# Export the list of unit tests so the Bazel BUILD file can pick it up.
358362
export_list_to_bazel("google_cloud_cpp_grpc_utils_unit_tests.bzl"

google/cloud/background_threads.h

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// Copyright 2020 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BACKGROUND_THREADS_H
16+
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BACKGROUND_THREADS_H
17+
18+
#include "google/cloud/completion_queue.h"
19+
#include "google/cloud/version.h"
20+
21+
namespace google {
22+
namespace cloud {
23+
inline namespace GOOGLE_CLOUD_CPP_NS {
24+
/**
25+
* A object representing the background threads available to a Client.
26+
*/
27+
class BackgroundThreads {
28+
public:
29+
virtual ~BackgroundThreads() = default;
30+
31+
/// The completion queue used for the background operations.
32+
virtual CompletionQueue cq() const = 0;
33+
};
34+
35+
} // namespace GOOGLE_CLOUD_CPP_NS
36+
} // namespace cloud
37+
} // namespace google
38+
39+
#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BACKGROUND_THREADS_H

google/cloud/google_cloud_cpp_grpc_utils.bzl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,21 @@
1818

1919
google_cloud_cpp_grpc_utils_hdrs = [
2020
"async_operation.h",
21+
"background_threads.h",
2122
"completion_queue.h",
2223
"grpc_error_delegate.h",
2324
"grpc_utils/async_operation.h",
2425
"grpc_utils/completion_queue.h",
2526
"grpc_utils/grpc_error_delegate.h",
2627
"grpc_utils/version.h",
2728
"internal/async_read_stream_impl.h",
29+
"internal/background_threads_impl.h",
2830
"internal/completion_queue_impl.h",
2931
]
3032

3133
google_cloud_cpp_grpc_utils_srcs = [
3234
"completion_queue.cc",
3335
"grpc_error_delegate.cc",
36+
"internal/background_threads_impl.cc",
3437
"internal/completion_queue_impl.cc",
3538
]

google/cloud/google_cloud_cpp_grpc_utils_unit_tests.bzl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,5 @@
1919
google_cloud_cpp_grpc_utils_unit_tests = [
2020
"completion_queue_test.cc",
2121
"grpc_error_delegate_test.cc",
22+
"internal/background_threads_impl_test.cc",
2223
]
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
// Copyright 2020 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "google/cloud/internal/background_threads_impl.h"
16+
17+
namespace google {
18+
namespace cloud {
19+
inline namespace GOOGLE_CLOUD_CPP_NS {
20+
namespace internal {
21+
22+
AutomaticallyCreatedBackgroundThreads::AutomaticallyCreatedBackgroundThreads()
23+
: runner_([](CompletionQueue cq) { cq.Run(); }, cq_) {}
24+
25+
AutomaticallyCreatedBackgroundThreads::
26+
~AutomaticallyCreatedBackgroundThreads() {
27+
Shutdown();
28+
}
29+
30+
void AutomaticallyCreatedBackgroundThreads::Shutdown() {
31+
cq_.Shutdown();
32+
if (runner_.joinable()) runner_.join();
33+
}
34+
35+
} // namespace internal
36+
} // namespace GOOGLE_CLOUD_CPP_NS
37+
} // namespace cloud
38+
} // namespace google
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
// Copyright 2020 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_INTERNAL_BACKGROUND_THREADS_IMPL_H
16+
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_INTERNAL_BACKGROUND_THREADS_IMPL_H
17+
18+
#include "google/cloud/background_threads.h"
19+
#include "google/cloud/completion_queue.h"
20+
#include <thread>
21+
22+
namespace google {
23+
namespace cloud {
24+
inline namespace GOOGLE_CLOUD_CPP_NS {
25+
namespace internal {
26+
27+
/// Assume the user has provided the background threads and use them.
28+
class CustomerSuppliedBackgroundThreads : public BackgroundThreads {
29+
public:
30+
explicit CustomerSuppliedBackgroundThreads(CompletionQueue cq)
31+
: cq_(std::move(cq)) {}
32+
~CustomerSuppliedBackgroundThreads() override = default;
33+
34+
CompletionQueue cq() const override { return cq_; }
35+
36+
private:
37+
CompletionQueue cq_;
38+
};
39+
40+
/// Create a background thread to perform background operations.
41+
class AutomaticallyCreatedBackgroundThreads : public BackgroundThreads {
42+
public:
43+
AutomaticallyCreatedBackgroundThreads();
44+
~AutomaticallyCreatedBackgroundThreads() override;
45+
46+
CompletionQueue cq() const override { return cq_; }
47+
void Shutdown();
48+
49+
private:
50+
CompletionQueue cq_;
51+
std::thread runner_;
52+
};
53+
54+
} // namespace internal
55+
} // namespace GOOGLE_CLOUD_CPP_NS
56+
} // namespace cloud
57+
} // namespace google
58+
59+
#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_INTERNAL_BACKGROUND_THREADS_IMPL_H
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
// Copyright 2019 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "google/cloud/internal/background_threads_impl.h"
16+
#include <gmock/gmock.h>
17+
18+
namespace google {
19+
namespace cloud {
20+
inline namespace GOOGLE_CLOUD_CPP_NS {
21+
namespace internal {
22+
namespace {
23+
24+
/// @test Verify we can create and use a CustomerSuppliedBackgroundThreads
25+
/// without impacting the completion queue
26+
TEST(CustomerSuppliedBackgroundThreads, LifecycleNoShutdown) {
27+
CompletionQueue cq;
28+
promise<void> p;
29+
std::thread t([&cq, &p] {
30+
cq.Run();
31+
p.set_value();
32+
});
33+
34+
{ CustomerSuppliedBackgroundThreads actual(cq); }
35+
36+
using ms = std::chrono::milliseconds;
37+
38+
auto has_shutdown = p.get_future();
39+
EXPECT_NE(std::future_status::ready, has_shutdown.wait_for(ms(2)));
40+
41+
auto expired = cq.MakeRelativeTimer(ms(0));
42+
EXPECT_EQ(std::future_status::ready, expired.wait_for(ms(100)));
43+
44+
cq.Shutdown();
45+
EXPECT_EQ(std::future_status::ready, has_shutdown.wait_for(ms(100)));
46+
47+
t.join();
48+
}
49+
50+
/// @test Verify that users can supply their own queue and threads.
51+
TEST(CustomerSuppliedBackgroundThreads, SharesCompletionQueue) {
52+
CompletionQueue cq;
53+
54+
CustomerSuppliedBackgroundThreads actual(cq);
55+
56+
using ms = std::chrono::milliseconds;
57+
// Verify the completion queue is shared. Scheduling work in actual.cq() works
58+
// once a thread is blocked in cq.Run(). Start that thread after scheduling
59+
// the work to avoid flaky failures where the timer expires immediately.
60+
future<std::thread::id> id = actual.cq().MakeRelativeTimer(ms(1)).then(
61+
[](future<StatusOr<std::chrono::system_clock::time_point>>) {
62+
return std::this_thread::get_id();
63+
});
64+
std::thread t([&cq] { cq.Run(); });
65+
EXPECT_EQ(std::future_status::ready, id.wait_for(ms(100)));
66+
EXPECT_EQ(t.get_id(), id.get());
67+
68+
cq.Shutdown();
69+
t.join();
70+
}
71+
72+
/// @test Verify that automatically created completion queues are usable.
73+
TEST(AutomaticallyCreatedBackgroundThreads, IsActive) {
74+
AutomaticallyCreatedBackgroundThreads actual;
75+
76+
using ms = std::chrono::milliseconds;
77+
78+
auto expired = actual.cq().MakeRelativeTimer(ms(0));
79+
EXPECT_EQ(std::future_status::ready, expired.wait_for(ms(100)));
80+
}
81+
82+
} // namespace
83+
} // namespace internal
84+
} // namespace GOOGLE_CLOUD_CPP_NS
85+
} // namespace cloud
86+
} // namespace google

0 commit comments

Comments
 (0)