Skip to content

Commit f68afb1

Browse files
authored
feat: implement ParallelUploadPersistentState (#3382)
This will be needed for resumable parallel uploads.
1 parent 5c6c554 commit f68afb1

File tree

3 files changed

+245
-0
lines changed

3 files changed

+245
-0
lines changed

google/cloud/storage/parallel_upload.cc

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,100 @@ StatusOr<ObjectWriteStream> ParallelUploadStateImpl::CreateStream(
7474
CreateHashValidator(request)));
7575
}
7676

77+
std::string ParallelUploadPersistentState::ToString() const {
78+
auto json_streams = internal::nl::json::array();
79+
for (auto const& stream : streams) {
80+
json_streams.emplace_back(internal::nl::json{
81+
{"name", stream.object_name},
82+
{"resumable_session_id", stream.resumable_session_id}});
83+
}
84+
return internal::nl::json{{"streams", json_streams},
85+
{"expected_generation", expected_generation},
86+
{"destination", destination_object_name}}
87+
.dump();
88+
}
89+
90+
StatusOr<ParallelUploadPersistentState>
91+
ParallelUploadPersistentState::FromString(std::string const& json_rep) {
92+
ParallelUploadPersistentState res;
93+
94+
auto json = internal::nl::json::parse(json_rep, nullptr, false);
95+
if (json.is_discarded()) {
96+
return Status(StatusCode::kInternal,
97+
"Parallel upload state is not a valid JSON.");
98+
}
99+
if (!json.is_object()) {
100+
return Status(StatusCode::kInternal,
101+
"Parallel upload state is not a JSON object.");
102+
}
103+
// nl::json doesn't allow for multiple keys with the same name, so there are
104+
// either 0 or 1 elements with the same key.
105+
if (json.count("destination") != 1) {
106+
return Status(StatusCode::kInternal,
107+
"Parallel upload state doesn't contain a 'destination'.");
108+
}
109+
auto& destination_json = json["destination"];
110+
if (!destination_json.is_string()) {
111+
return Status(StatusCode::kInternal,
112+
"Parallel upload state's 'destination' is not a string.");
113+
}
114+
res.destination_object_name = destination_json;
115+
if (json.count("expected_generation") != 1) {
116+
return Status(
117+
StatusCode::kInternal,
118+
"Parallel upload state doesn't contain a 'expected_generation'.");
119+
}
120+
auto& expected_generation_json = json["expected_generation"];
121+
if (!expected_generation_json.is_number()) {
122+
return Status(
123+
StatusCode::kInternal,
124+
"Parallel upload state's 'expected_generation' is not a number.");
125+
}
126+
res.expected_generation = expected_generation_json;
127+
if (json.count("streams") != 1) {
128+
return Status(StatusCode::kInternal,
129+
"Parallel upload state doesn't contain 'streams'.");
130+
}
131+
auto& streams_json = json["streams"];
132+
if (!streams_json.is_array()) {
133+
return Status(StatusCode::kInternal,
134+
"Parallel upload state's 'streams' is not an array.");
135+
}
136+
for (auto& stream_json : streams_json) {
137+
if (!stream_json.is_object()) {
138+
return Status(StatusCode::kInternal,
139+
"Parallel upload state's 'stream' is not an object.");
140+
}
141+
if (stream_json.count("name") != 1) {
142+
return Status(StatusCode::kInternal,
143+
"Parallel upload state's stream doesn't contain a 'name'.");
144+
}
145+
auto object_name_json = stream_json["name"];
146+
if (!object_name_json.is_string()) {
147+
return Status(StatusCode::kInternal,
148+
"Parallel upload state's stream 'name' is not a string.");
149+
}
150+
if (stream_json.count("resumable_session_id") != 1) {
151+
return Status(StatusCode::kInternal,
152+
"Parallel upload state's stream doesn't contain a "
153+
"'resumable_session_id'.");
154+
}
155+
auto resumable_session_id_json = stream_json["resumable_session_id"];
156+
if (!resumable_session_id_json.is_string()) {
157+
return Status(StatusCode::kInternal,
158+
"Parallel upload state's stream 'resumable_session_id' is "
159+
"not a string.");
160+
}
161+
res.streams.emplace_back(ParallelUploadPersistentState::Stream{
162+
object_name_json, resumable_session_id_json});
163+
}
164+
if (res.streams.empty()) {
165+
return Status(StatusCode::kInternal,
166+
"Parallel upload state's stream doesn't contain any streams");
167+
}
168+
return res;
169+
}
170+
77171
Status ParallelUploadStateImpl::EagerCleanup() {
78172
std::unique_lock<std::mutex> lk(mu_);
79173
if (!finished_) {

google/cloud/storage/parallel_upload.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,21 @@ class ParallelObjectWriteStreambuf;
7474
using Composer = std::function<StatusOr<ObjectMetadata>(
7575
std::vector<ComposeSourceObject> const&)>;
7676

77+
struct ParallelUploadPersistentState {
78+
struct Stream {
79+
std::string object_name;
80+
std::string resumable_session_id;
81+
};
82+
83+
std::string ToString() const;
84+
static StatusOr<ParallelUploadPersistentState> FromString(
85+
std::string const& json_rep);
86+
87+
std::string destination_object_name;
88+
std::int64_t expected_generation;
89+
std::vector<Stream> streams;
90+
};
91+
7792
// The `ObjectWriteStream`s have to hold references to the state of
7893
// the parallel upload so that they can update it when finished and trigger
7994
// shards composition, hence `ResumableParallelUploadState` has to be

google/cloud/storage/parallel_uploads_test.cc

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -924,6 +924,142 @@ TEST_F(ParallelUploadTest, CleanupFailureIsNotIgnored) {
924924
ASSERT_FALSE(object_metadata);
925925
}
926926

927+
TEST(ParallelUploadPersistentState, NotJson) {
928+
auto res = ParallelUploadPersistentState::FromString("blah");
929+
EXPECT_FALSE(res);
930+
EXPECT_EQ(StatusCode::kInternal, res.status().code());
931+
EXPECT_THAT(res.status().message(), HasSubstr("not a valid JSON"));
932+
}
933+
934+
TEST(ParallelUploadPersistentState, RootNotOject) {
935+
auto res = ParallelUploadPersistentState::FromString("\"blah\"");
936+
EXPECT_FALSE(res);
937+
EXPECT_EQ(StatusCode::kInternal, res.status().code());
938+
EXPECT_THAT(res.status().message(), HasSubstr("not a JSON object"));
939+
}
940+
941+
TEST(ParallelUploadPersistentState, NoDestination) {
942+
auto res = ParallelUploadPersistentState::FromString(
943+
internal::nl::json{{"a", "b"}}.dump());
944+
EXPECT_FALSE(res);
945+
EXPECT_EQ(StatusCode::kInternal, res.status().code());
946+
EXPECT_THAT(res.status().message(),
947+
HasSubstr("doesn't contain a 'destination'"));
948+
}
949+
950+
TEST(ParallelUploadPersistentState, DestinationNotAString) {
951+
auto res = ParallelUploadPersistentState::FromString(
952+
internal::nl::json{{"destination", 2}}.dump());
953+
EXPECT_FALSE(res);
954+
EXPECT_EQ(StatusCode::kInternal, res.status().code());
955+
EXPECT_THAT(res.status().message(),
956+
HasSubstr("'destination' is not a string"));
957+
}
958+
959+
TEST(ParallelUploadPersistentState, NoGeneration) {
960+
auto res = ParallelUploadPersistentState::FromString(
961+
internal::nl::json{{"destination", "b"}}.dump());
962+
EXPECT_FALSE(res);
963+
EXPECT_EQ(StatusCode::kInternal, res.status().code());
964+
EXPECT_THAT(res.status().message(),
965+
HasSubstr("doesn't contain a 'expected_generation'"));
966+
}
967+
968+
TEST(ParallelUploadPersistentState, GenerationNotAString) {
969+
auto res = ParallelUploadPersistentState::FromString(internal::nl::json{
970+
{"destination", "dest"}, {"expected_generation", "blah"}}
971+
.dump());
972+
EXPECT_FALSE(res);
973+
EXPECT_EQ(StatusCode::kInternal, res.status().code());
974+
EXPECT_THAT(res.status().message(),
975+
HasSubstr("'expected_generation' is not a number"));
976+
}
977+
978+
TEST(ParallelUploadPersistentState, NoStreams) {
979+
auto res = ParallelUploadPersistentState::FromString(
980+
internal::nl::json{{"destination", "dest"}, {"expected_generation", 1}}
981+
.dump());
982+
EXPECT_FALSE(res);
983+
EXPECT_EQ(StatusCode::kInternal, res.status().code());
984+
EXPECT_THAT(res.status().message(), HasSubstr("doesn't contain 'streams'"));
985+
}
986+
987+
TEST(ParallelUploadPersistentState, StreamsNotArray) {
988+
auto res = ParallelUploadPersistentState::FromString(internal::nl::json{
989+
{"destination", "dest"}, {"expected_generation", 1}, {"streams", 5}}
990+
.dump());
991+
EXPECT_FALSE(res);
992+
EXPECT_EQ(StatusCode::kInternal, res.status().code());
993+
EXPECT_THAT(res.status().message(), HasSubstr("is not an array"));
994+
}
995+
996+
TEST(ParallelUploadPersistentState, StreamNotObject) {
997+
auto res = ParallelUploadPersistentState::FromString(internal::nl::json{
998+
{"destination", "dest"}, {"expected_generation", 1}, {"streams", {5}}}
999+
.dump());
1000+
EXPECT_FALSE(res);
1001+
EXPECT_EQ(StatusCode::kInternal, res.status().code());
1002+
EXPECT_THAT(res.status().message(), HasSubstr("'stream' is not an object"));
1003+
}
1004+
1005+
TEST(ParallelUploadPersistentState, StreamHasNoName) {
1006+
auto res = ParallelUploadPersistentState::FromString(
1007+
internal::nl::json{{"destination", "dest"},
1008+
{"expected_generation", 1},
1009+
{"streams", {internal::nl::json::object()}}}
1010+
.dump());
1011+
EXPECT_FALSE(res);
1012+
EXPECT_EQ(StatusCode::kInternal, res.status().code());
1013+
EXPECT_THAT(res.status().message(),
1014+
HasSubstr("stream doesn't contain a 'name'"));
1015+
}
1016+
1017+
TEST(ParallelUploadPersistentState, StreamNameNotString) {
1018+
auto res = ParallelUploadPersistentState::FromString(
1019+
internal::nl::json{{"destination", "dest"},
1020+
{"expected_generation", 1},
1021+
{"streams", {{{"name", 1}}}}}
1022+
.dump());
1023+
EXPECT_FALSE(res);
1024+
EXPECT_EQ(StatusCode::kInternal, res.status().code());
1025+
EXPECT_THAT(res.status().message(),
1026+
HasSubstr("stream 'name' is not a string"));
1027+
}
1028+
1029+
TEST(ParallelUploadPersistentState, StreamHasNoSessionId) {
1030+
auto res = ParallelUploadPersistentState::FromString(
1031+
internal::nl::json{{"destination", "dest"},
1032+
{"expected_generation", 1},
1033+
{"streams", {{{"name", "abc"}}}}}
1034+
.dump());
1035+
EXPECT_FALSE(res);
1036+
EXPECT_EQ(StatusCode::kInternal, res.status().code());
1037+
EXPECT_THAT(res.status().message(),
1038+
HasSubstr("stream doesn't contain a 'resumable_session_id'"));
1039+
}
1040+
1041+
TEST(ParallelUploadPersistentState, StreamSessionIdNotString) {
1042+
auto res = ParallelUploadPersistentState::FromString(internal::nl::json{
1043+
{"destination", "dest"},
1044+
{"expected_generation", 1},
1045+
{"streams", {{{"name", "abc"}, {"resumable_session_id", 123}}}}}
1046+
.dump());
1047+
EXPECT_FALSE(res);
1048+
EXPECT_EQ(StatusCode::kInternal, res.status().code());
1049+
EXPECT_THAT(res.status().message(),
1050+
HasSubstr("'resumable_session_id' is not a string"));
1051+
}
1052+
1053+
TEST(ParallelUploadPersistentState, StreamsEmpty) {
1054+
auto res = ParallelUploadPersistentState::FromString(
1055+
internal::nl::json{{"destination", "dest"},
1056+
{"expected_generation", 1},
1057+
{"streams", internal::nl::json::array()}}
1058+
.dump());
1059+
EXPECT_FALSE(res);
1060+
EXPECT_EQ(StatusCode::kInternal, res.status().code());
1061+
EXPECT_THAT(res.status().message(), HasSubstr("doesn't contain any streams"));
1062+
}
9271063
} // namespace
9281064
} // namespace internal
9291065
} // namespace STORAGE_CLIENT_NS

0 commit comments

Comments
 (0)