Skip to content

Commit 2ec897a

Browse files
authored
Merge pull request dart-archive/gcloud#134 from simolus3/gcs-upload-fix
Forward pauses for GCS uploads with unknown sizes
2 parents 3690e1a + d79162e commit 2ec897a

File tree

4 files changed

+21
-8
lines changed

4 files changed

+21
-8
lines changed

pkgs/gcloud/CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
## 0.8.6-dev
2+
3+
- Throttle streams piped into `Bucket.write` when the size is not known
4+
beforehand.
5+
16
## 0.8.5
27

38
- Support the latest version 7.0.0 of the `googleapis` package.

pkgs/gcloud/lib/src/storage_impl.dart

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -518,8 +518,7 @@ class _MediaUploadStreamSink implements StreamSink<List<int>> {
518518
final storage_api.Object _object;
519519
final String? _predefinedAcl;
520520
final int? _length;
521-
int _bufferLength = 0;
522-
final List<List<int>> buffer = <List<int>>[];
521+
final BytesBuilder _buffer = BytesBuilder();
523522
final _controller = StreamController<List<int>>(sync: true);
524523
late StreamSubscription _subscription;
525524
late StreamController<List<int>> _resumableController;
@@ -577,15 +576,23 @@ class _MediaUploadStreamSink implements StreamSink<List<int>> {
577576
void _onData(List<int> data) {
578577
assert(_state != _stateLengthKnown);
579578
if (_state == _stateProbingLength) {
580-
buffer.add(data);
581-
_bufferLength += data.length;
582-
if (_bufferLength > _maxNormalUploadLength) {
579+
_buffer.add(data);
580+
if (_buffer.length > _maxNormalUploadLength) {
583581
// Start resumable upload.
584582
// TODO: Avoid using another stream-controller.
585583
_resumableController = StreamController<List<int>>(sync: true);
586-
buffer.forEach(_resumableController.add);
584+
_resumableController.add(_buffer.takeBytes());
587585
_startResumableUpload(_resumableController.stream, _length);
588586
_state = _stateDecidedResumable;
587+
588+
// At this point, we're forwarding events to the synchronous controller,
589+
// so let's also forward pause and resume requests.
590+
_resumableController
591+
..onPause = _subscription.pause
592+
..onResume = _subscription.resume;
593+
// We don't have to handle `onCancel`: The upload will only cancel the
594+
// stream in case of errors, which we already handle by closing the
595+
// subscription.
589596
}
590597
} else {
591598
assert(_state == _stateDecidedResumable);
@@ -597,7 +604,7 @@ class _MediaUploadStreamSink implements StreamSink<List<int>> {
597604
if (_state == _stateProbingLength) {
598605
// As the data is already cached don't bother to wait on somebody
599606
// listening on the stream before adding the data.
600-
_startNormalUpload(Stream<List<int>>.fromIterable(buffer), _bufferLength);
607+
_startNormalUpload(Stream.value(_buffer.takeBytes()), _buffer.length);
601608
} else {
602609
_resumableController.close();
603610
}

pkgs/gcloud/lib/storage.dart

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ library gcloud.storage;
5151
import 'dart:async';
5252
import 'dart:collection' show UnmodifiableListView, UnmodifiableMapView;
5353
import 'dart:convert';
54+
import 'dart:typed_data';
5455

5556
import 'package:_discoveryapis_commons/_discoveryapis_commons.dart' as commons;
5657
import 'package:googleapis/storage/v1.dart' as storage_api;

pkgs/gcloud/pubspec.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
name: gcloud
2-
version: 0.8.5
2+
version: 0.8.6-dev
33
description: >-
44
High level idiomatic Dart API for Google Cloud Storage, Pub-Sub and Datastore.
55
homepage: https://github.com/dart-lang/gcloud

0 commit comments

Comments
 (0)