Skip to content

Commit 3be3a76

Browse files
committed
added argument to drain that provides the list of packets that are drained
1 parent e95f6ab commit 3be3a76

File tree

7 files changed

+23
-14
lines changed

7 files changed

+23
-14
lines changed

packages/engine.io/lib/socket.ts

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@ export interface SendOptions {
1414

1515
type ReadyState = "opening" | "open" | "closing" | "closed";
1616

17-
type SendCallback = (transport: Transport) => void;
17+
type SendCallback = (
18+
transport: Transport,
19+
packets: Packet[] | undefined,
20+
) => void;
1821

1922
export class Socket extends EventEmitter {
2023
/**
@@ -271,13 +274,13 @@ export class Socket extends EventEmitter {
271274
*
272275
* @private
273276
*/
274-
private onDrain() {
277+
private onDrain(packets: Packet[] | undefined) {
275278
if (this.sentCallbackFn.length > 0) {
276279
debug("executing batch send callback");
277280
const seqFn = this.sentCallbackFn.shift();
278281
if (seqFn) {
279282
for (let i = 0; i < seqFn.length; i++) {
280-
seqFn[i](this.transport);
283+
seqFn[i](this.transport, packets);
281284
}
282285
}
283286
}
@@ -525,8 +528,8 @@ export class Socket extends EventEmitter {
525528
}
526529

527530
this.transport.send(wbuf);
528-
this.emit("drain");
529-
this.server.emit("drain", this);
531+
this.emit("drain", wbuf);
532+
this.server.emit("drain", this, wbuf);
530533
}
531534
}
532535

packages/engine.io/lib/transport.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import * as parser_v3 from "./parser-v3/index";
44
import debugModule from "debug";
55
import type { IncomingMessage, ServerResponse } from "http";
66
import { Packet, RawData } from "engine.io-parser";
7+
import type * as Parser from "engine.io-parser";
78

89
const debug = debugModule("engine:transport");
910

packages/engine.io/lib/transports-uws/polling.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { createGzip, createDeflate } from "zlib";
33
import * as accepts from "accepts";
44
import debugModule from "debug";
55
import { HttpRequest, HttpResponse } from "uWebSockets.js";
6+
import { Packet } from "engine.io-parser";
67

78
const debug = debugModule("engine:polling");
89

@@ -253,7 +254,7 @@ export class Polling extends Transport {
253254
* @param {Object} packet
254255
* @private
255256
*/
256-
send(packets) {
257+
send(packets: Packet[]) {
257258
this.writable = false;
258259

259260
if (this.shouldClose) {
@@ -263,11 +264,11 @@ export class Polling extends Transport {
263264
this.shouldClose = null;
264265
}
265266

266-
const doWrite = (data) => {
267+
const doWrite = (data: string) => {
267268
const compress = packets.some((packet) => {
268269
return packet.options && packet.options.compress;
269270
});
270-
this.write(data, { compress });
271+
this.write(data, { compress, source: packets });
271272
};
272273

273274
if (this.protocol === 3) {
@@ -288,7 +289,7 @@ export class Polling extends Transport {
288289
debug('writing "%s"', data);
289290
this.doWrite(data, options, () => {
290291
this.req.cleanup();
291-
this.emit("drain");
292+
this.emit("drain", options.source);
292293
});
293294
}
294295

packages/engine.io/lib/transports-uws/websocket.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ export class WebSocket extends Transport {
5555
this.socket.send(data, isBinary, compress);
5656

5757
if (isLast) {
58-
this.emit("drain");
58+
this.emit("drain", packets);
5959
this.writable = true;
6060
this.emit("ready");
6161
}

packages/engine.io/lib/transports/polling.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ export class Polling extends Transport {
229229
const compress = packets.some((packet) => {
230230
return packet.options && packet.options.compress;
231231
});
232-
this.write(data, { compress });
232+
this.write(data, { compress, source: packets });
233233
};
234234

235235
if (this.protocol === 3) {
@@ -250,7 +250,7 @@ export class Polling extends Transport {
250250
debug('writing "%s"', data);
251251
this.doWrite(data, options, () => {
252252
this.req.cleanup();
253-
this.emit("drain");
253+
this.emit("drain", options.source);
254254
});
255255
}
256256

packages/engine.io/lib/transports/websocket.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ const debug = debugModule("engine:ws");
77
export class WebSocket extends Transport {
88
protected perMessageDeflate: any;
99
private socket: any;
10+
private currentPackets: Packet[] | undefined;
1011

1112
/**
1213
* WebSocket transport
@@ -44,6 +45,8 @@ export class WebSocket extends Transport {
4445
send(packets: Packet[]) {
4546
this.writable = false;
4647

48+
this.currentPackets = packets;
49+
4750
for (let i = 0; i < packets.length; i++) {
4851
const packet = packets[i];
4952
const isLast = i + 1 === packets.length;
@@ -98,8 +101,9 @@ export class WebSocket extends Transport {
98101
if (err) {
99102
this.onError("write error", err.stack);
100103
} else {
101-
this.emit("drain");
104+
this.emit("drain", this.currentPackets);
102105
this.writable = true;
106+
this.currentPackets = undefined;
103107
this.emit("ready");
104108
}
105109
};

packages/engine.io/lib/transports/webtransport.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ export class WebTransport extends Transport {
6060
debug("error while writing: %s", e.message);
6161
}
6262

63-
this.emit("drain");
63+
this.emit("drain", packets);
6464
this.writable = true;
6565
this.emit("ready");
6666
}

0 commit comments

Comments
 (0)