Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions packages/powersync_core/lib/src/abort_controller.dart
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ class AbortController {
return _abortRequested.future;
}

Future<void> get onCompletion {
return _abortCompleter.future;
}

/// Abort, and wait until aborting is complete.
Future<void> abort() async {
aborted = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import 'package:powersync_core/src/log_internal.dart';
import 'package:powersync_core/src/open_factory/abstract_powersync_open_factory.dart';
import 'package:powersync_core/src/open_factory/native/native_open_factory.dart';
import 'package:powersync_core/src/schema.dart';
import 'package:powersync_core/src/schema_logic.dart';
import 'package:powersync_core/src/streaming_sync.dart';
import 'package:powersync_core/src/sync_status.dart';
import 'package:sqlite_async/sqlite3_common.dart';
Expand Down Expand Up @@ -109,42 +108,55 @@ class PowerSyncDatabaseImpl
/// [logger] defaults to [autoLogger], which logs to the console in debug builds.s
PowerSyncDatabaseImpl.withDatabase(
{required this.schema, required this.database, Logger? logger}) {
if (logger != null) {
this.logger = logger;
} else {
this.logger = autoLogger;
}
this.logger = logger ?? autoLogger;
isInitialized = baseInit();
}

@override
@internal

/// Connect to the PowerSync service, and keep the databases in sync.
///
/// The connection is automatically re-opened if it fails for any reason.
///
/// Status changes are reported on [statusStream].
baseConnect(
{required PowerSyncBackendConnector connector,

/// Throttle time between CRUD operations
/// Defaults to 10 milliseconds.
required Duration crudThrottleTime,
required Future<void> Function() reconnect,
Map<String, dynamic>? params}) async {
Future<void> connectInternal({
required PowerSyncBackendConnector connector,
required Duration crudThrottleTime,
required AbortController abort,
Map<String, dynamic>? params,
}) async {
await initialize();

// Disconnect if connected
await disconnect();
final disconnector = AbortController();
disconnecter = disconnector;

await isInitialized;
final dbRef = database.isolateConnectionFactory();
ReceivePort rPort = ReceivePort();

bool triedSpawningIsolate = false;
StreamSubscription<UpdateNotification>? crudUpdateSubscription;
rPort.listen((data) async {
final receiveMessages = ReceivePort();
final receiveUnhandledErrors = ReceivePort();
final receiveExit = ReceivePort();

SendPort? initPort;
final hasInitPort = Completer<void>();
final receivedIsolateExit = Completer<void>();

Future<void> waitForShutdown() async {
// Only complete the abortion signal after the isolate shuts down. This
// ensures absolutely no trace of this sync iteration remains.
if (triedSpawningIsolate) {
await receivedIsolateExit.future;
}

// Cleanup
crudUpdateSubscription?.cancel();
receiveMessages.close();
receiveUnhandledErrors.close();
receiveExit.close();

// Clear status apart from lastSyncedAt
setStatus(SyncStatus(lastSyncedAt: currentStatus.lastSyncedAt));
abort.completeAbort();
}

Future<void> close() async {
initPort?.send(['close']);
await waitForShutdown();
}

receiveMessages.listen((data) async {
if (data is List) {
String action = data[0] as String;
if (action == "getCredentials") {
Expand All @@ -159,27 +171,20 @@ class PowerSyncDatabaseImpl
await connector.prefetchCredentials();
});
} else if (action == 'init') {
SendPort port = data[1] as SendPort;
final port = initPort = data[1] as SendPort;
hasInitPort.complete();
var crudStream =
database.onChange(['ps_crud'], throttle: crudThrottleTime);
crudUpdateSubscription = crudStream.listen((event) {
port.send(['update']);
});
disconnector.onAbort.then((_) {
port.send(['close']);
}).ignore();
} else if (action == 'uploadCrud') {
await (data[1] as PortCompleter).handle(() async {
await connector.uploadData(this);
});
} else if (action == 'status') {
final SyncStatus status = data[1] as SyncStatus;
setStatus(status);
} else if (action == 'close') {
// Clear status apart from lastSyncedAt
setStatus(SyncStatus(lastSyncedAt: currentStatus.lastSyncedAt));
rPort.close();
crudUpdateSubscription?.cancel();
} else if (action == 'log') {
LogRecord record = data[1] as LogRecord;
logger.log(
Expand All @@ -188,8 +193,7 @@ class PowerSyncDatabaseImpl
}
});

var errorPort = ReceivePort();
errorPort.listen((message) async {
receiveUnhandledErrors.listen((message) async {
// Sample error:
// flutter: [PowerSync] WARNING: 2023-06-28 16:34:11.566122: Sync Isolate error
// flutter: [Connection closed while receiving data, #0 IOClient.send.<anonymous closure> (package:http/src/io_client.dart:76:13)
Expand All @@ -200,38 +204,38 @@ class PowerSyncDatabaseImpl
// ...
logger.severe('Sync Isolate error', message);

// Reconnect
// Use the param like this instead of directly calling connect(), to avoid recursive
// locks in some edge cases.
reconnect();
// Fatal errors are enabled, so the isolate will exit soon, causing us to
// complete the abort controller which will make the db mixin reconnect if
// necessary. There's no need to reconnect manually.
});

disconnected() {
disconnector.completeAbort();
disconnecter = null;
rPort.close();
// Clear status apart from lastSyncedAt
setStatus(SyncStatus(lastSyncedAt: currentStatus.lastSyncedAt));
// Don't spawn isolate if this operation was cancelled already.
if (abort.aborted) {
return waitForShutdown();
}

var exitPort = ReceivePort();
exitPort.listen((message) {
receiveExit.listen((message) {
logger.fine('Sync Isolate exit');
disconnected();
receivedIsolateExit.complete();
});

if (disconnecter?.aborted == true) {
disconnected();
return;
}

Isolate.spawn(
_powerSyncDatabaseIsolate,
_PowerSyncDatabaseIsolateArgs(
rPort.sendPort, dbRef, retryDelay, clientParams),
debugName: 'PowerSyncDatabase',
onError: errorPort.sendPort,
onExit: exitPort.sendPort);
// Spawning the isolate can't be interrupted
triedSpawningIsolate = true;
await Isolate.spawn(
_syncIsolate,
_PowerSyncDatabaseIsolateArgs(
receiveMessages.sendPort, dbRef, retryDelay, clientParams),
debugName: 'Sync ${database.openFactory.path}',
onError: receiveUnhandledErrors.sendPort,
errorsAreFatal: true,
onExit: receiveExit.sendPort,
);
await hasInitPort.future;

abort.onAbort.whenComplete(close);

// Automatically complete the abort controller once the isolate exits.
unawaited(waitForShutdown());
}

/// Takes a read lock, without starting a transaction.
Expand All @@ -255,16 +259,6 @@ class PowerSyncDatabaseImpl
return database.writeLock(callback,
debugContext: debugContext, lockTimeout: lockTimeout);
}

@override
Future<void> updateSchema(Schema schema) {
if (disconnecter != null) {
throw AssertionError('Cannot update schema while connected');
}
schema.validate();
this.schema = schema;
return updateSchemaInIsolate(database, schema);
}
}

class _PowerSyncDatabaseIsolateArgs {
Expand All @@ -277,64 +271,73 @@ class _PowerSyncDatabaseIsolateArgs {
this.sPort, this.dbRef, this.retryDelay, this.parameters);
}

Future<void> _powerSyncDatabaseIsolate(
_PowerSyncDatabaseIsolateArgs args) async {
Future<void> _syncIsolate(_PowerSyncDatabaseIsolateArgs args) async {
final sPort = args.sPort;
ReceivePort rPort = ReceivePort();
final rPort = ReceivePort();
StreamController<String> crudUpdateController = StreamController.broadcast();
final upstreamDbClient = args.dbRef.upstreamPort.open();

CommonDatabase? db;
final Mutex mutex = args.dbRef.mutex.open();
StreamingSyncImplementation? openedStreamingSync;
StreamSubscription<void>? localUpdatesSubscription;

Future<void> shutdown() async {
await openedStreamingSync?.abort();

localUpdatesSubscription?.cancel();
db?.dispose();
crudUpdateController.close();
upstreamDbClient.close();

// The SyncSqliteConnection uses this mutex
// It needs to be closed before killing the isolate
// in order to free the mutex for other operations.
await mutex.close();
rPort.close();

// TODO: If we closed our resources properly, this wouldn't be necessary...
Isolate.current.kill();
}

rPort.listen((message) async {
if (message is List) {
String action = message[0] as String;
if (action == 'update') {
crudUpdateController.add('update');
if (!crudUpdateController.isClosed) {
crudUpdateController.add('update');
}
} else if (action == 'close') {
// The SyncSqliteConnection uses this mutex
// It needs to be closed before killing the isolate
// in order to free the mutex for other operations.
await mutex.close();
db?.dispose();
crudUpdateController.close();
upstreamDbClient.close();
// Abort any open http requests, and wait for it to be closed properly
await openedStreamingSync?.abort();
// No kill the Isolate
Isolate.current.kill();
await shutdown();
}
}
});
Isolate.current.addOnExitListener(sPort, response: const ['close']);
sPort.send(["init", rPort.sendPort]);
sPort.send(['init', rPort.sendPort]);

// Is there a way to avoid the overhead if logging is not enabled?
// This only takes effect in this isolate.
isolateLogger.level = Level.ALL;
isolateLogger.onRecord.listen((record) {
var copy = LogRecord(record.level, record.message, record.loggerName,
record.error, record.stackTrace);
sPort.send(["log", copy]);
sPort.send(['log', copy]);
});

Future<PowerSyncCredentials?> loadCredentials() async {
final r = IsolateResult<PowerSyncCredentials?>();
sPort.send(["getCredentials", r.completer]);
sPort.send(['getCredentials', r.completer]);
return r.future;
}

Future<void> invalidateCredentials() async {
final r = IsolateResult<void>();
sPort.send(["invalidateCredentials", r.completer]);
sPort.send(['invalidateCredentials', r.completer]);
return r.future;
}

Future<void> uploadCrud() async {
final r = IsolateResult<void>();
sPort.send(["uploadCrud", r.completer]);
sPort.send(['uploadCrud', r.completer]);
return r.future;
}

Expand Down Expand Up @@ -372,7 +375,7 @@ Future<void> _powerSyncDatabaseIsolate(
}
}

db!.updates.listen((event) {
localUpdatesSubscription = db!.updates.listen((event) {
updatedTables.add(event.tableName);

updateDebouncer ??=
Expand All @@ -383,7 +386,7 @@ Future<void> _powerSyncDatabaseIsolate(
// Unfortunately, this does not handle disposing while the database is opening.
// This should be rare - any uncaught error is a bug. And in most cases,
// it should occur after the database is already open.
db?.dispose();
shutdown();
throw error;
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import 'dart:async';
import 'package:logging/logging.dart';
import 'package:meta/meta.dart';
import 'package:powersync_core/sqlite_async.dart';
import 'package:powersync_core/src/abort_controller.dart';
import 'package:powersync_core/src/database/powersync_db_mixin.dart';
import 'package:powersync_core/src/open_factory/abstract_powersync_open_factory.dart';
import 'powersync_database.dart';
Expand All @@ -24,6 +25,9 @@ class PowerSyncDatabaseImpl
@override
Schema get schema => throw UnimplementedError();

@override
set schema(Schema s) => throw UnimplementedError();

@override
SqliteDatabase get database => throw UnimplementedError();

Expand Down Expand Up @@ -101,20 +105,15 @@ class PowerSyncDatabaseImpl
throw UnimplementedError();
}

@override
Future<void> updateSchema(Schema schema) {
throw UnimplementedError();
}

@override
Logger get logger => throw UnimplementedError();

@override
@internal
Future<void> baseConnect(
Future<void> connectInternal(
{required PowerSyncBackendConnector connector,
required Duration crudThrottleTime,
required Future<void> Function() reconnect,
required AbortController abort,
Map<String, dynamic>? params}) {
throw UnimplementedError();
}
Expand Down
Loading
Loading