Skip to content

Commit 1c0e2ca

Browse files
committed
fix(rivetkit): fix runner reconnection bug
1 parent 23802e1 commit 1c0e2ca

File tree

1 file changed

+28
-8
lines changed
  • engine/sdks/typescript/runner/src

1 file changed

+28
-8
lines changed

engine/sdks/typescript/runner/src/mod.ts

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ export class Runner {
201201
#actors: Map<string, RunnerActor> = new Map();
202202

203203
// WebSocket
204-
__pegboardWebSocket?: WebSocket;
204+
#pegboardWebSocket?: WebSocket;
205205
runnerId?: string;
206206
#started: boolean = false;
207207
#shutdown: boolean = false;
@@ -529,7 +529,7 @@ export class Runner {
529529

530530
// Close WebSocket
531531
if (this.__webSocketReady()) {
532-
const pegboardWebSocket = this.__pegboardWebSocket;
532+
const pegboardWebSocket = this.#pegboardWebSocket;
533533
if (immediate) {
534534
// Stop immediately
535535
pegboardWebSocket.close(1000, "pegboard.runner_shutdown");
@@ -590,7 +590,7 @@ export class Runner {
590590
// the runner has already shut down
591591
this.log?.debug({
592592
msg: "no runner WebSocket to shutdown or already closed",
593-
readyState: this.__pegboardWebSocket?.readyState,
593+
readyState: this.#pegboardWebSocket?.readyState,
594594
});
595595
}
596596

@@ -715,8 +715,21 @@ export class Runner {
715715
protocols.push(`rivet_token.${this.config.token}`);
716716

717717
const WS = await importWebSocket();
718+
719+
// Assertion to clear previous WebSocket
720+
if (
721+
this.#pegboardWebSocket &&
722+
(this.#pegboardWebSocket.readyState === WS.CONNECTING ||
723+
this.#pegboardWebSocket.readyState === WS.OPEN)
724+
) {
725+
this.log?.error(
726+
"found duplicate pegboardWebSocket, closing previous",
727+
);
728+
this.#pegboardWebSocket.close(1000, "duplicate_websocket");
729+
}
730+
718731
const ws = new WS(this.pegboardUrl, protocols) as any as WebSocket;
719-
this.__pegboardWebSocket = ws;
732+
this.#pegboardWebSocket = ws;
720733

721734
this.log?.info({
722735
msg: "connecting",
@@ -1673,11 +1686,11 @@ export class Runner {
16731686

16741687
/** Asserts WebSocket exists and is ready. */
16751688
__webSocketReady(): this is this & {
1676-
__pegboardWebSocket: NonNullable<Runner["__pegboardWebSocket"]>;
1689+
__pegboardWebSocket: NonNullable<Runner["#pegboardWebSocket"]>;
16771690
} {
16781691
return (
1679-
!!this.__pegboardWebSocket &&
1680-
this.__pegboardWebSocket.readyState === 1
1692+
!!this.#pegboardWebSocket &&
1693+
this.#pegboardWebSocket.readyState === 1
16811694
);
16821695
}
16831696

@@ -1689,7 +1702,7 @@ export class Runner {
16891702

16901703
const encoded = protocol.encodeToServer(message);
16911704
if (this.__webSocketReady()) {
1692-
this.__pegboardWebSocket.send(encoded);
1705+
this.#pegboardWebSocket.send(encoded);
16931706
} else {
16941707
this.log?.error({
16951708
msg: "WebSocket not available or not open for sending data",
@@ -1781,6 +1794,13 @@ export class Runner {
17811794
msg: `Scheduling reconnect attempt ${this.#reconnectAttempt + 1} in ${delay}ms`,
17821795
});
17831796

1797+
if (this.#reconnectTimeout) {
1798+
this.log?.info(
1799+
"clearing previous reconnect timeout in schedule reconnect",
1800+
);
1801+
clearTimeout(this.#reconnectTimeout);
1802+
}
1803+
17841804
this.#reconnectTimeout = setTimeout(() => {
17851805
if (!this.#shutdown) {
17861806
this.#reconnectAttempt++;

0 commit comments

Comments
 (0)