Skip to content

Commit 7a6daf7

Browse files
authored
Queue messages after suspend-reconnect (#125)
1 parent 5ece14d commit 7a6daf7

File tree

2 files changed

+64
-7
lines changed

2 files changed

+64
-7
lines changed

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,11 @@ connection.suspendReconnectUntil(
137137
resolve();
138138
})
139139
);
140+
connection.suspend();
140141
```
141142

143+
When the suspend promise resolves until the connection is re-established, all messages being send will be delayed until the connection is established. If the first reconnect fails, the queued messages will be rejected.
144+
142145
#### Suspend connection
143146

144147
You can also actively close the connection and wait for a promise to resolve to reconnect again. This promise can be passed either with `suspendReconnectUntil` or with the `suspend` command itself.

lib/connection.ts

Lines changed: 61 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,13 @@ export class Connection {
8383
eventListeners: Map<string, ConnectionEventListener[]>;
8484
closeRequested: boolean;
8585
suspendReconnectPromise?: Promise<void>;
86+
87+
// We use this to queue messages in flight for the first reconnect
88+
// after the connection has been suspended.
89+
_queuedMessages?: Array<{
90+
resolve: () => unknown;
91+
reject?: (err: typeof ERR_CONNECTION_LOST) => unknown;
92+
}>;
8693
// @ts-ignore: incorrectly claiming it's not set in constructor.
8794
socket: HaWebSocket;
8895

@@ -132,6 +139,15 @@ export class Connection {
132139
}
133140
});
134141

142+
const queuedMessages = this._queuedMessages;
143+
144+
if (queuedMessages) {
145+
this._queuedMessages = undefined;
146+
for (const queuedMsg of queuedMessages) {
147+
queuedMsg.resolve();
148+
}
149+
}
150+
135151
this.fireEvent("ready");
136152
}
137153
}
@@ -171,12 +187,9 @@ export class Connection {
171187
this.suspendReconnectPromise = suspendPromise;
172188
}
173189

174-
suspend(suspendPromise?: Promise<void>) {
175-
if (suspendPromise) {
176-
this.suspendReconnectPromise = suspendPromise;
177-
}
190+
suspend() {
178191
if (!this.suspendReconnectPromise) {
179-
throw new Error("Can't suspend without a suspend promise");
192+
throw new Error("Suspend promise not set");
180193
}
181194
this.socket.close();
182195
}
@@ -209,6 +222,14 @@ export class Connection {
209222
console.log("Sending", message);
210223
}
211224

225+
if (this._queuedMessages) {
226+
if (commandId) {
227+
throw new Error("Cannot queue with commandId");
228+
}
229+
this._queuedMessages.push({ resolve: () => this.sendMessage(message) });
230+
return;
231+
}
232+
212233
if (!commandId) {
213234
commandId = this._genCmdId();
214235
}
@@ -219,6 +240,20 @@ export class Connection {
219240

220241
sendMessagePromise<Result>(message: MessageBase): Promise<Result> {
221242
return new Promise((resolve, reject) => {
243+
if (this._queuedMessages) {
244+
this._queuedMessages!.push({
245+
reject,
246+
resolve: async () => {
247+
try {
248+
resolve(await this.sendMessagePromise(message));
249+
} catch (err) {
250+
reject(err);
251+
}
252+
},
253+
});
254+
return;
255+
}
256+
222257
const commandId = this._genCmdId();
223258
this.commands.set(commandId, { resolve, reject });
224259
this.sendMessage(message, commandId);
@@ -236,11 +271,18 @@ export class Connection {
236271
callback: (result: Result) => void,
237272
subscribeMessage: MessageBase
238273
): Promise<SubscriptionUnsubscribe> {
239-
// Command ID that will be used
240-
const commandId = this._genCmdId();
274+
if (this._queuedMessages) {
275+
await new Promise((resolve, reject) => {
276+
this._queuedMessages!.push({ resolve, reject });
277+
});
278+
}
279+
241280
let info: SubscribeEventCommmandInFlight<Result>;
242281

243282
await new Promise((resolve, reject) => {
283+
// Command ID that will be used
284+
const commandId = this._genCmdId();
285+
244286
// We store unsubscribe on info object. That way we can overwrite it in case
245287
// we get disconnected and we have to subscribe again.
246288
info = {
@@ -348,6 +390,15 @@ export class Connection {
348390
const socket = await options.createSocket(options);
349391
this.setSocket(socket);
350392
} catch (err) {
393+
if (this._queuedMessages) {
394+
const queuedMessages = this._queuedMessages;
395+
this._queuedMessages = undefined;
396+
for (const msg of queuedMessages) {
397+
if (msg.reject) {
398+
msg.reject(ERR_CONNECTION_LOST);
399+
}
400+
}
401+
}
351402
if (err === ERR_INVALID_AUTH) {
352403
this.fireEvent("reconnect-error", err);
353404
} else {
@@ -360,6 +411,9 @@ export class Connection {
360411
if (this.suspendReconnectPromise) {
361412
await this.suspendReconnectPromise;
362413
this.suspendReconnectPromise = undefined;
414+
// For the first retry after suspend, we will queue up
415+
// all messages.
416+
this._queuedMessages = [];
363417
}
364418

365419
reconnect(0);

0 commit comments

Comments
 (0)