Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 36 additions & 6 deletions packages/powersync/lib/src/streaming_sync.dart
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ class StreamingSyncImplementation {

final Future<void> Function() uploadCrud;

// An internal controller which is used to trigger CRUD uploads internally
// e.g. when reconnecting.
// This is only a broadcast controller since the `crudLoop` method is public
// and could potentially be called multiple times externally.
final StreamController<Null> _internalCrudTriggerController =
StreamController<Null>.broadcast();

final Stream crudUpdateTriggerStream;

final StreamController<SyncStatus> _statusStreamController =
Expand Down Expand Up @@ -92,6 +99,9 @@ class StreamingSyncImplementation {
if (_safeToClose) {
_client.close();
}

await _internalCrudTriggerController.close();

// wait for completeAbort() to be called
await future;

Expand Down Expand Up @@ -144,7 +154,7 @@ class StreamingSyncImplementation {

// On error, wait a little before retrying
// When aborting, don't wait
await Future.any([Future.delayed(retryDelay), _abort!.onAbort]);
await _delayRetry();
}
}
} finally {
Expand All @@ -155,10 +165,14 @@ class StreamingSyncImplementation {
Future<void> crudLoop() async {
await uploadAllCrud();

await for (var _ in crudUpdateTriggerStream) {
if (_abort?.aborted == true) {
break;
}
// Trigger a CRUD upload whenever the upstream trigger fires
// as-well-as whenever the sync stream reconnects.
// This has the potential (in rare cases) to affect the crudThrottleTime,
// but it should not result in excessive uploads since the
// sync reconnects are also throttled.
// The stream here is closed on abort.
await for (var _ in mergeStreams(
[crudUpdateTriggerStream, _internalCrudTriggerController.stream])) {
await uploadAllCrud();
}
}
Expand All @@ -170,6 +184,13 @@ class StreamingSyncImplementation {

while (true) {
try {
// It's possible that an abort or disconnect operation could
// be followed by a `close` operation. The close would cause these
// operations, which use the DB, to throw an exception. Breaking the loop
// here prevents unnecessary potential (caught) exceptions.
if (aborted) {
break;
}
// This is the first item in the FIFO CRUD queue.
CrudEntry? nextCrudItem = await adapter.nextCrudItem();
if (nextCrudItem != null) {
Expand All @@ -196,7 +217,7 @@ class StreamingSyncImplementation {
checkedCrudItem = null;
isolateLogger.warning('Data upload error', e, stacktrace);
_updateStatus(uploading: false, uploadError: e);
await Future.delayed(retryDelay);
await _delayRetry();
if (!isConnected) {
// Exit the upload loop if the sync stream is no longer connected
break;
Expand Down Expand Up @@ -298,6 +319,9 @@ class StreamingSyncImplementation {
Future<void>? credentialsInvalidation;
bool haveInvalidated = false;

// Trigger a CRUD upload on reconnect
_internalCrudTriggerController.add(null);

await for (var line in merged) {
if (aborted) {
break;
Expand Down Expand Up @@ -465,6 +489,12 @@ class StreamingSyncImplementation {
yield parseStreamingSyncLine(line as Map<String, dynamic>);
}
}

/// Delays the standard `retryDelay` Duration, but exits early if
/// an abort has been requested.
Future<void> _delayRetry() async {
await Future.any([Future.delayed(retryDelay), _abort!.onAbort]);
}
}

/// Attempt to give a basic summary of the error for cases where the full error
Expand Down
136 changes: 136 additions & 0 deletions packages/powersync/test/connected_test.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
@TestOn('!browser')
// This test uses a local server which is possible to control in Web via hybrid main,
// but this makes the test significantly more complex.
import 'dart:async';

import 'package:powersync/powersync.dart';
import 'package:test/test.dart';

import 'server/sync_server/mock_sync_server.dart';
import 'streaming_sync_test.dart';
import 'utils/abstract_test_utils.dart';
import 'utils/test_utils_impl.dart';

final testUtils = TestUtils();

void main() {
group('connected tests', () {
late String path;
setUp(() async {
path = testUtils.dbPath();
});

tearDown(() async {
await testUtils.cleanDb(path: path);
});

createTestServer() async {
final testServer = TestHttpServerHelper();
await testServer.start();
addTearDown(() => testServer.stop());
return testServer;
}

test('should connect to mock PowerSync instance', () async {
final testServer = await createTestServer();
final connector = TestConnector(() async {
return PowerSyncCredentials(
endpoint: testServer.uri.toString(),
token: 'token not used here',
expiresAt: DateTime.now());
});

final db = PowerSyncDatabase.withFactory(
await testUtils.testFactory(path: path),
schema: defaultSchema,
maxReaders: 3);
await db.initialize();

final connectedCompleter = Completer();

db.statusStream.listen((status) {
if (status.connected) {
connectedCompleter.complete();
}
});

// Add a basic command for the test server to send
testServer.addEvent('{"token_expires_in": 3600}\n');

await db.connect(connector: connector);
await connectedCompleter.future;

expect(db.connected, isTrue);
await db.disconnect();
});

test('should trigger uploads when connection is re-established', () async {
int uploadCounter = 0;
Completer uploadTriggeredCompleter = Completer();
final testServer = await createTestServer();
final connector = TestConnector(() async {
return PowerSyncCredentials(
endpoint: testServer.uri.toString(),
token: 'token not used here',
expiresAt: DateTime.now());
}, uploadData: (database) async {
uploadCounter++;
uploadTriggeredCompleter.complete();
throw Exception('No uploads occur here');
});

final db = PowerSyncDatabase.withFactory(
await testUtils.testFactory(path: path),
schema: defaultSchema,
maxReaders: 3);
await db.initialize();

// Create an item which should trigger an upload.
await db.execute(
'INSERT INTO customers (id, name) VALUES (uuid(), ?)', ['steven']);

// Create a new completer to await the next upload
uploadTriggeredCompleter = Completer();

// Connect the PowerSync instance
final connectedCompleter = Completer();
// The first connection attempt will fail
final connectedErroredCompleter = Completer();

db.statusStream.listen((status) {
if (status.connected && !connectedCompleter.isCompleted) {
connectedCompleter.complete();
}
if (status.downloadError != null &&
!connectedErroredCompleter.isCompleted) {
connectedErroredCompleter.complete();
}
});

// The first command will not be valid, this simulates a failed connection
testServer.addEvent('asdf\n');
await db.connect(connector: connector);

// The connect operation should have triggered an upload (even though it fails to connect)
await uploadTriggeredCompleter.future;
expect(uploadCounter, equals(1));
// Create a new completer for the next iteration
uploadTriggeredCompleter = Completer();

// Connection attempt should initially fail
await connectedErroredCompleter.future;
expect(db.currentStatus.anyError, isNotNull);

// Now send a valid command. Which will result in successful connection
await testServer.clearEvents();
testServer.addEvent('{"token_expires_in": 3600}\n');
await connectedCompleter.future;
expect(db.connected, isTrue);

await uploadTriggeredCompleter.future;
expect(uploadCounter, equals(2));

await db.disconnect();
});
});
}
53 changes: 53 additions & 0 deletions packages/powersync/test/server/sync_server/mock_sync_server.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import 'dart:async';
import 'dart:convert';
import 'dart:io';

import 'package:shelf/shelf.dart';
import 'package:shelf/shelf_io.dart' as io;
import 'package:shelf_router/shelf_router.dart';

// A basic Mock PowerSync service server which queues commands
// which clients can receive via connecting to the `/sync/stream` route.
// This assumes only one client will ever be connected at a time.
class TestHttpServerHelper {
// Use a queued stream to make tests easier.
StreamController<String> _controller = StreamController<String>();
late HttpServer _server;
Uri get uri => Uri.parse('http://localhost:${_server.port}');

Future<void> start() async {
final router = Router()
..post('/sync/stream', (Request request) async {
// Respond immediately with a stream
return Response.ok(_controller.stream.transform(utf8.encoder),
headers: {
'Content-Type': 'application/x-ndjson',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
},
context: {
"shelf.io.buffer_output": false
});
});

_server = await io.serve(router.call, 'localhost', 0);
print('Test server running at ${_server.address}:${_server.port}');
}

// Queue events which will be sent to connected clients.
void addEvent(String data) {
_controller.add(data);
}

// Clear events. We rely on a buffered controller here. Create a new controller
// in order to clear the buffer.
Future<void> clearEvents() async {
await _controller.close();
_controller = StreamController<String>();
}

Future<void> stop() async {
await _controller.close();
await _server.close();
}
}
9 changes: 7 additions & 2 deletions packages/powersync/test/streaming_sync_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,21 @@ final testUtils = TestUtils();

class TestConnector extends PowerSyncBackendConnector {
final Function _fetchCredentials;
final Future<void> Function(PowerSyncDatabase)? _uploadData;

TestConnector(this._fetchCredentials);
TestConnector(this._fetchCredentials,
{Future<void> Function(PowerSyncDatabase)? uploadData})
: _uploadData = uploadData;

@override
Future<PowerSyncCredentials?> fetchCredentials() {
return _fetchCredentials();
}

@override
Future<void> uploadData(PowerSyncDatabase database) async {}
Future<void> uploadData(PowerSyncDatabase database) async {
await _uploadData?.call(database);
}
}

void main() {
Expand Down
4 changes: 3 additions & 1 deletion packages/powersync/test/upload_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ void main() {

powersync =
await testUtils.setupPowerSync(path: path, logger: testWarningLogger);
powersync.retryDelay = Duration(milliseconds: 0);
// Use a short retry delay here.
// A zero retry delay makes this test unstable, since it expects `2` error logs later.
powersync.retryDelay = Duration(milliseconds: 100);
var connector = TestConnector(credentialsCallback, uploadData);
powersync.connect(connector: connector);

Expand Down
13 changes: 7 additions & 6 deletions packages/powersync/test/utils/abstract_test_utils.dart
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ final testLogger = _makeTestLogger();

final testWarningLogger = _makeTestLogger(level: Level.WARNING);

Logger _makeTestLogger({Level level = Level.ALL}) {
final logger = Logger.detached('PowerSync Tests');
Logger _makeTestLogger({Level level = Level.ALL, String? name}) {
final logger = Logger.detached(name ?? 'PowerSync Tests');
logger.level = level;
logger.onRecord.listen((record) {
print(
Expand All @@ -53,11 +53,11 @@ Logger _makeTestLogger({Level level = Level.ALL}) {
}

abstract class AbstractTestUtils {
String get _testName => Invoker.current!.liveTest.test.name;

String dbPath() {
final test = Invoker.current!.liveTest;
var testName = test.test.name;
var testShortName =
testName.replaceAll(RegExp(r'[\s\./]'), '_').toLowerCase();
_testName.replaceAll(RegExp(r'[\s\./]'), '_').toLowerCase();
var dbName = "test-db/$testShortName.db";
return dbName;
}
Expand All @@ -74,7 +74,8 @@ abstract class AbstractTestUtils {
Future<PowerSyncDatabase> setupPowerSync(
{String? path, Schema? schema, Logger? logger}) async {
final db = PowerSyncDatabase.withFactory(await testFactory(path: path),
schema: schema ?? defaultSchema, logger: logger ?? testLogger);
schema: schema ?? defaultSchema,
logger: logger ?? _makeTestLogger(name: _testName));
await db.initialize();
return db;
}
Expand Down
Loading