Skip to content

Commit 8b14e0d

Browse files
Merge pull request #75 from hyperledger/ws-connect
Allow custom app logic to trigger when a websocket connects/reconnects
2 parents 6fa9555 + e396f5d commit 8b14e0d

File tree

3 files changed

+29
-4
lines changed

3 files changed

+29
-4
lines changed

lib/firefly.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ import {
7979
FireFlyDeleteOptions,
8080
FireFlyTokenApprovalFilter,
8181
FireFlyTokenApprovalResponse,
82+
FireFlyWebSocketConnectCallback,
8283
} from './interfaces';
8384
import { FireFlyWebSocket, FireFlyWebSocketCallback } from './websocket';
8485
import HttpBase, { mapConfig } from './http';
@@ -598,6 +599,7 @@ export default class FireFly extends HttpBase {
598599
subscriptions: string | string[] | FireFlySubscriptionBase,
599600
callback: FireFlyWebSocketCallback,
600601
socketOptions?: WebSocket.ClientOptions | http.ClientRequestArgs,
602+
afterConnect?: FireFlyWebSocketConnectCallback,
601603
): FireFlyWebSocket {
602604
const options: FireFlyWebSocketOptions = {
603605
host: this.options.websocket.host,
@@ -609,6 +611,7 @@ export default class FireFly extends HttpBase {
609611
reconnectDelay: this.options.websocket.reconnectDelay,
610612
heartbeatInterval: this.options.websocket.heartbeatInterval,
611613
socketOptions: socketOptions,
614+
afterConnect: afterConnect,
612615
};
613616

614617
const handler: FireFlyWebSocketCallback = (socket, event) => {

lib/interfaces.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,14 @@ export interface FireFlyOptions extends FireFlyOptionsInput {
6161
};
6262
}
6363

64+
export interface FireFlyWebSocketSender {
65+
send: (json: JSON) => void;
66+
}
67+
68+
export interface FireFlyWebSocketConnectCallback {
69+
(sender: FireFlyWebSocketSender): void | Promise<void>;
70+
}
71+
6472
export interface FireFlyWebSocketOptions {
6573
host: string;
6674
namespace: string;
@@ -72,6 +80,7 @@ export interface FireFlyWebSocketOptions {
7280
reconnectDelay: number;
7381
heartbeatInterval: number;
7482
socketOptions?: WebSocket.ClientOptions | http.ClientRequestArgs;
83+
afterConnect?: FireFlyWebSocketConnectCallback;
7584
}
7685

7786
// Namespace

lib/websocket.ts

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ export class FireFlyWebSocket {
2626
private readonly logger = new Logger(FireFlyWebSocket.name);
2727

2828
private socket?: WebSocket;
29-
private closed = false;
29+
private closed? = () => {};
3030
private pingTimer?: NodeJS.Timeout;
3131
private disconnectTimer?: NodeJS.Timeout;
3232
private reconnectTimer?: NodeJS.Timeout;
@@ -61,7 +61,7 @@ export class FireFlyWebSocket {
6161
auth,
6262
handshakeTimeout: this.options.heartbeatInterval,
6363
}));
64-
this.closed = false;
64+
this.closed = undefined;
6565

6666
socket
6767
.on('open', () => {
@@ -83,13 +83,17 @@ export class FireFlyWebSocket {
8383
);
8484
this.logger.log(`Started listening on subscription ${this.options.namespace}:${name}`);
8585
}
86+
if (this.options?.afterConnect !== undefined) {
87+
this.options.afterConnect(this);
88+
}
8689
})
8790
.on('error', (err) => {
8891
this.logger.error('Error', err.stack);
8992
})
9093
.on('close', () => {
9194
if (this.closed) {
9295
this.logger.log('Closed');
96+
this.closed(); // do this after all logging
9397
} else {
9498
this.disconnectDetected = true;
9599
this.reconnect('Closed by peer');
@@ -156,6 +160,12 @@ export class FireFlyWebSocket {
156160
}
157161
}
158162

163+
send(json: JSON) {
164+
if (this.socket !== undefined) {
165+
this.socket.send(JSON.stringify(json));
166+
}
167+
}
168+
159169
ack(event: FireFlyEventDelivery) {
160170
if (this.socket !== undefined && event.id !== undefined) {
161171
this.socket.send(
@@ -168,15 +178,18 @@ export class FireFlyWebSocket {
168178
}
169179
}
170180

171-
close() {
172-
this.closed = true;
181+
async close(wait?: boolean): Promise<void> {
182+
const closedPromise = new Promise<void>(resolve => {
183+
this.closed = resolve;
184+
});
173185
this.clearPingTimers();
174186
if (this.socket) {
175187
try {
176188
this.socket.close();
177189
} catch (e: any) {
178190
this.logger.warn(`Failed to clean up websocket: ${e.message}`);
179191
}
192+
if (wait) await closedPromise;
180193
this.socket = undefined;
181194
}
182195
}

0 commit comments

Comments
 (0)