Skip to content

Commit a528fcf

Browse files
authored
feat(pubsub): support x-goog-user-project (#8456)
Change the metadata decorators to inject `x-goog-user-project` from the current option span. For the admin APIs, use the per-call options to configure a span in the `*Client` class. For `pubsub::Publisher` and `pubsub::Subscriber` it is less clear which call "triggers" an RPC, and far less clear how to merge options from multiple RPCs. I have decided to use the per-connection options in this case. We can always add more fine-grained options later, but this takes care of the common case.
1 parent faecca1 commit a528fcf

10 files changed

+710
-193
lines changed

google/cloud/pubsub/internal/default_batch_sink.cc

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,17 @@ namespace pubsub_internal {
2222
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
2323

2424
DefaultBatchSink::DefaultBatchSink(std::shared_ptr<PublisherStub> stub,
25-
CompletionQueue cq, Options const& opts)
26-
: stub_(std::move(stub)),
27-
cq_(std::move(cq)),
28-
retry_policy_(opts.get<pubsub::RetryPolicyOption>()->clone()),
29-
backoff_policy_(opts.get<pubsub::BackoffPolicyOption>()->clone()) {}
25+
CompletionQueue cq, Options opts)
26+
: stub_(std::move(stub)), cq_(std::move(cq)), options_(std::move(opts)) {}
3027

3128
future<StatusOr<google::pubsub::v1::PublishResponse>>
3229
DefaultBatchSink::AsyncPublish(google::pubsub::v1::PublishRequest request) {
30+
internal::OptionsSpan span(options_);
31+
3332
auto& stub = stub_;
3433
return internal::AsyncRetryLoop(
35-
retry_policy_->clone(), backoff_policy_->clone(),
34+
options_.get<pubsub::RetryPolicyOption>()->clone(),
35+
options_.get<pubsub::BackoffPolicyOption>()->clone(),
3636
Idempotency::kIdempotent, cq_,
3737
[stub](CompletionQueue& cq, std::unique_ptr<grpc::ClientContext> context,
3838
google::pubsub::v1::PublishRequest const& request) {

google/cloud/pubsub/internal/default_batch_sink.h

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,9 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
3232
class DefaultBatchSink : public BatchSink {
3333
public:
3434
static std::shared_ptr<DefaultBatchSink> Create(
35-
std::shared_ptr<PublisherStub> stub, CompletionQueue cq,
36-
Options const& opts) {
35+
std::shared_ptr<PublisherStub> stub, CompletionQueue cq, Options opts) {
3736
return std::shared_ptr<DefaultBatchSink>(
38-
new DefaultBatchSink(std::move(stub), std::move(cq), opts));
37+
new DefaultBatchSink(std::move(stub), std::move(cq), std::move(opts)));
3938
}
4039

4140
~DefaultBatchSink() override = default;
@@ -47,12 +46,11 @@ class DefaultBatchSink : public BatchSink {
4746

4847
private:
4948
DefaultBatchSink(std::shared_ptr<PublisherStub> stub, CompletionQueue cq,
50-
Options const& opts);
49+
Options opts);
5150

5251
std::shared_ptr<PublisherStub> stub_;
5352
CompletionQueue cq_;
54-
std::unique_ptr<pubsub::RetryPolicy const> retry_policy_;
55-
std::unique_ptr<pubsub::BackoffPolicy const> backoff_policy_;
53+
Options options_;
5654
};
5755

5856
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END

google/cloud/pubsub/internal/publisher_metadata.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,11 @@ void PublisherMetadata::SetMetadata(grpc::ClientContext& context,
9595
std::string const& request_params) {
9696
context.AddMetadata("x-goog-request-params", request_params);
9797
context.AddMetadata("x-goog-api-client", x_goog_api_client_);
98+
auto const& options = internal::CurrentOptions();
99+
if (options.has<UserProjectOption>()) {
100+
context.AddMetadata("x-goog-user-project",
101+
options.get<UserProjectOption>());
102+
}
98103
}
99104

100105
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END

0 commit comments

Comments
 (0)