@@ -2,7 +2,7 @@ import 'dart:async';
22import 'dart:convert';
33import 'package:flutter/foundation.dart';
44import 'package:web_socket_channel/web_socket_channel.dart';
5- import 'package:web_socket_channel/status.dart';
5+ import 'package:web_socket_channel/status.dart' as status ;
66import 'exception.dart';
77import 'realtime_subscription.dart';
88import 'client.dart';
@@ -15,35 +15,46 @@ typedef GetFallbackCookie = String? Function();
1515
1616mixin RealtimeMixin {
1717 late Client client;
18- final Map <String , List < StreamController < RealtimeMessage >> > _channels = {};
18+ final Set <String > _channels = {};
1919 WebSocketChannel? _websok;
2020 String? _lastUrl;
2121 late WebSocketFactory getWebSocket;
2222 GetFallbackCookie? getFallbackCookie;
2323 int? get closeCode => _websok?.closeCode;
24+ int _subscriptionsCounter = 0;
25+ Map<int , RealtimeSubscription > _subscriptions = {};
26+ bool _notifyDone = true;
27+ StreamSubscription? _websocketSubscription;
28+ bool _creatingSocket = false;
2429
2530 Future<dynamic > _closeConnection() async {
26- await _websok?.sink.close(normalClosure);
31+ await _websocketSubscription?.cancel();
32+ await _websok?.sink.close(status.normalClosure, 'Ending session');
2733 _lastUrl = null;
2834 }
2935
3036 _createSocket() async {
37+ if(_creatingSocket || _channels.isEmpty) return;
38+ _creatingSocket = true;
3139 final uri = _prepareUri();
3240 if (_websok == null) {
3341 _websok = await getWebSocket(uri);
3442 _lastUrl = uri.toString();
3543 } else {
3644 if (_lastUrl == uri.toString() && _websok?.closeCode == null) {
45+ _creatingSocket = false;
3746 return;
3847 }
48+ _notifyDone = false;
3949 await _closeConnection();
4050 _lastUrl = uri.toString();
4151 _websok = await getWebSocket(uri);
52+ _notifyDone = true;
4253 }
4354 debugPrint('subscription: $_lastUrl');
4455
4556 try {
46- _websok?.stream.listen((response) {
57+ _websocketSubscription = _websok?.stream.listen((response) {
4758 final data = RealtimeResponse.fromJson(response);
4859 switch (data.type) {
4960 case 'error':
@@ -67,28 +78,25 @@ mixin RealtimeMixin {
6778 break;
6879 case 'event':
6980 final message = RealtimeMessage.fromMap(data.data);
70- for(var channel in message.channels ) {
71- if (_channels[ channel] != null ) {
72- for( var stream in _channels[ channel]! ) {
73- stream.sink .add(message);
81+ for (var subscription in _subscriptions.values ) {
82+ for (var channel in message.channels ) {
83+ if (subscription.channels.contains( channel) ) {
84+ subscription.controller .add(message);
7485 }
7586 }
7687 }
7788 break;
7889 }
7990 }, onDone: () {
80- for (var list in _channels.values) {
81- for (var stream in list) {
82- stream.close();
83- }
91+ if (!_notifyDone || _creatingSocket) return;
92+ for (var subscription in _subscriptions.values) {
93+ subscription.close();
8494 }
8595 _channels.clear();
8696 _closeConnection();
8797 }, onError: (err, stack) {
88- for (var list in _channels.values) {
89- for (var stream in list) {
90- stream.sink.addError(err, stack);
91- }
98+ for (var subscription in _subscriptions.values) {
99+ subscription.controller.addError(err, stack);
92100 }
93101 if (_websok?.closeCode != null && _websok?.closeCode != 1008) {
94102 debugPrint("Reconnecting in one second.");
@@ -103,6 +111,8 @@ mixin RealtimeMixin {
103111 throw {{spec .title | caseUcfirst }}Exception(e.message);
104112 }
105113 throw {{spec .title | caseUcfirst }}Exception(e.toString());
114+ } finally {
115+ _creatingSocket = false;
106116 }
107117 }
108118
@@ -118,43 +128,49 @@ mixin RealtimeMixin {
118128 port: uri.port,
119129 queryParameters: {
120130 "project": client.config['project'],
121- "channels[]": _channels.keys. toList(),
131+ "channels[]": _channels.toList(),
122132 },
123133 path: uri.path + "/realtime",
124134 );
125135 }
126136
127137 RealtimeSubscription subscribeTo(List<String > channels) {
128138 StreamController<RealtimeMessage > controller = StreamController.broadcast();
129- for(var channel in channels) {
130- if (!_channels.containsKey(channel)) {
131- _channels[channel] = [];
132- }
133- _channels[channel]!.add(controller);
134- }
139+ _channels.addAll(channels);
135140 Future.delayed(Duration.zero, () => _createSocket());
141+ int id = DateTime.now().microsecondsSinceEpoch;
136142 RealtimeSubscription subscription = RealtimeSubscription(
137- stream: controller.stream,
143+ controller: controller,
144+ channels: channels,
138145 close: () async {
146+ _subscriptions.remove(id);
147+ _subscriptionsCounter--;
139148 controller.close();
140- for(var channel in channels) {
141- _channels[channel]!.remove(controller);
142- if (_channels[channel]!.isEmpty) {
143- _channels.remove(channel);
144- }
145- }
146- if(_channels.isNotEmpty) {
149+ _cleanup(channels);
150+
151+ if (_channels.isNotEmpty) {
147152 await Future.delayed(Duration.zero, () => _createSocket());
148153 } else {
149154 await _closeConnection();
150155 }
151156 });
157+ _subscriptions[id] = subscription;
152158 return subscription;
153159 }
154160
161+ void _cleanup(List<String > channels) {
162+ for (var channel in channels) {
163+ bool found = _subscriptions.values
164+ .any((subscription) => subscription.channels.contains(channel));
165+ if (!found) {
166+ _channels.remove(channel);
167+ }
168+ }
169+ }
170+
155171 void handleError(RealtimeResponse response) {
156172 if (response.data['code'] == 1008) {
157- throw {{ spec . title | caseUcfirst }}Exception (response.data["message"], response.data["code"]);
173+ throw AppwriteException (response.data["message"], response.data["code"]);
158174 } else {
159175 debugPrint("Reconnecting in one second.");
160176 Future.delayed(const Duration(seconds: 1), () {
0 commit comments