Skip to content

Commit 5c6c554

Browse files
authored
refactor: create ParallelUploadStateImpl (#3379)
It will be used by resumable parallel uploads too, so we want it to be accessible.
1 parent f32b972 commit 5c6c554

File tree

3 files changed

+76
-66
lines changed

3 files changed

+76
-66
lines changed
-655 Bytes
Binary file not shown.

google/cloud/storage/parallel_upload.cc

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,7 @@ namespace internal {
2323
class ParallelObjectWriteStreambuf : public ObjectWriteStreambuf {
2424
public:
2525
ParallelObjectWriteStreambuf(
26-
std::shared_ptr<NonResumableParallelUploadState::Impl> state,
27-
std::size_t stream_idx,
26+
std::shared_ptr<ParallelUploadStateImpl> state, std::size_t stream_idx,
2827
std::unique_ptr<ResumableUploadSession> upload_session,
2928
std::size_t max_buffer_size,
3029
std::unique_ptr<HashValidator> hash_validator)
@@ -40,20 +39,26 @@ class ParallelObjectWriteStreambuf : public ObjectWriteStreambuf {
4039
}
4140

4241
private:
43-
std::shared_ptr<NonResumableParallelUploadState::Impl> state_;
42+
std::shared_ptr<ParallelUploadStateImpl> state_;
4443
std::size_t stream_idx_;
4544
};
4645

47-
NonResumableParallelUploadState::Impl::Impl(
48-
std::unique_ptr<ScopedDeleter> deleter, Composer composer)
46+
ParallelUploadStateImpl::ParallelUploadStateImpl(
47+
std::string destination_object_name, std::shared_ptr<ScopedDeleter> deleter,
48+
Composer composer)
4949
: deleter_(std::move(deleter)),
5050
composer_(std::move(composer)),
51+
destination_object_name_(std::move(destination_object_name)),
5152
finished_{},
5253
num_unfinished_streams_{} {}
5354

54-
NonResumableParallelUploadState::Impl::~Impl() { WaitForCompletion().wait(); }
55+
ParallelUploadStateImpl::~ParallelUploadStateImpl() {
56+
std::cout << "Trying to destroy impl" << std::endl;
57+
WaitForCompletion().wait();
58+
std::cout << "Destroyed impl" << std::endl;
59+
}
5560

56-
StatusOr<ObjectWriteStream> NonResumableParallelUploadState::Impl::CreateStream(
61+
StatusOr<ObjectWriteStream> ParallelUploadStateImpl::CreateStream(
5762
RawClient& raw_client, ResumableUploadRequest const& request) {
5863
auto session = raw_client.CreateResumableSession(request);
5964
std::unique_lock<std::mutex> lk(mu_);
@@ -69,7 +74,7 @@ StatusOr<ObjectWriteStream> NonResumableParallelUploadState::Impl::CreateStream(
6974
CreateHashValidator(request)));
7075
}
7176

72-
Status NonResumableParallelUploadState::Impl::EagerCleanup() {
77+
Status ParallelUploadStateImpl::EagerCleanup() {
7378
std::unique_lock<std::mutex> lk(mu_);
7479
if (!finished_) {
7580
return Status(StatusCode::kFailedPrecondition,
@@ -84,7 +89,7 @@ Status NonResumableParallelUploadState::Impl::EagerCleanup() {
8489
return cleanup_status_;
8590
}
8691

87-
void NonResumableParallelUploadState::Impl::Fail(Status status) {
92+
void ParallelUploadStateImpl::Fail(Status status) {
8893
std::unique_lock<std::mutex> lk(mu_);
8994
assert(!status.ok());
9095
if (!res_) {
@@ -93,7 +98,7 @@ void NonResumableParallelUploadState::Impl::Fail(Status status) {
9398
}
9499
}
95100

96-
void NonResumableParallelUploadState::Impl::StreamFinished(
101+
void ParallelUploadStateImpl::StreamFinished(
97102
std::size_t stream_idx, StatusOr<ResumableUploadResponse> const& response) {
98103
std::unique_lock<std::mutex> lk(mu_);
99104

@@ -130,8 +135,8 @@ void NonResumableParallelUploadState::Impl::StreamFinished(
130135
}
131136
}
132137

133-
future<StatusOr<ObjectMetadata>>
134-
NonResumableParallelUploadState::Impl::WaitForCompletion() const {
138+
future<StatusOr<ObjectMetadata>> ParallelUploadStateImpl::WaitForCompletion()
139+
const {
135140
std::unique_lock<std::mutex> lk(mu_);
136141

137142
if (finished_) {

google/cloud/storage/parallel_upload.h

Lines changed: 59 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,58 @@ namespace internal {
6969

7070
class ParallelObjectWriteStreambuf;
7171

72+
// Type-erased function object to execute ComposeMany with most arguments
73+
// bound.
74+
using Composer = std::function<StatusOr<ObjectMetadata>(
75+
std::vector<ComposeSourceObject> const&)>;
76+
77+
// The `ObjectWriteStream`s have to hold references to the state of
78+
// the parallel upload so that they can update it when finished and trigger
79+
// shards composition, hence `ResumableParallelUploadState` has to be
80+
// destroyed after the `ObjectWriteStream`s.
81+
// `ResumableParallelUploadState` and `ObjectWriteStream`s are passed
82+
// around by values, so we don't control their lifetime. In order to
83+
// circumvent it, we move the state to something held by a `shared_ptr`.
84+
class ParallelUploadStateImpl
85+
: public std::enable_shared_from_this<ParallelUploadStateImpl> {
86+
public:
87+
ParallelUploadStateImpl(std::string destination_object_name,
88+
std::shared_ptr<ScopedDeleter> deleter,
89+
Composer composer);
90+
~ParallelUploadStateImpl();
91+
92+
StatusOr<ObjectWriteStream> CreateStream(
93+
RawClient& raw_client, ResumableUploadRequest const& request);
94+
95+
void StreamFinished(std::size_t stream_idx,
96+
StatusOr<ResumableUploadResponse> const& response);
97+
98+
future<StatusOr<ObjectMetadata>> WaitForCompletion() const;
99+
100+
Status EagerCleanup();
101+
102+
void Fail(Status status);
103+
104+
private:
105+
mutable std::mutex mu_;
106+
// Promises made via `WaitForCompletion()`
107+
mutable std::vector<promise<StatusOr<ObjectMetadata>>> res_promises_;
108+
// Type-erased object for deleting temporary objects.
109+
std::shared_ptr<ScopedDeleter> deleter_;
110+
// Type-erased function object to execute ComposeMany with most arguments
111+
// bound.
112+
std::function<StatusOr<ObjectMetadata>(std::vector<ComposeSourceObject>)>
113+
composer_;
114+
std::string destination_object_name_;
115+
// Set when all streams are closed and composed but before cleanup.
116+
bool finished_;
117+
// Tracks how many streams are still written to.
118+
std::size_t num_unfinished_streams_;
119+
std::vector<ComposeSourceObject> to_compose_;
120+
google::cloud::optional<StatusOr<ObjectMetadata>> res_;
121+
Status cleanup_status_;
122+
};
123+
72124
struct ComposeManyApplyHelper {
73125
template <typename... Options>
74126
StatusOr<ObjectMetadata> operator()(Options&&... options) const {
@@ -168,62 +220,15 @@ class NonResumableParallelUploadState {
168220
void Fail(Status status) { return impl_->Fail(std::move(status)); }
169221

170222
private:
171-
// Type-erased function object to execute ComposeMany with most arguments
172-
// bound.
173-
using Composer = std::function<StatusOr<ObjectMetadata>(
174-
std::vector<ComposeSourceObject> const&)>;
175-
176-
// The `ObjectWriteStream`s have to hold references to the state of
177-
// the parallel upload so that they can update it when finished and trigger
178-
// shards composition, hence `NonResumableParallelUploadState` has to be
179-
// destroyed after the `ObjectWriteStream`s.
180-
// `NonResumableParallelUploadState` and `ObjectWriteStream`s are passed
181-
// around by values, so we don't control their lifetime. In order to
182-
// circumvent it, we move the state to something held by a `shared_ptr`.
183-
class Impl : public std::enable_shared_from_this<Impl> {
184-
public:
185-
Impl(std::unique_ptr<ScopedDeleter> deleter, Composer composer);
186-
~Impl();
187-
188-
StatusOr<ObjectWriteStream> CreateStream(
189-
RawClient& raw_client, ResumableUploadRequest const& request);
190-
191-
void StreamFinished(std::size_t stream_idx,
192-
StatusOr<ResumableUploadResponse> const& response);
193-
194-
future<StatusOr<ObjectMetadata>> WaitForCompletion() const;
195-
196-
Status EagerCleanup();
197-
198-
void Fail(Status status);
199-
200-
private:
201-
mutable std::mutex mu_;
202-
// Promises made via `WaitForCompletion()`
203-
mutable std::vector<promise<StatusOr<ObjectMetadata>>> res_promises_;
204-
// Type-erased object for deleting temporary objects.
205-
std::unique_ptr<ScopedDeleter> deleter_;
206-
// Type-erased function object to execute ComposeMany with most arguments
207-
// bound.
208-
std::function<StatusOr<ObjectMetadata>(std::vector<ComposeSourceObject>)>
209-
composer_;
210-
// Set when all streams are closed and composed but before cleanup.
211-
bool finished_;
212-
// Tracks how many streams are still written to.
213-
std::size_t num_unfinished_streams_;
214-
std::vector<ComposeSourceObject> to_compose_;
215-
google::cloud::optional<StatusOr<ObjectMetadata>> res_;
216-
Status cleanup_status_;
217-
};
218-
219-
NonResumableParallelUploadState(std::shared_ptr<Impl> state,
220-
std::vector<ObjectWriteStream> shards)
223+
NonResumableParallelUploadState(
224+
std::shared_ptr<ParallelUploadStateImpl> state,
225+
std::vector<ObjectWriteStream> shards)
221226
: impl_(std::move(state)), shards_(std::move(shards)) {}
222227

223-
std::shared_ptr<Impl> impl_;
228+
std::shared_ptr<ParallelUploadStateImpl> impl_;
224229
std::vector<ObjectWriteStream> shards_;
225230

226-
friend class ParallelObjectWriteStreambuf;
231+
friend class NonResumableParallelObjectWriteStreambuf;
227232
};
228233

229234
/**
@@ -306,8 +311,8 @@ NonResumableParallelUploadState::Create(Client client,
306311
}
307312
deleter->Add(*lock);
308313

309-
auto internal_state = std::make_shared<NonResumableParallelUploadState::Impl>(
310-
std::move(deleter), std::move(composer));
314+
auto internal_state = std::make_shared<ParallelUploadStateImpl>(
315+
object_name, std::move(deleter), std::move(composer));
311316
std::vector<ObjectWriteStream> streams;
312317

313318
auto upload_options = StaticTupleFilter<

0 commit comments

Comments
 (0)