Skip to content

Commit 05dac5f

Browse files
authored
Merge branch 'main' into bigtable_query_snippet_2
2 parents 89e8a75 + 6d79425 commit 05dac5f

15 files changed

+331
-13
lines changed

ci/cloudbuild/dockerfiles/checkers.Dockerfile

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ ARG ARCH=amd64
2323

2424
RUN dnf makecache && \
2525
dnf install -y \
26-
cargo \
2726
cmake \
2827
clang-tools-extra \
2928
diffutils \
@@ -35,7 +34,11 @@ RUN dnf makecache && \
3534
python-pip \
3635
ShellCheck
3736

38-
RUN cargo install typos-cli --version 1.24.1 --root /usr/local
37+
RUN dnf makecache && \
38+
dnf install -y \
39+
cargo
40+
41+
RUN cargo install typos-cli --locked --version 1.24.1 --root /usr/local
3942

4043
RUN curl -L -o /usr/bin/buildifier https://github.com/bazelbuild/buildtools/releases/download/v6.4.0/buildifier-linux-amd64 && \
4144
chmod 755 /usr/bin/buildifier

google/cloud/bigtable/internal/data_connection_impl.cc

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -119,10 +119,12 @@ class DefaultPartialResultSetReader
119119
std::shared_ptr<OperationContext> operation_context,
120120
std::unique_ptr<internal::StreamingReadRpc<
121121
google::bigtable::v2::ExecuteQueryResponse>>
122-
reader)
122+
reader,
123+
google::bigtable::v2::ResultSetMetadata initial_metadata)
123124
: context_(std::move(context)),
124125
operation_context_(std::move(operation_context)),
125-
reader_(std::move(reader)) {}
126+
reader_(std::move(reader)),
127+
initial_metadata_(std::move(initial_metadata)) {}
126128

127129
~DefaultPartialResultSetReader() override = default;
128130

@@ -148,10 +150,22 @@ class DefaultPartialResultSetReader
148150
return true;
149151
}
150152

151-
// Ignore metadata from the stream because PartialResultSetSource already
152-
// has it set (in ExecuteQuery).
153-
// TODO(#15701): Investigate expected behavior for processing metadata.
153+
// Throw an error when there is a schema difference between
154+
// ExecuteQueryResponse and PrepareQueryResponse.
154155
if (response.has_metadata()) {
156+
std::string initial_metadata_str;
157+
std::string response_metadata_str;
158+
bool metadata_matched =
159+
initial_metadata_.SerializeToString(&initial_metadata_str) &&
160+
response.metadata().SerializeToString(&response_metadata_str) &&
161+
initial_metadata_str == response_metadata_str;
162+
if (response.metadata().ByteSizeLong() > 0 && !metadata_matched) {
163+
final_status_ = google::cloud::Status(
164+
google::cloud::StatusCode::kAborted,
165+
"Schema changed during ExecuteQuery operation");
166+
operation_context_->PostCall(*context_, final_status_);
167+
return false;
168+
}
155169
continue;
156170
}
157171

@@ -173,6 +187,7 @@ class DefaultPartialResultSetReader
173187
std::unique_ptr<
174188
internal::StreamingReadRpc<google::bigtable::v2::ExecuteQueryResponse>>
175189
reader_;
190+
google::bigtable::v2::ResultSetMetadata initial_metadata_;
176191
Status final_status_;
177192
};
178193

@@ -896,8 +911,8 @@ bigtable::RowStream DataConnectionImpl::ExecuteQuery(
896911
std::shared_ptr<OperationContext> const& operation_context) mutable
897912
-> StatusOr<std::unique_ptr<bigtable::ResultSourceInterface>> {
898913
auto factory =
899-
[stub, request, tracing_enabled, tracing_options,
900-
operation_context](std::string const& resume_token) mutable {
914+
[stub, request, tracing_enabled, tracing_options, operation_context,
915+
initial_metadata = metadata](std::string const& resume_token) mutable {
901916
if (!resume_token.empty()) request.set_resume_token(resume_token);
902917
auto context = std::make_shared<grpc::ClientContext>();
903918
auto const& options = internal::CurrentOptions();
@@ -906,7 +921,8 @@ bigtable::RowStream DataConnectionImpl::ExecuteQuery(
906921
auto stream = stub->ExecuteQuery(context, options, request);
907922
std::unique_ptr<PartialResultSetReader> reader =
908923
std::make_unique<DefaultPartialResultSetReader>(
909-
std::move(context), operation_context, std::move(stream));
924+
std::move(context), operation_context, std::move(stream),
925+
std::move(initial_metadata));
910926
if (tracing_enabled) {
911927
reader = std::make_unique<LoggingResultSetReader>(std::move(reader),
912928
tracing_options);

google/cloud/bigtable/internal/data_connection_impl_test.cc

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ using ::testing::Contains;
7777
using ::testing::ElementsAre;
7878
using ::testing::ElementsAreArray;
7979
using ::testing::Eq;
80+
using ::testing::HasSubstr;
8081
using ::testing::Matcher;
8182
using ::testing::MockFunction;
8283
using ::testing::Pair;
@@ -3695,6 +3696,92 @@ TEST_F(DataConnectionTest,
36953696
EXPECT_THAT(row2.values().at(1).get<std::string>(), IsOkAndHolds("v2"));
36963697
}
36973698

3699+
TEST_F(DataConnectionTest, ExecuteQueryFailureWithSchemaChange) {
3700+
auto factory = std::make_unique<SimpleOperationContextFactory>();
3701+
3702+
auto mock = std::make_shared<MockBigtableStub>();
3703+
auto fake_cq_impl = std::make_shared<FakeCompletionQueueImpl>();
3704+
auto mock_bg = std::make_unique<MockBackgroundThreads>();
3705+
EXPECT_CALL(*mock_bg, cq).WillRepeatedly([&]() {
3706+
return CompletionQueue{fake_cq_impl};
3707+
});
3708+
auto constexpr kResultMetadataText = R"pb(
3709+
proto_schema {
3710+
columns {
3711+
name: "row_key"
3712+
type { string_type {} }
3713+
}
3714+
columns {
3715+
name: "value"
3716+
type { string_type {} }
3717+
}
3718+
}
3719+
)pb";
3720+
v2::PrepareQueryResponse pq_response;
3721+
pq_response.set_prepared_query("test-pq-id-54321");
3722+
ASSERT_TRUE(google::protobuf::TextFormat::ParseFromString(
3723+
kResultMetadataText, pq_response.mutable_metadata()));
3724+
*pq_response.mutable_valid_until() = internal::ToProtoTimestamp(
3725+
std::chrono::system_clock::now() + std::chrono::seconds(3600));
3726+
3727+
auto constexpr kExecuteQueryResultMetadataText = R"pb(
3728+
proto_schema {
3729+
columns {
3730+
name: "row_key"
3731+
type { string_type {} }
3732+
}
3733+
columns {
3734+
name: "different_value"
3735+
type { string_type {} }
3736+
}
3737+
}
3738+
)pb";
3739+
v2::ExecuteQueryResponse eq_response;
3740+
ASSERT_TRUE(google::protobuf::TextFormat::ParseFromString(
3741+
kExecuteQueryResultMetadataText, eq_response.mutable_metadata()));
3742+
3743+
auto refresh_fn = []() {
3744+
return make_ready_future(
3745+
StatusOr<google::bigtable::v2::PrepareQueryResponse>(
3746+
Status{StatusCode::kUnimplemented, "not implemented"}));
3747+
};
3748+
EXPECT_CALL(*mock, ExecuteQuery)
3749+
.Times(3)
3750+
.WillRepeatedly(
3751+
[&](auto, auto const&,
3752+
google::bigtable::v2::ExecuteQueryRequest const& request) {
3753+
EXPECT_EQ(request.app_profile_id(), kAppProfile);
3754+
EXPECT_EQ(request.instance_name(),
3755+
"projects/test-project/instances/test-instance");
3756+
auto stream = std::make_unique<MockExecuteQueryStream>();
3757+
EXPECT_CALL(*stream, Read)
3758+
.WillOnce([&](google::bigtable::v2::ExecuteQueryResponse* r) {
3759+
*r = eq_response;
3760+
return absl::nullopt;
3761+
});
3762+
return stream;
3763+
});
3764+
3765+
auto conn = TestConnection(std::move(mock), std::move(factory));
3766+
internal::OptionsSpan span(CallOptions());
3767+
Project p("test-project");
3768+
bigtable::SqlStatement statement("SELECT * FROM the-table");
3769+
bigtable::InstanceResource instance(p, "test-instance");
3770+
auto query_plan =
3771+
QueryPlan::Create(CompletionQueue(fake_cq_impl), std::move(pq_response),
3772+
std::move(refresh_fn));
3773+
auto prepared_query =
3774+
bigtable::PreparedQuery(instance, statement, std::move(query_plan));
3775+
auto bq = prepared_query.BindParameters({});
3776+
bigtable::ExecuteQueryParams params{std::move(bq)};
3777+
auto row_stream = conn->ExecuteQuery(std::move(params));
3778+
for (auto const& row : row_stream) {
3779+
EXPECT_THAT(row,
3780+
StatusIs(StatusCode::kAborted, HasSubstr("Schema changed")));
3781+
}
3782+
fake_cq_impl->SimulateCompletion(false);
3783+
}
3784+
36983785
} // namespace
36993786
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
37003787
} // namespace bigtable_internal

google/cloud/bigtable/internal/query_plan.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ std::shared_ptr<QueryPlan> QueryPlan::Create(
3535
return plan;
3636
}
3737

38+
QueryPlan::~QueryPlan() {
39+
if (refresh_timer_.valid()) refresh_timer_.cancel();
40+
}
41+
3842
void QueryPlan::Initialize() {
3943
std::unique_lock<std::mutex> lock(mu_);
4044
if (state_ == RefreshState::kDone) ScheduleRefresh(lock);

google/cloud/bigtable/internal/query_plan.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ class QueryPlan : public std::enable_shared_from_this<QueryPlan> {
4343
StatusOr<google::bigtable::v2::PrepareQueryResponse> response,
4444
RefreshFn fn, std::shared_ptr<Clock> clock = std::make_shared<Clock>());
4545

46+
~QueryPlan();
47+
4648
// Invalidates the current QueryPlan and triggers a refresh.
4749
void Invalidate(Status status, std::string const& invalid_query_plan_id);
4850

google/cloud/bigtable/internal/query_plan_test.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -229,10 +229,11 @@ TEST(QueryPlanTest, CreateFailedPlanAndRefresh) {
229229
}
230230

231231
// TODO(#15695): For reasons not yet understood, the fedora m32 CI build has
232-
// failures not seen in m64 builds when the number of threads is "too" high.
232+
// failures not seen in m64 builds when the number of threads is "too" high.
233+
// These failures occur while trying to create more than 500 threads.
233234
constexpr int LimitNumThreadsOn32Bit(int num_threads) {
234235
#if INTPTR_MAX == INT32_MAX
235-
return std::min(num_threads, 500);
236+
return std::min(num_threads, 200);
236237
#else
237238
return num_threads;
238239
#endif

google/cloud/opentelemetry/internal/monitoring_exporter.cc

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,41 @@ std::string FormatProjectFullName(std::string const& project) {
3131
return absl::StrCat("projects/", project);
3232
}
3333

34+
otel_internal::ResourceFilterDataFn MakeResourceFilterFn(
35+
Options const& options) {
36+
if (!options.has<otel_internal::ResourceFilterDataFnOption>()) {
37+
return nullptr;
38+
}
39+
40+
// Get the metric labels set to be excluded.
41+
auto const& excluded =
42+
options.get<otel_internal::ResourceFilterDataFnOption>();
43+
if (excluded.empty()) return nullptr;
44+
45+
// Capture by value to avoid dangling reference in the lambda.
46+
return [excluded = std::move(excluded)](std::string const& key) -> bool {
47+
return excluded.count(key) > 0;
48+
};
49+
}
50+
51+
otel_internal::MonitoredResourceFromDataFn MakeDynamicResourceFn(
52+
Options const& options, absl::optional<Project> const& project,
53+
absl::optional<google::api::MonitoredResource> const& mr_proto) {
54+
if (!options.has<otel_internal::ResourceFilterDataFnOption>()) {
55+
return nullptr;
56+
}
57+
58+
// `resource_filter_fn_` and `dynamic_resource_fn_` are meant to be used as a
59+
// pair. Here we have a filter but no dynamic function, create a default one
60+
// that returns the same project and monitored resource for all data points.
61+
auto project_id = project->project_id();
62+
auto monitored_resource = mr_proto.value_or(google::api::MonitoredResource{});
63+
return [project_id, monitored_resource](
64+
opentelemetry::sdk::metrics::PointDataAttributes const&) {
65+
return std::make_pair(project_id, monitored_resource);
66+
};
67+
}
68+
3469
} // namespace
3570

3671
MonitoringExporter::MonitoringExporter(
@@ -51,6 +86,8 @@ MonitoringExporter::MonitoringExporter(
5186
Options const& options)
5287
: MonitoringExporter(std::move(conn), nullptr, nullptr, options) {
5388
project_ = std::move(project);
89+
resource_filter_fn_ = MakeResourceFilterFn(options);
90+
dynamic_resource_fn_ = MakeDynamicResourceFn(options, project_, mr_proto_);
5491
}
5592

5693
opentelemetry::sdk::common::ExportResult MonitoringExporter::Export(

google/cloud/opentelemetry/internal/monitoring_exporter.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,12 @@ using MonitoredResourceFromDataFn =
4545
// of the google::api::Metric proto.
4646
using ResourceFilterDataFn = std::function<bool(std::string const&)>;
4747

48+
// Filter resource labels. A set of OpenTelemetry resource attribute keys to
49+
// exclude from metric labels when exporting metrics.
50+
struct ResourceFilterDataFnOption {
51+
using Type = std::set<std::string>;
52+
};
53+
4854
class MonitoringExporter final
4955
: public opentelemetry::sdk::metrics::PushMetricExporter {
5056
public:

google/cloud/opentelemetry/internal/monitoring_exporter_test.cc

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,40 @@ TEST(MonitoringExporter, ExportSuccess) {
159159
EXPECT_EQ(result, opentelemetry::sdk::common::ExportResult::kSuccess);
160160
}
161161

162+
TEST(MonitoringExporterTest, MakeFilterNoOption) {
163+
auto mock =
164+
std::make_shared<monitoring_v3_mocks::MockMetricServiceConnection>();
165+
Options options;
166+
167+
auto exporter = std::make_unique<MonitoringExporter>(Project("test-project"),
168+
mock, options);
169+
EXPECT_NE(exporter, nullptr);
170+
}
171+
172+
TEST(MonitoringExporterTest, MakeFilterEmptySet) {
173+
auto mock =
174+
std::make_shared<monitoring_v3_mocks::MockMetricServiceConnection>();
175+
Options options;
176+
options.set<otel_internal::ResourceFilterDataFnOption>(
177+
std::set<std::string>{});
178+
179+
auto exporter = std::make_unique<MonitoringExporter>(Project("test-project"),
180+
mock, options);
181+
EXPECT_NE(exporter, nullptr);
182+
}
183+
184+
TEST(MonitoringExporterTest, MakeFilterWithExcludedKeys) {
185+
auto mock =
186+
std::make_shared<monitoring_v3_mocks::MockMetricServiceConnection>();
187+
Options options;
188+
std::set<std::string> excluded{"service_name", "service_version"};
189+
options.set<otel_internal::ResourceFilterDataFnOption>(excluded);
190+
191+
auto exporter = std::make_unique<MonitoringExporter>(Project("test-project"),
192+
mock, options);
193+
EXPECT_NE(exporter, nullptr);
194+
}
195+
162196
} // namespace
163197
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
164198
} // namespace otel_internal

google/cloud/storage/grpc_plugin.h

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,27 @@ struct GrpcMetricsExportTimeoutOption {
131131
using Type = std::chrono::seconds;
132132
};
133133

134+
/**
135+
* gRPC telemetry excluded labels.
136+
*
137+
* A set of OpenTelemetry resource attribute keys to exclude from metric labels
138+
* when exporting gRPC telemetry. For example, to exclude the `service.name`
139+
* label, configure the option with `{"service_name"}`.
140+
*
141+
* @par Example: Exclude specific labels from telemetry
142+
* @code
143+
* namespace gcs_ex = google::cloud::storage_experimental;
144+
* auto client = google::cloud::storage::MakeGrpcClient(
145+
* google::cloud::Options{}
146+
* .set<gcs_ex::EnableGrpcMetricsOption>(true)
147+
* .set<gcs_ex::GrpcMetricsExcludedLabelsOption>(
148+
* std::set<std::string>{"service_name", "service_version"}));
149+
* @endcode
150+
*/
151+
struct GrpcMetricsExcludedLabelsOption {
152+
using Type = std::set<std::string>;
153+
};
154+
134155
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
135156
} // namespace storage_experimental
136157
} // namespace cloud

0 commit comments

Comments
 (0)