Skip to content

Commit 6aad704

Browse files
committed
Merge remote-tracking branch 'origin/run-engine-2' into run-engine-batch-trigger
2 parents fb08da1 + a759bf7 commit 6aad704

File tree

9 files changed

+109
-26
lines changed

9 files changed

+109
-26
lines changed

.npmrc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
link-workspace-packages=false
22
public-hoist-pattern[]=*prisma*
3-
prefer-workspace-packages=true
3+
prefer-workspace-packages=true
4+
update-notifier=false
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import { ActionFunctionArgs, json } from "@remix-run/server-runtime";
2+
import { z } from "zod";
3+
import { prisma } from "~/db.server";
4+
import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server";
5+
import { WorkerGroupService } from "~/v3/services/worker/workerGroupService.server";
6+
7+
const RequestBodySchema = z.object({
8+
name: z.string().optional(),
9+
description: z.string().optional(),
10+
projectId: z.string().optional(),
11+
makeDefault: z.boolean().optional(),
12+
});
13+
14+
export async function action({ request }: ActionFunctionArgs) {
15+
// Next authenticate the request
16+
const authenticationResult = await authenticateApiRequestWithPersonalAccessToken(request);
17+
18+
if (!authenticationResult) {
19+
return json({ error: "Invalid or Missing API key" }, { status: 401 });
20+
}
21+
22+
const user = await prisma.user.findUnique({
23+
where: {
24+
id: authenticationResult.userId,
25+
},
26+
});
27+
28+
if (!user) {
29+
return json({ error: "Invalid or Missing API key" }, { status: 401 });
30+
}
31+
32+
if (!user.admin) {
33+
return json({ error: "You must be an admin to perform this action" }, { status: 403 });
34+
}
35+
36+
try {
37+
const rawBody = await request.json();
38+
const { name, description, projectId, makeDefault } = RequestBodySchema.parse(rawBody ?? {});
39+
40+
const service = new WorkerGroupService();
41+
const { workerGroup, token } = await service.createWorkerGroup({
42+
name,
43+
description,
44+
});
45+
46+
if (makeDefault && projectId) {
47+
await prisma.project.update({
48+
where: {
49+
id: projectId,
50+
},
51+
data: {
52+
defaultWorkerGroupId: workerGroup.id,
53+
engine: "V2",
54+
},
55+
});
56+
}
57+
58+
return json({
59+
token,
60+
workerGroup,
61+
});
62+
} catch (error) {
63+
return json({ error: error instanceof Error ? error.message : error }, { status: 400 });
64+
}
65+
}
Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
import { json, TypedResponse } from "@remix-run/server-runtime";
2-
import {
3-
WorkerApiConnectRequestBody,
4-
WorkerApiConnectResponseBody,
5-
} from "@trigger.dev/worker";
2+
import { WorkerApiConnectRequestBody, WorkerApiConnectResponseBody } from "@trigger.dev/worker";
63
import { createActionWorkerApiRoute } from "~/services/routeBuilders/apiBuilder.server";
74

85
export const action = createActionWorkerApiRoute(
@@ -11,6 +8,12 @@ export const action = createActionWorkerApiRoute(
118
},
129
async ({ authenticatedWorker, body }): Promise<TypedResponse<WorkerApiConnectResponseBody>> => {
1310
await authenticatedWorker.connect(body.metadata);
14-
return json({ ok: true });
11+
return json({
12+
ok: true,
13+
workerGroup: {
14+
type: authenticatedWorker.type,
15+
name: authenticatedWorker.name,
16+
},
17+
});
1518
}
1619
);

apps/webapp/app/routes/api.v1.worker-actions.heartbeat.ts

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,12 @@
11
import { json, TypedResponse } from "@remix-run/server-runtime";
2-
import {
3-
WorkerApiConnectResponseBody,
4-
WorkerApiHeartbeatRequestBody,
5-
} from "@trigger.dev/worker";
2+
import { WorkerApiHeartbeatResponseBody, WorkerApiHeartbeatRequestBody } from "@trigger.dev/worker";
63
import { createActionWorkerApiRoute } from "~/services/routeBuilders/apiBuilder.server";
74

85
export const action = createActionWorkerApiRoute(
96
{
107
body: WorkerApiHeartbeatRequestBody,
118
},
12-
async ({ authenticatedWorker }): Promise<TypedResponse<WorkerApiConnectResponseBody>> => {
9+
async ({ authenticatedWorker }): Promise<TypedResponse<WorkerApiHeartbeatResponseBody>> => {
1310
await authenticatedWorker.heartbeatWorkerInstance();
1411
return json({ ok: true });
1512
}

apps/webapp/app/v3/runEngine.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ function createRunEngine() {
2727
pollIntervalMs: env.RUN_ENGINE_WORKER_POLL_INTERVAL,
2828
},
2929
machines: {
30-
defaultMachine: defaultMachine,
30+
defaultMachine,
3131
machines: allMachines(),
3232
baseCostInCents: env.CENTS_PER_RUN,
3333
},

apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ import { CURRENT_UNMANAGED_DEPLOYMENT_LABEL } from "~/consts";
2525
import { resolveVariablesForEnvironment } from "~/v3/environmentVariables/environmentVariablesRepository.server";
2626
import { generateJWTTokenForEnvironment } from "~/services/apiAuth.server";
2727
import { fromFriendlyId } from "@trigger.dev/core/v3/apps";
28+
import { machinePresetFromName } from "~/v3/machinePresets.server";
29+
import { defaultMachine } from "@trigger.dev/platform/v3";
2830

2931
export class WorkerGroupTokenService extends WithRunEngine {
3032
private readonly tokenPrefix = "tr_wgt_";
@@ -205,6 +207,7 @@ export class WorkerGroupTokenService extends WithRunEngine {
205207
prisma: this._prisma,
206208
engine: this._engine,
207209
type: WorkerInstanceGroupType.MANAGED,
210+
name: workerGroup.name,
208211
workerGroupId: workerGroup.id,
209212
workerInstanceId: workerInstance.id,
210213
masterQueue: workerGroup.masterQueue,
@@ -240,6 +243,7 @@ export class WorkerGroupTokenService extends WithRunEngine {
240243
prisma: this._prisma,
241244
engine: this._engine,
242245
type: WorkerInstanceGroupType.UNMANAGED,
246+
name: workerGroup.name,
243247
workerGroupId: workerGroup.id,
244248
workerInstanceId: workerInstance.id,
245249
masterQueue: workerGroup.masterQueue,
@@ -479,6 +483,7 @@ export type WorkerInstanceEnv = z.infer<typeof WorkerInstanceEnv>;
479483

480484
export type AuthenticatedWorkerInstanceOptions = WithRunEngineOptions<{
481485
type: WorkerInstanceGroupType;
486+
name: string;
482487
workerGroupId: string;
483488
workerInstanceId: string;
484489
masterQueue: string;
@@ -490,20 +495,22 @@ export type AuthenticatedWorkerInstanceOptions = WithRunEngineOptions<{
490495

491496
export class AuthenticatedWorkerInstance extends WithRunEngine {
492497
readonly type: WorkerInstanceGroupType;
498+
readonly name: string;
493499
readonly workerGroupId: string;
494500
readonly workerInstanceId: string;
495501
readonly masterQueue: string;
496502
readonly environment: RuntimeEnvironment | null;
497503
readonly deploymentId?: string;
498504
readonly backgroundWorkerId?: string;
499505

500-
// FIXME
506+
// FIXME: Required for unmanaged workers
501507
readonly isLatestDeployment = true;
502508

503509
constructor(opts: AuthenticatedWorkerInstanceOptions) {
504510
super({ prisma: opts.prisma, engine: opts.engine });
505511

506512
this.type = opts.type;
513+
this.name = opts.name;
507514
this.workerGroupId = opts.workerGroupId;
508515
this.workerInstanceId = opts.workerInstanceId;
509516
this.masterQueue = opts.masterQueue;
@@ -647,12 +654,7 @@ export class AuthenticatedWorkerInstance extends WithRunEngine {
647654
isWarmStart,
648655
});
649656

650-
const defaultMachinePreset = {
651-
name: "small-1x",
652-
cpu: 1,
653-
memory: 1,
654-
centsPerMs: 0,
655-
} satisfies MachinePreset;
657+
const defaultMachinePreset = machinePresetFromName(defaultMachine);
656658

657659
const environment =
658660
this.environment ??
@@ -718,6 +720,7 @@ export class AuthenticatedWorkerInstance extends WithRunEngine {
718720
if (this.type === WorkerInstanceGroupType.MANAGED) {
719721
return {
720722
type: WorkerInstanceGroupType.MANAGED,
723+
name: this.name,
721724
workerGroupId: this.workerGroupId,
722725
workerInstanceId: this.workerInstanceId,
723726
masterQueue: this.masterQueue,
@@ -726,6 +729,7 @@ export class AuthenticatedWorkerInstance extends WithRunEngine {
726729

727730
return {
728731
type: WorkerInstanceGroupType.UNMANAGED,
732+
name: this.name,
729733
workerGroupId: this.workerGroupId,
730734
workerInstanceId: this.workerInstanceId,
731735
masterQueue: this.masterQueue,
@@ -764,12 +768,14 @@ export class AuthenticatedWorkerInstance extends WithRunEngine {
764768
export type WorkerGroupTokenAuthenticationResponse =
765769
| {
766770
type: typeof WorkerInstanceGroupType.MANAGED;
771+
name: string;
767772
workerGroupId: string;
768773
workerInstanceId: string;
769774
masterQueue: string;
770775
}
771776
| {
772777
type: typeof WorkerInstanceGroupType.UNMANAGED;
778+
name: string;
773779
workerGroupId: string;
774780
workerInstanceId: string;
775781
masterQueue: string;

internal-packages/database/prisma/schema.prisma

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -423,7 +423,7 @@ model RuntimeEnvironment {
423423
taskRunNumberCounter TaskRunNumberCounter[]
424424
taskRunCheckpoints TaskRunCheckpoint[]
425425
waitpoints Waitpoint[]
426-
workerInstance WorkerInstance[]
426+
workerInstances WorkerInstance[]
427427
428428
@@unique([projectId, slug, orgMemberId])
429429
@@unique([projectId, shortcode])

packages/worker/src/supervisor/schemas.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ export type WorkerApiConnectRequestBody = z.infer<typeof WorkerApiConnectRequest
3333

3434
export const WorkerApiConnectResponseBody = z.object({
3535
ok: z.literal(true),
36+
workerGroup: z.object({
37+
type: z.string(),
38+
name: z.string(),
39+
}),
3640
});
3741
export type WorkerApiConnectResponseBody = z.infer<typeof WorkerApiConnectResponseBody>;
3842

packages/worker/src/supervisor/session.ts

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -100,17 +100,17 @@ export class SupervisorSession extends EventEmitter<WorkerEvents> {
100100
extraHeaders: getDefaultWorkerHeaders(this.opts),
101101
});
102102
this.socket.on("run:notify", ({ version, run }) => {
103-
console.log("[WorkerSession] Received run notification", { version, run });
103+
console.log("[WorkerSession][WS] Received run notification", { version, run });
104104
this.emit("runNotification", { time: new Date(), run });
105105
});
106106
this.socket.on("connect", () => {
107-
console.log("[WorkerSession] Connected to platform");
107+
console.log("[WorkerSession][WS] Connected to platform");
108108
});
109109
this.socket.on("connect_error", (error) => {
110-
console.error("[WorkerSession] Connection error", { error });
110+
console.error("[WorkerSession][WS] Connection error", { error });
111111
});
112112
this.socket.on("disconnect", (reason, description) => {
113-
console.log("[WorkerSession] Disconnected from platform", { reason, description });
113+
console.log("[WorkerSession][WS] Disconnected from platform", { reason, description });
114114
});
115115
}
116116

@@ -122,10 +122,17 @@ export class SupervisorSession extends EventEmitter<WorkerEvents> {
122122
});
123123

124124
if (!connect.success) {
125-
console.error("[WorkerSession] Failed to connect via HTTP client", { error: connect.error });
126-
throw new Error("[WorkerSession] Failed to connect via HTTP client");
125+
console.error("[WorkerSession][HTTP] Failed to connect", { error: connect.error });
126+
throw new Error("[WorkerSession][HTTP] Failed to connect");
127127
}
128128

129+
const { workerGroup } = connect.data;
130+
131+
console.log("[WorkerSession][HTTP] Connected to platform", {
132+
type: workerGroup.type,
133+
name: workerGroup.name,
134+
});
135+
129136
this.queueConsumer.start();
130137
this.heartbeatService.start();
131138
this.createSocket();

0 commit comments

Comments
 (0)