@@ -2,6 +2,9 @@ import { digest } from './crypto';
22import * as secp from '@noble/secp256k1' ;
33import { Buffer } from 'buffer'
44import * as HttpApi from './HttpApi'
5+ import waitFor from './waitFor' ;
6+
7+ export const NOTIFICATIONS_RETRY_TIMEOUT = 3000
58
69export const generatePrivateKey = ( ) : Uint8Array => {
710 return secp . utils . randomPrivateKey ( )
@@ -39,11 +42,19 @@ export const broadcast = async (httpBase: string, signingKey: Uint8Array, messag
3942 return HttpApi . waitForTx ( httpBase , result . txhash )
4043}
4144
45+ export type WebSocketState =
46+ | { type : 'subscribed' , socket : WebSocket }
47+ | { type : 'unsubscribed' }
48+
4249export class KolmeClient {
4350 httpBase : string
51+ notificationsSocket : WebSocketState
52+ lastNotificationsErrorAt : number | undefined
53+ onNotificationsSocketStateChange : ( ( state : number ) => void ) | undefined
4454
4555 constructor ( httpBase : string ) {
4656 this . httpBase = httpBase ;
57+ this . notificationsSocket = { type : 'unsubscribed' } ;
4758 }
4859
4960 generatePrivateKey ( ) {
@@ -57,4 +68,80 @@ export class KolmeClient {
5768 async broadcast ( signingKey : Uint8Array , messages : unknown [ ] ) {
5869 return broadcast ( this . httpBase , signingKey , messages )
5970 }
71+
72+ onNotificationsSocketOpen ( ) {
73+ if ( this . onNotificationsSocketStateChange && this . notificationsSocket . type === 'subscribed' ) {
74+ this . onNotificationsSocketStateChange ( this . notificationsSocket . socket . readyState )
75+ }
76+ }
77+
78+ async onNotificationsSocketClosed ( onMessage : ( message : MessageEvent ) => void ) : Promise < boolean > {
79+ switch ( this . notificationsSocket . type ) {
80+ case 'subscribed' : {
81+ if ( this . onNotificationsSocketStateChange ) {
82+ this . onNotificationsSocketStateChange ( this . notificationsSocket . socket . readyState )
83+ }
84+
85+ // Make sure we're not re-trying too often
86+ if ( this . lastNotificationsErrorAt ) {
87+ const now = Date . now ( )
88+ const diff = now - this . lastNotificationsErrorAt
89+
90+ if ( diff < NOTIFICATIONS_RETRY_TIMEOUT ) {
91+ await waitFor ( NOTIFICATIONS_RETRY_TIMEOUT - diff )
92+ }
93+ }
94+
95+ this . subscribeToNotifications ( onMessage )
96+ return true
97+ }
98+ case 'unsubscribed' : {
99+ return false ;
100+ }
101+ }
102+ }
103+
104+ async subscribeToNotifications ( onMessage : ( message : MessageEvent ) => void , onReadyStateChange ? : ( state : number ) => void ) : Promise < void > {
105+ const endpoint = this . httpBase . replace ( 'https' , 'wss' )
106+
107+ if ( onReadyStateChange ) {
108+ this . onNotificationsSocketStateChange = onReadyStateChange
109+ }
110+
111+ return new Promise ( ( resolve , reject ) => {
112+ this . notificationsSocket = {
113+ type : 'subscribed' ,
114+ socket : HttpApi . subscribeToNotifications ( endpoint , {
115+ onMessage,
116+ onOpen : ( ) => {
117+ this . onNotificationsSocketOpen ( )
118+ resolve ( )
119+ } ,
120+ onClose : ( ) => {
121+ this . onNotificationsSocketClosed ( onMessage )
122+ } ,
123+ onError : ( ) => {
124+ this . lastNotificationsErrorAt = Date . now ( )
125+ reject ( )
126+ } ,
127+ } )
128+ }
129+ } )
130+ }
131+
132+ unsubscribeFromNotifications ( ) : boolean {
133+ this . onNotificationsSocketStateChange = undefined
134+
135+ switch ( this . notificationsSocket . type ) {
136+ case 'subscribed' : {
137+ this . notificationsSocket . socket . close ( )
138+ this . notificationsSocket = { type : 'unsubscribed' }
139+
140+ return true
141+ }
142+ case 'unsubscribed' : {
143+ return false
144+ }
145+ }
146+ }
60147}
0 commit comments