Skip to content

Commit e6d2992

Browse files
committed
Add zstd support (using zucc@next)
1 parent 187eec3 commit e6d2992

File tree

8 files changed

+357
-182
lines changed

8 files changed

+357
-182
lines changed

package.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "detritus-client-socket",
3-
"version": "0.4.1",
3+
"version": "0.4.2",
44
"description": "A TypeScript NodeJS library to interact with Discord's Gateway",
55
"main": "lib/index.js",
66
"types": "lib/index.d.ts",
@@ -19,7 +19,8 @@
1919
"pako": "^1.0.10",
2020
"sodium-native": "^2.4.6",
2121
"tweetnacl": "^1.0.1",
22-
"uws": "10.148.1"
22+
"uws": "10.148.1",
23+
"zucc": "^0.2.0"
2324
},
2425
"scripts": {
2526
"build": "tsc",

src/basesocket.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,14 @@ export class BaseSocket extends EventSpewer {
9494
this.pings.delete(nonce);
9595
}
9696
this.pings.clear();
97-
this.socket.removeAllListeners();
97+
98+
for (let event of Object.values(SocketEventsBase)) {
99+
// clear out all listeners but close from the socket
100+
if (event === SocketEventsBase.CLOSE) {
101+
continue;
102+
}
103+
this.socket.on(event, this.emit.bind(this, event));
104+
}
98105
this.removeAllListeners();
99106
}
100107

src/constants.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
export const Package = Object.freeze({
22
URL: 'https://github.com/detritusjs/client-socket',
3-
VERSION: '0.4.1',
3+
VERSION: '0.4.2',
44
});
55

66
function normalize(object: {[key: string]: any}) {
@@ -23,6 +23,8 @@ export const CompressTypes = Object.freeze({
2323
ZSTD: 'zstd-stream',
2424
});
2525

26+
export const COMPRESS_TYPES = Object.freeze(Object.values(CompressTypes));
27+
2628
export const DEFAULT_SHARD_COUNT = 1;
2729
export const DEFAULT_SHARD_LAUNCH_DELAY = 5000;
2830
export const DEFAULT_VOICE_TIMEOUT = 30000;
@@ -211,7 +213,7 @@ export const MediaEncryptionModes = Object.freeze({
211213
XSALSA20_POLY1305: 'xsalsa20_poly1305',
212214
});
213215

214-
export const MEDIA_ENCRYPTION_MODES = Object.values(MediaEncryptionModes);
216+
export const MEDIA_ENCRYPTION_MODES = Object.freeze(Object.values(MediaEncryptionModes));
215217

216218
export const MediaOpCodes = Object.freeze({
217219
IDENTIFY: 0,
@@ -235,7 +237,7 @@ export const MediaProtocols = Object.freeze({
235237
WEBRTC: 'webrtc',
236238
});
237239

238-
export const MEDIA_PROTOCOLS = Object.values(MediaProtocols);
240+
export const MEDIA_PROTOCOLS = Object.freeze(Object.values(MediaProtocols));
239241

240242
export const MediaReceivedVideoQuality = Object.freeze({
241243
OFF: 'off',
@@ -278,8 +280,6 @@ export const SocketEventsBase = Object.freeze({
278280
OPEN: 'open',
279281
PING: 'ping',
280282
PONG: 'pong',
281-
UNEXPECTED_RESPONSE: 'unexpected-response',
282-
UPGRADE: 'upgrade',
283283
});
284284

285285
export const SocketCloseCodes = Object.freeze({
@@ -368,7 +368,7 @@ export const RTPPayloadTypes = Object.freeze({
368368
H264: 0x69,
369369
});
370370

371-
export const RTP_PAYLOAD_TYPES = Object.values(RTPPayloadTypes);
371+
export const RTP_PAYLOAD_TYPES = Object.freeze(Object.values(RTPPayloadTypes));
372372

373373
export const RTCP_HEADER_VERSION = 0x80;
374374

@@ -382,7 +382,7 @@ export const RTCPPacketTypes = Object.freeze({
382382
PSFB: 206,
383383
});
384384

385-
export const RTCP_PACKET_TYPES = Object.values(RTCPPacketTypes);
385+
export const RTCP_PACKET_TYPES = Object.freeze(Object.values(RTCPPacketTypes));
386386

387387
export const RTPHeaderExtensionOneByte = Object.freeze({
388388
HEADER: [0xBE, 0xDE],

src/decompressor.ts

Lines changed: 42 additions & 154 deletions
Original file line numberDiff line numberDiff line change
@@ -1,181 +1,69 @@
11
import { EventSpewer } from 'detritus-utils';
22

3-
import { InflateError } from './errors';
3+
import { CompressTypes, ZLIB_SUFFIX } from './constants';
4+
import { ZlibDecompressor, ZstdDecompressor } from './decompressors';
45

5-
const DependencyTypes = Object.freeze({
6-
PAKO: 'pako',
7-
ZLIB: 'zlib',
8-
});
96

10-
const ErrorCodes = Object.freeze({
11-
ERR_ZLIB_BINDING_CLOSED: 'ERR_ZLIB_BINDING_CLOSED',
12-
});
13-
14-
const Inflate = {
15-
flushCode: 0,
16-
module: require(DependencyTypes.ZLIB),
17-
type: DependencyTypes.ZLIB,
18-
};
19-
20-
Inflate.flushCode = Inflate.module.constants.Z_SYNC_FLUSH;
21-
22-
try {
23-
Inflate.module = require(DependencyTypes.PAKO);
24-
Inflate.type = DependencyTypes.PAKO;
25-
} catch(e) {}
7+
export interface DecompresserOptions {
8+
type: string,
9+
}
2610

2711
export class Decompressor extends EventSpewer {
28-
dataChunks: Array<Buffer>;
29-
chunks: Array<Buffer>;
30-
chunkSize: number;
31-
closed: boolean;
32-
flushing: boolean;
33-
inflate: any;
34-
suffix: Buffer;
12+
closed: boolean = false;
13+
decompressor!: ZlibDecompressor | ZstdDecompressor;
14+
type: string;
3515

36-
constructor(
37-
suffix: Buffer,
38-
chunkSize: number = 64 * 1024,
39-
) {
16+
constructor(options: DecompresserOptions) {
4017
super();
4118

42-
this.dataChunks = [];
43-
this.chunks = [];
44-
this.chunkSize = chunkSize;
45-
this.closed = false;
46-
this.flushing = false;
47-
this.inflate = null;
48-
this.suffix = suffix;
49-
this.initialize();
50-
}
51-
52-
feed(chunk: Buffer): void {
53-
if (!this.closed && this.inflate) {
54-
this.chunks.push(chunk);
55-
this.write();
56-
}
57-
}
58-
59-
close(): void {
60-
this.closed = true;
61-
this.chunks.length = 0;
62-
this.dataChunks.length = 0;
63-
this.flushing = false;
64-
switch (Inflate.type) {
65-
case DependencyTypes.ZLIB: {
66-
this.inflate.close();
67-
this.inflate.removeAllListeners('data');
19+
this.type = options.type;
20+
switch (this.type) {
21+
case CompressTypes.ZLIB: {
22+
this.decompressor = new ZlibDecompressor(Buffer.from(ZLIB_SUFFIX));
23+
this.decompressor.on('data', (data) => this.emit('data', data));
24+
this.decompressor.on('error', (error) => this.emit('error', error));
6825
}; break;
69-
}
70-
this.inflate = null;
71-
}
72-
73-
initialize(): void {
74-
switch (Inflate.type) {
75-
case DependencyTypes.PAKO: {
76-
this.inflate = new Inflate.module.Inflate({
77-
chunkSize: this.chunkSize,
78-
});
79-
}; break;
80-
case DependencyTypes.ZLIB: {
81-
this.inflate = Inflate.module.createInflate({
82-
chunkSize: this.chunkSize,
83-
flush: Inflate.flushCode,
84-
});
85-
this.inflate.on('data', this.onData.bind(this));
86-
this.inflate.on('error', this.onError.bind(this));
26+
case CompressTypes.ZSTD: {
27+
this.decompressor = new ZstdDecompressor();
28+
this.decompressor.on('data', (data) => this.emit('data', data));
29+
this.decompressor.on('error', (error) => this.emit('error', error));
8730
}; break;
8831
default: {
89-
throw new Error(`Unable to use any ${JSON.stringify(Object.values(DependencyTypes))}`);
32+
throw new Error(`Invalid Compress Type: ${this.type}`);
9033
};
9134
}
92-
93-
this.dataChunks.length = 0;
94-
this.chunks.length = 0;
95-
this.flushing = false;
96-
this.closed = false;
9735
}
9836

99-
reset(): void {
100-
this.close();
101-
this.initialize();
102-
}
103-
104-
write(): void {
105-
if (
106-
(this.closed) ||
107-
(!this.inflate) ||
108-
(!this.chunks.length) ||
109-
(this.flushing)
110-
) {
111-
return;
37+
close(): void {
38+
if (!this.closed) {
39+
this.closed = true;
40+
this.decompressor.close();
41+
this.decompressor.removeAllListeners();
42+
this.removeAllListeners();
11243
}
44+
}
11345

114-
const chunk = <Buffer> this.chunks.shift();
115-
const isEnd = (
116-
(this.suffix.length <= chunk.length) &&
117-
(chunk.slice(-this.suffix.length).equals(this.suffix))
118-
);
119-
120-
switch (Inflate.type) {
121-
case DependencyTypes.PAKO: {
122-
this.inflate.push(chunk, isEnd && Inflate.flushCode);
123-
if (isEnd) {
124-
if (this.inflate.err) {
125-
const error = new InflateError(this.inflate.msg, this.inflate.err);
126-
this.onError(error);
127-
} else {
128-
this.onData(this.inflate.result);
129-
}
130-
}
131-
}; break;
132-
case DependencyTypes.ZLIB: {
133-
this.inflate.write(chunk);
134-
if (isEnd) {
135-
this.flushing = true;
136-
this.inflate.flush(Inflate.flushCode, this.onFlush.bind(this));
137-
return;
138-
}
139-
}; break;
140-
}
141-
this.write();
46+
feed(data: Buffer): void {
47+
this.decompressor.feed(data);
14248
}
14349

144-
onData(
145-
data: any,
146-
): void {
147-
switch (Inflate.type) {
148-
case DependencyTypes.PAKO: {
149-
this.emit('data', Buffer.from(data));
150-
}; break
151-
case DependencyTypes.ZLIB: {
152-
this.dataChunks.push(<Buffer> data);
153-
}; break;
154-
}
50+
reset(): void {
51+
this.decompressor.reset();
15552
}
15653

157-
onError(
158-
error: any,
159-
): void {
160-
if (error.code === ErrorCodes.ERR_ZLIB_BINDING_CLOSED) {
161-
// zlib was flushing when we called .close on it
162-
return;
163-
}
164-
this.emit('error', error);
54+
on(event: string | symbol, listener: (...args: any[]) => void): this;
55+
on(event: 'data', listener: (data: Buffer) => any): this;
56+
on(event: 'error', listener: (error: Error) => any): this;
57+
on(event: string | symbol, listener: (...args: any[]) => void): this {
58+
super.on(event, listener);
59+
return this;
16560
}
16661

167-
onFlush(
168-
error: any,
169-
): void {
170-
if (error) {
171-
return;
172-
}
173-
if (this.dataChunks.length) {
174-
const chunk = (this.dataChunks.length === 1) ? this.dataChunks.shift() : Buffer.concat(this.dataChunks);
175-
this.dataChunks.length = 0;
176-
this.emit('data', chunk);
62+
static supported(): Array<string> {
63+
const supported: Array<string> = [CompressTypes.ZLIB];
64+
if (ZstdDecompressor.isSupported()) {
65+
supported.unshift(CompressTypes.ZSTD);
17766
}
178-
this.flushing = false;
179-
this.write();
67+
return supported;
18068
}
18169
}

src/decompressors/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
export * from './zlib';
2+
export * from './zstd';

0 commit comments

Comments
 (0)