Skip to content

Commit 3de36d5

Browse files
committed
added a disable and enable method on the auto flusher and use that in e2e and unit tests where applicable
1 parent a4fffa5 commit 3de36d5

File tree

4 files changed

+56
-10
lines changed

4 files changed

+56
-10
lines changed

packages/kinesis/aws_kinesis_datastreams/lib/src/impl/record_client.dart

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,22 @@ class RecordClient {
168168
_scheduler.disable();
169169
}
170170

171+
/// Enables automatic flush operations.
172+
///
173+
/// When enabled, the scheduler will trigger flush operations at the
174+
/// configured interval. This does not affect manual flush() calls.
175+
void enableAutoFlush() {
176+
_scheduler.enable();
177+
}
178+
179+
/// Disables automatic flush operations.
180+
///
181+
/// When disabled, the scheduler will not trigger automatic flushes.
182+
/// Manual flush() calls and record() calls still work normally.
183+
void disableAutoFlush() {
184+
_scheduler.disable();
185+
}
186+
171187
/// Closes the client and releases all resources.
172188
Future<void> close() async {
173189
_closed = true;

packages/kinesis/aws_kinesis_datastreams/lib/src/kinesis_data_streams.dart

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,26 @@ class KinesisDataStreams {
215215
_recordClient.disable();
216216
}
217217

218+
/// Enables automatic flush operations.
219+
///
220+
/// When enabled, records will be automatically flushed at the interval
221+
/// specified in [KinesisDataStreamsOptions.flushStrategy].
222+
///
223+
/// This does not affect manual [flush] calls or [record] operations.
224+
void enableAutoFlush() {
225+
_recordClient.enableAutoFlush();
226+
}
227+
228+
/// Disables automatic flush operations.
229+
///
230+
/// When disabled, records will not be automatically flushed.
231+
/// You must call [flush] manually to send records to Kinesis.
232+
///
233+
/// This does not affect manual [flush] calls or [record] operations.
234+
void disableAutoFlush() {
235+
_recordClient.disableAutoFlush();
236+
}
237+
218238
/// Closes the client and releases all resources.
219239
///
220240
/// After closing:

packages/kinesis/aws_kinesis_datastreams/test/e2e/kinesis_e2e_test.dart

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,12 @@ void main() {
6060
credentialsProvider: const AWSCredentialsProvider(credentials),
6161
storagePath: tempDir.path,
6262
options: const KinesisDataStreamsOptions(
63-
flushStrategy: KinesisDataStreamsInterval(
64-
interval: Duration(minutes: 5), // Disable auto-flush for tests
65-
),
6663
maxRetries: 3,
6764
),
6865
);
66+
67+
// Disable auto-flush for controlled testing
68+
client.disableAutoFlush();
6969
});
7070

7171
tearDown(() async {

packages/kinesis/aws_kinesis_datastreams/test/record_client_test.dart

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -448,18 +448,28 @@ void main() {
448448
expect(client.isClosed, isTrue);
449449
});
450450

451-
test('ignores record calls after close', () async {
451+
test('throws ClientClosedException on record after close', () async {
452452
await client.close();
453453

454-
await client.record(
455-
KinesisRecord(
456-
data: Uint8List.fromList([1]),
457-
partitionKey: 'pk',
458-
streamName: 'stream',
454+
expect(
455+
() => client.record(
456+
KinesisRecord(
457+
data: Uint8List.fromList([1]),
458+
partitionKey: 'pk',
459+
streamName: 'stream',
460+
),
459461
),
462+
throwsA(isA<ClientClosedException>()),
460463
);
464+
});
461465

462-
// Can't check storage after close, but no exception should be thrown
466+
test('throws ClientClosedException on flush after close', () async {
467+
await client.close();
468+
469+
expect(
470+
() => client.flush(),
471+
throwsA(isA<ClientClosedException>()),
472+
);
463473
});
464474
});
465475
});

0 commit comments

Comments
 (0)