Skip to content

Commit eb506b4

Browse files
committed
feat(storage): add checksumming of bidiread messages (#78)
* feat(storage): add checksumming of bidiread messages * remove move
1 parent 3c1f49a commit eb506b4

File tree

4 files changed

+74
-4
lines changed

4 files changed

+74
-4
lines changed

google/cloud/storage/internal/async/object_descriptor_impl.cc

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,15 @@
1313
// limitations under the License.
1414

1515
#include "google/cloud/storage/internal/async/object_descriptor_impl.h"
16+
#include "google/cloud/storage/async/options.h"
1617
#include "google/cloud/storage/internal/async/handle_redirect_error.h"
1718
#include "google/cloud/storage/internal/async/object_descriptor_reader_tracing.h"
19+
#include "google/cloud/storage/internal/hash_function.h"
20+
#include "google/cloud/storage/internal/hash_function_impl.h"
1821
#include "google/cloud/grpc_error_delegate.h"
1922
#include "google/cloud/internal/opentelemetry.h"
2023
#include <google/rpc/status.pb.h>
24+
#include <memory>
2125
#include <utility>
2226

2327
namespace google {
@@ -53,7 +57,15 @@ absl::optional<google::storage::v2::Object> ObjectDescriptorImpl::metadata()
5357

5458
std::unique_ptr<storage_experimental::AsyncReaderConnection>
5559
ObjectDescriptorImpl::Read(ReadParams p) {
56-
auto range = std::make_shared<ReadRange>(p.start, p.limit);
60+
std::shared_ptr<storage::internal::HashFunction> hash_function =
61+
std::shared_ptr<storage::internal::HashFunction>(
62+
storage::internal::CreateNullHashFunction());
63+
if (options_.has<storage_experimental::EnableCrc32cValidationOption>()) {
64+
hash_function =
65+
std::make_shared<storage::internal::Crc32cMessageHashFunction>(
66+
storage::internal::CreateNullHashFunction());
67+
}
68+
auto range = std::make_shared<ReadRange>(p.start, p.limit, hash_function);
5769

5870
std::unique_lock<std::mutex> lk(mu_);
5971
auto const id = ++read_id_generator_;

google/cloud/storage/internal/async/read_range.cc

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,16 @@ void ReadRange::OnRead(google::storage::v2::ObjectRangeData data) {
6969
std::unique_lock<std::mutex> lk(mu_);
7070
if (status_) return;
7171
if (data.range_end()) status_ = Status{};
72-
// TODO(#28) - verify the checksum
72+
73+
auto check_summed_data = data.mutable_checksummed_data();
7374
auto content = StealMutableContent(*data.mutable_checksummed_data());
75+
auto status =
76+
hash_function_->Update(offset_, content, check_summed_data->crc32c());
77+
if (!status.ok()) {
78+
status_ = std::move(status);
79+
return Notify(std::move(lk), ReadPayloadImpl::Make(std::move(content)));
80+
}
81+
7482
offset_ += content.size();
7583
if (limit_ != 0) limit_ -= std::min<std::int64_t>(content.size(), limit_);
7684
auto p = ReadPayloadImpl::Make(std::move(content));

google/cloud/storage/internal/async/read_range.h

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@
1616
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_INTERNAL_ASYNC_READ_RANGE_H
1717

1818
#include "google/cloud/storage/async/reader_connection.h"
19+
#include "google/cloud/storage/internal/hash_function.h"
1920
#include "google/cloud/future.h"
2021
#include "google/cloud/status.h"
2122
#include "google/cloud/version.h"
2223
#include "absl/types/optional.h"
2324
#include <google/storage/v2/storage.pb.h>
2425
#include <cstdint>
26+
#include <memory>
2527
#include <mutex>
2628

2729
namespace google {
@@ -43,8 +45,12 @@ class ReadRange {
4345
using ReadResponse =
4446
storage_experimental::AsyncReaderConnection::ReadResponse;
4547

46-
ReadRange(std::int64_t offset, std::int64_t limit)
47-
: offset_(offset), limit_(limit) {}
48+
ReadRange(std::int64_t offset, std::int64_t limit,
49+
std::shared_ptr<storage::internal::HashFunction> hash_function =
50+
storage::internal::CreateNullHashFunction())
51+
: offset_(offset),
52+
limit_(limit),
53+
hash_function_(std::move(hash_function)) {}
4854

4955
bool IsDone() const;
5056

@@ -66,6 +72,7 @@ class ReadRange {
6672
absl::optional<storage_experimental::ReadPayload> payload_;
6773
absl::optional<Status> status_;
6874
absl::optional<promise<ReadResponse>> wait_;
75+
std::shared_ptr<storage::internal::HashFunction> hash_function_;
6976
};
7077

7178
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END

google/cloud/storage/internal/async/read_range_test.cc

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,18 @@
1313
// limitations under the License.
1414

1515
#include "google/cloud/storage/internal/async/read_range.h"
16+
#include "google/cloud/storage/internal/hash_function.h"
17+
#include "google/cloud/storage/internal/hash_values.h"
1618
#include "google/cloud/storage/testing/canonical_errors.h"
1719
#include "google/cloud/testing_util/is_proto_equal.h"
1820
#include "google/cloud/testing_util/status_matchers.h"
21+
#include "absl/strings/cord.h"
22+
#include "absl/strings/string_view.h"
1923
#include <google/protobuf/text_format.h>
2024
#include <gmock/gmock.h>
25+
#include <gtest/gtest.h>
26+
#include <memory>
27+
#include <string>
2128

2229
namespace google {
2330
namespace cloud {
@@ -31,11 +38,30 @@ using ::google::cloud::testing_util::IsOk;
3138
using ::google::cloud::testing_util::IsProtoEqual;
3239
using ::google::cloud::testing_util::StatusIs;
3340
using ::google::protobuf::TextFormat;
41+
using ::testing::_;
42+
using ::testing::AtLeast;
3443
using ::testing::ElementsAre;
3544
using ::testing::Optional;
3645
using ::testing::ResultOf;
3746
using ::testing::VariantWith;
3847

48+
class MockHashFunction : public storage::internal::HashFunction {
49+
public:
50+
MOCK_METHOD(void, Update, (absl::string_view buffer), (override));
51+
MOCK_METHOD(Status, Update, (std::int64_t offset, absl::string_view buffer),
52+
(override));
53+
MOCK_METHOD(Status, Update,
54+
(std::int64_t offset, absl::string_view buffer,
55+
std::uint32_t buffer_crc),
56+
(override));
57+
MOCK_METHOD(Status, Update,
58+
(std::int64_t offset, absl::Cord const& buffer,
59+
std::uint32_t buffer_crc),
60+
(override));
61+
MOCK_METHOD(std::string, Name, (), (const, override));
62+
MOCK_METHOD(storage::internal::HashValues, Finish, (), (override));
63+
};
64+
3965
TEST(ReadRange, BasicLifecycle) {
4066
ReadRange actual(10000, 40);
4167
EXPECT_FALSE(actual.IsDone());
@@ -156,6 +182,23 @@ TEST(ReadRange, Queue) {
156182
EXPECT_THAT(actual.Read().get(), VariantWith<ReadPayload>(matcher));
157183
}
158184

185+
TEST(ReadRange, HashFunctionCalled) {
186+
auto hash_function = std::make_shared<MockHashFunction>();
187+
absl::Cord contents("1234567890");
188+
EXPECT_CALL(*hash_function, Update(0, contents, _)).Times(AtLeast(1));
189+
190+
ReadRange actual(0, 0, hash_function);
191+
auto data = google::storage::v2::ObjectRangeData{};
192+
auto constexpr kData0 = R"pb(
193+
checksummed_data { content: "1234567890" }
194+
read_range { read_offset: 0 read_limit: 10 read_id: 7 }
195+
range_end: false
196+
)pb";
197+
198+
EXPECT_TRUE(TextFormat::ParseFromString(kData0, &data));
199+
actual.OnRead(std::move(data));
200+
}
201+
159202
} // namespace
160203
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
161204
} // namespace storage_internal

0 commit comments

Comments
 (0)