-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathwebsocket-util.ts
More file actions
162 lines (144 loc) · 4.61 KB
/
websocket-util.ts
File metadata and controls
162 lines (144 loc) · 4.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import WebSocket from 'isomorphic-ws';
import { WsRequestOperationKucoin } from '../../types/websockets/ws-api.js';
import { MessageEventLike } from '../../types/websockets/ws-events.js';
/** Should be one WS key per unique URL */
export const WS_KEY_MAP = {
spotPublicV1: 'spotPublicV1',
spotPrivateV1: 'spotPrivateV1',
futuresPublicV1: 'futuresPublicV1',
futuresPrivateV1: 'futuresPrivateV1',
/** Dedicated V2 (Pro) connection for push of public spot market data */
spotPublicProV2: 'spotPublicProV2',
/** Dedicated V2 (Pro) connection for push of public futures market data */
futuresPublicProV2: 'futuresPublicProV2',
/** Shared (spot & futures) V2 (Pro) connection for all private data */
privateProV2: 'privateProV2',
wsApiSpotV1: 'wsApiSpotV1',
wsApiFuturesV1: 'wsApiFuturesV1',
} as const;
/** This is used to differentiate between each of the available websocket streams */
export type WsKey = (typeof WS_KEY_MAP)[keyof typeof WS_KEY_MAP];
export type WSAPIWsKey =
| typeof WS_KEY_MAP.wsApiFuturesV1
| typeof WS_KEY_MAP.wsApiSpotV1;
/**
* Normalised internal format for a request (subscribe/unsubscribe/etc) on a topic, with optional parameters.
*
* - Topic: the topic this event is for
* - Payload: the parameters to include, optional. E.g. auth requires key + sign. Some topics allow configurable parameters.
*/
export interface WsTopicRequest<
TWSTopic extends string = string,
TWSPayload = any,
> {
topic: TWSTopic;
payload?: TWSPayload;
}
/**
* Conveniently allow users to request a topic either as string topics or objects (containing string topic + params)
*/
export type WsTopicRequestOrStringTopic<
TWSTopic extends string,
TWSPayload = any,
> = WsTopicRequest<TWSTopic, TWSPayload> | string;
/**
* #305: ws.terminate() is undefined in browsers.
* This only works in node.js, not in browsers.
* Does nothing if `ws` is undefined. Does nothing in browsers.
*/
export function safeTerminateWs(
ws?: WebSocket | any,
fallbackToClose?: boolean,
): boolean {
if (!ws) {
return false;
}
if (typeof ws['terminate'] === 'function') {
ws.terminate();
return true;
} else if (fallbackToClose) {
ws.close();
}
return false;
}
/**
* WS API promises are stored using a primary key. This key is constructed using
* properties found in every request & reply.
*
* The counterpart to this is in resolveEmittableEvents
*/
export function getPromiseRefForWSAPIRequest(
wsKey: WsKey,
requestEvent: WsRequestOperationKucoin<string>,
): string {
const promiseRef = [wsKey, requestEvent.id].join('_');
return promiseRef;
}
export function isWSAPIWsKey(wsKey: WsKey): wsKey is WSAPIWsKey {
return (
wsKey === WS_KEY_MAP.wsApiFuturesV1 || wsKey === WS_KEY_MAP.wsApiSpotV1
);
}
export function isBufferMessageEvent(
msg: unknown,
): msg is MessageEventLike<Buffer> {
if (typeof msg !== 'object' || !msg) {
return false;
}
const message = msg as MessageEventLike;
return message['type'] === 'message' && Buffer.isBuffer(message['data']);
}
export function bufferLooksLikeText(
data?: Buffer | ArrayBufferLike | null,
): boolean {
if (!data) {
return false;
}
const buf = Buffer.isBuffer(data)
? data
: Buffer.from(data as ArrayBufferLike);
if (!buf.length) {
return false;
}
const first = buf[0];
// '{' '[' or '"' are common JSON prefixes; fast heuristic to detect plaintext
return first === 0x7b || first === 0x5b || first === 0x22;
}
export async function decompressMessageEvent(
event: MessageEventLike<Buffer<ArrayBufferLike>>,
): Promise<MessageEventLike<any>> {
const data = event.data;
if (typeof data === 'string') {
return { ...event, type: 'message', data };
}
// Some KuCoin streams (notably newer spot public v2) send JSON in a binary
// frame without compression. If the payload already looks like UTF-8 text,
// skip decompression and just decode it so the message can be processed.
if (bufferLooksLikeText(data)) {
return {
...event,
type: 'message',
data: Buffer.from(data as any).toString('utf8'),
};
}
const ds = new DecompressionStream('deflate-raw');
const dataStream = new Response(data).body;
let decompressedStream: ReadableStream;
if (!dataStream) {
const uint8 = new Uint8Array(data);
const rs = new ReadableStream({
start(controller) {
controller.enqueue(uint8);
controller.close();
},
});
decompressedStream = rs.pipeThrough(ds);
} else {
decompressedStream = (dataStream as ReadableStream).pipeThrough(ds);
}
return {
...event,
type: 'message',
data: await new Response(decompressedStream).text(),
};
}