Skip to content

Commit c0cb91b

Browse files
fix for realtime multiple subscription
1 parent 12f86d7 commit c0cb91b

File tree

3 files changed

+66
-47
lines changed

3 files changed

+66
-47
lines changed

templates/flutter/lib/src/realtime_mixin.dart.twig

Lines changed: 46 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import 'dart:async';
22
import 'dart:convert';
33
import 'package:flutter/foundation.dart';
44
import 'package:web_socket_channel/web_socket_channel.dart';
5-
import 'package:web_socket_channel/status.dart';
5+
import 'package:web_socket_channel/status.dart' as status;
66
import 'exception.dart';
77
import 'realtime_subscription.dart';
88
import 'client.dart';
@@ -15,15 +15,20 @@ typedef GetFallbackCookie = String? Function();
1515

1616
mixin RealtimeMixin {
1717
late Client client;
18-
final Map<String, List<StreamController<RealtimeMessage>>> _channels = {};
18+
final Set<String> _channels = {};
1919
WebSocketChannel? _websok;
2020
String? _lastUrl;
2121
late WebSocketFactory getWebSocket;
2222
GetFallbackCookie? getFallbackCookie;
2323
int? get closeCode => _websok?.closeCode;
24+
int _subscriptionsCounter = 0;
25+
Map<int, RealtimeSubscription> _subscriptions = {};
26+
bool _notifyDone = true;
27+
StreamSubscription? _websocketSubscription;
2428

2529
Future<dynamic> _closeConnection() async {
26-
await _websok?.sink.close(normalClosure);
30+
await _websocketSubscription?.cancel();
31+
await _websok?.sink.close(status.normalClosure, 'Ending session');
2732
_lastUrl = null;
2833
}
2934

@@ -36,14 +41,16 @@ mixin RealtimeMixin {
3641
if (_lastUrl == uri.toString() && _websok?.closeCode == null) {
3742
return;
3843
}
44+
_notifyDone = false;
3945
await _closeConnection();
4046
_lastUrl = uri.toString();
4147
_websok = await getWebSocket(uri);
48+
_notifyDone = true;
4249
}
4350
debugPrint('subscription: $_lastUrl');
4451

4552
try {
46-
_websok?.stream.listen((response) {
53+
_websocketSubscription = _websok?.stream.listen((response) {
4754
final data = RealtimeResponse.fromJson(response);
4855
switch (data.type) {
4956
case 'error':
@@ -67,48 +74,45 @@ mixin RealtimeMixin {
6774
break;
6875
case 'event':
6976
final message = RealtimeMessage.fromMap(data.data);
70-
for(var channel in message.channels) {
71-
if (_channels[channel] != null) {
72-
for( var stream in _channels[channel]!) {
73-
stream.sink.add(message);
77+
for (var subscription in _subscriptions.values) {
78+
for (var channel in message.channels) {
79+
if (subscription.channels.contains(channel)) {
80+
subscription.controller.add(message);
7481
}
7582
}
7683
}
7784
break;
7885
}
7986
}, onDone: () {
80-
for (var list in _channels.values) {
81-
for (var stream in list) {
82-
stream.close();
83-
}
87+
if (!_notifyDone) return;
88+
for (var subscription in _subscriptions.values) {
89+
subscription.close();
8490
}
8591
_channels.clear();
8692
_closeConnection();
8793
}, onError: (err, stack) {
88-
for (var list in _channels.values) {
89-
for (var stream in list) {
90-
stream.sink.addError(err, stack);
91-
}
94+
for (var subscription in _subscriptions.values) {
95+
subscription.controller.addError(err, stack);
9296
}
9397
if (_websok?.closeCode != null && _websok?.closeCode != 1008) {
9498
debugPrint("Reconnecting in one second.");
9599
Future.delayed(Duration(seconds: 1), _createSocket);
96100
}
97101
});
98102
} catch (e) {
99-
if (e is {{spec.title | caseUcfirst}}Exception) {
103+
if (e is AppwriteException) {
100104
rethrow;
101105
}
102106
if (e is WebSocketChannelException) {
103-
throw {{spec.title | caseUcfirst}}Exception(e.message);
107+
throw AppwriteException(e.message);
104108
}
105-
throw {{spec.title | caseUcfirst}}Exception(e.toString());
109+
throw AppwriteException(e.toString());
106110
}
107111
}
108112

109113
Uri _prepareUri() {
110114
if (client.endPointRealtime == null) {
111-
throw {{spec.title | caseUcfirst}}Exception(
115+
throw AppwriteException(
112116
"Please set endPointRealtime to connect to realtime server");
113117
}
114118
var uri = Uri.parse(client.endPointRealtime!);
@@ -118,43 +122,49 @@ mixin RealtimeMixin {
118122
port: uri.port,
119123
queryParameters: {
120124
"project": client.config['project'],
121-
"channels[]": _channels.keys.toList(),
125+
"channels[]": _channels.toList(),
122126
},
123127
path: uri.path + "/realtime",
124128
);
125129
}
126130

127131
RealtimeSubscription subscribeTo(List<String> channels) {
128132
StreamController<RealtimeMessage> controller = StreamController.broadcast();
129-
for(var channel in channels) {
130-
if (!_channels.containsKey(channel)) {
131-
_channels[channel] = [];
132-
}
133-
_channels[channel]!.add(controller);
134-
}
133+
_channels.addAll(channels);
135134
Future.delayed(Duration.zero, () => _createSocket());
135+
int counter = _subscriptionsCounter++;
136136
RealtimeSubscription subscription = RealtimeSubscription(
137-
stream: controller.stream,
137+
controller: controller,
138+
channels: channels,
138139
close: () async {
140+
_subscriptions.remove(counter);
141+
_subscriptionsCounter--;
139142
controller.close();
140-
for(var channel in channels) {
141-
_channels[channel]!.remove(controller);
142-
if (_channels[channel]!.isEmpty) {
143-
_channels.remove(channel);
144-
}
145-
}
146-
if(_channels.isNotEmpty) {
143+
_cleanup(channels);
144+
145+
if (_channels.isNotEmpty) {
147146
await Future.delayed(Duration.zero, () => _createSocket());
148147
} else {
149148
await _closeConnection();
150149
}
151150
});
151+
_subscriptions[counter] = subscription;
152152
return subscription;
153153
}
154154

155+
void _cleanup(List<String> channels) {
156+
for (var channel in channels) {
157+
bool found = _subscriptions.values
158+
.any((subscription) => subscription.channels.contains(channel));
159+
if (!found) {
160+
_channels.remove(channel);
161+
}
162+
}
163+
}
164+
155165
void handleError(RealtimeResponse response) {
156166
if (response.data['code'] == 1008) {
157-
throw {{spec.title | caseUcfirst}}Exception(response.data["message"], response.data["code"]);
167+
throw AppwriteException(response.data["message"], response.data["code"]);
158168
} else {
159169
debugPrint("Reconnecting in one second.");
160170
Future.delayed(const Duration(seconds: 1), () {
Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,22 @@
1+
import 'dart:async';
2+
13
import 'realtime_message.dart';
24

35
/// Realtime Subscription
46
class RealtimeSubscription {
57
/// Stream of [RealtimeMessage]s
68
final Stream<RealtimeMessage> stream;
79

10+
final StreamController<RealtimeMessage> controller;
11+
12+
/// List of channels
13+
List<String> channels;
14+
815
/// Closes the subscription
916
final Future<void> Function() close;
1017

1118
/// Initializes a [RealtimeSubscription]
12-
RealtimeSubscription({required this.stream, required this.close});
19+
RealtimeSubscription(
20+
{required this.close, required this.channels, required this.controller})
21+
: stream = controller.stream;
1322
}

templates/flutter/test/src/realtime_subscription_test.dart.twig

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,20 @@
1-
import 'package:mockito/mockito.dart';
2-
import 'package:{{language.params.packageName}}/src/realtime_message.dart';
3-
import 'package:{{language.params.packageName}}/src/realtime_subscription.dart';
1+
import 'package:appwrite/src/realtime_message.dart';
2+
import 'package:appwrite/src/realtime_subscription.dart';
43
import 'package:flutter_test/flutter_test.dart';
5-
6-
class MockStream<T> extends Mock implements Stream<T> {}
7-
8-
4+
import 'dart:async';
95

106
void main() {
117
group('RealtimeSubscription', () {
12-
final mockStream = MockStream<RealtimeMessage>();
8+
final mockStream = StreamController<RealtimeMessage>.broadcast();
139
final mockCloseFunction = () async {};
14-
final subscription = RealtimeSubscription(stream: mockStream, close: mockCloseFunction);
10+
final subscription = RealtimeSubscription(
11+
controller: mockStream,
12+
close: mockCloseFunction,
13+
channels: ['documents']);
1514

1615
test('should have the correct stream and close function', () {
17-
expect(subscription.stream, equals(mockStream));
16+
expect(subscription.controller, equals(mockStream));
17+
expect(subscription.stream, equals(mockStream.stream));
1818
expect(subscription.close, equals(mockCloseFunction));
1919
});
2020
});

0 commit comments

Comments
 (0)