@@ -2,12 +2,11 @@ import { z } from "zod";
22import { type ClickHouse , type LogsListResult } from "@internal/clickhouse" ;
33import { MachinePresetName } from "@trigger.dev/core/v3" ;
44import {
5- type PrismaClient ,
65 type PrismaClientOrTransaction ,
76 type TaskRunStatus ,
87 TaskRunStatus as TaskRunStatusEnum ,
9- TaskTriggerSource ,
108} from "@trigger.dev/database" ;
9+ import { getConfiguredEventRepository } from "~/v3/eventRepository/index.server" ;
1110
1211// Create a schema that validates TaskRunStatus enum values
1312const TaskRunStatusSchema = z . array ( z . nativeEnum ( TaskRunStatusEnum ) ) ;
@@ -18,11 +17,12 @@ import { findDisplayableEnvironment } from "~/models/runtimeEnvironment.server";
1817import { getAllTaskIdentifiers } from "~/models/task.server" ;
1918import { RunsRepository } from "~/services/runsRepository/runsRepository.server" ;
2019import { ServiceValidationError } from "~/v3/services/baseService.server" ;
20+ import { kindToLevel , type LogLevel , LogLevelSchema } from "~/utils/logUtils" ;
21+ import { BasePresenter } from "~/presenters/v3/basePresenter.server" ;
2122import {
2223 convertDateToClickhouseDateTime ,
2324 convertClickhouseDateTime64ToJsDate ,
2425} from "~/v3/eventRepository/clickhouseEventRepository.server" ;
25- import { kindToLevel , type LogLevel , LogLevelSchema } from "~/utils/logUtils" ;
2626
2727export type { LogLevel } ;
2828
@@ -131,9 +131,7 @@ function decodeCursor(cursor: string): LogCursor | null {
131131}
132132
133133// Convert display level to ClickHouse kinds and statuses
134- function levelToKindsAndStatuses (
135- level : LogLevel
136- ) : { kinds ?: string [ ] ; statuses ?: string [ ] } {
134+ function levelToKindsAndStatuses ( level : LogLevel ) : { kinds ?: string [ ] ; statuses ?: string [ ] } {
137135 switch ( level ) {
138136 case "DEBUG" :
139137 return { kinds : [ "DEBUG_EVENT" , "LOG_DEBUG" ] } ;
@@ -150,7 +148,6 @@ function levelToKindsAndStatuses(
150148 }
151149}
152150
153-
154151function convertDateToNanoseconds ( date : Date ) : bigint {
155152 return BigInt ( date . getTime ( ) ) * 1_000_000n ;
156153}
@@ -168,11 +165,13 @@ function formatNanosecondsForClickhouse(ns: bigint): string {
168165 return padded . slice ( 0 , 10 ) + "." + padded . slice ( 10 ) ;
169166}
170167
171- export class LogsListPresenter {
168+ export class LogsListPresenter extends BasePresenter {
172169 constructor (
173170 private readonly replica : PrismaClientOrTransaction ,
174171 private readonly clickhouse : ClickHouse
175- ) { }
172+ ) {
173+ super ( undefined , replica ) ;
174+ }
176175
177176 public async call (
178177 organizationId : string ,
@@ -242,10 +241,7 @@ export class LogsListPresenter {
242241 ( search !== undefined && search !== "" ) ||
243242 ! time . isDefault ;
244243
245- const possibleTasksAsync = getAllTaskIdentifiers (
246- this . replica ,
247- environmentId
248- ) ;
244+ const possibleTasksAsync = getAllTaskIdentifiers ( this . replica , environmentId ) ;
249245
250246 const bulkActionsAsync = this . replica . bulkActionGroup . findMany ( {
251247 select : {
@@ -264,31 +260,26 @@ export class LogsListPresenter {
264260 take : 20 ,
265261 } ) ;
266262
267- const [ possibleTasks , bulkActions , displayableEnvironment ] =
268- await Promise . all ( [
269- possibleTasksAsync ,
270- bulkActionsAsync ,
271- findDisplayableEnvironment ( environmentId , userId ) ,
272- ] ) ;
273-
274- if (
275- bulkId &&
276- ! bulkActions . some ( ( bulkAction ) => bulkAction . friendlyId === bulkId )
277- ) {
278- const selectedBulkAction =
279- await this . replica . bulkActionGroup . findFirst ( {
280- select : {
281- friendlyId : true ,
282- type : true ,
283- createdAt : true ,
284- name : true ,
285- } ,
286- where : {
287- friendlyId : bulkId ,
288- projectId,
289- environmentId,
290- } ,
291- } ) ;
263+ const [ possibleTasks , bulkActions , displayableEnvironment ] = await Promise . all ( [
264+ possibleTasksAsync ,
265+ bulkActionsAsync ,
266+ findDisplayableEnvironment ( environmentId , userId ) ,
267+ ] ) ;
268+
269+ if ( bulkId && ! bulkActions . some ( ( bulkAction ) => bulkAction . friendlyId === bulkId ) ) {
270+ const selectedBulkAction = await this . replica . bulkActionGroup . findFirst ( {
271+ select : {
272+ friendlyId : true ,
273+ type : true ,
274+ createdAt : true ,
275+ name : true ,
276+ } ,
277+ where : {
278+ friendlyId : bulkId ,
279+ projectId,
280+ environmentId,
281+ } ,
282+ } ) ;
292283
293284 if ( selectedBulkAction ) {
294285 bulkActions . push ( selectedBulkAction ) ;
@@ -371,7 +362,22 @@ export class LogsListPresenter {
371362 }
372363 }
373364
374- const queryBuilder = this . clickhouse . taskEventsV2 . logsListQueryBuilder ( ) ;
365+ // Determine which store to use based on organization configuration
366+ const { store } = await getConfiguredEventRepository ( organizationId ) ;
367+
368+ // Throw error if postgres is detected
369+ if ( store === "postgres" ) {
370+ throw new ServiceValidationError (
371+ "Logs are not available for PostgreSQL event store. Please contact support."
372+ ) ;
373+ }
374+
375+ // Get the appropriate query builder based on store type
376+ const isClickhouseV2 = store === "clickhouse_v2" ;
377+
378+ const queryBuilder = isClickhouseV2
379+ ? this . clickhouse . taskEventsV2 . logsListQueryBuilder ( )
380+ : this . clickhouse . taskEvents . logsListQueryBuilder ( ) ;
375381
376382 queryBuilder . prewhere ( "environment_id = {environmentId: String}" , {
377383 environmentId,
@@ -382,12 +388,17 @@ export class LogsListPresenter {
382388 } ) ;
383389 queryBuilder . where ( "project_id = {projectId: String}" , { projectId } ) ;
384390
385- // Time filters - inserted_at in PREWHERE for partition pruning , start_time in WHERE
391+ // Time filters - inserted_at in PREWHERE only for v2 , start_time in WHERE for both
386392 if ( effectiveFrom ) {
387393 const fromNs = convertDateToNanoseconds ( effectiveFrom ) ;
388- queryBuilder . prewhere ( "inserted_at >= {insertedAtStart: DateTime64(3)}" , {
389- insertedAtStart : convertDateToClickhouseDateTime ( effectiveFrom ) ,
390- } ) ;
394+
395+ // Only use inserted_at for partition pruning if v2
396+ if ( isClickhouseV2 ) {
397+ queryBuilder . prewhere ( "inserted_at >= {insertedAtStart: DateTime64(3)}" , {
398+ insertedAtStart : convertDateToClickhouseDateTime ( effectiveFrom ) ,
399+ } ) ;
400+ }
401+
391402 queryBuilder . where ( "start_time >= {fromTime: String}" , {
392403 fromTime : formatNanosecondsForClickhouse ( fromNs ) ,
393404 } ) ;
@@ -396,9 +407,14 @@ export class LogsListPresenter {
396407 if ( effectiveTo ) {
397408 const clampedTo = effectiveTo > new Date ( ) ? new Date ( ) : effectiveTo ;
398409 const toNs = convertDateToNanoseconds ( clampedTo ) ;
399- queryBuilder . prewhere ( "inserted_at <= {insertedAtEnd: DateTime64(3)}" , {
400- insertedAtEnd : convertDateToClickhouseDateTime ( clampedTo ) ,
401- } ) ;
410+
411+ // Only use inserted_at for partition pruning if v2
412+ if ( isClickhouseV2 ) {
413+ queryBuilder . prewhere ( "inserted_at <= {insertedAtEnd: DateTime64(3)}" , {
414+ insertedAtEnd : convertDateToClickhouseDateTime ( clampedTo ) ,
415+ } ) ;
416+ }
417+
402418 queryBuilder . where ( "start_time <= {toTime: String}" , {
403419 toTime : formatNanosecondsForClickhouse ( toNs ) ,
404420 } ) ;
@@ -428,7 +444,6 @@ export class LogsListPresenter {
428444 ) ;
429445 }
430446
431-
432447 if ( levels && levels . length > 0 ) {
433448 const conditions : string [ ] = [ ] ;
434449 const params : Record < string , string [ ] > = { } ;
@@ -477,7 +492,6 @@ export class LogsListPresenter {
477492
478493 queryBuilder . where ( "NOT (kind = 'SPAN' AND status = 'PARTIAL')" ) ;
479494
480-
481495 // Cursor pagination
482496 const decodedCursor = cursor ? decodeCursor ( cursor ) : null ;
483497 if ( decodedCursor ) {
@@ -525,11 +539,11 @@ export class LogsListPresenter {
525539 let displayMessage = log . message ;
526540
527541 // For error logs with status ERROR, try to extract error message from attributes
528- if ( log . status === "ERROR" && log . attributes ) {
542+ if ( log . status === "ERROR" && log . attributes_text ) {
529543 try {
530- let attributes = log . attributes as ErrorAttributes ;
544+ const attributes = JSON . parse ( log . attributes_text ) as ErrorAttributes ;
531545
532- if ( attributes ?. error ?. message && typeof attributes . error . message === ' string' ) {
546+ if ( attributes ?. error ?. message && typeof attributes . error . message === " string" ) {
533547 displayMessage = attributes . error . message ;
534548 }
535549 } catch {
0 commit comments