Skip to content

Commit 2ce9e6a

Browse files
committed
add cache with best effort invalidation
1 parent 97e0ef7 commit 2ce9e6a

File tree

3 files changed

+103
-22
lines changed

3 files changed

+103
-22
lines changed

apps/webapp/app/services/platform.v3.server.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import { logger } from "~/services/logger.server";
2121
import { newProjectPath, organizationBillingPath } from "~/utils/pathBuilder";
2222
import { singleton } from "~/utils/singleton";
2323
import { RedisCacheStore } from "./unkey/redisCacheStore.server";
24+
import { engine } from "~/v3/runEngine.server";
2425
import { existsSync, readFileSync } from "node:fs";
2526
import { z } from "zod";
2627
import { MachinePresetName } from "@trigger.dev/core/v3";
@@ -308,6 +309,8 @@ export async function setPlan(
308309
}
309310
case "free_connected": {
310311
if (result.accepted) {
312+
// Invalidate billing cache since plan changed
313+
engine.invalidateBillingCache(organization.id);
311314
return redirect(newProjectPath(organization, "You're on the Free plan."));
312315
} else {
313316
return redirectWithErrorMessage(
@@ -321,13 +324,17 @@ export async function setPlan(
321324
return redirect(result.checkoutUrl);
322325
}
323326
case "updated_subscription": {
327+
// Invalidate billing cache since subscription changed
328+
engine.invalidateBillingCache(organization.id);
324329
return redirectWithSuccessMessage(
325330
callerPath,
326331
request,
327332
"Subscription updated successfully."
328333
);
329334
}
330335
case "canceled_subscription": {
336+
// Invalidate billing cache since subscription was canceled
337+
engine.invalidateBillingCache(organization.id);
331338
return redirectWithSuccessMessage(callerPath, request, "Subscription canceled.");
332339
}
333340
}

internal-packages/run-engine/src/engine/index.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,7 @@ export class RunEngine {
298298
runAttemptSystem: this.runAttemptSystem,
299299
machines: this.options.machines,
300300
billing: this.options.billing,
301+
redisOptions: this.options.cache?.redis ?? this.options.runLock.redis,
301302
});
302303
}
303304

@@ -1348,4 +1349,12 @@ export class RunEngine {
13481349
orgId: run.organizationId!,
13491350
}));
13501351
}
1352+
1353+
/**
1354+
* Invalidates the billing cache for an organization when their plan changes
1355+
* Runs in background and handles all errors internally
1356+
*/
1357+
invalidateBillingCache(orgId: string): void {
1358+
this.dequeueSystem.invalidateBillingCache(orgId);
1359+
}
13511360
}

internal-packages/run-engine/src/engine/systems/dequeueSystem.ts

Lines changed: 87 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,14 @@
1+
import {
2+
createCache,
3+
DefaultStatefulContext,
4+
MemoryStore,
5+
Namespace,
6+
RedisCacheStore,
7+
UnkeyCache,
8+
} from "@internal/cache";
9+
import type { RedisOptions } from "@internal/redis";
110
import { startSpan } from "@internal/tracing";
2-
import { assertExhaustive, tryCatch } from "@trigger.dev/core";
11+
import { assertExhaustive } from "@trigger.dev/core";
312
import { DequeuedMessage, RetryOptions } from "@trigger.dev/core/v3";
413
import { getMaxDuration } from "@trigger.dev/core/v3/isomorphic";
514
import { PrismaClientOrTransaction } from "@trigger.dev/database";
@@ -11,24 +20,66 @@ import { RunEngineOptions } from "../types.js";
1120
import { ExecutionSnapshotSystem, getLatestExecutionSnapshot } from "./executionSnapshotSystem.js";
1221
import { RunAttemptSystem } from "./runAttemptSystem.js";
1322
import { SystemResources } from "./systems.js";
23+
import { ServiceValidationError } from "../errors.js";
1424

1525
export type DequeueSystemOptions = {
1626
resources: SystemResources;
1727
machines: RunEngineOptions["machines"];
1828
executionSnapshotSystem: ExecutionSnapshotSystem;
1929
runAttemptSystem: RunAttemptSystem;
2030
billing?: RunEngineOptions["billing"];
31+
redisOptions: RedisOptions;
2132
};
2233

34+
// Cache TTLs for billing information - shorter than other caches since billing can change
35+
const BILLING_FRESH_TTL = 60000 * 5; // 5 minutes
36+
const BILLING_STALE_TTL = 60000 * 10; // 10 minutes
37+
2338
export class DequeueSystem {
2439
private readonly $: SystemResources;
2540
private readonly executionSnapshotSystem: ExecutionSnapshotSystem;
2641
private readonly runAttemptSystem: RunAttemptSystem;
42+
private readonly billingCache: UnkeyCache<{
43+
billing: { isPaying: boolean };
44+
}>;
2745

2846
constructor(private readonly options: DequeueSystemOptions) {
2947
this.$ = options.resources;
3048
this.executionSnapshotSystem = options.executionSnapshotSystem;
3149
this.runAttemptSystem = options.runAttemptSystem;
50+
51+
// Initialize billing cache
52+
const ctx = new DefaultStatefulContext();
53+
const memory = new MemoryStore({ persistentMap: new Map() });
54+
const redisCacheStore = new RedisCacheStore({
55+
name: "dequeue-system",
56+
connection: {
57+
...options.redisOptions,
58+
keyPrefix: "engine:dequeue-system:cache:",
59+
},
60+
useModernCacheKeyBuilder: true,
61+
});
62+
63+
this.billingCache = createCache({
64+
billing: new Namespace<{ isPaying: boolean }>(ctx, {
65+
stores: [memory, redisCacheStore],
66+
fresh: BILLING_FRESH_TTL,
67+
stale: BILLING_STALE_TTL,
68+
}),
69+
});
70+
}
71+
72+
/**
73+
* Invalidates the billing cache for an organization when their plan changes
74+
* Runs in background and handles all errors internally
75+
*/
76+
invalidateBillingCache(orgId: string): void {
77+
this.billingCache.billing.remove(orgId).catch((error) => {
78+
this.$.logger.warn("Failed to invalidate billing cache", {
79+
orgId,
80+
error: error.message,
81+
});
82+
});
3283
}
3384

3485
/**
@@ -382,22 +433,7 @@ export class DequeueSystem {
382433
const nextAttemptNumber = currentAttemptNumber + 1;
383434

384435
// Get billing information if available
385-
let isPaying = false;
386-
if (this.options.billing?.getCurrentPlan) {
387-
const [error, planResult] = await tryCatch(
388-
this.options.billing.getCurrentPlan(orgId)
389-
);
390-
391-
if (error) {
392-
this.$.logger.error("Failed to get billing information", {
393-
orgId,
394-
runId,
395-
error: error.message,
396-
});
397-
} else {
398-
isPaying = planResult.isPaying;
399-
}
400-
}
436+
const billing = await this.#getBillingInfo({ orgId, runId });
401437

402438
const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(
403439
prisma,
@@ -467,11 +503,7 @@ export class DequeueSystem {
467503
project: {
468504
id: lockedTaskRun.projectId,
469505
},
470-
billing: {
471-
currentPlan: {
472-
isPaying,
473-
},
474-
},
506+
billing,
475507
} satisfies DequeuedMessage;
476508
}
477509
);
@@ -636,4 +668,37 @@ export class DequeueSystem {
636668
});
637669
});
638670
}
671+
672+
async #getBillingInfo({
673+
orgId,
674+
runId,
675+
}: {
676+
orgId: string;
677+
runId: string;
678+
}): Promise<{ currentPlan: { isPaying: boolean } }> {
679+
if (!this.options.billing?.getCurrentPlan) {
680+
return { currentPlan: { isPaying: false } };
681+
}
682+
683+
const result = await this.billingCache.billing.swr(orgId, async () => {
684+
// This is safe because options can't change at runtime
685+
const planResult = await this.options.billing!.getCurrentPlan(orgId);
686+
687+
return { isPaying: planResult.isPaying };
688+
});
689+
690+
if (result.err) {
691+
throw result.err;
692+
}
693+
694+
if (!result.val) {
695+
throw new ServiceValidationError(
696+
`Could not resolve billing information for organization ${orgId}`,
697+
undefined,
698+
{ orgId, runId }
699+
);
700+
}
701+
702+
return { currentPlan: { isPaying: result.val.isPaying } };
703+
}
639704
}

0 commit comments

Comments
 (0)