Skip to content

Commit 5d75706

Browse files
committed
use new plan type on runs as fallback during dequeue
1 parent 021591f commit 5d75706

File tree

8 files changed

+180
-94
lines changed

8 files changed

+180
-94
lines changed

apps/webapp/app/v3/runEngine.server.ts

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,14 +110,23 @@ function createRunEngine() {
110110
const plan = await getCurrentPlan(orgId);
111111

112112
if (!plan) {
113-
return { isPaying: false };
113+
return {
114+
isPaying: false,
115+
type: "free",
116+
};
114117
}
115118

116119
if (!plan.v3Subscription) {
117-
return { isPaying: false };
120+
return {
121+
isPaying: false,
122+
type: "free",
123+
};
118124
}
119125

120-
return { isPaying: plan.v3Subscription.isPaying };
126+
return {
127+
isPaying: plan.v3Subscription.isPaying,
128+
type: plan.v3Subscription.plan?.type ?? "free",
129+
};
121130
},
122131
},
123132
});

internal-packages/cache/src/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ export {
33
DefaultStatefulContext,
44
Namespace,
55
type Cache as UnkeyCache,
6+
type CacheError,
67
} from "@unkey/cache";
8+
export { type Result, Ok, Err } from "@unkey/error";
79
export { MemoryStore } from "@unkey/cache/stores";
810
export { RedisCacheStore } from "./stores/redis.js";
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- AlterTable
2+
ALTER TABLE "TaskRun" ADD COLUMN "planType" TEXT;

internal-packages/database/prisma/schema.prisma

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -707,6 +707,9 @@ model TaskRun {
707707
/// Run error
708708
error Json?
709709
710+
/// Organization's billing plan type (cached for fallback when billing API fails)
711+
planType String?
712+
710713
maxDurationInSeconds Int?
711714
712715
@@unique([oneTimeUseToken])
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
import {
2+
createCache,
3+
DefaultStatefulContext,
4+
MemoryStore,
5+
Namespace,
6+
Ok,
7+
RedisCacheStore,
8+
type UnkeyCache,
9+
type CacheError,
10+
type Result,
11+
} from "@internal/cache";
12+
import type { RedisOptions } from "@internal/redis";
13+
import type { Logger } from "@trigger.dev/core/logger";
14+
import type { RunEngineOptions } from "./types.js";
15+
16+
// Cache TTLs for billing information - shorter than other caches since billing can change
17+
const BILLING_FRESH_TTL = 60000 * 5; // 5 minutes
18+
const BILLING_STALE_TTL = 60000 * 10; // 10 minutes
19+
20+
export type BillingPlan = {
21+
isPaying: boolean;
22+
type: "free" | "paid" | "enterprise";
23+
};
24+
25+
export type BillingCacheOptions = {
26+
billingOptions?: RunEngineOptions["billing"];
27+
redisOptions: RedisOptions;
28+
logger: Logger;
29+
};
30+
31+
export class BillingCache {
32+
private readonly cache: UnkeyCache<{
33+
currentPlan: BillingPlan;
34+
}>;
35+
private readonly logger: Logger;
36+
private readonly billingOptions?: RunEngineOptions["billing"];
37+
38+
constructor(options: BillingCacheOptions) {
39+
this.logger = options.logger;
40+
this.billingOptions = options.billingOptions;
41+
42+
// Initialize cache
43+
const ctx = new DefaultStatefulContext();
44+
const memory = new MemoryStore({ persistentMap: new Map() });
45+
const redisCacheStore = new RedisCacheStore({
46+
name: "billing-cache",
47+
connection: {
48+
...options.redisOptions,
49+
keyPrefix: "engine:billing:cache:",
50+
},
51+
useModernCacheKeyBuilder: true,
52+
});
53+
54+
this.cache = createCache({
55+
currentPlan: new Namespace<BillingPlan>(ctx, {
56+
stores: [memory, redisCacheStore],
57+
fresh: BILLING_FRESH_TTL,
58+
stale: BILLING_STALE_TTL,
59+
}),
60+
});
61+
}
62+
63+
/**
64+
* Gets the current billing plan for an organization
65+
* Returns a Result that allows the caller to handle errors and missing values
66+
*/
67+
async getCurrentPlan(orgId: string): Promise<Result<BillingPlan | undefined, CacheError>> {
68+
if (!this.billingOptions?.getCurrentPlan) {
69+
// Return a successful result with default free plan
70+
return Ok({ isPaying: false, type: "free" });
71+
}
72+
73+
return await this.cache.currentPlan.swr(orgId, async () => {
74+
// This is safe because options can't change at runtime
75+
const planResult = await this.billingOptions!.getCurrentPlan(orgId);
76+
return { isPaying: planResult.isPaying, type: planResult.type };
77+
});
78+
}
79+
80+
/**
81+
* Invalidates the billing cache for an organization when their plan changes
82+
* Runs in background and handles all errors internally
83+
*/
84+
invalidate(orgId: string): void {
85+
this.cache.currentPlan.remove(orgId).catch((error) => {
86+
this.logger.warn("Failed to invalidate billing cache", {
87+
orgId,
88+
error: error instanceof Error ? error.message : String(error),
89+
});
90+
});
91+
}
92+
}

internal-packages/run-engine/src/engine/index.ts

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { BillingCache } from "./billingCache.js";
12
import { createRedisClient, Redis } from "@internal/redis";
23
import { getMeter, Meter, startSpan, trace, Tracer } from "@internal/tracing";
34
import { Logger } from "@trigger.dev/core/logger";
@@ -78,6 +79,8 @@ export class RunEngine {
7879
pendingVersionSystem: PendingVersionSystem;
7980
raceSimulationSystem: RaceSimulationSystem = new RaceSimulationSystem();
8081

82+
private readonly billingCache: BillingCache;
83+
8184
constructor(private readonly options: RunEngineOptions) {
8285
this.logger = options.logger ?? new Logger("RunEngine", this.options.logLevel ?? "info");
8386
this.prisma = options.prisma;
@@ -292,12 +295,18 @@ export class RunEngine {
292295
redisOptions: this.options.cache?.redis ?? this.options.runLock.redis,
293296
});
294297

298+
this.billingCache = new BillingCache({
299+
billingOptions: this.options.billing,
300+
redisOptions: this.options.cache?.redis ?? this.options.runLock.redis,
301+
logger: this.logger,
302+
});
303+
295304
this.dequeueSystem = new DequeueSystem({
296305
resources,
297306
executionSnapshotSystem: this.executionSnapshotSystem,
298307
runAttemptSystem: this.runAttemptSystem,
299308
machines: this.options.machines,
300-
billing: this.options.billing,
309+
billingCache: this.billingCache,
301310
redisOptions: this.options.cache?.redis ?? this.options.runLock.redis,
302311
});
303312
}
@@ -366,13 +375,37 @@ export class RunEngine {
366375
"trigger",
367376
async (span) => {
368377
const status = delayUntil ? "DELAYED" : "PENDING";
378+
const runId = RunId.fromFriendlyId(friendlyId);
379+
380+
// Get billing information to store with the run
381+
let planType: string | undefined;
382+
const currentPlan = await this.billingCache.getCurrentPlan(environment.organization.id);
383+
384+
if (currentPlan.err || !currentPlan.val) {
385+
// If billing lookup fails, don't block the trigger - planType will be null
386+
this.logger.warn(
387+
"Failed to get billing info during trigger, proceeding without planType",
388+
{
389+
orgId: environment.organization.id,
390+
runId,
391+
error:
392+
currentPlan.err instanceof Error
393+
? currentPlan.err.message
394+
: String(currentPlan.err),
395+
hasValue: !!currentPlan.val,
396+
}
397+
);
398+
planType = undefined;
399+
} else {
400+
planType = currentPlan.val.type;
401+
}
369402

370403
//create run
371404
let taskRun: TaskRun;
372405
try {
373406
taskRun = await prisma.taskRun.create({
374407
data: {
375-
id: RunId.fromFriendlyId(friendlyId),
408+
id: runId,
376409
engine: "V2",
377410
status,
378411
number,
@@ -431,6 +464,7 @@ export class RunEngine {
431464
scheduleInstanceId,
432465
createdAt,
433466
bulkActionGroupIds: bulkActionId ? [bulkActionId] : undefined,
467+
planType,
434468
executionSnapshots: {
435469
create: {
436470
engine: "V2",
@@ -1355,6 +1389,6 @@ export class RunEngine {
13551389
* Runs in background and handles all errors internally
13561390
*/
13571391
invalidateBillingCache(orgId: string): void {
1358-
this.dequeueSystem.invalidateBillingCache(orgId);
1392+
this.billingCache.invalidate(orgId);
13591393
}
13601394
}

internal-packages/run-engine/src/engine/systems/dequeueSystem.ts

Lines changed: 30 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,5 @@
1-
import {
2-
createCache,
3-
DefaultStatefulContext,
4-
MemoryStore,
5-
Namespace,
6-
RedisCacheStore,
7-
UnkeyCache,
8-
} from "@internal/cache";
91
import type { RedisOptions } from "@internal/redis";
2+
import type { BillingCache } from "../billingCache.js";
103
import { startSpan } from "@internal/tracing";
114
import { assertExhaustive } from "@trigger.dev/core";
125
import { DequeuedMessage, RetryOptions } from "@trigger.dev/core/v3";
@@ -20,66 +13,25 @@ import { RunEngineOptions } from "../types.js";
2013
import { ExecutionSnapshotSystem, getLatestExecutionSnapshot } from "./executionSnapshotSystem.js";
2114
import { RunAttemptSystem } from "./runAttemptSystem.js";
2215
import { SystemResources } from "./systems.js";
23-
import { ServiceValidationError } from "../errors.js";
2416

2517
export type DequeueSystemOptions = {
2618
resources: SystemResources;
2719
machines: RunEngineOptions["machines"];
2820
executionSnapshotSystem: ExecutionSnapshotSystem;
2921
runAttemptSystem: RunAttemptSystem;
30-
billing?: RunEngineOptions["billing"];
22+
billingCache: BillingCache;
3123
redisOptions: RedisOptions;
3224
};
3325

34-
// Cache TTLs for billing information - shorter than other caches since billing can change
35-
const BILLING_FRESH_TTL = 60000 * 5; // 5 minutes
36-
const BILLING_STALE_TTL = 60000 * 10; // 10 minutes
37-
3826
export class DequeueSystem {
3927
private readonly $: SystemResources;
4028
private readonly executionSnapshotSystem: ExecutionSnapshotSystem;
4129
private readonly runAttemptSystem: RunAttemptSystem;
42-
private readonly billingCache: UnkeyCache<{
43-
billing: { isPaying: boolean };
44-
}>;
4530

4631
constructor(private readonly options: DequeueSystemOptions) {
4732
this.$ = options.resources;
4833
this.executionSnapshotSystem = options.executionSnapshotSystem;
4934
this.runAttemptSystem = options.runAttemptSystem;
50-
51-
// Initialize billing cache
52-
const ctx = new DefaultStatefulContext();
53-
const memory = new MemoryStore({ persistentMap: new Map() });
54-
const redisCacheStore = new RedisCacheStore({
55-
name: "dequeue-system",
56-
connection: {
57-
...options.redisOptions,
58-
keyPrefix: "engine:dequeue-system:cache:",
59-
},
60-
useModernCacheKeyBuilder: true,
61-
});
62-
63-
this.billingCache = createCache({
64-
billing: new Namespace<{ isPaying: boolean }>(ctx, {
65-
stores: [memory, redisCacheStore],
66-
fresh: BILLING_FRESH_TTL,
67-
stale: BILLING_STALE_TTL,
68-
}),
69-
});
70-
}
71-
72-
/**
73-
* Invalidates the billing cache for an organization when their plan changes
74-
* Runs in background and handles all errors internally
75-
*/
76-
invalidateBillingCache(orgId: string): void {
77-
this.billingCache.billing.remove(orgId).catch((error) => {
78-
this.$.logger.warn("Failed to invalidate billing cache", {
79-
orgId,
80-
error: error.message,
81-
});
82-
});
8335
}
8436

8537
/**
@@ -432,8 +384,29 @@ export class DequeueSystem {
432384
const currentAttemptNumber = lockedTaskRun.attemptNumber ?? 0;
433385
const nextAttemptNumber = currentAttemptNumber + 1;
434386

435-
// Get billing information if available
436-
const billing = await this.#getBillingInfo({ orgId, runId });
387+
// Get billing information if available, with fallback to TaskRun.planType
388+
const billingResult = await this.options.billingCache.getCurrentPlan(orgId);
389+
390+
let isPaying: boolean;
391+
if (billingResult.err || !billingResult.val) {
392+
// Fallback to stored planType on TaskRun if billing cache fails or returns no value
393+
this.$.logger.warn(
394+
"Billing cache failed or returned no value, falling back to TaskRun.planType",
395+
{
396+
orgId,
397+
runId,
398+
error:
399+
billingResult.err instanceof Error
400+
? billingResult.err.message
401+
: String(billingResult.err),
402+
currentPlan: billingResult.val,
403+
}
404+
);
405+
406+
isPaying = lockedTaskRun.planType !== null && lockedTaskRun.planType !== "free";
407+
} else {
408+
isPaying = billingResult.val.isPaying;
409+
}
437410

438411
const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(
439412
prisma,
@@ -503,7 +476,11 @@ export class DequeueSystem {
503476
project: {
504477
id: lockedTaskRun.projectId,
505478
},
506-
billing,
479+
billing: {
480+
currentPlan: {
481+
isPaying,
482+
},
483+
},
507484
} satisfies DequeuedMessage;
508485
}
509486
);
@@ -668,37 +645,4 @@ export class DequeueSystem {
668645
});
669646
});
670647
}
671-
672-
async #getBillingInfo({
673-
orgId,
674-
runId,
675-
}: {
676-
orgId: string;
677-
runId: string;
678-
}): Promise<{ currentPlan: { isPaying: boolean } }> {
679-
if (!this.options.billing?.getCurrentPlan) {
680-
return { currentPlan: { isPaying: false } };
681-
}
682-
683-
const result = await this.billingCache.billing.swr(orgId, async () => {
684-
// This is safe because options can't change at runtime
685-
const planResult = await this.options.billing!.getCurrentPlan(orgId);
686-
687-
return { isPaying: planResult.isPaying };
688-
});
689-
690-
if (result.err) {
691-
throw result.err;
692-
}
693-
694-
if (!result.val) {
695-
throw new ServiceValidationError(
696-
`Could not resolve billing information for organization ${orgId}`,
697-
undefined,
698-
{ orgId, runId }
699-
);
700-
}
701-
702-
return { currentPlan: { isPaying: result.val.isPaying } };
703-
}
704648
}

0 commit comments

Comments
 (0)