Skip to content

Commit 770fd84

Browse files
authored
Allow subscriptions (#85)
* Allow subscriptions * Update README * Disable debug * Update docs
1 parent 24ff3cb commit 770fd84

File tree

2 files changed

+84
-52
lines changed

2 files changed

+84
-52
lines changed

README.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ This is a websocket client written in JavaScript that allows retrieving authenti
77
Check [the demo](https://hass-auth-demo.glitch.me/). The repository also includes an [example client](https://github.com/home-assistant/home-assistant-js-websocket/blob/master/example.html):
88

99
Clone this repository, then go to home-assistant-js-websocket folder and run the following commands:
10+
1011
```bash
1112
yarn install
1213
yarn build
@@ -291,6 +292,10 @@ Subscribe to all or specific events on the Home Assistant bus. Calls `eventCallb
291292

292293
Returns a promise that will resolve to a function that will cancel the subscription once called.
293294

295+
Subscription will be automatically re-established after a reconnect.
296+
297+
Uses `conn.subscribeMessage` under the hood.
298+
294299
##### `conn.addEventListener(eventType, listener)`
295300

296301
Listen for events on the connection. [See docs.](#automatic-reconnecting)
@@ -299,6 +304,14 @@ Listen for events on the connection. [See docs.](#automatic-reconnecting)
299304

300305
Send a message to the server. Returns a promise that resolves or rejects based on the result of the server. Special case rejection is `ERR_CONNECTION_LOST` if the connection is lost while the command is in progress.
301306

307+
##### `conn.subscribeMessage(callback, subscribeMessage)`
308+
309+
Call an endpoint in Home Assistant that creates a subscription. Calls `callback` for each item that gets received.
310+
311+
Returns a promise that will resolve to a function that will cancel the subscription once called.
312+
313+
Subscription will be automatically re-established after a reconnect.
314+
302315
## Auth API Reference
303316

304317
An instance of Auth is returned from the `getAuth` method. It has the following properties:

lib/connection.ts

Lines changed: 71 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -49,21 +49,23 @@ type WebSocketResponse =
4949
| WebSocketResultResponse
5050
| WebSocketResultErrorResponse;
5151

52-
type SubscribeEventCommmandInFlight = {
52+
type SubscriptionUnsubscribe = () => Promise<void>;
53+
54+
interface SubscribeEventCommmandInFlight<T> {
5355
resolve: (result?: any) => void;
5456
reject: (err: any) => void;
55-
eventCallback: (ev: any) => void;
56-
eventType?: string;
57-
unsubscribe: () => Promise<void>;
58-
};
57+
callback: (ev: T) => void;
58+
subscribe: () => Promise<SubscriptionUnsubscribe>;
59+
unsubscribe: SubscriptionUnsubscribe;
60+
}
5961

6062
type CommandWithAnswerInFlight = {
6163
resolve: (result?: any) => void;
6264
reject: (err: any) => void;
6365
};
6466

6567
type CommandInFlight =
66-
| SubscribeEventCommmandInFlight
68+
| SubscribeEventCommmandInFlight<any>
6769
| CommandWithAnswerInFlight;
6870

6971
export class Connection {
@@ -112,16 +114,14 @@ export class Connection {
112114
Object.keys(oldCommands).forEach(id => {
113115
const info: CommandInFlight = oldCommands[id];
114116

115-
if ("eventCallback" in info) {
116-
this.subscribeEvents(info.eventCallback, info.eventType).then(
117-
unsub => {
118-
info.unsubscribe = unsub;
119-
// We need to resolve this in case it wasn't resolved yet.
120-
// This allows us to call subscribeEvents while we're disconnected
121-
// and recover properly.
122-
info.resolve();
123-
}
124-
);
117+
if ("subscribe" in info) {
118+
info.subscribe().then(unsub => {
119+
info.unsubscribe = unsub;
120+
// We need to resolve this in case it wasn't resolved yet.
121+
// This allows us to subscribe while we're disconnected
122+
// and recover properly.
123+
info.resolve();
124+
});
125125
}
126126
});
127127

@@ -164,39 +164,18 @@ export class Connection {
164164
this.socket.close();
165165
}
166166

167-
// eventCallback will be called when a new event fires
168-
// Returned promise resolves to an unsubscribe function.
167+
/**
168+
* Subscribe to a specific or all events.
169+
*
170+
* @param callback Callback to be called when a new event fires
171+
* @param eventType
172+
* @returns promise that resolves to an unsubscribe function
173+
*/
169174
async subscribeEvents<EventType>(
170-
eventCallback: (ev: EventType) => void,
175+
callback: (ev: EventType) => void,
171176
eventType?: string
172-
) {
173-
// Command ID that will be used
174-
const commandId = this._genCmdId();
175-
let info: SubscribeEventCommmandInFlight;
176-
177-
await new Promise((resolve, reject) => {
178-
// We store unsubscribe on info object. That way we can overwrite it in case
179-
// we get disconnected and we have to subscribe again.
180-
info = this.commands[commandId] = {
181-
resolve,
182-
reject,
183-
eventCallback: eventCallback as (ev: any) => void,
184-
eventType,
185-
unsubscribe: async () => {
186-
await this.sendMessagePromise(messages.unsubscribeEvents(commandId));
187-
delete this.commands[commandId];
188-
}
189-
};
190-
191-
try {
192-
this.sendMessage(messages.subscribeEvents(eventType), commandId);
193-
} catch (err) {
194-
// Happens when the websocket is already closing.
195-
// Don't have to handle the error, reconnect logic will pick it up.
196-
}
197-
});
198-
199-
return () => info.unsubscribe();
177+
): Promise<SubscriptionUnsubscribe> {
178+
return this.subscribeMessage(callback, messages.subscribeEvents(eventType));
200179
}
201180

202181
ping() {
@@ -224,6 +203,46 @@ export class Connection {
224203
});
225204
}
226205

206+
/**
207+
* Call a websocket command that starts a subscription on the backend.
208+
*
209+
* @param message the message to start the subscription
210+
* @param callback the callback to be called when a new item arrives
211+
* @returns promise that resolves to an unsubscribe function
212+
*/
213+
async subscribeMessage<Result>(
214+
callback: (result: Result) => void,
215+
subscribeMessage: MessageBase
216+
): Promise<SubscriptionUnsubscribe> {
217+
// Command ID that will be used
218+
const commandId = this._genCmdId();
219+
let info: SubscribeEventCommmandInFlight<Result>;
220+
221+
await new Promise((resolve, reject) => {
222+
// We store unsubscribe on info object. That way we can overwrite it in case
223+
// we get disconnected and we have to subscribe again.
224+
info = this.commands[commandId] = {
225+
resolve,
226+
reject,
227+
callback,
228+
subscribe: () => this.subscribeMessage(callback, subscribeMessage),
229+
unsubscribe: async () => {
230+
await this.sendMessagePromise(messages.unsubscribeEvents(commandId));
231+
delete this.commands[commandId];
232+
}
233+
};
234+
235+
try {
236+
this.sendMessage(subscribeMessage, commandId);
237+
} catch (err) {
238+
// Happens when the websocket is already closing.
239+
// Don't have to handle the error, reconnect logic will pick it up.
240+
}
241+
});
242+
243+
return () => info.unsubscribe();
244+
}
245+
227246
private _handleMessage(event: MessageEvent) {
228247
const message: WebSocketResponse = JSON.parse(event.data);
229248

@@ -235,8 +254,8 @@ export class Connection {
235254
case "event":
236255
const eventInfo = this.commands[
237256
message.id
238-
] as SubscribeEventCommmandInFlight;
239-
eventInfo.eventCallback(message.event);
257+
] as SubscribeEventCommmandInFlight<any>;
258+
eventInfo.callback(message.event);
240259
break;
241260

242261
case "result":
@@ -247,8 +266,8 @@ export class Connection {
247266
if (message.success) {
248267
info.resolve(message.result);
249268

250-
// Don't remove event subscriptions.
251-
if (!("eventCallback" in info)) {
269+
// Don't remove subscriptions.
270+
if (!("subscribe" in info)) {
252271
delete this.commands[message.id];
253272
}
254273
} else {
@@ -278,7 +297,7 @@ export class Connection {
278297

279298
// We don't cancel subscribeEvents commands in flight
280299
// as we will be able to recover them.
281-
if (!("eventCallback" in info)) {
300+
if (!("subscribe" in info)) {
282301
info.reject(messages.error(ERR_CONNECTION_LOST, "Connection lost"));
283302
}
284303
});

0 commit comments

Comments
 (0)