Skip to content

Commit c18915e

Browse files
committed
Forward pauses for GCS upload with unknown size
1 parent cc92ac9 commit c18915e

File tree

2 files changed

+14
-6
lines changed

2 files changed

+14
-6
lines changed

pkgs/gcloud/lib/src/storage_impl.dart

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -518,8 +518,7 @@ class _MediaUploadStreamSink implements StreamSink<List<int>> {
518518
final storage_api.Object _object;
519519
final String? _predefinedAcl;
520520
final int? _length;
521-
int _bufferLength = 0;
522-
final List<List<int>> buffer = <List<int>>[];
521+
final BytesBuilder buffer = BytesBuilder();
523522
final _controller = StreamController<List<int>>(sync: true);
524523
late StreamSubscription _subscription;
525524
late StreamController<List<int>> _resumableController;
@@ -578,14 +577,22 @@ class _MediaUploadStreamSink implements StreamSink<List<int>> {
578577
assert(_state != _stateLengthKnown);
579578
if (_state == _stateProbingLength) {
580579
buffer.add(data);
581-
_bufferLength += data.length;
582-
if (_bufferLength > _maxNormalUploadLength) {
580+
if (buffer.length > _maxNormalUploadLength) {
583581
// Start resumable upload.
584582
// TODO: Avoid using another stream-controller.
585583
_resumableController = StreamController<List<int>>(sync: true);
586-
buffer.forEach(_resumableController.add);
584+
_resumableController.add(buffer.takeBytes());
587585
_startResumableUpload(_resumableController.stream, _length);
588586
_state = _stateDecidedResumable;
587+
588+
// At this point, we're forwarding events to the synchronous controller,
589+
// so let's also forward pause and resume requests.
590+
_resumableController
591+
..onPause = _subscription.pause
592+
..onResume = _subscription.resume;
593+
// We don't have to handle `onCancel`: The upload will only cancel the
594+
// stream in case of errors, which we already handle by closing the
595+
// subscription.
589596
}
590597
} else {
591598
assert(_state == _stateDecidedResumable);
@@ -597,7 +604,7 @@ class _MediaUploadStreamSink implements StreamSink<List<int>> {
597604
if (_state == _stateProbingLength) {
598605
// As the data is already cached don't bother to wait on somebody
599606
// listening on the stream before adding the data.
600-
_startNormalUpload(Stream<List<int>>.fromIterable(buffer), _bufferLength);
607+
_startNormalUpload(Stream.value(buffer.takeBytes()), buffer.length);
601608
} else {
602609
_resumableController.close();
603610
}

pkgs/gcloud/lib/storage.dart

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ library gcloud.storage;
5151
import 'dart:async';
5252
import 'dart:collection' show UnmodifiableListView, UnmodifiableMapView;
5353
import 'dart:convert';
54+
import 'dart:typed_data';
5455

5556
import 'package:_discoveryapis_commons/_discoveryapis_commons.dart' as commons;
5657
import 'package:googleapis/storage/v1.dart' as storage_api;

0 commit comments

Comments
 (0)