Skip to content

Commit 6944f62

Browse files
committed
test: fix BulkWriter integration tests
1 parent f73d1d7 commit 6944f62

File tree

3 files changed

+295
-183
lines changed

3 files changed

+295
-183
lines changed

packages/googleapis_firestore/lib/src/bulk_writer.dart

Lines changed: 72 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
part of 'firestore.dart';
22

33
/// The maximum number of writes that can be in a single batch.
4-
const int _maxBatchSize = 20;
4+
const int _kMaxBatchSize = 20;
55

66
/// The maximum number of writes that can be in a batch being retried.
7-
const int _retryMaxBatchSize = 10;
7+
const int _kRetryMaxBatchSize = 10;
88

99
/// The starting maximum number of operations per second as allowed by the
1010
/// 500/50/5 rule.
@@ -184,19 +184,22 @@ class _BulkWriterOperation {
184184

185185
/// A batch used by BulkWriter for committing operations.
186186
class _BulkCommitBatch extends WriteBatch {
187-
_BulkCommitBatch(super.firestore, this.maxBatchSize) : super._();
187+
_BulkCommitBatch(super.firestore, this._maxBatchSize) : super._();
188188

189-
final int maxBatchSize;
189+
int _maxBatchSize;
190190
final Set<String> _docPaths = {};
191191
final List<_BulkWriterOperation> pendingOps = [];
192192

193+
/// Gets the current maximum batch size.
194+
int get maxBatchSize => _maxBatchSize;
195+
193196
/// Checks if this batch contains a write to the given document.
194197
bool has(DocumentReference<Object?> documentRef) {
195198
return _docPaths.contains(documentRef.path);
196199
}
197200

198201
/// Returns true if the batch is full.
199-
bool get isFull => pendingOps.length >= maxBatchSize;
202+
bool get isFull => pendingOps.length >= _maxBatchSize;
200203

201204
/// Adds an operation to this batch.
202205
void processOperation(_BulkWriterOperation op) {
@@ -211,11 +214,11 @@ class _BulkCommitBatch extends WriteBatch {
211214
/// Dynamically sets the maximum batch size for this batch.
212215
/// Used to limit retry batches to a smaller size.
213216
void setMaxBatchSize(int size) {
214-
// Only update if not already at this size and not larger than current
215-
if (size < maxBatchSize && pendingOps.length <= size) {
216-
// Create a new batch with the smaller size
217-
// This is used by retry logic to ensure batches stay under 10MiB
218-
}
217+
assert(
218+
pendingOps.length <= size,
219+
'New batch size cannot be less than the number of enqueued writes',
220+
);
221+
_maxBatchSize = size;
219222
}
220223

221224
/// Commits this batch using batchWrite API and handles individual results.
@@ -378,7 +381,7 @@ class BulkWriter {
378381

379382
// Ensure batch size doesn't exceed rate limit
380383
if (initialOpsPerSecond < _maxBatchSize) {
381-
_currentMaxBatchSize = initialOpsPerSecond;
384+
_maxBatchSize = initialOpsPerSecond;
382385
}
383386

384387
_rateLimiter = RateLimiter(
@@ -395,13 +398,14 @@ class BulkWriter {
395398
/// Rate limiter for throttling operations.
396399
late final RateLimiter _rateLimiter;
397400

398-
/// The maximum batch size (can be reduced based on rate limiting).
399-
int _currentMaxBatchSize = _maxBatchSize;
401+
/// The maximum number of writes that can be in a single batch.
402+
/// Visible for testing.
403+
int _maxBatchSize = _kMaxBatchSize;
400404

401405
/// The batch currently being filled with operations.
402406
late _BulkCommitBatch _bulkCommitBatch = _BulkCommitBatch(
403407
firestore,
404-
_currentMaxBatchSize,
408+
_maxBatchSize,
405409
);
406410

407411
/// Represents the tail of all active BulkWriter operations.
@@ -426,9 +430,6 @@ class BulkWriter {
426430
/// User-provided error callback. Returns true to retry, false otherwise.
427431
bool Function(BulkWriterError) _errorCallback = _defaultErrorCallback;
428432

429-
/// Whether a custom error handler has been set.
430-
bool _errorHandlerSet = false;
431-
432433
/// Default error callback that retries UNAVAILABLE and ABORTED up to 10 times.
433434
/// Also retries INTERNAL errors for delete operations.
434435
static bool _defaultErrorCallback(BulkWriterError error) {
@@ -484,8 +485,8 @@ class BulkWriter {
484485
/// return false; // Don't retry
485486
/// });
486487
/// ```
488+
// ignore: use_setters_to_change_properties
487489
void onWriteError(bool Function(BulkWriterError) callback) {
488-
_errorHandlerSet = true;
489490
_errorCallback = callback;
490491
}
491492

@@ -643,12 +644,8 @@ class BulkWriter {
643644
_verifyNotClosed();
644645
_scheduleCurrentBatch(flush: true);
645646

646-
// Mark the most recent operation as flushed
647-
if (_bulkCommitBatch.pendingOps.isNotEmpty) {
648-
_bulkCommitBatch.pendingOps.last.markFlushed();
649-
}
650-
651-
// Also mark buffered operations
647+
// Mark the most recent operation as flushed to ensure that the batch
648+
// containing it will be sent once it's popped from the buffer.
652649
if (_bufferedOperations.isNotEmpty) {
653650
_bufferedOperations.last.operation.markFlushed();
654651
}
@@ -700,26 +697,7 @@ class BulkWriter {
700697
successCallback: _successCallback,
701698
);
702699

703-
// Swallow the error if the developer has set an error listener. This
704-
// prevents unhandled Future errors from being thrown if a floating
705-
// BulkWriter operation fails when an error handler is specified.
706-
//
707-
// We set up the error handler BEFORE potentially triggering batch send
708-
// to ensure it's in place when errors occur.
709-
final userFuture = completer.future.then<WriteResult>(
710-
(value) => value,
711-
onError: (Object err, StackTrace stackTrace) {
712-
if (!_errorHandlerSet) {
713-
// Re-throw if no custom error handler is set
714-
// This ensures the error propagates to the caller
715-
Error.throwWithStackTrace(err, stackTrace);
716-
}
717-
// When error handler is set, return the rejected future itself
718-
// This prevents unhandled rejections while still allowing the
719-
// future to reject (so tests can await expectLater(future, throwsA(...)))
720-
return completer.future;
721-
},
722-
);
700+
final userFuture = completer.future;
723701

724702
// Advance the `_lastOperation` pointer. This ensures that `_lastOperation`
725703
// only resolves when both the previous and the current write resolve.
@@ -752,8 +730,12 @@ class BulkWriter {
752730
// Decrement pending ops count and process buffered operations on error
753731
_pendingOpsCount--;
754732
_processBufferedOperations();
755-
// Re-throw to propagate the error
756-
Error.throwWithStackTrace(err, stackTrace);
733+
// Re-throw to propagate the error with stack trace
734+
if (err is Exception || err is Error) {
735+
Error.throwWithStackTrace(err, stackTrace);
736+
} else {
737+
throw Exception(err.toString());
738+
}
757739
},
758740
);
759741
}
@@ -767,11 +749,10 @@ class BulkWriter {
767749
// Retried writes are sent with a batch size of 10 in order to guarantee
768750
// that the batch is under the 10MiB limit.
769751
if (op.backoffDuration > 0) {
770-
if (_bulkCommitBatch.pendingOps.length >= _retryMaxBatchSize) {
752+
if (_bulkCommitBatch.pendingOps.length >= _kRetryMaxBatchSize) {
771753
_scheduleCurrentBatch();
772754
}
773-
_bulkCommitBatch.setMaxBatchSize(_retryMaxBatchSize);
774-
_currentMaxBatchSize = _retryMaxBatchSize;
755+
_bulkCommitBatch.setMaxBatchSize(_kRetryMaxBatchSize);
775756
}
776757

777758
// If the current batch already contains this document, send it first
@@ -808,15 +789,14 @@ class BulkWriter {
808789

809790
/// Sends the current batch and creates a new one.
810791
void _scheduleCurrentBatch({bool flush = false}) {
811-
final batchToSend = _bulkCommitBatch;
812-
if (batchToSend.pendingOps.isEmpty) {
792+
if (_bulkCommitBatch.pendingOps.isEmpty) {
813793
return;
814794
}
815795

816-
final opsToSend = batchToSend.pendingOps.length;
796+
final batchToSend = _bulkCommitBatch;
817797

818798
// Create a new batch for future operations
819-
_bulkCommitBatch = _BulkCommitBatch(firestore, _currentMaxBatchSize);
799+
_bulkCommitBatch = _BulkCommitBatch(firestore, _maxBatchSize);
820800

821801
// Use the write with the longest backoff duration when determining backoff
822802
final highestBackoffDuration = batchToSend.pendingOps.fold<int>(
@@ -825,33 +805,48 @@ class BulkWriter {
825805
);
826806
final backoffMsWithJitter = _applyJitter(highestBackoffDuration);
827807

828-
// Chain the batch commit to the tail of operations
829-
_lastOperation = _lastOperation.then((_) async {
830-
try {
831-
// Apply backoff delay if needed
832-
if (backoffMsWithJitter > 0) {
833-
await Future<void>.delayed(
834-
Duration(milliseconds: backoffMsWithJitter),
835-
);
836-
}
808+
// Apply backoff delay if needed, then send the batch
809+
if (backoffMsWithJitter > 0) {
810+
unawaited(
811+
Future<void>.delayed(
812+
Duration(milliseconds: backoffMsWithJitter),
813+
).then((_) => _sendBatch(batchToSend, flush)),
814+
);
815+
} else {
816+
unawaited(_sendBatch(batchToSend, flush));
817+
}
818+
}
837819

838-
// Rate limit before sending
839-
await _rateLimiter.request(opsToSend);
820+
/// Sends the provided batch once the rate limiter does not require any delay.
821+
Future<void> _sendBatch(_BulkCommitBatch batch, bool flush) async {
822+
// Check if we're under the rate limit
823+
final underRateLimit = _rateLimiter.tryMakeRequest(batch.pendingOps.length);
840824

841-
await batchToSend.bulkCommit();
825+
if (underRateLimit) {
826+
// We have capacity - send the batch immediately
827+
await batch.bulkCommit();
842828

843-
// If flush was requested, schedule any remaining batches
844-
if (flush) {
845-
_scheduleCurrentBatch(flush: true);
846-
}
847-
} catch (error) {
848-
// Errors are already handled in bulkCommit()
849-
// This catch prevents unhandled rejections
829+
// If flush was requested, schedule any remaining batches
830+
if (flush) {
831+
_scheduleCurrentBatch(flush: true);
850832
}
851-
// Note: _pendingOpsCount is decremented in the .then() chain in _enqueue()
852-
// when each operation completes, not here. This ensures proper tracking
853-
// of pending operations.
854-
});
833+
} else {
834+
// We need to wait - get the delay and schedule a retry
835+
final delayMs = _rateLimiter.getNextRequestDelayMs(
836+
batch.pendingOps.length,
837+
);
838+
839+
if (delayMs > 0) {
840+
// Schedule another attempt after the delay
841+
unawaited(
842+
Future<void>.delayed(
843+
Duration(milliseconds: delayMs),
844+
).then((_) => _sendBatch(batch, flush)),
845+
);
846+
}
847+
// Note: If delayMs is -1, the request can never be fulfilled with current
848+
// capacity. This shouldn't happen in practice since batch sizes are limited.
849+
}
855850
}
856851

857852
/// Adds a 30% jitter to the provided backoff.
@@ -905,7 +900,7 @@ class BulkWriter {
905900
_bulkCommitBatch.pendingOps.isEmpty,
906901
'Cannot change batch size when there are pending operations',
907902
);
908-
_currentMaxBatchSize = size;
903+
_maxBatchSize = size;
909904
_bulkCommitBatch = _BulkCommitBatch(firestore, size);
910905
}
911906
}

packages/googleapis_firestore/lib/src/rate_limiter.dart

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,16 +47,33 @@ class RateLimiter {
4747
return _availableTokens;
4848
}
4949

50+
/// Tries to make the number of operations. Returns true if the request
51+
/// succeeded and false otherwise.
52+
bool tryMakeRequest(int numOperations) {
53+
_refillTokens();
54+
if (numOperations <= _availableTokens) {
55+
_availableTokens -= numOperations;
56+
return true;
57+
}
58+
return false;
59+
}
60+
5061
/// Returns the number of ms needed to refill to the specified number of
5162
/// tokens, or 0 if capacity is already available.
52-
int _getNextRequestDelayMs(int requestTokens) {
63+
int getNextRequestDelayMs(int requestTokens) {
5364
_refillTokens();
5465

5566
if (requestTokens <= _availableTokens) {
5667
return 0;
5768
}
5869

5970
final capacity = _currentCapacity;
71+
72+
// If the request is larger than capacity, it can never be fulfilled
73+
if (capacity < requestTokens) {
74+
return -1;
75+
}
76+
6077
final tokensNeeded = requestTokens - _availableTokens;
6178
final refillTimeMs = (tokensNeeded * 1000 / capacity).ceil();
6279

@@ -80,7 +97,7 @@ class RateLimiter {
8097
/// Requests the specified number of tokens. Waits until the tokens are
8198
/// available before returning.
8299
Future<void> request(int requestTokens) async {
83-
final delayMs = _getNextRequestDelayMs(requestTokens);
100+
final delayMs = getNextRequestDelayMs(requestTokens);
84101

85102
if (delayMs > 0) {
86103
await Future<void>.delayed(Duration(milliseconds: delayMs));

0 commit comments

Comments
 (0)