88 TaskRunStatus as TaskRunStatusEnum ,
99 TaskTriggerSource ,
1010} from "@trigger.dev/database" ;
11+ import { getConfiguredEventRepository } from "~/v3/eventRepository/index.server" ;
1112
1213// Create a schema that validates TaskRunStatus enum values
1314const TaskRunStatusSchema = z . array ( z . nativeEnum ( TaskRunStatusEnum ) ) ;
@@ -23,6 +24,8 @@ import {
2324 convertClickhouseDateTime64ToJsDate ,
2425} from "~/v3/eventRepository/clickhouseEventRepository.server" ;
2526import { kindToLevel , type LogLevel , LogLevelSchema } from "~/utils/logUtils" ;
27+ import { BasePresenter } from "~/presenters/v3/basePresenter.server" ;
28+
2629
2730export type { LogLevel } ;
2831
@@ -131,9 +134,7 @@ function decodeCursor(cursor: string): LogCursor | null {
131134}
132135
133136// Convert display level to ClickHouse kinds and statuses
134- function levelToKindsAndStatuses (
135- level : LogLevel
136- ) : { kinds ?: string [ ] ; statuses ?: string [ ] } {
137+ function levelToKindsAndStatuses ( level : LogLevel ) : { kinds ?: string [ ] ; statuses ?: string [ ] } {
137138 switch ( level ) {
138139 case "DEBUG" :
139140 return { kinds : [ "DEBUG_EVENT" , "LOG_DEBUG" ] } ;
@@ -150,7 +151,6 @@ function levelToKindsAndStatuses(
150151 }
151152}
152153
153-
154154function convertDateToNanoseconds ( date : Date ) : bigint {
155155 return BigInt ( date . getTime ( ) ) * 1_000_000n ;
156156}
@@ -168,11 +168,13 @@ function formatNanosecondsForClickhouse(ns: bigint): string {
168168 return padded . slice ( 0 , 10 ) + "." + padded . slice ( 10 ) ;
169169}
170170
171- export class LogsListPresenter {
171+ export class LogsListPresenter extends BasePresenter {
172172 constructor (
173173 private readonly replica : PrismaClientOrTransaction ,
174174 private readonly clickhouse : ClickHouse
175- ) { }
175+ ) {
176+ super ( ) ;
177+ }
176178
177179 public async call (
178180 organizationId : string ,
@@ -242,10 +244,7 @@ export class LogsListPresenter {
242244 ( search !== undefined && search !== "" ) ||
243245 ! time . isDefault ;
244246
245- const possibleTasksAsync = getAllTaskIdentifiers (
246- this . replica ,
247- environmentId
248- ) ;
247+ const possibleTasksAsync = getAllTaskIdentifiers ( this . replica , environmentId ) ;
249248
250249 const bulkActionsAsync = this . replica . bulkActionGroup . findMany ( {
251250 select : {
@@ -264,31 +263,26 @@ export class LogsListPresenter {
264263 take : 20 ,
265264 } ) ;
266265
267- const [ possibleTasks , bulkActions , displayableEnvironment ] =
268- await Promise . all ( [
269- possibleTasksAsync ,
270- bulkActionsAsync ,
271- findDisplayableEnvironment ( environmentId , userId ) ,
272- ] ) ;
273-
274- if (
275- bulkId &&
276- ! bulkActions . some ( ( bulkAction ) => bulkAction . friendlyId === bulkId )
277- ) {
278- const selectedBulkAction =
279- await this . replica . bulkActionGroup . findFirst ( {
280- select : {
281- friendlyId : true ,
282- type : true ,
283- createdAt : true ,
284- name : true ,
285- } ,
286- where : {
287- friendlyId : bulkId ,
288- projectId,
289- environmentId,
290- } ,
291- } ) ;
266+ const [ possibleTasks , bulkActions , displayableEnvironment ] = await Promise . all ( [
267+ possibleTasksAsync ,
268+ bulkActionsAsync ,
269+ findDisplayableEnvironment ( environmentId , userId ) ,
270+ ] ) ;
271+
272+ if ( bulkId && ! bulkActions . some ( ( bulkAction ) => bulkAction . friendlyId === bulkId ) ) {
273+ const selectedBulkAction = await this . replica . bulkActionGroup . findFirst ( {
274+ select : {
275+ friendlyId : true ,
276+ type : true ,
277+ createdAt : true ,
278+ name : true ,
279+ } ,
280+ where : {
281+ friendlyId : bulkId ,
282+ projectId,
283+ environmentId,
284+ } ,
285+ } ) ;
292286
293287 if ( selectedBulkAction ) {
294288 bulkActions . push ( selectedBulkAction ) ;
@@ -371,7 +365,29 @@ export class LogsListPresenter {
371365 }
372366 }
373367
374- const queryBuilder = this . clickhouse . taskEventsV2 . logsListQueryBuilder ( ) ;
368+ // Determine which store to use based on organization configuration
369+ const { store } = await getConfiguredEventRepository ( organizationId ) ;
370+
371+ // Throw error if postgres is detected
372+ if ( store === "postgres" ) {
373+ throw new ServiceValidationError (
374+ "Logs are not available for PostgreSQL event store. Please contact support."
375+ ) ;
376+ }
377+
378+ // Throw error if clickhouse v1 is detected (not supported)
379+ if ( store === "postgres" ) {
380+ throw new ServiceValidationError (
381+ "Logs are not available for Postgres event store. Please contact support."
382+ ) ;
383+ }
384+
385+ // Get the appropriate query builder based on store type
386+ const isClickhouseV2 = store === "clickhouse_v2" ;
387+
388+ const queryBuilder = isClickhouseV2
389+ ? this . clickhouse . taskEventsV2 . logsListQueryBuilder ( )
390+ : this . clickhouse . taskEvents . logsListQueryBuilder ( ) ;
375391
376392 queryBuilder . prewhere ( "environment_id = {environmentId: String}" , {
377393 environmentId,
@@ -382,12 +398,17 @@ export class LogsListPresenter {
382398 } ) ;
383399 queryBuilder . where ( "project_id = {projectId: String}" , { projectId } ) ;
384400
385- // Time filters - inserted_at in PREWHERE for partition pruning , start_time in WHERE
401+ // Time filters - inserted_at in PREWHERE only for v2 , start_time in WHERE for both
386402 if ( effectiveFrom ) {
387403 const fromNs = convertDateToNanoseconds ( effectiveFrom ) ;
388- queryBuilder . prewhere ( "inserted_at >= {insertedAtStart: DateTime64(3)}" , {
389- insertedAtStart : convertDateToClickhouseDateTime ( effectiveFrom ) ,
390- } ) ;
404+
405+ // Only use inserted_at for partition pruning if v2
406+ if ( isClickhouseV2 ) {
407+ queryBuilder . prewhere ( "inserted_at >= {insertedAtStart: DateTime64(3)}" , {
408+ insertedAtStart : convertDateToClickhouseDateTime ( effectiveFrom ) ,
409+ } ) ;
410+ }
411+
391412 queryBuilder . where ( "start_time >= {fromTime: String}" , {
392413 fromTime : formatNanosecondsForClickhouse ( fromNs ) ,
393414 } ) ;
@@ -396,9 +417,14 @@ export class LogsListPresenter {
396417 if ( effectiveTo ) {
397418 const clampedTo = effectiveTo > new Date ( ) ? new Date ( ) : effectiveTo ;
398419 const toNs = convertDateToNanoseconds ( clampedTo ) ;
399- queryBuilder . prewhere ( "inserted_at <= {insertedAtEnd: DateTime64(3)}" , {
400- insertedAtEnd : convertDateToClickhouseDateTime ( clampedTo ) ,
401- } ) ;
420+
421+ // Only use inserted_at for partition pruning if v2
422+ if ( isClickhouseV2 ) {
423+ queryBuilder . prewhere ( "inserted_at <= {insertedAtEnd: DateTime64(3)}" , {
424+ insertedAtEnd : convertDateToClickhouseDateTime ( clampedTo ) ,
425+ } ) ;
426+ }
427+
402428 queryBuilder . where ( "start_time <= {toTime: String}" , {
403429 toTime : formatNanosecondsForClickhouse ( toNs ) ,
404430 } ) ;
@@ -428,7 +454,6 @@ export class LogsListPresenter {
428454 ) ;
429455 }
430456
431-
432457 if ( levels && levels . length > 0 ) {
433458 const conditions : string [ ] = [ ] ;
434459 const params : Record < string , string [ ] > = { } ;
@@ -477,7 +502,6 @@ export class LogsListPresenter {
477502
478503 queryBuilder . where ( "NOT (kind = 'SPAN' AND status = 'PARTIAL')" ) ;
479504
480-
481505 // Cursor pagination
482506 const decodedCursor = cursor ? decodeCursor ( cursor ) : null ;
483507 if ( decodedCursor ) {
@@ -529,7 +553,7 @@ export class LogsListPresenter {
529553 try {
530554 let attributes = log . attributes as ErrorAttributes ;
531555
532- if ( attributes ?. error ?. message && typeof attributes . error . message === ' string' ) {
556+ if ( attributes ?. error ?. message && typeof attributes . error . message === " string" ) {
533557 displayMessage = attributes . error . message ;
534558 }
535559 } catch {
0 commit comments