Skip to content

Commit 3a4a622

Browse files
committed
Make ClickHouse required
1 parent f8733b3 commit 3a4a622

File tree

11 files changed

+25
-242
lines changed

11 files changed

+25
-242
lines changed

apps/webapp/app/env.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -912,7 +912,7 @@ const EnvironmentSchema = z.object({
912912
RUN_REPLICATION_INSERT_MAX_DELAY_MS: z.coerce.number().int().default(2000),
913913

914914
// Clickhouse
915-
CLICKHOUSE_URL: z.string().optional(),
915+
CLICKHOUSE_URL: z.string(),
916916
CLICKHOUSE_KEEP_ALIVE_ENABLED: z.string().default("1"),
917917
CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS: z.coerce.number().int().optional(),
918918
CLICKHOUSE_MAX_OPEN_CONNECTIONS: z.coerce.number().int().default(10),

apps/webapp/app/presenters/v3/ApiRunListPresenter.server.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -213,10 +213,6 @@ export class ApiRunListPresenter extends BasePresenter {
213213
options.batchId = searchParams["filter[batch]"];
214214
}
215215

216-
if (!clickhouseClient) {
217-
throw new Error("Clickhouse is not supported yet");
218-
}
219-
220216
const presenter = new NextRunListPresenter(this._prisma, clickhouseClient);
221217

222218
logger.debug("Calling RunListPresenter", { options });

apps/webapp/app/presenters/v3/CreateBulkActionPresenter.server.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,6 @@ export class CreateBulkActionPresenter extends BasePresenter {
2424
Object.fromEntries(new URL(request.url).searchParams)
2525
);
2626

27-
if (!clickhouseClient) {
28-
throw new Error("Clickhouse client not found");
29-
}
30-
3127
const runsRepository = new RunsRepository({
3228
clickhouse: clickhouseClient,
3329
prisma: this._replica as PrismaClient,

apps/webapp/app/presenters/v3/TaskListPresenter.server.ts

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import {
1111
type CurrentRunningStats,
1212
type DailyTaskActivity,
1313
type EnvironmentMetricsRepository,
14-
PostgrestEnvironmentMetricsRepository,
1514
} from "~/services/environmentMetricsRepository.server";
1615
import { singleton } from "~/utils/singleton";
1716
import { findCurrentWorkerFromEnvironment } from "~/v3/models/workerDeployment.server";
@@ -110,13 +109,9 @@ export class TaskListPresenter {
110109
export const taskListPresenter = singleton("taskListPresenter", setupTaskListPresenter);
111110

112111
function setupTaskListPresenter() {
113-
const environmentMetricsRepository = clickhouseClient
114-
? new ClickHouseEnvironmentMetricsRepository({
115-
clickhouse: clickhouseClient,
116-
})
117-
: new PostgrestEnvironmentMetricsRepository({
118-
prisma: $replica,
119-
});
112+
const environmentMetricsRepository = new ClickHouseEnvironmentMetricsRepository({
113+
clickhouse: clickhouseClient,
114+
});
120115

121116
return new TaskListPresenter(environmentMetricsRepository, $replica);
122117
}

apps/webapp/app/presenters/v3/UsagePresenter.server.ts

Lines changed: 19 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -124,60 +124,24 @@ async function getTaskUsageByOrganization(
124124
endOfMonth: Date,
125125
replica: PrismaClientOrTransaction
126126
) {
127-
if (clickhouseClient) {
128-
const [queryError, tasks] = await clickhouseClient.taskRuns.getTaskUsageByOrganization({
129-
startTime: startOfMonth.getTime(),
130-
endTime: endOfMonth.getTime(),
131-
organizationId,
132-
});
133-
134-
if (queryError) {
135-
throw queryError;
136-
}
137-
138-
return tasks
139-
.map((task) => ({
140-
taskIdentifier: task.task_identifier,
141-
runCount: Number(task.run_count),
142-
averageDuration: Number(task.average_duration),
143-
averageCost: Number(task.average_cost) + env.CENTS_PER_RUN / 100,
144-
totalDuration: Number(task.total_duration),
145-
totalCost: Number(task.total_cost) + Number(task.total_base_cost),
146-
}))
147-
.sort((a, b) => b.totalCost - a.totalCost);
148-
} else {
149-
return replica.$queryRaw<TaskUsageItem[]>`
150-
SELECT
151-
tr."taskIdentifier",
152-
COUNT(*) AS "runCount",
153-
AVG(tr."usageDurationMs") AS "averageDuration",
154-
SUM(tr."usageDurationMs") AS "totalDuration",
155-
AVG(tr."costInCents") / 100.0 AS "averageCost",
156-
SUM(tr."costInCents") / 100.0 AS "totalCost",
157-
SUM(tr."baseCostInCents") / 100.0 AS "totalBaseCost"
158-
FROM
159-
${sqlDatabaseSchema}."TaskRun" tr
160-
JOIN ${sqlDatabaseSchema}."Project" pr ON pr.id = tr."projectId"
161-
JOIN ${sqlDatabaseSchema}."Organization" org ON org.id = pr."organizationId"
162-
JOIN ${sqlDatabaseSchema}."RuntimeEnvironment" env ON env."id" = tr."runtimeEnvironmentId"
163-
WHERE
164-
env.type <> 'DEVELOPMENT'
165-
AND tr."createdAt" > ${startOfMonth}
166-
AND tr."createdAt" < ${endOfMonth}
167-
AND org.id = ${organizationId}
168-
GROUP BY
169-
tr."taskIdentifier";
170-
`.then((data) => {
171-
return data
172-
.map((item) => ({
173-
taskIdentifier: item.taskIdentifier,
174-
runCount: Number(item.runCount),
175-
averageDuration: Number(item.averageDuration),
176-
averageCost: Number(item.averageCost) + env.CENTS_PER_RUN / 100,
177-
totalDuration: Number(item.totalDuration),
178-
totalCost: Number(item.totalCost) + Number(item.totalBaseCost),
179-
}))
180-
.sort((a, b) => b.totalCost - a.totalCost);
181-
});
127+
const [queryError, tasks] = await clickhouseClient.taskRuns.getTaskUsageByOrganization({
128+
startTime: startOfMonth.getTime(),
129+
endTime: endOfMonth.getTime(),
130+
organizationId,
131+
});
132+
133+
if (queryError) {
134+
throw queryError;
182135
}
136+
137+
return tasks
138+
.map((task) => ({
139+
taskIdentifier: task.task_identifier,
140+
runCount: Number(task.run_count),
141+
averageDuration: Number(task.average_duration),
142+
averageCost: Number(task.average_cost) + env.CENTS_PER_RUN / 100,
143+
totalDuration: Number(task.total_duration),
144+
totalCost: Number(task.total_cost) + Number(task.total_base_cost),
145+
}))
146+
.sort((a, b) => b.totalCost - a.totalCost);
183147
}

apps/webapp/app/presenters/v3/ViewSchedulePresenter.server.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,6 @@ export class ViewSchedulePresenter {
7777
? nextScheduledTimestamps(schedule.generatorExpression, schedule.timezone, new Date(), 5)
7878
: [];
7979

80-
if (!clickhouseClient) {
81-
throw new Error("Clickhouse is not supported yet");
82-
}
83-
8480
const runPresenter = new NextRunListPresenter(this.#prismaClient, clickhouseClient);
8581
const { runs } = await runPresenter.call(schedule.project.organizationId, environmentId, {
8682
projectId: schedule.project.id,

apps/webapp/app/presenters/v3/WaitpointPresenter.server.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,10 +79,6 @@ export class WaitpointPresenter extends BasePresenter {
7979
const connectedRuns: NextRunListItem[] = [];
8080

8181
if (connectedRunIds.length > 0) {
82-
if (!clickhouseClient) {
83-
throw new Error("Clickhouse is not supported yet");
84-
}
85-
8682
const runPresenter = new NextRunListPresenter(this._prisma, clickhouseClient);
8783
const { runs } = await runPresenter.call(
8884
waitpoint.environment.organizationId,

apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs._index/route.tsx

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,6 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
8181
throw new Error("Environment not found");
8282
}
8383

84-
if (!clickhouseClient) {
85-
throw new Error("Clickhouse is not supported yet");
86-
}
87-
8884
const filters = await getRunFiltersFromRequest(request);
8985

9086
const presenter = new NextRunListPresenter($replica, clickhouseClient);

apps/webapp/app/services/clickhouseInstance.server.ts

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,6 @@ import { singleton } from "~/utils/singleton";
55
export const clickhouseClient = singleton("clickhouseClient", initializeClickhouseClient);
66

77
function initializeClickhouseClient() {
8-
if (!env.CLICKHOUSE_URL) {
9-
console.log("🗃️ Clickhouse service not enabled");
10-
return;
11-
}
12-
138
const url = new URL(env.CLICKHOUSE_URL);
149

1510
// Remove secure param

apps/webapp/app/services/environmentMetricsRepository.server.ts

Lines changed: 2 additions & 145 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
1-
import { ClickHouse } from "@internal/clickhouse";
2-
import { Logger, LogLevel } from "@trigger.dev/core/logger";
3-
import type { PrismaClientOrTransaction, TaskRunStatus } from "@trigger.dev/database";
4-
import { Prisma } from "@trigger.dev/database";
1+
import { type ClickHouse } from "@internal/clickhouse";
2+
import type { TaskRunStatus } from "@trigger.dev/database";
53
import { QUEUED_STATUSES } from "~/components/runs/v3/TaskRunStatus";
64

75
export type DailyTaskActivity = Record<string, ({ day: string } & Record<TaskRunStatus, number>)[]>;
@@ -34,147 +32,6 @@ export interface EnvironmentMetricsRepository {
3432
}): Promise<AverageDurations>;
3533
}
3634

37-
export type PostgrestEnvironmentMetricsRepositoryOptions = {
38-
prisma: PrismaClientOrTransaction;
39-
schema?: string;
40-
logger?: Logger;
41-
logLevel?: LogLevel;
42-
};
43-
44-
export class PostgrestEnvironmentMetricsRepository implements EnvironmentMetricsRepository {
45-
private readonly logger: Logger;
46-
private readonly schema: string;
47-
48-
constructor(private readonly options: PostgrestEnvironmentMetricsRepositoryOptions) {
49-
this.logger =
50-
options.logger ??
51-
new Logger("PostgrestEnvironmentMetricsRepository", options.logLevel ?? "info");
52-
this.schema = options.schema ?? "public";
53-
}
54-
55-
public async getDailyTaskActivity({
56-
environmentId,
57-
days,
58-
tasks,
59-
}: {
60-
environmentId: string;
61-
days: number;
62-
tasks: string[];
63-
}): Promise<DailyTaskActivity> {
64-
if (tasks.length === 0) {
65-
return {};
66-
}
67-
68-
const activity = await this.options.prisma.$queryRaw<
69-
{
70-
taskIdentifier: string;
71-
status: TaskRunStatus;
72-
day: Date;
73-
count: BigInt;
74-
}[]
75-
>`
76-
SELECT
77-
tr."taskIdentifier",
78-
tr."status",
79-
DATE(tr."createdAt") as day,
80-
COUNT(*)
81-
FROM
82-
${Prisma.sql([this.schema])}."TaskRun" as tr
83-
WHERE
84-
tr."taskIdentifier" IN (${Prisma.join(tasks)})
85-
AND tr."runtimeEnvironmentId" = ${environmentId}
86-
AND tr."createdAt" >= (current_date - interval '1 day' * ${days})
87-
GROUP BY
88-
tr."taskIdentifier",
89-
tr."status",
90-
day
91-
ORDER BY
92-
tr."taskIdentifier" ASC,
93-
day ASC,
94-
tr."status" ASC;`;
95-
96-
return fillInDailyTaskActivity(activity, days);
97-
}
98-
99-
public async getCurrentRunningStats({
100-
environmentId,
101-
days,
102-
tasks,
103-
}: {
104-
environmentId: string;
105-
days: number;
106-
tasks: string[];
107-
}): Promise<CurrentRunningStats> {
108-
if (tasks.length === 0) {
109-
return {};
110-
}
111-
112-
const stats = await this.options.prisma.$queryRaw<
113-
{
114-
taskIdentifier: string;
115-
status: TaskRunStatus;
116-
count: BigInt;
117-
}[]
118-
>`
119-
SELECT
120-
tr."taskIdentifier",
121-
tr.status,
122-
COUNT(*)
123-
FROM
124-
${Prisma.sql([this.schema])}."TaskRun" as tr
125-
WHERE
126-
tr."taskIdentifier" IN (${Prisma.join(tasks)})
127-
AND tr."runtimeEnvironmentId" = ${environmentId}
128-
AND tr."createdAt" >= (current_date - interval '1 day' * ${days})
129-
AND tr."status" = ANY(ARRAY[${Prisma.join([
130-
...QUEUED_STATUSES,
131-
"EXECUTING",
132-
])}]::\"TaskRunStatus\"[])
133-
GROUP BY
134-
tr."taskIdentifier",
135-
tr.status
136-
ORDER BY
137-
tr."taskIdentifier" ASC`;
138-
139-
return fillInCurrentRunningStats(stats, tasks);
140-
}
141-
142-
public async getAverageDurations({
143-
environmentId,
144-
days,
145-
tasks,
146-
}: {
147-
environmentId: string;
148-
days: number;
149-
tasks: string[];
150-
}): Promise<AverageDurations> {
151-
if (tasks.length === 0) {
152-
return {};
153-
}
154-
155-
const durations = await this.options.prisma.$queryRaw<
156-
{
157-
taskIdentifier: string;
158-
duration: Number;
159-
}[]
160-
>`
161-
SELECT
162-
tr."taskIdentifier",
163-
AVG(EXTRACT(EPOCH FROM (tr."updatedAt" - COALESCE(tr."startedAt", tr."lockedAt")))) as duration
164-
FROM
165-
${Prisma.sql([this.schema])}."TaskRun" as tr
166-
WHERE
167-
tr."taskIdentifier" IN (${Prisma.join(tasks)})
168-
AND tr."runtimeEnvironmentId" = ${environmentId}
169-
AND tr."createdAt" >= (current_date - interval '1 day' * ${days})
170-
AND tr."status" IN ('COMPLETED_SUCCESSFULLY', 'COMPLETED_WITH_ERRORS')
171-
GROUP BY
172-
tr."taskIdentifier";`;
173-
174-
return Object.fromEntries(durations.map((s) => [s.taskIdentifier, Number(s.duration)]));
175-
}
176-
}
177-
17835
export type ClickHouseEnvironmentMetricsRepositoryOptions = {
17936
clickhouse: ClickHouse;
18037
};

0 commit comments

Comments
 (0)