Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 22 additions & 11 deletions sdks/typescript/runner/src/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,14 @@ export interface RunnerConfig {
onConnected: () => void;
onDisconnected: () => void;
fetch: (actorId: string, request: Request) => Promise<Response>;
websocket?: (
actorId: string,
ws: any,
request: Request,
) => Promise<void>;
websocket?: (actorId: string, ws: any, request: Request) => Promise<void>;
onActorStart: (
actorId: string,
generation: number,
config: ActorConfig,
) => Promise<void>;
onActorStop: (actorId: string, generation: number) => Promise<void>;
noAutoShutdown?: boolean;
}

export interface KvListOptions {
Expand Down Expand Up @@ -221,8 +218,10 @@ export class Runner {
await this.#openPegboardWebSocket();
this.#openTunnel();

process.on("SIGTERM", this.shutdown.bind(this, false, true));
process.on("SIGINT", this.shutdown.bind(this, false, true));
if (!this.#config.noAutoShutdown) {
process.on("SIGTERM", this.shutdown.bind(this, false, true));
process.on("SIGINT", this.shutdown.bind(this, false, true));
}
}

// MARK: Shutdown
Expand Down Expand Up @@ -344,6 +343,10 @@ export class Runner {
this.#tunnel.shutdown();
console.log("Tunnel shutdown completed");
}

if (exit) {
process.exit(0);
}
}

// MARK: Networking
Expand All @@ -356,7 +359,10 @@ export class Runner {
}

get pegboardRelayUrl() {
const endpoint = this.#config.pegboardRelayEndpoint || this.#config.pegboardEndpoint || this.#config.endpoint;
const endpoint =
this.#config.pegboardRelayEndpoint ||
this.#config.pegboardEndpoint ||
this.#config.endpoint;
const wsEndpoint = endpoint
.replace("http://", "ws://")
.replace("https://", "wss://");
Expand Down Expand Up @@ -424,7 +430,7 @@ export class Runner {
key: this.#config.runnerKey,
version: this.#config.version,
totalSlots: this.#config.totalSlots,
addressesHttp: new Map(), // No addresses needed with tunnel
addressesHttp: new Map(), // No addresses needed with tunnel
addressesTcp: null,
addressesUdp: null,
lastCommandIdx:
Expand Down Expand Up @@ -499,7 +505,9 @@ export class Runner {
this.runnerId = init.runnerId;

// Store the runner lost threshold from metadata
this.#runnerLostThreshold = init.metadata?.runnerLostThreshold ? Number(init.metadata.runnerLostThreshold) : undefined;
this.#runnerLostThreshold = init.metadata?.runnerLostThreshold
? Number(init.metadata.runnerLostThreshold)
: undefined;

console.log("Received init", {
runnerId: init.runnerId,
Expand Down Expand Up @@ -616,7 +624,10 @@ export class Runner {
console.log("[RUNNER] Registering new actor with tunnel:", actorId);
this.#tunnel.registerActor(actorId);
} else {
console.error("[RUNNER] WARNING: No tunnel available to register actor:", actorId);
console.error(
"[RUNNER] WARNING: No tunnel available to register actor:",
actorId,
);
}

this.#sendActorStateUpdate(actorId, generation, "running");
Expand Down
1 change: 1 addition & 0 deletions sdks/typescript/test-runner/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"version": "0.0.0",
"type": "module",
"scripts": {
"start": "tsx src/main.ts",
"build": "tsup src/main.ts",
"check-types": "tsc --noEmit"
},
Expand Down
71 changes: 35 additions & 36 deletions sdks/typescript/test-runner/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,18 @@ import type { RunnerConfig, ActorConfig } from "@rivetkit/engine-runner";
import WebSocket from "ws";
import { serve } from "@hono/node-server";

const INTERNAL_SERVER_PORT = process.env.INTERNAL_SERVER_PORT ? Number(process.env.INTERNAL_SERVER_PORT) : 5051;
const RIVET_NAMESPACE = process.env.RIVET_NAMESPACE ?? 'default';
const RIVET_RUNNER_KEY = process.env.RIVET_RUNNER_KEY ?? `key-${Math.floor(Math.random() * 10000)}`;
const RIVET_RUNNER_VERSION = process.env.RIVET_RUNNER_VERSION ? Number(process.env.RIVET_RUNNER_VERSION) : 1;
const RIVET_RUNNER_TOTAL_SLOTS = process.env.RIVET_RUNNER_TOTAL_SLOTS ? Number(process.env.RIVET_RUNNER_TOTAL_SLOTS) : 100;
const INTERNAL_SERVER_PORT = process.env.INTERNAL_SERVER_PORT
? Number(process.env.INTERNAL_SERVER_PORT)
: 5051;
const RIVET_NAMESPACE = process.env.RIVET_NAMESPACE ?? "default";
const RIVET_RUNNER_KEY =
process.env.RIVET_RUNNER_KEY ?? `key-${Math.floor(Math.random() * 10000)}`;
const RIVET_RUNNER_VERSION = process.env.RIVET_RUNNER_VERSION
? Number(process.env.RIVET_RUNNER_VERSION)
: 1;
const RIVET_RUNNER_TOTAL_SLOTS = process.env.RIVET_RUNNER_TOTAL_SLOTS
? Number(process.env.RIVET_RUNNER_TOTAL_SLOTS)
: 100;
const RIVET_ENDPOINT = process.env.RIVET_ENDPOINT ?? "http://localhost:6420";

let runnerStarted = Promise.withResolvers();
Expand All @@ -20,18 +27,22 @@ const actorWebSockets = new Map<string, WebSocket>();
serve({
fetch: async (request: Request) => {
const url = new URL(request.url);
if (url.pathname == '/wait-ready') {
if (url.pathname == "/wait-ready") {
await runnerStarted.promise;
return new Response(JSON.stringify(runner?.runnerId), { status: 200 });
} else if (url.pathname == '/has-actor') {
let actorIdQuery = url.searchParams.get('actor');
let generationQuery = url.searchParams.get('generation');
let generation = generationQuery ? Number(generationQuery) : undefined;
return new Response(JSON.stringify(runner?.runnerId), {
status: 200,
});
} else if (url.pathname == "/has-actor") {
let actorIdQuery = url.searchParams.get("actor");
let generationQuery = url.searchParams.get("generation");
let generation = generationQuery
? Number(generationQuery)
: undefined;

if (!actorIdQuery || !runner?.hasActor(actorIdQuery, generation)) {
return new Response(undefined, { status: 404 });
}
} else if (url.pathname == '/shutdown') {
} else if (url.pathname == "/shutdown") {
await runner?.shutdown(true);
}

Expand All @@ -56,9 +67,11 @@ const config: RunnerConfig = {
onConnected: () => {
runnerStarted.resolve(undefined);
},
onDisconnected: () => { },
onDisconnected: () => {},
fetch: async (actorId: string, request: Request) => {
console.log(`[TEST-RUNNER] Fetch called for actor ${actorId}, URL: ${request.url}`);
console.log(
`[TEST-RUNNER] Fetch called for actor ${actorId}, URL: ${request.url}`,
);
const url = new URL(request.url);
if (url.pathname === "/ping") {
// Return the actor ID in response
Expand All @@ -68,13 +81,10 @@ const config: RunnerConfig = {
timestamp: Date.now(),
};
console.log(`[TEST-RUNNER] Returning ping response:`, responseData);
return new Response(
JSON.stringify(responseData),
{
status: 200,
headers: { "Content-Type": "application/json" },
},
);
return new Response(JSON.stringify(responseData), {
status: 200,
headers: { "Content-Type": "application/json" },
});
}

return new Response("ok", { status: 200 });
Expand All @@ -84,33 +94,22 @@ const config: RunnerConfig = {
_generation: number,
_config: ActorConfig,
) => {
console.log(
`Actor ${_actorId} started (generation ${_generation})`,
);
console.log(`Actor ${_actorId} started (generation ${_generation})`);
startedRef.current.resolve(undefined);
},
onActorStop: async (_actorId: string, _generation: number) => {
console.log(
`Actor ${_actorId} stopped (generation ${_generation})`,
);
console.log(`Actor ${_actorId} stopped (generation ${_generation})`);
stoppedRef.current.resolve(undefined);
},
websocket: async (
actorId: string,
ws: WebSocket,
request: Request,
) => {
websocket: async (actorId: string, ws: WebSocket, request: Request) => {
console.log(`WebSocket connected for actor ${actorId}`);
websocketOpen.resolve(undefined);
actorWebSockets.set(actorId, ws);

// Echo server - send back any messages received
ws.addEventListener("message", (event) => {
const data = event.data;
console.log(
`WebSocket message from actor ${actorId}:`,
data,
);
console.log(`WebSocket message from actor ${actorId}:`, data);
ws.send(`Echo: ${data}`);
});

Expand Down
Loading