Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ class PowerSyncDatabaseImpl
sync = StreamingSyncImplementation(
adapter: storage,
credentialsCallback: connector.getCredentialsCached,
invalidCredentialsCallback: connector.fetchCredentials,
invalidCredentialsCallback: connector.prefetchCredentials,
uploadCrud: () => connector.uploadData(this),
crudUpdateTriggerStream: crudStream,
retryDelay: Duration(seconds: 3),
Expand Down
2 changes: 1 addition & 1 deletion packages/powersync_core/lib/src/web/sync_controller.dart
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class SyncWorkerHandle implements StreamingSync {
await connector.uploadData(database);
return (JSObject(), null);
case SyncWorkerMessageType.invalidCredentialsCallback:
final credentials = await connector.fetchCredentials();
final credentials = await connector.prefetchCredentials();
return (
credentials != null
? SerializedCredentials.from(credentials)
Expand Down
2 changes: 1 addition & 1 deletion packages/powersync_core/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ dependencies:
# but right now we need a minimum of v2.4.6.
sqlite3: ^2.4.6
# We implement a database controller, which is an interface of sqlite3_web.
sqlite3_web: ^0.3.0
sqlite3_web: ^0.3.1
universal_io: ^2.0.0
meta: ^1.0.0
http: ^1.1.0
Expand Down
29 changes: 26 additions & 3 deletions packages/powersync_core/test/in_memory_sync_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,26 @@ void main() {
late CommonDatabase raw;
late PowerSyncDatabase database;
late MockSyncService syncService;
late StreamingSyncImplementation syncClient;
late StreamingSync syncClient;
var credentialsCallbackCount = 0;

setUp(() async {
credentialsCallbackCount = 0;
final (client, server) = inMemoryServer();
syncService = MockSyncService();
server.mount(syncService.router.call);

factory = await testUtils.testFactory();
(raw, database) = await factory.openInMemoryDatabase();
await database.initialize();

syncClient = database.connectWithMockService(
client,
TestConnector(() async {
credentialsCallbackCount++;
return PowerSyncCredentials(
endpoint: server.url.toString(),
token: 'token not used here',
token: 'token$credentialsCallbackCount',
expiresAt: DateTime.now(),
);
}),
Expand Down Expand Up @@ -312,18 +316,37 @@ void main() {
},
);
});

test('reconnects when token expires', () async {
await waitForConnection();
expect(credentialsCallbackCount, 1);
// When the sync service says the token has expired
syncService
..addLine({'token_expires_in': 0})
..endCurrentListener();

final nextRequest = await syncService.waitForListener;
expect(nextRequest.headers['Authorization'], 'Token token2');
expect(credentialsCallbackCount, 2);
});
});
}

TypeMatcher<SyncStatus> isSyncStatus(
{Object? downloading, Object? connected, Object? hasSynced}) {
{Object? downloading,
Object? connected,
Object? connecting,
Object? hasSynced}) {
var matcher = isA<SyncStatus>();
if (downloading != null) {
matcher = matcher.having((e) => e.downloading, 'downloading', downloading);
}
if (connected != null) {
matcher = matcher.having((e) => e.connected, 'connected', connected);
}
if (connecting != null) {
matcher = matcher.having((e) => e.connecting, 'connecting', connecting);
}
if (hasSynced != null) {
matcher = matcher.having((e) => e.hasSynced, 'hasSynced', hasSynced);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ import 'package:shelf_router/shelf_router.dart';

final class MockSyncService {
// Use a queued stream to make tests easier.
StreamController<String> _controller = StreamController<String>();
Completer<void> _listener = Completer();
StreamController<String> _controller = StreamController();
Completer<Request> _listener = Completer();

final router = Router();

MockSyncService() {
router
..post('/sync/stream', (Request request) async {
_listener.complete();
_listener.complete(request);
// Respond immediately with a stream
return Response.ok(_controller.stream.transform(utf8.encoder),
headers: {
Expand All @@ -33,7 +33,7 @@ final class MockSyncService {
});
}

Future<void> get waitForListener => _listener.future;
Future<Request> get waitForListener => _listener.future;

// Queue events which will be sent to connected clients.
void addRawEvent(String data) {
Expand All @@ -48,6 +48,12 @@ final class MockSyncService {
addLine({'token_expires_in': tokenExpiresIn});
}

void endCurrentListener() {
_controller.close();
_controller = StreamController();
_listener = Completer();
}

// Clear events. We rely on a buffered controller here. Create a new controller
// in order to clear the buffer.
Future<void> clearEvents() async {
Expand Down