1- import { type ClickHouse , type ClickhouseQueryBuilder } from "@internal/clickhouse" ;
1+ import { type ClickHouse } from "@internal/clickhouse" ;
22import { type Tracer } from "@internal/tracing" ;
33import { type Logger , type LogLevel } from "@trigger.dev/core/logger" ;
44import { MachinePresetName } from "@trigger.dev/core/v3" ;
55import { BulkActionId , RunId } from "@trigger.dev/core/v3/isomorphic" ;
6- import { Prisma , TaskRunStatus } from "@trigger.dev/database" ;
6+ import { type Prisma , TaskRunStatus } from "@trigger.dev/database" ;
77import parseDuration from "parse-duration" ;
88import { z } from "zod" ;
99import { timeFilters } from "~/components/runs/v3/SharedFilters" ;
1010import { type PrismaClient } from "~/db.server" ;
11- import { ClickHouseRunsRepository } from "./clickhouseRunsRepository.server" ;
12- import { PostgresRunsRepository } from "./postgresRunsRepository.server" ;
1311import { FEATURE_FLAG , makeFlags } from "~/v3/featureFlags.server" ;
1412import { startActiveSpan } from "~/v3/tracer.server" ;
1513import { logger } from "../logger.server" ;
14+ import { ClickHouseRunsRepository } from "./clickhouseRunsRepository.server" ;
15+ import { PostgresRunsRepository } from "./postgresRunsRepository.server" ;
1616
1717export type RunsRepositoryOptions = {
1818 clickhouse : ClickHouse ;
@@ -121,6 +121,7 @@ export class RunsRepository implements IRunsRepository {
121121 private readonly clickHouseRunsRepository : ClickHouseRunsRepository ;
122122 private readonly postgresRunsRepository : PostgresRunsRepository ;
123123 private readonly defaultRepository : "clickhouse" | "postgres" ;
124+ private readonly logger : Logger ;
124125
125126 constructor (
126127 private readonly options : RunsRepositoryOptions & {
@@ -130,6 +131,7 @@ export class RunsRepository implements IRunsRepository {
130131 this . clickHouseRunsRepository = new ClickHouseRunsRepository ( options ) ;
131132 this . postgresRunsRepository = new PostgresRunsRepository ( options ) ;
132133 this . defaultRepository = options . defaultRepository ?? "clickhouse" ;
134+ this . logger = options . logger ?? logger ;
133135 }
134136
135137 get name ( ) {
@@ -163,11 +165,38 @@ export class RunsRepository implements IRunsRepository {
163165 return startActiveSpan (
164166 "runsRepository.listRunIds" ,
165167 async ( ) => {
166- return repository . listRunIds ( options ) ;
168+ try {
169+ return await repository . listRunIds ( options ) ;
170+ } catch ( error ) {
171+ // If ClickHouse fails, retry with Postgres
172+ if ( repository . name === "clickhouse" ) {
173+ this . logger ?. warn ( "ClickHouse failed, retrying with Postgres" , { error } ) ;
174+ return startActiveSpan (
175+ "runsRepository.listRunIds.fallback" ,
176+ async ( ) => {
177+ return await this . postgresRunsRepository . listRunIds ( options ) ;
178+ } ,
179+ {
180+ attributes : {
181+ "repository.name" : "postgres" ,
182+ "fallback.reason" : "clickhouse_error" ,
183+ "fallback.error" : error instanceof Error ? error . message : String ( error ) ,
184+ organizationId : options . organizationId ,
185+ projectId : options . projectId ,
186+ environmentId : options . environmentId ,
187+ } ,
188+ }
189+ ) ;
190+ }
191+ throw error ;
192+ }
167193 } ,
168194 {
169195 attributes : {
170196 "repository.name" : repository . name ,
197+ organizationId : options . organizationId ,
198+ projectId : options . projectId ,
199+ environmentId : options . environmentId ,
171200 } ,
172201 }
173202 ) ;
@@ -184,11 +213,38 @@ export class RunsRepository implements IRunsRepository {
184213 return startActiveSpan (
185214 "runsRepository.listRuns" ,
186215 async ( ) => {
187- return repository . listRuns ( options ) ;
216+ try {
217+ return await repository . listRuns ( options ) ;
218+ } catch ( error ) {
219+ // If ClickHouse fails, retry with Postgres
220+ if ( repository . name === "clickhouse" ) {
221+ this . logger ?. warn ( "ClickHouse failed, retrying with Postgres" , { error } ) ;
222+ return startActiveSpan (
223+ "runsRepository.listRuns.fallback" ,
224+ async ( ) => {
225+ return await this . postgresRunsRepository . listRuns ( options ) ;
226+ } ,
227+ {
228+ attributes : {
229+ "repository.name" : "postgres" ,
230+ "fallback.reason" : "clickhouse_error" ,
231+ "fallback.error" : error instanceof Error ? error . message : String ( error ) ,
232+ organizationId : options . organizationId ,
233+ projectId : options . projectId ,
234+ environmentId : options . environmentId ,
235+ } ,
236+ }
237+ ) ;
238+ }
239+ throw error ;
240+ }
188241 } ,
189242 {
190243 attributes : {
191244 "repository.name" : repository . name ,
245+ organizationId : options . organizationId ,
246+ projectId : options . projectId ,
247+ environmentId : options . environmentId ,
192248 } ,
193249 }
194250 ) ;
@@ -199,11 +255,38 @@ export class RunsRepository implements IRunsRepository {
199255 return startActiveSpan (
200256 "runsRepository.countRuns" ,
201257 async ( ) => {
202- return repository . countRuns ( options ) ;
258+ try {
259+ return await repository . countRuns ( options ) ;
260+ } catch ( error ) {
261+ // If ClickHouse fails, retry with Postgres
262+ if ( repository . name === "clickhouse" ) {
263+ this . logger ?. warn ( "ClickHouse failed, retrying with Postgres" , { error } ) ;
264+ return startActiveSpan (
265+ "runsRepository.countRuns.fallback" ,
266+ async ( ) => {
267+ return await this . postgresRunsRepository . countRuns ( options ) ;
268+ } ,
269+ {
270+ attributes : {
271+ "repository.name" : "postgres" ,
272+ "fallback.reason" : "clickhouse_error" ,
273+ "fallback.error" : error instanceof Error ? error . message : String ( error ) ,
274+ organizationId : options . organizationId ,
275+ projectId : options . projectId ,
276+ environmentId : options . environmentId ,
277+ } ,
278+ }
279+ ) ;
280+ }
281+ throw error ;
282+ }
203283 } ,
204284 {
205285 attributes : {
206286 "repository.name" : repository . name ,
287+ organizationId : options . organizationId ,
288+ projectId : options . projectId ,
289+ environmentId : options . environmentId ,
207290 } ,
208291 }
209292 ) ;
0 commit comments