diff --git a/packages/powersync_core/lib/src/sync/streaming_sync.dart b/packages/powersync_core/lib/src/sync/streaming_sync.dart index c460de30..ad0886a1 100644 --- a/packages/powersync_core/lib/src/sync/streaming_sync.dart +++ b/packages/powersync_core/lib/src/sync/streaming_sync.dart @@ -517,7 +517,8 @@ class StreamingSyncImplementation implements StreamingSync { } final uri = credentials.endpointUri('sync/stream'); - final request = http.Request('POST', uri); + final request = + http.AbortableRequest('POST', uri, abortTrigger: _abort!.onAbort); request.headers['Content-Type'] = 'application/json'; request.headers['Authorization'] = "Token ${credentials.token}"; request.headers['Accept'] = diff --git a/packages/powersync_core/pubspec.yaml b/packages/powersync_core/pubspec.yaml index 9ffae6ad..7213180a 100644 --- a/packages/powersync_core/pubspec.yaml +++ b/packages/powersync_core/pubspec.yaml @@ -16,7 +16,7 @@ dependencies: sqlite3_web: ^0.3.2 universal_io: ^2.0.0 meta: ^1.0.0 - http: ^1.4.0 + http: ^1.5.0 uuid: ^4.2.0 async: ^2.10.0 logging: ^1.1.1 diff --git a/packages/powersync_core/test/in_memory_sync_test.dart b/packages/powersync_core/test/in_memory_sync_test.dart index 57e36b0c..e4ab531b 100644 --- a/packages/powersync_core/test/in_memory_sync_test.dart +++ b/packages/powersync_core/test/in_memory_sync_test.dart @@ -7,6 +7,8 @@ import 'package:powersync_core/powersync_core.dart'; import 'package:powersync_core/sqlite3_common.dart'; import 'package:powersync_core/src/sync/streaming_sync.dart'; import 'package:powersync_core/src/sync/protocol.dart'; +import 'package:shelf/shelf.dart'; +import 'package:shelf_router/shelf_router.dart'; import 'package:test/test.dart'; import 'bucket_storage_test.dart'; @@ -63,7 +65,7 @@ void _declareTests(String name, SyncOptions options, bool bson) { void createSyncClient({Schema? schema}) { final (client, server) = inMemoryServer(); - server.mount(syncService.router.call); + server.mount((req) => syncService.router(req)); final thisSyncClient = syncClient = database.connectWithMockService( client, @@ -937,6 +939,46 @@ void _declareTests(String name, SyncOptions options, bool bson) { expect(await query.next, 'from server'); }); + + group('abort', () { + test('during connect', () async { + final requestStarted = Completer(); + + syncService.router = Router() + ..post('/sync/stream', expectAsync1((Request request) async { + requestStarted.complete(); + + // emulate a network that never connects + await Completer().future; + })); + + syncClient.streamingSync(); + await requestStarted.future; + expect(database.currentStatus, isSyncStatus(connecting: true)); + + await syncClient.abort(); + expect(database.currentStatus.anyError, isNull); + }); + + test('during stream', () async { + final status = await waitForConnection(); + syncService.addLine({ + 'checkpoint': { + 'last_op_id': '0', + 'buckets': [ + { + 'bucket': 'bkt', + 'checksum': 0, + } + ], + }, + }); + await expectLater(status, emits(isSyncStatus(downloading: true))); + + await syncClient.abort(); + expect(database.currentStatus.anyError, isNull); + }); + }); }); } diff --git a/packages/powersync_core/test/server/sync_server/in_memory_sync_server.dart b/packages/powersync_core/test/server/sync_server/in_memory_sync_server.dart index 19d7e137..278f014e 100644 --- a/packages/powersync_core/test/server/sync_server/in_memory_sync_server.dart +++ b/packages/powersync_core/test/server/sync_server/in_memory_sync_server.dart @@ -14,7 +14,7 @@ final class MockSyncService { StreamController(); Completer _listener = Completer(); - final router = Router(); + var router = Router(); Object? Function() writeCheckpoint = () { return { 'data': {'write_checkpoint': '10'} diff --git a/packages/powersync_core/test/utils/in_memory_http.dart b/packages/powersync_core/test/utils/in_memory_http.dart index 61550e3c..a35d6a09 100644 --- a/packages/powersync_core/test/utils/in_memory_http.dart +++ b/packages/powersync_core/test/utils/in_memory_http.dart @@ -35,6 +35,11 @@ final class _MockServer implements shelf.Server { Future handleRequest( BaseRequest request, ByteStream body) async { + final cancellationFuture = switch (request) { + Abortable(:final abortTrigger) => abortTrigger, + _ => null, + }; + if (_handler case final endpoint?) { final shelfRequest = shelf.Request( request.method, @@ -42,10 +47,17 @@ final class _MockServer implements shelf.Server { headers: request.headers, body: body, ); - final shelfResponse = await endpoint(shelfRequest); + + final shelfResponse = await Future.any([ + Future.sync(() => endpoint(shelfRequest)), + if (cancellationFuture != null) + cancellationFuture.then((_) { + throw RequestAbortedException(); + }), + ]); return StreamedResponse( - shelfResponse.read(), + shelfResponse.read().injectCancellation(cancellationFuture), shelfResponse.statusCode, headers: shelfResponse.headers, ); @@ -54,3 +66,36 @@ final class _MockServer implements shelf.Server { } } } + +extension on Stream { + Stream injectCancellation(Future? token) { + if (token == null) { + return this; + } + + return Stream.multi( + (listener) { + final subscription = listen( + listener.addSync, + onError: listener.addErrorSync, + onDone: listener.closeSync, + ); + + listener + ..onPause = subscription.pause + ..onResume = subscription.resume + ..onCancel = subscription.cancel; + + token.whenComplete(() { + if (!listener.isClosed) { + listener + ..addErrorSync(RequestAbortedException()) + ..closeSync(); + subscription.cancel(); + } + }); + }, + isBroadcast: isBroadcast, + ); + } +}