|
28 | 28 | #include "google/cloud/storage/internal/async/reader_connection_impl.h" |
29 | 29 | #include "google/cloud/storage/internal/async/reader_connection_resume.h" |
30 | 30 | #include "google/cloud/storage/internal/async/rewriter_connection_impl.h" |
| 31 | +#include "google/cloud/storage/internal/async/write_object.h" |
31 | 32 | #include "google/cloud/storage/internal/async/write_payload_impl.h" |
32 | 33 | #include "google/cloud/storage/internal/async/writer_connection_buffered.h" |
33 | 34 | #include "google/cloud/storage/internal/async/writer_connection_finalized.h" |
@@ -292,44 +293,46 @@ AsyncConnectionImpl::ReadObjectRange(ReadObjectParams p) { |
292 | 293 | future<StatusOr<std::unique_ptr<storage_experimental::AsyncWriterConnection>>> |
293 | 294 | AsyncConnectionImpl::StartAppendableObjectUpload(AppendableUploadParams p) { |
294 | 295 | auto current = internal::MakeImmutableOptions(std::move(p.options)); |
| 296 | + auto request = p.request; |
295 | 297 | std::int64_t persisted_size = 0; |
296 | 298 | auto hash_function = CreateHashFunction(*current); |
297 | | - using StreamingRpc = AsyncWriterConnectionImpl::StreamingRpc; |
| 299 | + auto retry = retry_policy(*current); |
| 300 | + auto backoff = backoff_policy(*current); |
298 | 301 | struct RequestPlaceholder {}; |
299 | 302 |
|
300 | | - auto call = [stub = stub_](CompletionQueue& cq, |
301 | | - std::shared_ptr<grpc::ClientContext> context, |
302 | | - google::cloud::internal::ImmutableOptions options, |
303 | | - RequestPlaceholder const&) |
304 | | - -> future<StatusOr<std::unique_ptr<StreamingRpc>>> { |
| 303 | + auto call = [stub = stub_, request = std::move(request)]( |
| 304 | + CompletionQueue& cq, |
| 305 | + std::shared_ptr<grpc::ClientContext> context, |
| 306 | + google::cloud::internal::ImmutableOptions options, |
| 307 | + RequestPlaceholder const&) mutable |
| 308 | + -> future<StatusOr<WriteObject::WriteResult>> { |
305 | 309 | auto rpc = |
306 | 310 | stub->AsyncBidiWriteObject(cq, std::move(context), std::move(options)); |
307 | | - auto start = rpc->Start(); |
308 | | - return start.then([rpc = std::move(rpc)](auto f) mutable { |
309 | | - if (f.get()) return make_ready_future(make_status_or(std::move(rpc))); |
310 | | - auto pending = rpc->Finish(); |
311 | | - return pending.then([rpc = std::move(rpc)](auto f) mutable { |
312 | | - rpc.reset(); |
313 | | - auto status = f.get(); |
314 | | - return StatusOr<std::unique_ptr<StreamingRpc>>(std::move(status)); |
315 | | - }); |
| 311 | + request.set_state_lookup(true); |
| 312 | + auto open = std::make_shared<WriteObject>(std::move(rpc), request); |
| 313 | + return open->Call().then([open, &request](auto f) mutable { |
| 314 | + auto response = f.get(); |
| 315 | + if (!response) { |
| 316 | + EnsureFirstMessageAppendObjectSpec(request); |
| 317 | + ApplyWriteRedirectErrors(*request.mutable_append_object_spec(), |
| 318 | + ExtractGrpcStatus(response.status())); |
| 319 | + } |
| 320 | + return response; |
316 | 321 | }); |
317 | 322 | }; |
318 | 323 |
|
319 | | - auto transform = [current, request = std::move(p.request), persisted_size, |
| 324 | + auto transform = [current, request, persisted_size, |
320 | 325 | hash = std::move(hash_function)](auto f) mutable |
321 | 326 | -> StatusOr< |
322 | 327 | std::unique_ptr<storage_experimental::AsyncWriterConnection>> { |
323 | 328 | auto rpc = f.get(); |
324 | 329 | if (!rpc) return std::move(rpc).status(); |
325 | 330 | return std::unique_ptr<storage_experimental::AsyncWriterConnection>( |
326 | 331 | std::make_unique<AsyncWriterConnectionImpl>( |
327 | | - current, std::move(request), *std::move(rpc), std::move(hash), |
328 | | - persisted_size)); |
| 332 | + current, std::move(request), std::move(rpc->stream), |
| 333 | + std::move(hash), persisted_size, false)); |
329 | 334 | }; |
330 | 335 |
|
331 | | - auto retry = retry_policy(*current); |
332 | | - auto backoff = backoff_policy(*current); |
333 | 336 | return google::cloud::internal::AsyncRetryLoop( |
334 | 337 | std::move(retry), std::move(backoff), Idempotency::kIdempotent, |
335 | 338 | cq_, std::move(call), std::move(current), RequestPlaceholder{}, |
@@ -716,7 +719,7 @@ AsyncConnectionImpl::UnbufferedUploadImpl( |
716 | 719 | return std::unique_ptr<storage_experimental::AsyncWriterConnection>( |
717 | 720 | std::make_unique<AsyncWriterConnectionImpl>( |
718 | 721 | current, std::move(request), *std::move(rpc), std::move(hash), |
719 | | - persisted_size)); |
| 722 | + persisted_size, true)); |
720 | 723 | }; |
721 | 724 |
|
722 | 725 | auto retry = retry_policy(*current); |
|
0 commit comments