Skip to content

Commit 677354c

Browse files
feat(storage): Create OTel tracing decorator for storage:: ParallelUploadFile() (#15289)
1 parent e77b251 commit 677354c

File tree

10 files changed

+106
-9
lines changed

10 files changed

+106
-9
lines changed

google/cloud/storage/internal/connection_impl.cc

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
#include "google/cloud/storage/internal/connection_impl.h"
1616
#include "google/cloud/storage/internal/retry_object_read_source.h"
17+
#include "google/cloud/storage/parallel_upload.h"
1718
#include "google/cloud/internal/filesystem.h"
1819
#include "google/cloud/internal/opentelemetry.h"
1920
#include "google/cloud/internal/rest_retry_loop.h"
@@ -854,6 +855,20 @@ integrity checks using the DisableMD5Hash() and DisableCrc32cChecksum() options.
854855
return std::unique_ptr<std::istream>(std::move(source));
855856
}
856857

858+
StatusOr<ObjectMetadata> StorageConnectionImpl::ExecuteParallelUploadFile(
859+
std::vector<std::thread> threads,
860+
std::vector<ParallelUploadFileShard> shards, bool ignore_cleanup_failures) {
861+
for (auto& thread : threads) {
862+
thread.join();
863+
}
864+
auto res = shards[0].WaitForCompletion().get();
865+
auto cleanup_res = shards[0].EagerCleanup();
866+
if (!cleanup_res.ok() && !ignore_cleanup_failures) {
867+
return cleanup_res;
868+
}
869+
return res;
870+
}
871+
857872
StatusOr<ListBucketAclResponse> StorageConnectionImpl::ListBucketAcl(
858873
ListBucketAclRequest const& request) {
859874
auto const idempotency = current_idempotency_policy().IsIdempotent(request)

google/cloud/storage/internal/connection_impl.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,10 @@ class StorageConnectionImpl
104104
InsertObjectMediaRequest& request) override;
105105
StatusOr<std::unique_ptr<std::istream>> UploadFileResumable(
106106
std::string const& file_name, ResumableUploadRequest& request) override;
107+
StatusOr<ObjectMetadata> ExecuteParallelUploadFile(
108+
std::vector<std::thread> threads,
109+
std::vector<ParallelUploadFileShard> shards,
110+
bool ignore_cleanup_failures) override;
107111

108112
StatusOr<ListBucketAclResponse> ListBucketAcl(
109113
ListBucketAclRequest const& request) override;

google/cloud/storage/internal/storage_connection.cc

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
#include "google/cloud/storage/internal/storage_connection.h"
16+
#include "google/cloud/storage/parallel_upload.h"
1617
#include <utility>
1718
#include <vector>
1819

@@ -44,6 +45,16 @@ StatusOr<CreateOrResumeResponse> CreateOrResume(
4445
std::move(response->payload)};
4546
}
4647

48+
StatusOr<ObjectMetadata> StorageConnection::ExecuteParallelUploadFile(
49+
std::vector<std::thread>, // NOLINT(performance-unnecessary-value-param)
50+
std::vector<
51+
ParallelUploadFileShard>, // NOLINT(performance-unnecessary-value-param)
52+
bool) {
53+
return Status(
54+
StatusCode::kUnimplemented,
55+
"ExecuteParallelUploadFile() is not implemented by this Object");
56+
}
57+
4758
} // namespace internal
4859
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
4960
} // namespace storage

google/cloud/storage/internal/storage_connection.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,15 @@
3838
#include "google/cloud/status_or.h"
3939
#include <memory>
4040
#include <string>
41+
#include <thread>
4142
#include <vector>
4243

4344
namespace google {
4445
namespace cloud {
4546
namespace storage {
4647
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
4748
namespace internal {
49+
class ParallelUploadFileShard;
4850
class ObjectReadStreambuf;
4951

5052
/**
@@ -118,6 +120,8 @@ class StorageConnection {
118120
std::string const&, ResumableUploadRequest&) {
119121
return Status(StatusCode::kUnimplemented, "unimplemented");
120122
}
123+
virtual StatusOr<ObjectMetadata> ExecuteParallelUploadFile(
124+
std::vector<std::thread>, std::vector<ParallelUploadFileShard>, bool);
121125
///@}
122126

123127
///@{

google/cloud/storage/internal/storage_connection_test.cc

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
#include "google/cloud/storage/internal/storage_connection.h"
16+
#include "google/cloud/storage/parallel_upload.h"
1617
#include "google/cloud/testing_util/status_matchers.h"
1718
#include <gmock/gmock.h>
1819

@@ -160,6 +161,16 @@ TEST(StorageConnectionTest, UploadFileResumableUnimplemented) {
160161
EXPECT_THAT(response, StatusIs(StatusCode::kUnimplemented));
161162
}
162163

164+
TEST(StorageConnectionTest, ExecuteParallelUploadFileUnimplemented) {
165+
TestStorageConnection connection;
166+
std::vector<std::thread> threads;
167+
std::vector<ParallelUploadFileShard> shards;
168+
bool ignore_cleanup_failures = false;
169+
auto response = connection.ExecuteParallelUploadFile(
170+
std::move(threads), std::move(shards), ignore_cleanup_failures);
171+
EXPECT_THAT(response, StatusIs(StatusCode::kUnimplemented));
172+
}
173+
163174
} // namespace
164175
} // namespace internal
165176
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END

google/cloud/storage/internal/tracing_connection.cc

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
#include "google/cloud/storage/internal/tracing_connection.h"
1616
#include "google/cloud/storage/internal/tracing_object_read_source.h"
17+
#include "google/cloud/storage/parallel_upload.h"
1718
#include "google/cloud/internal/opentelemetry.h"
1819
#include <memory>
1920
#include <string>
@@ -255,6 +256,18 @@ StatusOr<std::unique_ptr<std::istream>> TracingConnection::UploadFileResumable(
255256
impl_->UploadFileResumable(file_name, request));
256257
}
257258

259+
StatusOr<storage::ObjectMetadata> TracingConnection::ExecuteParallelUploadFile(
260+
std::vector<std::thread> threads,
261+
std::vector<storage::internal::ParallelUploadFileShard> shards,
262+
bool ignore_cleanup_failures) {
263+
auto span = internal::MakeSpan(
264+
"storage::ParallelUploadFile/ExecuteParallelUploadFile");
265+
auto scope = opentelemetry::trace::Scope(span);
266+
return internal::EndSpan(*span, impl_->ExecuteParallelUploadFile(
267+
std::move(threads), std::move(shards),
268+
ignore_cleanup_failures));
269+
}
270+
258271
StatusOr<storage::internal::ListBucketAclResponse>
259272
TracingConnection::ListBucketAcl(
260273
storage::internal::ListBucketAclRequest const& request) {

google/cloud/storage/internal/tracing_connection.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_INTERNAL_TRACING_CONNECTION_H
1717

1818
#include "google/cloud/storage/internal/storage_connection.h"
19+
#include "google/cloud/storage/parallel_upload.h"
1920
#include "google/cloud/storage/version.h"
2021
#include <memory>
2122
#include <string>
@@ -104,6 +105,10 @@ class TracingConnection : public storage::internal::StorageConnection {
104105
StatusOr<std::unique_ptr<std::istream>> UploadFileResumable(
105106
std::string const& file_name,
106107
storage::internal::ResumableUploadRequest& request) override;
108+
StatusOr<storage::ObjectMetadata> ExecuteParallelUploadFile(
109+
std::vector<std::thread> threads,
110+
std::vector<storage::internal::ParallelUploadFileShard> shards,
111+
bool ignore_cleanup_failures) override;
107112

108113
StatusOr<storage::internal::ListBucketAclResponse> ListBucketAcl(
109114
storage::internal::ListBucketAclRequest const& request) override;

google/cloud/storage/internal/tracing_connection_test.cc

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -746,6 +746,41 @@ TEST(TracingClientTest, UploadFileResumable) {
746746
"gl-cpp.status_code", code_str)))));
747747
}
748748

749+
TEST(TracingClientTest, ExecuteParallelUploadFile) {
750+
auto span_catcher = InstallSpanCatcher();
751+
auto mock = std::make_shared<MockClient>();
752+
EXPECT_CALL(*mock, ExecuteParallelUploadFile)
753+
.WillOnce(
754+
[](std::vector<
755+
std::thread>, // NOLINT(performance-unnecessary-value-param)
756+
std::vector<
757+
storage::internal::
758+
ParallelUploadFileShard>, // NOLINT(performance-unnecessary-value-param)
759+
bool) {
760+
EXPECT_TRUE(ThereIsAnActiveSpan());
761+
return PermanentError();
762+
});
763+
auto under_test = TracingConnection(mock);
764+
std::vector<std::thread> threads;
765+
std::vector<storage::internal::ParallelUploadFileShard> shards;
766+
bool ignore_cleanup_failures = false;
767+
auto actual = under_test.ExecuteParallelUploadFile(
768+
std::move(threads), std::move(shards), ignore_cleanup_failures);
769+
770+
auto const code = PermanentError().code();
771+
auto const code_str = StatusCodeToString(code);
772+
auto const msg = PermanentError().message();
773+
EXPECT_THAT(actual, StatusIs(code));
774+
EXPECT_THAT(
775+
span_catcher->GetSpans(),
776+
ElementsAre(AllOf(
777+
SpanNamed("storage::ParallelUploadFile/ExecuteParallelUploadFile"),
778+
SpanHasInstrumentationScope(), SpanKindIsClient(),
779+
SpanWithStatus(opentelemetry::trace::StatusCode::kError, msg),
780+
SpanHasAttributes(
781+
OTelAttribute<std::string>("gl-cpp.status_code", code_str)))));
782+
}
783+
749784
TEST(TracingClientTest, ListBucketAcl) {
750785
auto span_catcher = InstallSpanCatcher();
751786
auto mock = std::make_shared<MockClient>();

google/cloud/storage/parallel_upload.h

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1190,6 +1190,7 @@ StatusOr<ObjectMetadata> ParallelUploadFile(
11901190
internal::IsOptionSupportedWithParallelUpload<Options...>::value,
11911191
"Provided Option not found in ParallelUploadFileSupportedOptions.");
11921192

1193+
auto connection = internal::ClientImplDetails::GetConnection(client);
11931194
auto shards = internal::CreateParallelUploadShards::Create(
11941195
std::move(client), std::move(file_name), std::move(bucket_name),
11951196
std::move(object_name), std::move(prefix),
@@ -1207,15 +1208,8 @@ StatusOr<ObjectMetadata> ParallelUploadFile(
12071208
shard.Upload();
12081209
});
12091210
}
1210-
for (auto& thread : threads) {
1211-
thread.join();
1212-
}
1213-
auto res = (*shards)[0].WaitForCompletion().get();
1214-
auto cleanup_res = (*shards)[0].EagerCleanup();
1215-
if (!cleanup_res.ok() && !ignore_cleanup_failures) {
1216-
return cleanup_res;
1217-
}
1218-
return res;
1211+
return connection->ExecuteParallelUploadFile(
1212+
std::move(threads), std::move(*shards), ignore_cleanup_failures);
12191213
}
12201214

12211215
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END

google/cloud/storage/testing/mock_client.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
#include "google/cloud/storage/client.h"
1919
#include "google/cloud/storage/internal/storage_connection.h"
20+
#include "google/cloud/storage/parallel_upload.h"
2021
#include <gmock/gmock.h>
2122
#include <memory>
2223
#include <string>
@@ -108,6 +109,10 @@ class MockClient : public google::cloud::storage::internal::StorageConnection {
108109
MOCK_METHOD(StatusOr<std::unique_ptr<std::istream>>, UploadFileResumable,
109110
(std::string const&, storage::internal::ResumableUploadRequest&),
110111
(override));
112+
MOCK_METHOD(StatusOr<ObjectMetadata>, ExecuteParallelUploadFile,
113+
(std::vector<std::thread>,
114+
std::vector<storage::internal::ParallelUploadFileShard>, bool),
115+
(override));
111116

112117
MOCK_METHOD(StatusOr<internal::ListBucketAclResponse>, ListBucketAcl,
113118
(internal::ListBucketAclRequest const&), (override));

0 commit comments

Comments
 (0)