Skip to content

Commit fa4b763

Browse files
authored
refactor(pubsub): move SubscriberConnectionImpl to header (#10291)
This will make it easier (I think) to test new functions for blocking pull.
1 parent a4130df commit fa4b763

File tree

8 files changed

+326
-187
lines changed

8 files changed

+326
-187
lines changed

google/cloud/pubsub/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,8 @@ add_library(
102102
internal/streaming_subscription_batch_source.h
103103
internal/subscriber_auth_decorator.cc
104104
internal/subscriber_auth_decorator.h
105+
internal/subscriber_connection_impl.cc
106+
internal/subscriber_connection_impl.h
105107
internal/subscriber_logging_decorator.cc
106108
internal/subscriber_logging_decorator.h
107109
internal/subscriber_metadata_decorator.cc
@@ -275,6 +277,7 @@ function (google_cloud_cpp_pubsub_client_define_tests)
275277
internal/sequential_batch_sink_test.cc
276278
internal/session_shutdown_manager_test.cc
277279
internal/streaming_subscription_batch_source_test.cc
280+
internal/subscriber_connection_impl_test.cc
278281
internal/subscription_concurrency_control_test.cc
279282
internal/subscription_lease_management_test.cc
280283
internal/subscription_message_queue_test.cc

google/cloud/pubsub/google_cloud_cpp_pubsub.bzl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ google_cloud_cpp_pubsub_hdrs = [
5252
"internal/session_shutdown_manager.h",
5353
"internal/streaming_subscription_batch_source.h",
5454
"internal/subscriber_auth_decorator.h",
55+
"internal/subscriber_connection_impl.h",
5556
"internal/subscriber_logging_decorator.h",
5657
"internal/subscriber_metadata_decorator.h",
5758
"internal/subscriber_round_robin_decorator.h",
@@ -121,6 +122,7 @@ google_cloud_cpp_pubsub_srcs = [
121122
"internal/session_shutdown_manager.cc",
122123
"internal/streaming_subscription_batch_source.cc",
123124
"internal/subscriber_auth_decorator.cc",
125+
"internal/subscriber_connection_impl.cc",
124126
"internal/subscriber_logging_decorator.cc",
125127
"internal/subscriber_metadata_decorator.cc",
126128
"internal/subscriber_round_robin_decorator.cc",
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
// Copyright 2022 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "google/cloud/pubsub/internal/subscriber_connection_impl.h"
16+
#include "google/cloud/pubsub/internal/subscription_session.h"
17+
18+
namespace google {
19+
namespace cloud {
20+
namespace pubsub_internal {
21+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
22+
23+
SubscriberConnectionImpl::SubscriberConnectionImpl(
24+
pubsub::Subscription subscription, Options opts,
25+
std::shared_ptr<pubsub_internal::SubscriberStub> stub)
26+
: subscription_(std::move(subscription)),
27+
opts_(std::move(opts)),
28+
stub_(std::move(stub)),
29+
background_(internal::MakeBackgroundThreadsFactory(opts_)()),
30+
generator_(internal::MakeDefaultPRNG()) {}
31+
32+
SubscriberConnectionImpl::~SubscriberConnectionImpl() = default;
33+
34+
future<Status> SubscriberConnectionImpl::Subscribe(SubscribeParams p) {
35+
return CreateSubscriptionSession(
36+
subscription_, google::cloud::internal::CurrentOptions(), stub_,
37+
background_->cq(), MakeClientId(), std::move(p.callback));
38+
}
39+
40+
future<Status> SubscriberConnectionImpl::ExactlyOnceSubscribe(
41+
ExactlyOnceSubscribeParams p) {
42+
return CreateSubscriptionSession(
43+
subscription_, google::cloud::internal::CurrentOptions(), stub_,
44+
background_->cq(), MakeClientId(), std::move(p.callback));
45+
}
46+
47+
Options SubscriberConnectionImpl::options() { return opts_; }
48+
49+
std::string SubscriberConnectionImpl::MakeClientId() {
50+
std::lock_guard<std::mutex> lk(mu_);
51+
auto constexpr kLength = 32;
52+
auto constexpr kChars = "abcdefghijklmnopqrstuvwxyz0123456789";
53+
return internal::Sample(generator_, kLength, kChars);
54+
}
55+
56+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
57+
} // namespace pubsub_internal
58+
} // namespace cloud
59+
} // namespace google
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
// Copyright 2022 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_INTERNAL_SUBSCRIBER_CONNECTION_IMPL_H
16+
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_INTERNAL_SUBSCRIBER_CONNECTION_IMPL_H
17+
18+
#include "google/cloud/pubsub/subscriber_connection.h"
19+
#include "google/cloud/version.h"
20+
21+
namespace google {
22+
namespace cloud {
23+
namespace pubsub_internal {
24+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
25+
26+
class SubscriberConnectionImpl : public pubsub::SubscriberConnection {
27+
public:
28+
explicit SubscriberConnectionImpl(
29+
pubsub::Subscription subscription, Options opts,
30+
std::shared_ptr<pubsub_internal::SubscriberStub> stub);
31+
32+
~SubscriberConnectionImpl() override;
33+
34+
future<Status> Subscribe(SubscribeParams p) override;
35+
36+
future<Status> ExactlyOnceSubscribe(ExactlyOnceSubscribeParams p) override;
37+
38+
Options options() override;
39+
40+
private:
41+
std::string MakeClientId();
42+
43+
pubsub::Subscription const subscription_;
44+
Options const opts_;
45+
std::shared_ptr<pubsub_internal::SubscriberStub> stub_;
46+
std::shared_ptr<BackgroundThreads> background_;
47+
std::mutex mu_;
48+
internal::DefaultPRNG generator_;
49+
};
50+
51+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
52+
} // namespace pubsub_internal
53+
} // namespace cloud
54+
} // namespace google
55+
56+
#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_INTERNAL_SUBSCRIBER_CONNECTION_IMPL_H
Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
// Copyright 2022 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "google/cloud/pubsub/internal/subscriber_connection_impl.h"
16+
#include "google/cloud/pubsub/ack_handler.h"
17+
#include "google/cloud/pubsub/exactly_once_ack_handler.h"
18+
#include "google/cloud/pubsub/internal/defaults.h"
19+
#include "google/cloud/pubsub/testing/fake_streaming_pull.h"
20+
#include "google/cloud/pubsub/testing/mock_subscriber_stub.h"
21+
#include "google/cloud/pubsub/testing/test_retry_policies.h"
22+
#include "google/cloud/credentials.h"
23+
#include "google/cloud/testing_util/status_matchers.h"
24+
#include <gmock/gmock.h>
25+
#include <atomic>
26+
27+
namespace google {
28+
namespace cloud {
29+
namespace pubsub_internal {
30+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
31+
namespace {
32+
33+
using ::google::cloud::pubsub::AckHandler;
34+
using ::google::cloud::pubsub::ExactlyOnceAckHandler;
35+
using ::google::cloud::pubsub::Message;
36+
using ::google::cloud::pubsub::Subscription;
37+
using ::google::cloud::pubsub_testing::FakeAsyncStreamingPull;
38+
using ::google::cloud::testing_util::StatusIs;
39+
using ::testing::AtLeast;
40+
using ::testing::Contains;
41+
using ::testing::HasSubstr;
42+
using ::testing::StartsWith;
43+
44+
Options MakeTestOptions(Options opts = {}) {
45+
opts.set<UnifiedCredentialsOption>(MakeInsecureCredentials());
46+
opts = pubsub_internal::DefaultSubscriberOptions(
47+
pubsub_testing::MakeTestOptions(std::move(opts)));
48+
// The CI scripts set an environment variable that overrides this option. We
49+
// are not interested in this behavior for this test.
50+
opts.unset<google::cloud::UserProjectOption>();
51+
return opts;
52+
}
53+
54+
Options MakeTestOptions(CompletionQueue const& cq) {
55+
return MakeTestOptions(Options{}.set<GrpcCompletionQueueOption>(cq));
56+
}
57+
58+
TEST(SubscriberConnectionTest, Basic) {
59+
auto const subscription = Subscription("test-project", "test-subscription");
60+
auto mock = std::make_shared<pubsub_testing::MockSubscriberStub>();
61+
EXPECT_CALL(*mock, AsyncModifyAckDeadline)
62+
.WillRepeatedly([](google::cloud::CompletionQueue&,
63+
std::unique_ptr<grpc::ClientContext>,
64+
google::pubsub::v1::ModifyAckDeadlineRequest const&) {
65+
return make_ready_future(Status{});
66+
});
67+
EXPECT_CALL(*mock, AsyncAcknowledge)
68+
.WillOnce([](google::cloud::CompletionQueue&,
69+
std::unique_ptr<grpc::ClientContext>,
70+
google::pubsub::v1::AcknowledgeRequest const& request) {
71+
EXPECT_THAT(request.ack_ids(), Contains("test-ack-id-0"));
72+
return make_ready_future(Status{});
73+
});
74+
EXPECT_CALL(*mock, AsyncStreamingPull)
75+
.Times(AtLeast(1))
76+
.WillRepeatedly(FakeAsyncStreamingPull);
77+
78+
CompletionQueue cq;
79+
auto subscriber = std::make_shared<SubscriberConnectionImpl>(
80+
subscription, MakeTestOptions(cq), mock);
81+
std::atomic_flag received_one{false};
82+
promise<void> waiter;
83+
auto handler = [&](Message const& m, AckHandler h) {
84+
if (received_one.test_and_set()) return;
85+
EXPECT_THAT(m.message_id(), StartsWith("test-message-id-"));
86+
ASSERT_NO_FATAL_FAILURE(std::move(h).ack());
87+
waiter.set_value();
88+
};
89+
std::thread t([&cq] { cq.Run(); });
90+
google::cloud::internal::OptionsSpan span(subscriber->options());
91+
auto response = subscriber->Subscribe({handler});
92+
waiter.get_future().wait();
93+
response.cancel();
94+
ASSERT_STATUS_OK(response.get());
95+
// We need to explicitly cancel any pending timers (some of which may be quite
96+
// long) left by the subscription.
97+
cq.CancelAll();
98+
cq.Shutdown();
99+
t.join();
100+
}
101+
102+
TEST(SubscriberConnectionTest, ExactlyOnce) {
103+
auto const subscription = Subscription("test-project", "test-subscription");
104+
auto mock = std::make_shared<pubsub_testing::MockSubscriberStub>();
105+
EXPECT_CALL(*mock, AsyncModifyAckDeadline)
106+
.WillRepeatedly([](google::cloud::CompletionQueue&,
107+
std::unique_ptr<grpc::ClientContext>,
108+
google::pubsub::v1::ModifyAckDeadlineRequest const&) {
109+
return make_ready_future(Status{});
110+
});
111+
EXPECT_CALL(*mock, AsyncAcknowledge)
112+
.WillOnce([](google::cloud::CompletionQueue&,
113+
std::unique_ptr<grpc::ClientContext>,
114+
google::pubsub::v1::AcknowledgeRequest const& request) {
115+
EXPECT_THAT(request.ack_ids(), Contains("test-ack-id-0"));
116+
return make_ready_future(
117+
Status{StatusCode::kUnknown, "test-only-unknown"});
118+
});
119+
EXPECT_CALL(*mock, AsyncStreamingPull)
120+
.Times(AtLeast(1))
121+
.WillRepeatedly(FakeAsyncStreamingPull);
122+
123+
CompletionQueue cq;
124+
auto subscriber = std::make_shared<SubscriberConnectionImpl>(
125+
subscription, MakeTestOptions(cq), mock);
126+
std::atomic_flag received_one{false};
127+
promise<void> waiter;
128+
auto callback = [&](Message const& m, ExactlyOnceAckHandler h) {
129+
if (received_one.test_and_set()) return;
130+
EXPECT_THAT(m.message_id(), StartsWith("test-message-id-"));
131+
auto status = std::move(h).ack().get();
132+
EXPECT_THAT(status, StatusIs(StatusCode::kUnknown, "test-only-unknown"));
133+
waiter.set_value();
134+
};
135+
std::thread t([&cq] { cq.Run(); });
136+
google::cloud::internal::OptionsSpan span(subscriber->options());
137+
auto response = subscriber->ExactlyOnceSubscribe({callback});
138+
waiter.get_future().wait();
139+
response.cancel();
140+
ASSERT_STATUS_OK(response.get());
141+
// We need to explicitly cancel any pending timers (some of which may be quite
142+
// long) left by the subscription.
143+
cq.CancelAll();
144+
cq.Shutdown();
145+
t.join();
146+
}
147+
148+
TEST(SubscriberConnectionTest, PullFailure) {
149+
auto const subscription = Subscription("test-project", "test-subscription");
150+
auto const expected = Status(StatusCode::kPermissionDenied, "uh-oh");
151+
152+
auto mock = std::make_shared<pubsub_testing::MockSubscriberStub>();
153+
EXPECT_CALL(*mock, AsyncModifyAckDeadline)
154+
.WillRepeatedly([](google::cloud::CompletionQueue&,
155+
std::unique_ptr<grpc::ClientContext>,
156+
google::pubsub::v1::ModifyAckDeadlineRequest const&) {
157+
return make_ready_future(Status{});
158+
});
159+
EXPECT_CALL(*mock, AsyncAcknowledge)
160+
.WillRepeatedly([](google::cloud::CompletionQueue&,
161+
std::unique_ptr<grpc::ClientContext>,
162+
google::pubsub::v1::AcknowledgeRequest const&) {
163+
return make_ready_future(Status{});
164+
});
165+
EXPECT_CALL(*mock, AsyncStreamingPull)
166+
.Times(AtLeast(1))
167+
.WillRepeatedly([](google::cloud::CompletionQueue const& cq,
168+
std::unique_ptr<grpc::ClientContext>) {
169+
using TimerFuture =
170+
future<StatusOr<std::chrono::system_clock::time_point>>;
171+
using us = std::chrono::microseconds;
172+
173+
auto start_response = [q = cq]() mutable {
174+
return q.MakeRelativeTimer(us(10)).then(
175+
[](TimerFuture) { return false; });
176+
};
177+
auto finish_response = [q = cq]() mutable {
178+
return q.MakeRelativeTimer(us(10)).then([](TimerFuture) {
179+
return Status{StatusCode::kPermissionDenied, "uh-oh"};
180+
});
181+
};
182+
183+
auto stream = absl::make_unique<pubsub_testing::MockAsyncPullStream>();
184+
EXPECT_CALL(*stream, Start).WillOnce(start_response);
185+
EXPECT_CALL(*stream, Finish).WillOnce(finish_response);
186+
return stream;
187+
});
188+
189+
auto subscriber = std::make_shared<SubscriberConnectionImpl>(
190+
subscription, pubsub_testing::MakeTestOptions(), mock);
191+
auto handler = [&](Message const&, AckHandler const&) {};
192+
google::cloud::internal::OptionsSpan span(subscriber->options());
193+
auto response = subscriber->Subscribe({handler});
194+
EXPECT_THAT(response.get(),
195+
StatusIs(StatusCode::kPermissionDenied, HasSubstr("uh-oh")));
196+
}
197+
198+
} // namespace
199+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
200+
} // namespace pubsub_internal
201+
} // namespace cloud
202+
} // namespace google

google/cloud/pubsub/pubsub_client_unit_tests.bzl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ pubsub_client_unit_tests = [
3333
"internal/sequential_batch_sink_test.cc",
3434
"internal/session_shutdown_manager_test.cc",
3535
"internal/streaming_subscription_batch_source_test.cc",
36+
"internal/subscriber_connection_impl_test.cc",
3637
"internal/subscription_concurrency_control_test.cc",
3738
"internal/subscription_lease_management_test.cc",
3839
"internal/subscription_message_queue_test.cc",

0 commit comments

Comments
 (0)