Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 22 additions & 27 deletions engine/sdks/typescript/runner/src/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@
runnerId?: string;
#started: boolean = false;
#shutdown: boolean = false;
#shuttingDown: boolean = false;
#reconnectAttempt: number = 0;
#reconnectTimeout?: NodeJS.Timeout;

Expand Down Expand Up @@ -480,20 +479,19 @@
// MARK: Shutdown
async shutdown(immediate: boolean, exit: boolean = false) {
// Prevent concurrent shutdowns
if (this.#shuttingDown) {
if (this.#shutdown) {
this.log?.debug({
msg: "shutdown already in progress, ignoring",
});
return;
}
this.#shuttingDown = true;
this.#shutdown = true;

this.log?.info({
msg: "starting shutdown",
immediate,
exit,
});
this.#shutdown = true;

// Clear reconnect timeout
if (this.#reconnectTimeout) {
Expand Down Expand Up @@ -918,41 +916,34 @@
});

ws.addEventListener("close", async (ev) => {
const closeError = parseWebSocketCloseReason(ev.reason);
if (
closeError?.group === "ws" &&
closeError?.error === "eviction"
) {
this.log?.info("runner websocket evicted");

this.#config.onDisconnected(ev.code, ev.reason);

await this.shutdown(true);
} else {
if (!this.#shutdown) {
const closeError = parseWebSocketCloseReason(ev.reason);
if (
closeError?.group === "pegboard" &&
closeError?.error === "runner_shutdown"
closeError?.group === "ws" &&
closeError?.error === "eviction"
) {
this.log?.info("runner shutdown");
this.log?.info("runner websocket evicted");

this.#config.onDisconnected(ev.code, ev.reason);

await this.shutdown(true);
} else {
this.log?.warn({
msg: "runner disconnected",
code: ev.code,
reason: ev.reason.toString(),
closeError,
});
}

this.#config.onDisconnected(ev.code, ev.reason);
}
this.#config.onDisconnected(ev.code, ev.reason);
}

// Clear ack interval on close
if (this.#ackInterval) {
clearInterval(this.#ackInterval);
this.#ackInterval = undefined;
}
// Clear ack interval on close
if (this.#ackInterval) {
clearInterval(this.#ackInterval);
this.#ackInterval = undefined;
}

if (!this.#shutdown) {
// Start runner lost timeout if we have a threshold and are not shutting down
if (
!this.#runnerLostTimeout &&
Expand All @@ -977,6 +968,10 @@

// Attempt to reconnect if not stopped
this.#scheduleReconnect();
} else {
this.log?.info("websocket closed");

this.#config.onDisconnected(ev.code, ev.reason);
}
});
}
Expand Down Expand Up @@ -1166,7 +1161,7 @@
}

async #handleCommandStopActor(commandWrapper: protocol.CommandWrapper) {
const stopCommand = commandWrapper.inner

Check warning on line 1164 in engine/sdks/typescript/runner/src/mod.ts

View workflow job for this annotation

GitHub Actions / quality

lint/correctness/noUnusedVariables

This variable stopCommand is unused.

Check warning on line 1164 in engine/sdks/typescript/runner/src/mod.ts

View workflow job for this annotation

GitHub Actions / quality

lint/correctness/noUnusedVariables

This variable stopCommand is unused.
.val as protocol.CommandStopActor;

const actorId = commandWrapper.checkpoint.actorId;
Expand Down
Loading