@@ -23,36 +23,40 @@ mixin RealtimeMixin {
2323 int? get closeCode => _websok?.closeCode;
2424 Map<int , RealtimeSubscription > _subscriptions = {};
2525 bool _notifyDone = true;
26+ bool _reconnect = true;
27+ int _retries = 0;
2628 StreamSubscription? _websocketSubscription;
2729 bool _creatingSocket = false;
2830
2931 Future<dynamic > _closeConnection() async {
3032 await _websocketSubscription?.cancel();
3133 await _websok?.sink.close(status.normalClosure, 'Ending session');
3234 _lastUrl = null;
35+ _retries = 0;
36+ _reconnect = false;
3337 }
3438
3539 _createSocket() async {
3640 if(_creatingSocket || _channels.isEmpty) return;
3741 _creatingSocket = true;
3842 final uri = _prepareUri();
39- if (_websok == null) {
40- _websok = await getWebSocket(uri);
41- _lastUrl = uri.toString();
42- } else {
43- if (_lastUrl == uri.toString() && _websok?.closeCode == null) {
44- _creatingSocket = false;
45- return;
46- }
47- _notifyDone = false;
48- await _closeConnection();
49- _lastUrl = uri.toString();
50- _websok = await getWebSocket(uri);
51- _notifyDone = true;
52- }
53- debugPrint('subscription: $_lastUrl');
54-
5543 try {
44+ if (_websok == null || _websok?.closeCode != null) {
45+ _websok = await getWebSocket(uri);
46+ _lastUrl = uri.toString();
47+ } else {
48+ if (_lastUrl == uri.toString() && _websok?.closeCode == null) {
49+ _creatingSocket = false;
50+ return;
51+ }
52+ _notifyDone = false;
53+ await _closeConnection();
54+ _lastUrl = uri.toString();
55+ _websok = await getWebSocket(uri);
56+ _notifyDone = true;
57+ }
58+ debugPrint('subscription: $_lastUrl');
59+ _retries = 0;
5660 _websocketSubscription = _websok?.stream.listen((response) {
5761 final data = RealtimeResponse.fromJson(response);
5862 switch (data.type) {
@@ -87,34 +91,44 @@ mixin RealtimeMixin {
8791 break;
8892 }
8993 }, onDone: () {
90- final subscriptions = List.from(_subscriptions.values);
91- for (var subscription in subscriptions) {
92- subscription.close();
93- }
94- _channels.clear();
95- _closeConnection();
94+ _retry();
9695 }, onError: (err, stack) {
9796 for (var subscription in _subscriptions.values) {
9897 subscription.controller.addError(err, stack);
9998 }
100- if (_websok?.closeCode != null && _websok?.closeCode != 1008) {
101- debugPrint("Reconnecting in one second.");
102- Future.delayed(Duration(seconds: 1), _createSocket);
103- }
99+ _retry();
104100 });
105101 } catch (e) {
106102 if (e is {{spec .title | caseUcfirst }}Exception) {
107103 rethrow;
108104 }
109- if (e is WebSocketChannelException) {
110- throw {{spec .title | caseUcfirst }}Exception(e.message);
111- }
112- throw {{spec .title | caseUcfirst }}Exception(e.toString());
105+ debugPrint(e.toString());
106+ _retry();
113107 } finally {
114108 _creatingSocket = false;
115109 }
116110 }
117111
112+ void _retry() async {
113+ if (!_reconnect || _websok?.closeCode == status.policyViolation) {
114+ _reconnect = true;
115+ return;
116+ }
117+ _retries++;
118+ debugPrint("Reconnecting in ${_getTimeout()} seconds.");
119+ Future.delayed(Duration(seconds: _getTimeout()), _createSocket);
120+ }
121+
122+ int _getTimeout() {
123+ return _retries < 5
124+ ? 1
125+ : _retries < 15
126+ ? 5
127+ : _retries < 100
128+ ? 10
129+ : 60;
130+ }
131+
118132 Uri _prepareUri() {
119133 if (client.endPointRealtime == null) {
120134 throw {{spec .title | caseUcfirst }}Exception(
@@ -167,13 +181,10 @@ mixin RealtimeMixin {
167181 }
168182
169183 void handleError(RealtimeResponse response) {
170- if (response.data['code'] == 1008 ) {
171- throw AppwriteException (response.data["message"], response.data["code"]);
184+ if (response.data['code'] == status.policyViolation ) {
185+ throw {{ spec . title | caseUcfirst }}Exception (response.data["message"], response.data["code"]);
172186 } else {
173- debugPrint("Reconnecting in one second.");
174- Future.delayed(const Duration(seconds: 1), () {
175- _createSocket();
176- });
187+ _retry();
177188 }
178189 }
179190}
0 commit comments