Skip to content

Commit 26bd2ee

Browse files
authored
refactor(core, ui): improve app lifecycle and ws connectivity handling (#2339)
1 parent 0033f75 commit 26bd2ee

File tree

10 files changed

+391
-735
lines changed

10 files changed

+391
-735
lines changed

packages/stream_chat/lib/src/client/client.dart

Lines changed: 37 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,13 @@ class StreamChatClient {
123123
},
124124
);
125125

126+
_connectionStatusSubscription = wsConnectionStatusStream.pairwise().listen(
127+
(statusPair) {
128+
final [prevStatus, currStatus] = statusPair;
129+
return _onConnectionStatusChanged(prevStatus, currStatus);
130+
},
131+
);
132+
126133
state = ClientState(this);
127134
}
128135

@@ -220,7 +227,7 @@ class StreamChatClient {
220227
///```
221228
final LogHandlerFunction logHandlerFunction;
222229

223-
StreamSubscription<ConnectionStatus>? _connectionStatusSubscription;
230+
StreamSubscription<List<ConnectionStatus>>? _connectionStatusSubscription;
224231

225232
final _eventController = PublishSubject<Event>();
226233

@@ -238,20 +245,14 @@ class StreamChatClient {
238245
},
239246
);
240247

241-
final _wsConnectionStatusController =
242-
BehaviorSubject.seeded(ConnectionStatus.disconnected);
243-
244-
set _wsConnectionStatus(ConnectionStatus status) =>
245-
_wsConnectionStatusController.add(status);
246-
247248
/// The current status value of the [_ws] connection
248-
ConnectionStatus get wsConnectionStatus =>
249-
_wsConnectionStatusController.value;
249+
ConnectionStatus get wsConnectionStatus => _ws.connectionStatus;
250250

251251
/// This notifies the connection status of the [_ws] connection.
252252
/// Listen to this to get notified when the [_ws] tries to reconnect.
253-
Stream<ConnectionStatus> get wsConnectionStatusStream =>
254-
_wsConnectionStatusController.stream.distinct();
253+
Stream<ConnectionStatus> get wsConnectionStatusStream {
254+
return _ws.connectionStatusStream.distinct();
255+
}
255256

256257
/// Default log handler function for the [StreamChatClient] logger.
257258
static void defaultLogHandler(LogRecord record) {
@@ -447,16 +448,6 @@ class StreamChatClient {
447448
throw StreamChatError('Connection already available for ${user.id}');
448449
}
449450

450-
_wsConnectionStatus = ConnectionStatus.connecting;
451-
452-
// skipping `ws` seed connection status -> ConnectionStatus.disconnected
453-
// otherwise `client.wsConnectionStatusStream` will emit in order
454-
// 1. ConnectionStatus.disconnected -> client seed status
455-
// 2. ConnectionStatus.connecting -> client connecting status
456-
// 3. ConnectionStatus.disconnected -> ws seed status
457-
_connectionStatusSubscription =
458-
_ws.connectionStatusStream.skip(1).listen(_connectionStatusHandler);
459-
460451
try {
461452
final event = await _ws.connect(
462453
user,
@@ -479,13 +470,7 @@ class StreamChatClient {
479470
/// This will not trigger default auto-retry mechanism for reconnection.
480471
/// You need to call [openConnection] to reconnect to [_ws].
481472
void closeConnection() {
482-
if (wsConnectionStatus == ConnectionStatus.disconnected) return;
483-
484473
logger.info('Closing web-socket connection for ${state.currentUser?.id}');
485-
_wsConnectionStatus = ConnectionStatus.disconnected;
486-
487-
_connectionStatusSubscription?.cancel();
488-
_connectionStatusSubscription = null;
489474

490475
// Stop listening to events
491476
state.cancelEventSubscription();
@@ -513,19 +498,25 @@ class StreamChatClient {
513498
return _eventController.add(event);
514499
}
515500

516-
void _connectionStatusHandler(ConnectionStatus status) async {
517-
final previousState = wsConnectionStatus;
518-
final currentState = _wsConnectionStatus = status;
501+
void _onConnectionStatusChanged(
502+
ConnectionStatus prevStatus,
503+
ConnectionStatus currStatus,
504+
) async {
505+
// If the status hasn't changed, we don't need to do anything.
506+
if (prevStatus == currStatus) return;
519507

520-
if (previousState != currentState) {
521-
handleEvent(Event(
522-
type: EventType.connectionChanged,
523-
online: status == ConnectionStatus.connected,
524-
));
525-
}
508+
final wasConnected = prevStatus == ConnectionStatus.connected;
509+
final isConnected = currStatus == ConnectionStatus.connected;
510+
511+
// Notify the connection status change event
512+
handleEvent(Event(
513+
type: EventType.connectionChanged,
514+
online: isConnected,
515+
));
526516

527-
if (currentState == ConnectionStatus.connected &&
528-
previousState != ConnectionStatus.connected) {
517+
final connectionRecovered = !wasConnected && isConnected;
518+
519+
if (connectionRecovered) {
529520
// connection recovered
530521
final cids = state.channels.keys.toList(growable: false);
531522
if (cids.isNotEmpty) {
@@ -2025,6 +2016,9 @@ class StreamChatClient {
20252016
Future<void> disconnectUser({bool flushChatPersistence = false}) async {
20262017
logger.info('Disconnecting user : ${state.currentUser?.id}');
20272018

2019+
// closing web-socket connection
2020+
closeConnection();
2021+
20282022
// resetting state.
20292023
state.dispose();
20302024
state = ClientState(this);
@@ -2035,27 +2029,17 @@ class StreamChatClient {
20352029
_connectionIdManager.reset();
20362030

20372031
// closing persistence connection.
2038-
await closePersistenceConnection(flush: flushChatPersistence);
2039-
2040-
// closing web-socket connection
2041-
return closeConnection();
2032+
return closePersistenceConnection(flush: flushChatPersistence);
20422033
}
20432034

20442035
/// Call this function to dispose the client
20452036
Future<void> dispose() async {
2046-
logger.info('Disposing new StreamChatClient');
2047-
2048-
// disposing state.
2049-
state.dispose();
2050-
2051-
// closing persistence connection.
2052-
await closePersistenceConnection();
2053-
2054-
// closing web-socket connection.
2055-
closeConnection();
2037+
logger.info('Disposing StreamChatClient');
20562038

2039+
await disconnectUser();
2040+
await _ws.dispose();
20572041
await _eventController.close();
2058-
await _wsConnectionStatusController.close();
2042+
await _connectionStatusSubscription?.cancel();
20592043
}
20602044
}
20612045

packages/stream_chat/lib/src/ws/websocket.dart

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ class WebSocket with TimerHelper {
4343
this.reconnectionMonitorInterval = 10,
4444
this.healthCheckInterval = 20,
4545
this.reconnectionMonitorTimeout = 40,
46+
this.maxReconnectAttempts = 6,
4647
this.queryParameters = const {},
4748
}) : _logger = logger;
4849

@@ -93,6 +94,11 @@ class WebSocket with TimerHelper {
9394
/// connection unhealthy
9495
final int reconnectionMonitorTimeout;
9596

97+
/// Maximum number of reconnection attempts before giving up.
98+
///
99+
/// Default is 6 attempts. ~30 seconds of reconnection attempts
100+
final int maxReconnectAttempts;
101+
96102
User? _user;
97103
String? _connectionId;
98104
DateTime? _lastEventAt;
@@ -105,18 +111,20 @@ class WebSocket with TimerHelper {
105111
///
106112
String? get connectionId => _connectionId;
107113

108-
final _connectionStatusController =
109-
BehaviorSubject.seeded(ConnectionStatus.disconnected);
110-
111-
set _connectionStatus(ConnectionStatus status) =>
112-
_connectionStatusController.safeAdd(status);
114+
final _connectionStatusController = BehaviorSubject.seeded(
115+
ConnectionStatus.disconnected,
116+
);
113117

114118
/// The current connection status value
115119
ConnectionStatus get connectionStatus => _connectionStatusController.value;
120+
set _connectionStatus(ConnectionStatus status) {
121+
_connectionStatusController.safeAdd(status);
122+
}
116123

117124
/// This notifies of connection status changes
118-
Stream<ConnectionStatus> get connectionStatusStream =>
119-
_connectionStatusController.stream.distinct();
125+
Stream<ConnectionStatus> get connectionStatusStream {
126+
return _connectionStatusController.stream.distinct();
127+
}
120128

121129
void _initWebSocketChannel(Uri uri) {
122130
_logger?.info('Initiating connection with $baseUrl');
@@ -250,6 +258,12 @@ class WebSocket with TimerHelper {
250258
void _reconnect({bool refreshToken = false}) async {
251259
_logger?.info('Retrying connection : $_reconnectAttempt');
252260
if (_reconnectRequestInProgress) return;
261+
262+
if (_reconnectAttempt >= maxReconnectAttempts) {
263+
_logger?.severe('Max reconnect attempts reached: $maxReconnectAttempts');
264+
return disconnect();
265+
}
266+
253267
_reconnectRequestInProgress = true;
254268

255269
_stopMonitoringEvents();
@@ -495,4 +509,14 @@ class WebSocket with TimerHelper {
495509

496510
_closeWebSocketChannel();
497511
}
512+
513+
/// Disposes the web-socket connection and releases resources
514+
Future<void> dispose() async {
515+
_logger?.info('Disposing web-socket connection');
516+
517+
_stopMonitoringEvents();
518+
_unsubscribeFromWebSocketChannel();
519+
_closeWebSocketChannel();
520+
_connectionStatusController.close();
521+
}
498522
}

packages/stream_chat/test/src/client/client_test.dart

Lines changed: 9 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -149,21 +149,6 @@ void main() {
149149
}
150150
});
151151

152-
test('should throw if connection is already in progress', () async {
153-
expect(client.state.currentUser, isNull);
154-
try {
155-
await client.connectAnonymousUser();
156-
await client.openConnection();
157-
} catch (e) {
158-
expect(e, isA<StreamChatError>());
159-
final err = e as StreamChatError;
160-
expect(
161-
err.message.contains('Connection already in progress for'),
162-
isTrue,
163-
);
164-
}
165-
});
166-
167152
test('should throw if connection is already available', () async {
168153
expect(client.state.currentUser, isNull);
169154
try {
@@ -499,7 +484,6 @@ void main() {
499484
group('Client with connected user with persistence', () {
500485
const apiKey = 'test-api-key';
501486
late final api = FakeChatApi();
502-
late final ws = FakeWebSocket();
503487
late final persistence = MockPersistenceClient();
504488

505489
final user = User(id: 'test-user-id');
@@ -518,6 +502,7 @@ void main() {
518502
when(() => persistence.updateLastSyncAt(any()))
519503
.thenAnswer((_) => Future.value());
520504
when(persistence.getLastSyncAt).thenAnswer((_) async => null);
505+
final ws = FakeWebSocket();
521506
client = StreamChatClient(apiKey, chatApi: api, ws: ws)
522507
..chatPersistenceClient = persistence;
523508
await client.connectUser(user, token);
@@ -526,8 +511,8 @@ void main() {
526511
expect(client.wsConnectionStatus, ConnectionStatus.connected);
527512
});
528513

529-
tearDown(() {
530-
client.dispose();
514+
tearDown(() async {
515+
await client.dispose();
531516
});
532517

533518
group('`.sync`', () {
@@ -815,7 +800,6 @@ void main() {
815800
const apiKey = 'test-api-key';
816801
const userId = 'test-user-id';
817802
late final api = FakeChatApi();
818-
late final ws = FakeWebSocket();
819803

820804
final user = User(id: userId);
821805
final token = Token.development(user.id).rawValue;
@@ -832,15 +816,16 @@ void main() {
832816
});
833817

834818
setUp(() async {
819+
final ws = FakeWebSocket();
835820
client = StreamChatClient(apiKey, chatApi: api, ws: ws);
836821
await client.connectUser(user, token);
837822
await delay(300);
838823
expect(client.persistenceEnabled, isFalse);
839824
expect(client.wsConnectionStatus, ConnectionStatus.connected);
840825
});
841826

842-
tearDown(() {
843-
client.dispose();
827+
tearDown(() async {
828+
await client.dispose();
844829
});
845830

846831
group('`.sync`', () {
@@ -3490,22 +3475,22 @@ void main() {
34903475
group('PersistenceConnectionTests', () {
34913476
const apiKey = 'test-api-key';
34923477
late final api = FakeChatApi();
3493-
late final ws = FakeWebSocket();
34943478

34953479
final user = User(id: 'test-user-id');
34963480
final token = Token.development(user.id).rawValue;
34973481

34983482
late StreamChatClient client;
34993483

35003484
setUp(() async {
3485+
final ws = FakeWebSocket();
35013486
client = StreamChatClient(apiKey, chatApi: api, ws: ws);
35023487
expect(client.persistenceEnabled, isFalse);
35033488
});
35043489

3505-
tearDown(() {
3490+
tearDown(() async {
35063491
client.chatPersistenceClient = null;
35073492
expect(client.persistenceEnabled, isFalse);
3508-
client.dispose();
3493+
await client.dispose();
35093494
});
35103495

35113496
test('openPersistenceConnection connects the client to the user', () async {

0 commit comments

Comments
 (0)