Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions packages/stream_chat/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
## Upcoming

🐞 Fixed

- Improved sync reliability and error handling with enhanced `lastSyncAt` initialization, 400
error recovery, and automatic flushing of stale persistence data after 30 days of inactivity.

## 9.16.0

🐞 Fixed
Expand Down
66 changes: 33 additions & 33 deletions packages/stream_chat/lib/src/client/client.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import 'dart:async';

import 'package:collection/collection.dart';
import 'package:dio/dio.dart';
import 'package:logging/logging.dart';
import 'package:meta/meta.dart';
Expand Down Expand Up @@ -186,9 +187,6 @@ class StreamChatClient {

late final RetryPolicy _retryPolicy;

/// the last dateTime at the which all the channels were synced
DateTime? _lastSyncedAt;

/// The retry policy options getter
RetryPolicy get retryPolicy => _retryPolicy;

Expand Down Expand Up @@ -519,25 +517,17 @@ class StreamChatClient {

if (connectionRecovered) {
// connection recovered
final cids = state.channels.keys.toList(growable: false);
final cids = [...state.channels.keys.toSet()];
if (cids.isNotEmpty) {
await queryChannelsOnline(
filter: Filter.in_('cid', cids),
paginationParams: const PaginationParams(limit: 30),
);
if (persistenceEnabled) {
await sync(cids: cids, lastSyncAt: _lastSyncedAt);
}
} else {
// channels are empty, assuming it's a fresh start
// and making sure `lastSyncAt` is initialized
if (persistenceEnabled) {
final lastSyncAt = await chatPersistenceClient?.getLastSyncAt();
if (lastSyncAt == null) {
await chatPersistenceClient?.updateLastSyncAt(DateTime.now());
}
}

// Sync the persistence client if available
if (persistenceEnabled) await sync(cids: cids);
}

handleEvent(Event(
type: EventType.connectionRecovered,
online: true,
Expand Down Expand Up @@ -569,34 +559,45 @@ class StreamChatClient {
Future<void> sync({List<String>? cids, DateTime? lastSyncAt}) {
return _syncLock.synchronized(() async {
final channels = cids ?? await chatPersistenceClient?.getChannelCids();
if (channels == null || channels.isEmpty) {
return;
}
if (channels == null || channels.isEmpty) return;

final syncAt = lastSyncAt ?? await chatPersistenceClient?.getLastSyncAt();
if (syncAt == null) {
return;
logger.info('Fresh sync start: lastSyncAt initialized to now.');
return chatPersistenceClient?.updateLastSyncAt(DateTime.now());
}

try {
logger.info('Syncing events since $syncAt for channels: $channels');

final res = await _chatApi.general.sync(channels, syncAt);
final events = res.events
..sort((a, b) => a.createdAt.compareTo(b.createdAt));
final events = res.events.sorted(
(a, b) => a.createdAt.compareTo(b.createdAt),
);

for (final event in events) {
logger.fine('event.type: ${event.type}');
final messageText = event.message?.text;
if (messageText != null) {
logger.fine('event.message.text: $messageText');
}
logger.fine('Syncing event: ${event.type}');
handleEvent(event);
}

final now = DateTime.now();
_lastSyncedAt = now;
chatPersistenceClient?.updateLastSyncAt(now);
} catch (e, stk) {
logger.severe('Error during sync', e, stk);
final updatedSyncAt = events.lastOrNull?.createdAt ?? DateTime.now();
return chatPersistenceClient?.updateLastSyncAt(updatedSyncAt);
} catch (error, stk) {
// If we got a 400 error, it means that either the sync time is too
// old or the channel list is too long or too many events need to be
// synced. In this case, we should just flush the persistence client
// and start over.
if (error is StreamChatNetworkError && error.statusCode == 400) {
logger.warning(
'Failed to sync events due to stale or oversized state. '
'Resetting the persistence client to enable a fresh start.',
);

await chatPersistenceClient?.flush();
return chatPersistenceClient?.updateLastSyncAt(DateTime.now());
}

logger.warning('Error syncing events', error, stk);
}
});
}
Expand Down Expand Up @@ -2071,7 +2072,6 @@ class StreamChatClient {
// resetting state.
state.dispose();
state = ClientState(this);
_lastSyncedAt = null;

// resetting credentials.
_tokenManager.reset();
Expand Down
3 changes: 3 additions & 0 deletions packages/stream_chat/lib/src/db/chat_persistence_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ abstract class ChatPersistenceClient {
/// If [flush] is true, the data will also be deleted
Future<void> disconnect({bool flush = false});

/// Clears all the data stored in the persistence client.
Future<void> flush();

/// Get stored replies by messageId
Future<List<Message>> getReplies(
String parentId, {
Expand Down
66 changes: 66 additions & 0 deletions packages/stream_chat/test/src/client/client_test.dart
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// ignore_for_file: avoid_redundant_argument_values

import 'package:mocktail/mocktail.dart';
import 'package:stream_chat/src/core/http/token.dart';
import 'package:stream_chat/stream_chat.dart';
Expand Down Expand Up @@ -3590,5 +3592,69 @@ void main() {
);
},
);

group('Sync Method Tests', () {
test(
'should retrieve data from persistence client and sync successfully',
() async {
final cids = ['channel1', 'channel2'];
final lastSyncAt = DateTime.now().subtract(const Duration(hours: 1));
final fakeClient = FakePersistenceClient(
channelCids: cids,
lastSyncAt: lastSyncAt,
);

client.chatPersistenceClient = fakeClient;
when(() => api.general.sync(cids, lastSyncAt)).thenAnswer(
(_) async => SyncResponse()..events = [],
);

await client.sync();

verify(() => api.general.sync(cids, lastSyncAt)).called(1);

final newLastSyncAt = await fakeClient.getLastSyncAt();
expect(newLastSyncAt?.isAfter(lastSyncAt), isTrue);
},
);

test('should set lastSyncAt on first sync when null', () async {
final fakeClient = FakePersistenceClient(
channelCids: ['channel1'],
lastSyncAt: null,
);

client.chatPersistenceClient = fakeClient;

await client.sync();

expectLater(fakeClient.getLastSyncAt(), completion(isNotNull));
verifyNever(() => api.general.sync(any(), any()));
});

test('should flush persistence client on 400 error', () async {
final cids = ['channel1'];
final lastSyncAt = DateTime.now().subtract(const Duration(hours: 1));
final fakeClient = FakePersistenceClient(
channelCids: cids,
lastSyncAt: lastSyncAt,
);

client.chatPersistenceClient = fakeClient;
when(() => api.general.sync(cids, lastSyncAt)).thenThrow(
StreamChatNetworkError.raw(
code: 4,
statusCode: 400,
message: 'Too many events',
),
);

await client.sync();

expect(await fakeClient.getChannelCids(), isEmpty); // Should be flushed

verify(() => api.general.sync(cids, lastSyncAt)).called(1);
});
});
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ class TestPersistenceClient extends ChatPersistenceClient {
@override
Future<void> disconnect({bool flush = false}) => throw UnimplementedError();

@override
Future<void> flush() => throw UnimplementedError();

@override
Future<ChannelModel?> getChannelByCid(String cid) async =>
ChannelModel(cid: cid);
Expand Down
57 changes: 57 additions & 0 deletions packages/stream_chat/test/src/fakes.dart
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,63 @@ class FakeTokenManager extends Fake implements TokenManager {

class FakeMultiPartFile extends Fake implements MultipartFile {}

/// Fake persistence client for testing persistence client reliability features
class FakePersistenceClient extends Fake implements ChatPersistenceClient {
FakePersistenceClient({
DateTime? lastSyncAt,
List<String>? channelCids,
}) : _lastSyncAt = lastSyncAt,
_channelCids = channelCids ?? [];

String? _userId;
bool _isConnected = false;
DateTime? _lastSyncAt;
List<String> _channelCids;

// Track method calls for testing
int connectCallCount = 0;
int disconnectCallCount = 0;

@override
bool get isConnected => _isConnected;

@override
String? get userId => _userId;

@override
Future<void> connect(String userId) async {
_userId = userId;
_isConnected = true;
connectCallCount++;
}

@override
Future<void> disconnect({bool flush = false}) async {
if (flush) await this.flush();

_userId = null;
_isConnected = false;
disconnectCallCount++;
}

@override
Future<void> flush() async {
_lastSyncAt = null;
_channelCids = [];
}

@override
Future<DateTime?> getLastSyncAt() async => _lastSyncAt;

@override
Future<void> updateLastSyncAt(DateTime lastSyncAt) async {
_lastSyncAt = lastSyncAt;
}

@override
Future<List<String>> getChannelCids() async => _channelCids;
}

class FakeChatApi extends Fake implements StreamChatApi {
UserApi? _user;

Expand Down
33 changes: 24 additions & 9 deletions packages/stream_chat_persistence/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## Upcoming

✅ Added

- Added support for `client.flush()` method to clear database.

## 9.16.0

- Updated `stream_chat` dependency to [`9.16.0`](https://pub.dev/packages/stream_chat/changelog).
Expand Down Expand Up @@ -110,7 +116,8 @@

## 7.2.0-hotfix.1

- Updated `stream_chat` dependency to [`7.2.0-hotfix.1`](https://pub.dev/packages/stream_chat/changelog).
- Updated `stream_chat` dependency to [
`7.2.0-hotfix.1`](https://pub.dev/packages/stream_chat/changelog).

## 7.2.0

Expand All @@ -131,7 +138,8 @@
## 7.0.0

- Updated minimum supported `SDK` version to Flutter 3.13/Dart 3.1
- 🛑 **BREAKING** Removed deprecated `getChannelStates.sort` parameter. Use `getChannelStates.channelStateSort` instead.
- 🛑 **BREAKING** Removed deprecated `getChannelStates.sort` parameter. Use
`getChannelStates.channelStateSort` instead.

## 6.10.0

Expand All @@ -148,7 +156,8 @@

## 6.7.0

- [[#1683]](https://github.com/GetStream/stream-chat-flutter/issues/1683) Fixed SqliteException no such column `messages.state`.
- [[#1683]](https://github.com/GetStream/stream-chat-flutter/issues/1683) Fixed SqliteException no
such column `messages.state`.
- Updated `stream_chat` dependency to [`6.7.0`](https://pub.dev/packages/stream_chat/changelog).

## 6.6.0
Expand All @@ -169,12 +178,14 @@

## 6.2.0

- Added support for `StreamChatPersistenceClient.isConnected` for checking if the client is connected to the database.
- Added support for `StreamChatPersistenceClient.isConnected` for checking if the client is
connected to the database.
- [[#1422]](https://github.com/GetStream/stream-chat-flutter/issues/1422) Removed default values
from `UserEntity` `createdAt` and `updatedAt` fields.
- Updated `stream_chat` dependency to [`6.2.0`](https://pub.dev/packages/stream_chat/changelog).
- Added support for `StreamChatPersistenceClient.openPersistenceConnection`
and `StreamChatPersistenceClient.closePersistenceConnection` for opening and closing the database connection.
and `StreamChatPersistenceClient.closePersistenceConnection` for opening and closing the database
connection.

## 6.1.0

Expand Down Expand Up @@ -202,7 +213,8 @@

## 5.0.0-beta.1

- Updated `stream_chat` dependency to [`5.0.0-beta.1`](https://pub.dev/packages/stream_chat/changelog).
- Updated `stream_chat` dependency to [
`5.0.0-beta.1`](https://pub.dev/packages/stream_chat/changelog).

## 4.4.0

Expand Down Expand Up @@ -234,7 +246,8 @@

## 4.0.0-beta.0

- Updated `stream_chat` dependency to [`4.0.0-beta.0`](https://pub.dev/packages/stream_chat/changelog).
- Updated `stream_chat` dependency to [
`4.0.0-beta.0`](https://pub.dev/packages/stream_chat/changelog).

## 3.1.0

Expand All @@ -243,8 +256,10 @@
## 3.0.0

- Updated `stream_chat` dependency to [`3.0.0`](https://pub.dev/packages/stream_chat/changelog).
- [[#604]](https://github.com/GetStream/stream-chat-flutter/issues/604) Fix cascade deletion by enabling `pragma foreign_keys`.
- Added a new table `PinnedMessageReactions` and dao `PinnedMessageReactionDao` specifically for pinned messages.
- [[#604]](https://github.com/GetStream/stream-chat-flutter/issues/604) Fix cascade deletion by
enabling `pragma foreign_keys`.
- Added a new table `PinnedMessageReactions` and dao `PinnedMessageReactionDao` specifically for
pinned messages.

## 2.2.0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,13 @@ class StreamChatPersistenceClient extends ChatPersistenceClient {
return db!.transaction(() => super.updateChannelStates(channelStates));
}

@override
Future<void> flush() {
assert(_debugIsConnected, '');
_logger.info('flush');
return db!.flush();
}

@override
Future<void> disconnect({bool flush = false}) async {
_logger.info('disconnect');
Expand Down
Loading