Skip to content

Commit 97e0ef7

Browse files
committed
add billing info to dequeued message w/o cache
1 parent 42cd14a commit 97e0ef7

File tree

6 files changed

+53
-2
lines changed

6 files changed

+53
-2
lines changed

apps/supervisor/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,7 @@ class ManagedSupervisor {
247247
nextAttemptNumber: message.run.attemptNumber,
248248
snapshotId: message.snapshot.id,
249249
snapshotFriendlyId: message.snapshot.friendlyId,
250+
isPaidTier: message.billing?.currentPlan.isPaying ?? false,
250251
});
251252

252253
// Disabled for now

apps/webapp/app/v3/runEngine.server.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { RunEngine } from "@internal/run-engine";
22
import { $replica, prisma } from "~/db.server";
33
import { env } from "~/env.server";
4-
import { defaultMachine } from "~/services/platform.v3.server";
4+
import { defaultMachine, getCurrentPlan } from "~/services/platform.v3.server";
55
import { singleton } from "~/utils/singleton";
66
import { allMachines } from "./machinePresets.server";
77
import { meter, tracer } from "./tracer.server";
@@ -105,6 +105,21 @@ function createRunEngine() {
105105
SUSPENDED: env.RUN_ENGINE_TIMEOUT_SUSPENDED,
106106
},
107107
retryWarmStartThresholdMs: env.RUN_ENGINE_RETRY_WARM_START_THRESHOLD_MS,
108+
billing: {
109+
getCurrentPlan: async (orgId: string) => {
110+
const plan = await getCurrentPlan(orgId);
111+
112+
if (!plan) {
113+
return { isPaying: false };
114+
}
115+
116+
if (!plan.v3Subscription) {
117+
return { isPaying: false };
118+
}
119+
120+
return { isPaying: plan.v3Subscription.isPaying };
121+
},
122+
},
108123
});
109124

110125
return engine;

internal-packages/run-engine/src/engine/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,7 @@ export class RunEngine {
297297
executionSnapshotSystem: this.executionSnapshotSystem,
298298
runAttemptSystem: this.runAttemptSystem,
299299
machines: this.options.machines,
300+
billing: this.options.billing,
300301
});
301302
}
302303

internal-packages/run-engine/src/engine/systems/dequeueSystem.ts

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { startSpan } from "@internal/tracing";
2-
import { assertExhaustive } from "@trigger.dev/core";
2+
import { assertExhaustive, tryCatch } from "@trigger.dev/core";
33
import { DequeuedMessage, RetryOptions } from "@trigger.dev/core/v3";
44
import { getMaxDuration } from "@trigger.dev/core/v3/isomorphic";
55
import { PrismaClientOrTransaction } from "@trigger.dev/database";
@@ -17,6 +17,7 @@ export type DequeueSystemOptions = {
1717
machines: RunEngineOptions["machines"];
1818
executionSnapshotSystem: ExecutionSnapshotSystem;
1919
runAttemptSystem: RunAttemptSystem;
20+
billing?: RunEngineOptions["billing"];
2021
};
2122

2223
export class DequeueSystem {
@@ -380,6 +381,24 @@ export class DequeueSystem {
380381
const currentAttemptNumber = lockedTaskRun.attemptNumber ?? 0;
381382
const nextAttemptNumber = currentAttemptNumber + 1;
382383

384+
// Get billing information if available
385+
let isPaying = false;
386+
if (this.options.billing?.getCurrentPlan) {
387+
const [error, planResult] = await tryCatch(
388+
this.options.billing.getCurrentPlan(orgId)
389+
);
390+
391+
if (error) {
392+
this.$.logger.error("Failed to get billing information", {
393+
orgId,
394+
runId,
395+
error: error.message,
396+
});
397+
} else {
398+
isPaying = planResult.isPaying;
399+
}
400+
}
401+
383402
const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(
384403
prisma,
385404
{
@@ -448,6 +467,11 @@ export class DequeueSystem {
448467
project: {
449468
id: lockedTaskRun.projectId,
450469
},
470+
billing: {
471+
currentPlan: {
472+
isPaying,
473+
},
474+
},
451475
} satisfies DequeuedMessage;
452476
}
453477
);

internal-packages/run-engine/src/engine/types.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ export type RunEngineOptions = {
3030
machines: Record<string, MachinePreset>;
3131
baseCostInCents: number;
3232
};
33+
billing?: {
34+
getCurrentPlan: (orgId: string) => Promise<{ isPaying: boolean }>;
35+
};
3336
queue: {
3437
redis: RedisOptions;
3538
shardCount?: number;

packages/core/src/v3/schemas/runEngine.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,5 +261,12 @@ export const DequeuedMessage = z.object({
261261
project: z.object({
262262
id: z.string(),
263263
}),
264+
billing: z
265+
.object({
266+
currentPlan: z.object({
267+
isPaying: z.boolean(),
268+
}),
269+
})
270+
.optional(),
264271
});
265272
export type DequeuedMessage = z.infer<typeof DequeuedMessage>;

0 commit comments

Comments
 (0)