Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@
"tslint": "^6.1.3",
"tslint-config-common": "^1.6.2",
"typedoc": "^0.25.12",
"typescript": "^5.3.3"
"typescript": "^5.3.3",
"websocket": "^1.0.35"
},
"jest": {
"verbose": true,
Expand Down
119 changes: 119 additions & 0 deletions src/ws/WsFrameDecoder.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import {StreamingOctetReader} from '@jsonjoy.com/util/lib/buffers/StreamingOctetReader';
import {WsFrameOpcode} from './constants';
import {WsFrameDecodingError} from './errors';
import {WsCloseFrame, WsFrameHeader, WsPingFrame, WsPongFrame} from './frames';

export class WsFrameDecoder {
public readonly reader = new StreamingOctetReader();

public push(uint8: Uint8Array): void {
this.reader.push(uint8);
}

public readFrameHeader(): WsFrameHeader | undefined {
try {
const reader = this.reader;
if (reader.size() < 2) return undefined;
const b0 = reader.u8();
const b1 = reader.u8();
const fin = <0 | 1>(b0 >>> 7);
const opcode = b0 & 0b1111;
const maskBit = b1 >>> 7;
let length = b1 & 0b01111111;
if (length === 126) {
if (reader.size() < 2) return undefined;
length = (reader.u8() << 8) | reader.u8();
} else if (length === 127) {
if (reader.size() < 8) return undefined;
reader.skip(4);
length = reader.u32();
}
let mask: undefined | [number, number, number, number];
if (maskBit) {
if (reader.size() < 4) return undefined;
mask = [reader.u8(), reader.u8(), reader.u8(), reader.u8()];
}
if (opcode >= WsFrameOpcode.MIN_CONTROL_OPCODE) {
switch (opcode) {
case WsFrameOpcode.CLOSE: {
return new WsCloseFrame(fin, opcode, length, mask, 0, '');
}
case WsFrameOpcode.PING: {
if (length > 125) throw new WsFrameDecodingError();
const data = mask ? reader.bufXor(length, mask, 0) : reader.buf(length);
return new WsPingFrame(fin, opcode, length, mask, data);
}
case WsFrameOpcode.PONG: {
if (length > 125) throw new WsFrameDecodingError();
const data = mask ? reader.bufXor(length, mask, 0) : reader.buf(length);
return new WsPongFrame(fin, opcode, length, mask, data);
}
default: {
throw new WsFrameDecodingError();
}
}
}
return new WsFrameHeader(fin, opcode, length, mask);
} catch (err) {
if (err instanceof RangeError) return undefined;
throw err;
}
}

/**
* Read application data of a frame and copy it to the destination buffer.
* Receives the frame header and the number of bytes that still need to be
* copied, returns back the number of bytes that still need to be copied in
* subsequent calls.
*
* @param frame Frame header.
* @param remaining How many bytes are remaining to be copied.
* @param dst The destination buffer to write to.
* @param pos Position in the destination buffer to start writing to.
* @returns The number of bytes that still need to be copied in the next call.
*/
public readFrameData(frame: WsFrameHeader, remaining: number, dst: Uint8Array, pos: number): number {
const reader = this.reader;
const mask = frame.mask;
const readSize = Math.min(reader.size(), remaining);
if (!mask) reader.copy(readSize, dst, pos);
else {
const alreadyRead = frame.length - remaining;
reader.copyXor(readSize, dst, pos, mask, alreadyRead);
}
return remaining - readSize;
}

public copyFrameData(frame: WsFrameHeader, dst: Uint8Array, pos: number): void {
const reader = this.reader;
const mask = frame.mask;
const readSize = frame.length;
if (!mask) reader.copy(readSize, dst, pos);
else reader.copyXor(readSize, dst, pos, mask, 0);
}

/**
* Reads application data of the CLOSE frame and sets the code and reason
* properties of the frame.
*
* @param frame Close frame.
*/
public readCloseFrameData(frame: WsCloseFrame): void {
let length = frame.length;
if (length > 125) throw new WsFrameDecodingError();
let code = 0;
let reason = '';
if (length > 0) {
if (length < 2) throw new WsFrameDecodingError();
const reader = this.reader;
const mask = frame.mask;
const octet1 = reader.u8() ^ (mask ? mask[0] : 0);
const octet2 = reader.u8() ^ (mask ? mask[1] : 0);
code = (octet1 << 8) | octet2;
length -= 2;
if (length) reason = reader.utf8(length, mask ?? [0, 0, 0, 0], 2);
}
frame.code = code;
frame.reason = reason;
}
}
126 changes: 126 additions & 0 deletions src/ws/WsFrameEncoder.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import {Writer} from '@jsonjoy.com/util/lib/buffers/Writer';
import {WsFrameOpcode} from './constants';
import {WsFrameEncodingError} from './errors';
import type {IWriter, IWriterGrowable} from '@jsonjoy.com/util/lib/buffers';

const maskBuf = new Uint8Array(4);
const maskBufView = new DataView(maskBuf.buffer, maskBuf.byteOffset, maskBuf.byteLength);

export class WsFrameEncoder<W extends IWriter & IWriterGrowable = IWriter & IWriterGrowable> {
constructor(public readonly writer: W = new Writer() as any) {}

public encodePing(data: Uint8Array | null): Uint8Array {
this.writePing(data);
return this.writer.flush();
}

public encodePong(data: Uint8Array | null): Uint8Array {
this.writePong(data);
return this.writer.flush();
}

public encodeClose(reason: string, code = 0): Uint8Array {
this.writeClose(reason, code);
return this.writer.flush();
}

public encodeHdr(fin: 0 | 1, opcode: WsFrameOpcode, length: number, mask: number): Uint8Array {
this.writeHdr(fin, opcode, length, mask);
return this.writer.flush();
}

public encodeDataMsgHdrFast(length: number): Uint8Array {
this.writeDataMsgHdrFast(length);
return this.writer.flush();
}

public writePing(data: Uint8Array | null): void {
let length = 0;
if (data && (length = data.length)) {
this.writeHdr(1, WsFrameOpcode.PING, length, 0);
this.writer.buf(data, length);
} else {
this.writeHdr(1, WsFrameOpcode.PING, 0, 0);
}
}

public writePong(data: Uint8Array | null): void {
let length = 0;
if (data && (length = data.length)) {
this.writeHdr(1, WsFrameOpcode.PONG, length, 0);
this.writer.buf(data, length);
} else {
this.writeHdr(1, WsFrameOpcode.PONG, 0, 0);
}
}

public writeClose(reason: string, code = 0): void {
if (reason || code) {
const reasonLength = reason.length;
const length = 2 + reasonLength;
const writer = this.writer;
writer.ensureCapacity(
2 + // Frame header
2 + // Close code 2 bytes
reasonLength * 4, // Close reason, max 4 bytes per UTF-8 char
);
const lengthX = writer.x + 1;
this.writeHdr(1, WsFrameOpcode.CLOSE, length, 0);
writer.u16(code);
if (reasonLength) {
const utf8Length = writer.utf8(reason);
if (utf8Length !== reasonLength) {
if (utf8Length > 126 - 2) throw new WsFrameEncodingError();
writer.uint8[lengthX] = (writer.uint8[lengthX] & 0b10000000) | (utf8Length + 2);
}
}
} else {
this.writeHdr(1, WsFrameOpcode.CLOSE, 0, 0);
}
}

public writeHdr(fin: 0 | 1, opcode: WsFrameOpcode, length: number, mask: number): void {
const octet1 = (fin << 7) | opcode;
const maskBit = mask ? 0b10000000 : 0b00000000;
const writer = this.writer;
if (length < 126) {
const octet2 = maskBit | length;
writer.u16((octet1 << 8) | octet2);
} else if (length < 0x10000) {
const octet2 = maskBit | 126;
writer.u32(((octet1 << 8) | octet2) * 0x10000 + length);
} else {
const octet2 = maskBit | 127;
writer.u16((octet1 << 8) | octet2);
writer.u32(0);
writer.u32(length);
}
if (mask) writer.u32(mask);
}

public writeDataMsgHdrFast(length: number): void {
const writer = this.writer;
if (length < 126) {
writer.u16(0b10000010_00000000 + length);
return;
}
if (length < 0x10000) {
writer.u32(0b10000010_01111110_00000000_00000000 + length);
return;
}
writer.u16(0b10000010_01111111);
writer.u32(0);
writer.u32(length);
}

public writeBufXor(buf: Uint8Array, mask: number): void {
maskBufView.setUint32(0, mask, false);
const writer = this.writer;
const length = buf.length;
writer.ensureCapacity(length);
let x = writer.x;
const uint8 = writer.uint8;
for (let i = 0; i < length; i++) uint8[x++] = buf[i] ^ maskBuf[i & 3];
writer.x = x;
}
}
Loading