Skip to content

Commit caff0e1

Browse files
authored
fix(pubsub): impersonate sa credentials CompletionQueue lifetime (#15833)
1 parent 6af1c8f commit caff0e1

File tree

8 files changed

+71
-25
lines changed

8 files changed

+71
-25
lines changed

ci/cloudbuild/builds/lib/integration.sh

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,9 @@ function integration::bazel_args() {
8484
"--test_env=GOOGLE_CLOUD_CPP_BIGTABLE_TEST_SERVICE_ACCOUNT=${GOOGLE_CLOUD_CPP_BIGTABLE_TEST_SERVICE_ACCOUNT}"
8585
"--test_env=ENABLE_BIGTABLE_ADMIN_INTEGRATION_TESTS=${ENABLE_BIGTABLE_ADMIN_INTEGRATION_TESTS:-no}"
8686

87+
# Pubsub
88+
"--test_env=GOOGLE_CLOUD_CPP_PUBSUB_TEST_IMPERSONATED_SERVICE_ACCOUNT=${GOOGLE_CLOUD_CPP_PUBSUB_TEST_IMPERSONATED_SERVICE_ACCOUNT}"
89+
8790
# Rest
8891
"--test_env=GOOGLE_CLOUD_CPP_REST_TEST_SIGNING_SERVICE_ACCOUNT=${GOOGLE_CLOUD_CPP_REST_TEST_SIGNING_SERVICE_ACCOUNT}"
8992

ci/etc/integration-tests-config.ps1

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ $env:GOOGLE_CLOUD_CPP_SPANNER_TEST_QUICKSTART_DATABASE="quickstart-db"
8585

8686
# Cloud Pub/Sub configuration parameters
8787
$env:GOOGLE_CLOUD_CPP_PUBSUB_TEST_QUICKSTART_TOPIC="quickstart"
88+
$env:GOOGLE_CLOUD_CPP_PUBSUB_TEST_IMPERSONATED_SERVICE_ACCOUNT="pubsub-impersonate-test-sa@${env:GOOGLE_CLOUD_PROJECT}.iam.gserviceaccount.com"
8889

8990
# Cloud Batch configuration parameters
9091
$env:GOOGLE_CLOUD_CPP_BATCH_TEST_TEMPLATE_NAME="cloud-batch-sample-template"

ci/etc/integration-tests-config.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ export GOOGLE_CLOUD_CPP_SPANNER_TEST_QUICKSTART_DATABASE="quickstart-db"
100100

101101
# Cloud Pub/Sub configuration parameters
102102
export GOOGLE_CLOUD_CPP_PUBSUB_TEST_QUICKSTART_TOPIC="quickstart"
103+
export GOOGLE_CLOUD_CPP_PUBSUB_TEST_IMPERSONATED_SERVICE_ACCOUNT="pubsub-impersonate-test-sa@${GOOGLE_CLOUD_PROJECT}.iam.gserviceaccount.com"
103104

104105
# Cloud Batch configuration parameters
105106
# Created using:

google/cloud/pubsub/integration_tests/subscriber_integration_test.cc

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,22 @@ TEST_F(SubscriberIntegrationTest, PublishPullAck) {
351351
ASSERT_NO_FATAL_FAILURE(TestRoundtrip(publisher, subscriber));
352352
}
353353

354+
TEST_F(SubscriberIntegrationTest, PublishPullAckWithImpersonatedCredentials) {
355+
std::string iam_service_account =
356+
google::cloud::internal::GetEnv(
357+
"GOOGLE_CLOUD_CPP_PUBSUB_TEST_IMPERSONATED_SERVICE_ACCOUNT")
358+
.value_or("");
359+
ASSERT_FALSE(iam_service_account.empty());
360+
auto google_default_credentials = MakeGoogleDefaultCredentials();
361+
auto options = Options{}.set<UnifiedCredentialsOption>(
362+
MakeImpersonateServiceAccountCredentials(google_default_credentials,
363+
iam_service_account));
364+
auto publisher = Publisher(MakePublisherConnection(topic_, options));
365+
auto subscriber =
366+
Subscriber(MakeSubscriberConnection(subscription_, options));
367+
ASSERT_NO_FATAL_FAILURE(TestRoundtrip(publisher, subscriber));
368+
}
369+
354370
TEST_F(SubscriberIntegrationTest, FireAndForget) {
355371
std::mutex mu;
356372
std::condition_variable cv;

google/cloud/pubsub/internal/subscriber_connection_impl.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,11 @@ namespace pubsub_internal {
2929
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
3030

3131
SubscriberConnectionImpl::SubscriberConnectionImpl(
32-
Options opts, std::shared_ptr<pubsub_internal::SubscriberStub> stub)
32+
Options opts, std::shared_ptr<pubsub_internal::SubscriberStub> stub,
33+
std::shared_ptr<BackgroundThreads> background)
3334
: opts_(std::move(opts)),
3435
stub_(std::move(stub)),
35-
background_(internal::MakeBackgroundThreadsFactory(opts_)()),
36+
background_(std::move(background)),
3637
generator_(internal::MakeDefaultPRNG()) {}
3738

3839
SubscriberConnectionImpl::~SubscriberConnectionImpl() = default;

google/cloud/pubsub/internal/subscriber_connection_impl.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
2929
class SubscriberConnectionImpl : public pubsub::SubscriberConnection {
3030
public:
3131
explicit SubscriberConnectionImpl(
32-
Options opts, std::shared_ptr<pubsub_internal::SubscriberStub> stub);
32+
Options opts, std::shared_ptr<pubsub_internal::SubscriberStub> stub,
33+
std::shared_ptr<BackgroundThreads> background);
3334

3435
~SubscriberConnectionImpl() override;
3536

google/cloud/pubsub/internal/subscriber_connection_impl_test.cc

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,14 @@ using ::testing::Contains;
4545
using ::testing::HasSubstr;
4646
using ::testing::Pair;
4747
using ::testing::Property;
48+
using ::testing::Return;
4849
using ::testing::StartsWith;
4950

51+
class MockBackgroundThreads : public BackgroundThreads {
52+
public:
53+
MOCK_METHOD(CompletionQueue, cq, (), (override, const));
54+
};
55+
5056
Options MakeTestOptions(Subscription subscription, Options opts = {}) {
5157
opts.set<pubsub::SubscriptionOption>(std::move(subscription));
5258
opts.set<UnifiedCredentialsOption>(MakeInsecureCredentials());
@@ -163,8 +169,10 @@ TEST(SubscriberConnectionTest, Subscribe) {
163169
.WillRepeatedly(FakeAsyncStreamingPull);
164170

165171
CompletionQueue cq;
172+
auto mock_background_threads = std::make_shared<MockBackgroundThreads>();
173+
ON_CALL(*mock_background_threads, cq()).WillByDefault(Return(cq));
166174
auto subscriber = std::make_shared<SubscriberConnectionImpl>(
167-
MakeTestOptions(subscription, cq), mock);
175+
MakeTestOptions(subscription, cq), mock, mock_background_threads);
168176
std::atomic_flag received_one{false};
169177
promise<void> waiter;
170178
auto handler = [&](Message const& m, AckHandler h) {
@@ -206,8 +214,10 @@ TEST(SubscriberConnectionTest, SubscribeOverrideSubscription) {
206214
.WillRepeatedly(MakeAsyncStreamingPullMock(s2.FullName()));
207215

208216
CompletionQueue cq;
209-
auto subscriber =
210-
std::make_shared<SubscriberConnectionImpl>(MakeTestOptions(s1, cq), mock);
217+
auto mock_background_threads = std::make_shared<MockBackgroundThreads>();
218+
ON_CALL(*mock_background_threads, cq()).WillByDefault(Return(cq));
219+
auto subscriber = std::make_shared<SubscriberConnectionImpl>(
220+
MakeTestOptions(s1, cq), mock, mock_background_threads);
211221
std::atomic_flag received_one{false};
212222
promise<void> waiter;
213223
auto handler = [&](Message const& m, AckHandler h) {
@@ -250,8 +260,10 @@ TEST(SubscriberConnectionTest, ExactlyOnce) {
250260
.WillRepeatedly(FakeAsyncStreamingPull);
251261

252262
CompletionQueue cq;
263+
auto mock_background_threads = std::make_shared<MockBackgroundThreads>();
264+
ON_CALL(*mock_background_threads, cq()).WillByDefault(Return(cq));
253265
auto subscriber = std::make_shared<SubscriberConnectionImpl>(
254-
MakeTestOptions(subscription, cq), mock);
266+
MakeTestOptions(subscription, cq), mock, mock_background_threads);
255267
std::atomic_flag received_one{false};
256268
promise<void> waiter;
257269
auto callback = [&](Message const& m, ExactlyOnceAckHandler h) {
@@ -294,8 +306,10 @@ TEST(SubscriberConnectionTest, ExactlyOnceOverrideSubscription) {
294306
.WillRepeatedly(MakeAsyncStreamingPullMock(s2.FullName()));
295307

296308
CompletionQueue cq;
297-
auto subscriber =
298-
std::make_shared<SubscriberConnectionImpl>(MakeTestOptions(s1, cq), mock);
309+
auto mock_background_threads = std::make_shared<MockBackgroundThreads>();
310+
ON_CALL(*mock_background_threads, cq()).WillByDefault(Return(cq));
311+
auto subscriber = std::make_shared<SubscriberConnectionImpl>(
312+
MakeTestOptions(s1, cq), mock, mock_background_threads);
299313
std::atomic_flag received_one{false};
300314
promise<void> waiter;
301315
auto handler = [&](Message const& m, AckHandler h) {
@@ -359,7 +373,8 @@ TEST(SubscriberConnectionTest, StreamingPullFailure) {
359373
auto subscriber = std::make_shared<SubscriberConnectionImpl>(
360374
pubsub_testing::MakeTestOptions(
361375
Options{}.set<pubsub::SubscriptionOption>(subscription)),
362-
mock);
376+
mock,
377+
std::make_shared<internal::AutomaticallyCreatedBackgroundThreads>());
363378
auto handler = [&](Message const&, AckHandler const&) {};
364379
google::cloud::internal::OptionsSpan span(subscriber->options());
365380
auto response = subscriber->Subscribe({handler});
@@ -401,8 +416,10 @@ TEST(SubscriberConnectionTest, Pull) {
401416
CompletionQueue cq;
402417
std::thread t([&cq] { cq.Run(); });
403418

419+
auto mock_background_threads = std::make_shared<MockBackgroundThreads>();
420+
ON_CALL(*mock_background_threads, cq()).WillByDefault(Return(cq));
404421
auto subscriber = std::make_shared<SubscriberConnectionImpl>(
405-
MakeTestOptions(subscription, cq), mock);
422+
MakeTestOptions(subscription, cq), mock, mock_background_threads);
406423
google::cloud::internal::OptionsSpan span(subscriber->options());
407424
auto response = subscriber->Pull();
408425
ASSERT_STATUS_OK(response);
@@ -440,7 +457,8 @@ TEST(SubscriberConnectionTest, PullReturnsNoMessage) {
440457

441458
CompletionQueue cq;
442459
std::thread t([&cq] { cq.Run(); });
443-
460+
auto mock_background_threads = std::make_shared<MockBackgroundThreads>();
461+
ON_CALL(*mock_background_threads, cq()).WillByDefault(Return(cq));
444462
auto subscriber = std::make_shared<SubscriberConnectionImpl>(
445463
MakeTestOptions(
446464
subscription,
@@ -450,7 +468,7 @@ TEST(SubscriberConnectionTest, PullReturnsNoMessage) {
450468
google::cloud::pubsub::LimitedErrorCountRetryPolicy(
451469
kNumRetries)
452470
.clone())),
453-
mock);
471+
mock, mock_background_threads);
454472
google::cloud::internal::OptionsSpan span(subscriber->options());
455473
auto response = subscriber->Pull();
456474
EXPECT_THAT(response, StatusIs(StatusCode::kUnavailable,
@@ -496,9 +514,10 @@ TEST(SubscriberConnectionTest, PullOverrideSubscription) {
496514

497515
CompletionQueue cq;
498516
std::thread t([&cq] { cq.Run(); });
499-
500-
auto subscriber =
501-
std::make_shared<SubscriberConnectionImpl>(MakeTestOptions(s1, cq), mock);
517+
auto mock_background_threads = std::make_shared<MockBackgroundThreads>();
518+
ON_CALL(*mock_background_threads, cq()).WillByDefault(Return(cq));
519+
auto subscriber = std::make_shared<SubscriberConnectionImpl>(
520+
MakeTestOptions(s1, cq), mock, mock_background_threads);
502521
google::cloud::internal::OptionsSpan span(
503522
subscriber->options().set<pubsub::SubscriptionOption>(s2));
504523
auto response = subscriber->Pull();
@@ -523,9 +542,10 @@ TEST(SubscriberConnectionTest, PullPermanentFailure) {
523542

524543
CompletionQueue cq;
525544
std::thread t([&cq] { cq.Run(); });
526-
545+
auto mock_background_threads = std::make_shared<MockBackgroundThreads>();
546+
ON_CALL(*mock_background_threads, cq()).WillByDefault(Return(cq));
527547
auto subscriber = std::make_shared<SubscriberConnectionImpl>(
528-
MakeTestOptions(subscription, cq), mock);
548+
MakeTestOptions(subscription, cq), mock, mock_background_threads);
529549
google::cloud::internal::OptionsSpan span(subscriber->options());
530550
auto response = subscriber->Pull();
531551
EXPECT_THAT(response,
@@ -550,9 +570,10 @@ TEST(SubscriberConnectionTest, PullTooManyTransientFailures) {
550570

551571
CompletionQueue cq;
552572
std::thread t([&cq] { cq.Run(); });
553-
573+
auto mock_background_threads = std::make_shared<MockBackgroundThreads>();
574+
ON_CALL(*mock_background_threads, cq()).WillByDefault(Return(cq));
554575
auto subscriber = std::make_shared<SubscriberConnectionImpl>(
555-
MakeTestOptions(subscription, cq), mock);
576+
MakeTestOptions(subscription, cq), mock, mock_background_threads);
556577
google::cloud::internal::OptionsSpan span(subscriber->options());
557578
auto response = subscriber->Pull();
558579
EXPECT_THAT(response,

google/cloud/pubsub/subscriber_connection.cc

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,12 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
4040
namespace {
4141

4242
std::shared_ptr<pubsub::SubscriberConnection> ConnectionFromDecoratedStub(
43-
std::shared_ptr<pubsub_internal::SubscriberStub> stub,
44-
Options const& opts) {
43+
std::shared_ptr<pubsub_internal::SubscriberStub> stub, Options const& opts,
44+
std::shared_ptr<BackgroundThreads> background) {
4545
auto tracing_enabled = google::cloud::internal::TracingEnabled(opts);
4646
std::shared_ptr<SubscriberConnection> connection =
4747
std::make_shared<pubsub_internal::SubscriberConnectionImpl>(
48-
opts, std::move(stub));
48+
opts, std::move(stub), std::move(background));
4949
if (tracing_enabled) {
5050
connection =
5151
pubsub_internal::MakeSubscriberTracingConnection(std::move(connection));
@@ -92,7 +92,8 @@ std::shared_ptr<SubscriberConnection> MakeSubscriberConnection(
9292
auto background = internal::MakeBackgroundThreadsFactory(opts)();
9393
auto stub =
9494
pubsub_internal::MakeRoundRobinSubscriberStub(background->cq(), opts);
95-
return ConnectionFromDecoratedStub(std::move(stub), std::move(opts));
95+
return ConnectionFromDecoratedStub(std::move(stub), std::move(opts),
96+
std::move(background));
9697
}
9798

9899
std::shared_ptr<SubscriberConnection> MakeSubscriberConnection(
@@ -126,7 +127,8 @@ std::shared_ptr<pubsub::SubscriberConnection> MakeTestSubscriberConnection(
126127
auto stub = pubsub_internal::MakeTestSubscriberStub(background->cq(), opts,
127128
std::move(stubs));
128129
opts.set<pubsub::SubscriptionOption>(std::move(subscription));
129-
return pubsub::ConnectionFromDecoratedStub(std::move(stub), std::move(opts));
130+
return pubsub::ConnectionFromDecoratedStub(std::move(stub), std::move(opts),
131+
std::move(background));
130132
}
131133

132134
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END

0 commit comments

Comments
 (0)