Skip to content

Commit bfc7515

Browse files
feat(storage): Create OTel tracing decorator for Client::DownloadToFile() (#15283)
1 parent 9bea3fc commit bfc7515

File tree

9 files changed

+103
-31
lines changed

9 files changed

+103
-31
lines changed

google/cloud/storage/client.cc

Lines changed: 2 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -225,39 +225,10 @@ StatusOr<ObjectMetadata> Client::UploadStreamResumable(
225225

226226
Status Client::DownloadFileImpl(internal::ReadObjectRangeRequest const& request,
227227
std::string const& file_name) {
228-
auto const* func = __func__;
229-
auto msg = [&request, &file_name, func](char const* what) {
230-
std::ostringstream os;
231-
os << func << "(" << request << ", " << file_name << "): " << what;
232-
return std::move(os).str();
233-
};
234-
235228
auto stream = ReadObjectImpl(request);
236229
if (stream.bad()) return stream.status();
237-
238-
// Open the destination file, and immediate raise an exception on failure.
239-
std::ofstream os(file_name, std::ios::binary);
240-
if (!os.is_open()) {
241-
return google::cloud::internal::InvalidArgumentError(
242-
msg("cannot open download destination file - ofstream::open()"),
243-
GCP_ERROR_INFO());
244-
}
245-
246-
auto const& current = google::cloud::internal::CurrentOptions();
247-
auto const size = current.get<DownloadBufferSizeOption>();
248-
std::unique_ptr<char[]> buffer(new char[size]);
249-
do {
250-
stream.read(buffer.get(), size);
251-
os.write(buffer.get(), stream.gcount());
252-
} while (os.good() && stream.good());
253-
os.close();
254-
if (!os.good()) {
255-
return google::cloud::internal::UnknownError(
256-
msg("cannot close download destination file - ofstream::close()"),
257-
GCP_ERROR_INFO());
258-
}
259-
if (stream.bad()) return stream.status();
260-
return Status();
230+
return connection_->DownloadStreamToFile(std::move(stream), file_name,
231+
request);
261232
}
262233

263234
std::string Client::SigningEmail(SigningAccount const& signing_account) const {

google/cloud/storage/internal/connection_impl.cc

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -855,6 +855,41 @@ integrity checks using the DisableMD5Hash() and DisableCrc32cChecksum() options.
855855
return std::unique_ptr<std::istream>(std::move(source));
856856
}
857857

858+
Status StorageConnectionImpl::DownloadStreamToFile(
859+
ObjectReadStream&& stream, std::string const& file_name,
860+
ReadObjectRangeRequest const& request) {
861+
auto const* func = __func__;
862+
auto msg = [&request, &file_name, func](char const* what) {
863+
std::ostringstream os;
864+
os << func << "(" << request << ", " << file_name << "): " << what;
865+
return std::move(os).str();
866+
};
867+
868+
// Open the destination file, and immediate raise an exception on failure.
869+
std::ofstream os(file_name, std::ios::binary);
870+
if (!os.is_open()) {
871+
return google::cloud::internal::InvalidArgumentError(
872+
msg("cannot open download destination file - ofstream::open()"),
873+
GCP_ERROR_INFO());
874+
}
875+
876+
auto const& current = google::cloud::internal::CurrentOptions();
877+
auto const size = current.get<DownloadBufferSizeOption>();
878+
std::unique_ptr<char[]> buffer(new char[size]);
879+
do {
880+
stream.read(buffer.get(), size);
881+
os.write(buffer.get(), stream.gcount());
882+
} while (os.good() && stream.good());
883+
os.close();
884+
if (!os.good()) {
885+
return google::cloud::internal::UnknownError(
886+
msg("cannot close download destination file - ofstream::close()"),
887+
GCP_ERROR_INFO());
888+
}
889+
if (stream.bad()) return stream.status();
890+
return Status();
891+
}
892+
858893
StatusOr<ObjectMetadata> StorageConnectionImpl::ExecuteParallelUploadFile(
859894
std::vector<std::thread> threads,
860895
std::vector<ParallelUploadFileShard> shards, bool ignore_cleanup_failures) {

google/cloud/storage/internal/connection_impl.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include "google/cloud/storage/idempotency_policy.h"
1919
#include "google/cloud/storage/internal/generic_stub.h"
2020
#include "google/cloud/storage/internal/storage_connection.h"
21+
#include "google/cloud/storage/object_read_stream.h"
2122
#include "google/cloud/storage/retry_policy.h"
2223
#include "google/cloud/storage/version.h"
2324
#include "google/cloud/internal/invocation_id_generator.h"
@@ -104,6 +105,9 @@ class StorageConnectionImpl
104105
InsertObjectMediaRequest& request) override;
105106
StatusOr<std::unique_ptr<std::istream>> UploadFileResumable(
106107
std::string const& file_name, ResumableUploadRequest& request) override;
108+
Status DownloadStreamToFile(ObjectReadStream&& stream,
109+
std::string const& file_name,
110+
ReadObjectRangeRequest const& request) override;
107111
StatusOr<ObjectMetadata> ExecuteParallelUploadFile(
108112
std::vector<std::thread> threads,
109113
std::vector<ParallelUploadFileShard> shards,

google/cloud/storage/internal/storage_connection.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include "google/cloud/storage/internal/sign_blob_requests.h"
3232
#include "google/cloud/storage/oauth2/credentials.h"
3333
#include "google/cloud/storage/object_metadata.h"
34+
#include "google/cloud/storage/object_read_stream.h"
3435
#include "google/cloud/storage/service_account.h"
3536
#include "google/cloud/storage/version.h"
3637
#include "google/cloud/options.h"
@@ -120,6 +121,10 @@ class StorageConnection {
120121
std::string const&, ResumableUploadRequest&) {
121122
return Status(StatusCode::kUnimplemented, "unimplemented");
122123
}
124+
virtual Status DownloadStreamToFile(ObjectReadStream&&, std::string const&,
125+
ReadObjectRangeRequest const&) {
126+
return Status(StatusCode::kUnimplemented, "unimplemented");
127+
}
123128
virtual StatusOr<ObjectMetadata> ExecuteParallelUploadFile(
124129
std::vector<std::thread>, std::vector<ParallelUploadFileShard>, bool);
125130
///@}

google/cloud/storage/internal/storage_connection_test.cc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
#include "google/cloud/storage/internal/storage_connection.h"
16+
#include "google/cloud/storage/object_read_stream.h"
1617
#include "google/cloud/storage/parallel_upload.h"
1718
#include "google/cloud/testing_util/status_matchers.h"
1819
#include <gmock/gmock.h>
@@ -161,6 +162,15 @@ TEST(StorageConnectionTest, UploadFileResumableUnimplemented) {
161162
EXPECT_THAT(response, StatusIs(StatusCode::kUnimplemented));
162163
}
163164

165+
TEST(StorageConnectionTest, DownloadStreamToFileUnimplemented) {
166+
TestStorageConnection connection;
167+
ObjectReadStream stream;
168+
ReadObjectRangeRequest request;
169+
auto response = connection.DownloadStreamToFile(std::move(stream),
170+
"test-file.txt", request);
171+
EXPECT_THAT(response, StatusIs(StatusCode::kUnimplemented));
172+
}
173+
164174
TEST(StorageConnectionTest, ExecuteParallelUploadFileUnimplemented) {
165175
TestStorageConnection connection;
166176
std::vector<std::thread> threads;

google/cloud/storage/internal/tracing_connection.cc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,16 @@ StatusOr<std::unique_ptr<std::istream>> TracingConnection::UploadFileResumable(
256256
impl_->UploadFileResumable(file_name, request));
257257
}
258258

259+
Status TracingConnection::DownloadStreamToFile(
260+
storage::ObjectReadStream&& stream, std::string const& file_name,
261+
storage::internal::ReadObjectRangeRequest const& request) {
262+
auto span = internal::MakeSpan(
263+
"storage::Client::DownloadToFile/DownloadStreamToFile");
264+
auto scope = opentelemetry::trace::Scope(span);
265+
return internal::EndSpan(*span, impl_->DownloadStreamToFile(
266+
std::move(stream), file_name, request));
267+
}
268+
259269
StatusOr<storage::ObjectMetadata> TracingConnection::ExecuteParallelUploadFile(
260270
std::vector<std::thread> threads,
261271
std::vector<storage::internal::ParallelUploadFileShard> shards,

google/cloud/storage/internal/tracing_connection.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,9 @@ class TracingConnection : public storage::internal::StorageConnection {
105105
StatusOr<std::unique_ptr<std::istream>> UploadFileResumable(
106106
std::string const& file_name,
107107
storage::internal::ResumableUploadRequest& request) override;
108+
Status DownloadStreamToFile(
109+
storage::ObjectReadStream&&, std::string const&,
110+
storage::internal::ReadObjectRangeRequest const&) override;
108111
StatusOr<storage::ObjectMetadata> ExecuteParallelUploadFile(
109112
std::vector<std::thread> threads,
110113
std::vector<storage::internal::ParallelUploadFileShard> shards,

google/cloud/storage/internal/tracing_connection_test.cc

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#ifdef GOOGLE_CLOUD_CPP_HAVE_OPENTELEMETRY
1616

1717
#include "google/cloud/storage/internal/tracing_connection.h"
18+
#include "google/cloud/storage/object_read_stream.h"
1819
#include "google/cloud/storage/testing/canonical_errors.h"
1920
#include "google/cloud/storage/testing/mock_client.h"
2021
#include "google/cloud/internal/opentelemetry.h"
@@ -746,6 +747,35 @@ TEST(TracingClientTest, UploadFileResumable) {
746747
"gl-cpp.status_code", code_str)))));
747748
}
748749

750+
TEST(TracingClientTest, DownloadStreamToFile) {
751+
auto span_catcher = InstallSpanCatcher();
752+
auto mock = std::make_shared<MockClient>();
753+
EXPECT_CALL(*mock, DownloadStreamToFile)
754+
.WillOnce([](auto&&, auto const&, auto const&) {
755+
EXPECT_TRUE(ThereIsAnActiveSpan());
756+
return PermanentError();
757+
});
758+
auto under_test = TracingConnection(mock);
759+
storage::ObjectReadStream stream;
760+
storage::internal::ReadObjectRangeRequest request("test-bucket",
761+
"test-object");
762+
auto actual = under_test.DownloadStreamToFile(std::move(stream),
763+
"test-file.txt", request);
764+
765+
auto const code = PermanentError().code();
766+
auto const code_str = StatusCodeToString(code);
767+
auto const msg = PermanentError().message();
768+
EXPECT_THAT(actual, StatusIs(code, msg));
769+
EXPECT_THAT(
770+
span_catcher->GetSpans(),
771+
ElementsAre(AllOf(
772+
SpanNamed("storage::Client::DownloadToFile/DownloadStreamToFile"),
773+
SpanHasInstrumentationScope(), SpanKindIsClient(),
774+
SpanWithStatus(opentelemetry::trace::StatusCode::kError, msg),
775+
SpanHasAttributes(
776+
OTelAttribute<std::string>("gl-cpp.status_code", code_str)))));
777+
}
778+
749779
TEST(TracingClientTest, ExecuteParallelUploadFile) {
750780
auto span_catcher = InstallSpanCatcher();
751781
auto mock = std::make_shared<MockClient>();

google/cloud/storage/testing/mock_client.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,10 @@ class MockClient : public google::cloud::storage::internal::StorageConnection {
109109
MOCK_METHOD(StatusOr<std::unique_ptr<std::istream>>, UploadFileResumable,
110110
(std::string const&, storage::internal::ResumableUploadRequest&),
111111
(override));
112+
MOCK_METHOD(Status, DownloadStreamToFile,
113+
(ObjectReadStream&&, std::string const&,
114+
storage::internal::ReadObjectRangeRequest const&),
115+
(override));
112116
MOCK_METHOD(StatusOr<ObjectMetadata>, ExecuteParallelUploadFile,
113117
(std::vector<std::thread>,
114118
std::vector<storage::internal::ParallelUploadFileShard>, bool),

0 commit comments

Comments
 (0)