1
1
import type { ClickHouse , TaskEventSummaryV1Result , TaskEventV1Input } from "@internal/clickhouse" ;
2
2
import { Attributes , startSpan , trace , Tracer } from "@internal/tracing" ;
3
- import { DynamicFlushScheduler } from "../dynamicFlushScheduler.server" ;
4
- import type {
5
- CompleteableTaskRun ,
6
- CreateEventInput ,
7
- EventBuilder ,
8
- ExceptionEventProperties ,
9
- IEventRepository ,
10
- RunPreparedEvent ,
11
- SpanDetail ,
12
- SpanSummary ,
13
- TaskEventRecord ,
14
- TraceAttributes ,
15
- TraceDetailedSummary ,
16
- TraceEventOptions ,
17
- TraceSummary ,
18
- } from "./eventRepository.types" ;
19
- import { tracePubSub } from "../services/tracePubSub.server" ;
20
- import type { TaskEventStoreTable } from "../taskEventStore.server" ;
21
- import { logger } from "~/services/logger.server" ;
22
- import {
23
- generateSpanId ,
24
- extractContextFromCarrier ,
25
- getNowInNanoseconds ,
26
- generateDeterministicSpanId ,
27
- calculateDurationFromStart ,
28
- generateTraceId ,
29
- parseEventsField ,
30
- convertDateToNanoseconds ,
31
- createExceptionPropertiesFromError ,
32
- } from "./common.server" ;
3
+ import { createJsonErrorObject } from "@trigger.dev/core/v3/errors" ;
33
4
import { serializeTraceparent } from "@trigger.dev/core/v3/isomorphic" ;
34
5
import {
35
6
AttemptFailedSpanEvent ,
@@ -44,11 +15,39 @@ import {
44
15
TaskEventStyle ,
45
16
TaskRunError ,
46
17
} from "@trigger.dev/core/v3/schemas" ;
18
+ import { SemanticInternalAttributes } from "@trigger.dev/core/v3/semanticInternalAttributes" ;
47
19
import { unflattenAttributes } from "@trigger.dev/core/v3/utils/flattenAttributes" ;
48
20
import { TaskEventLevel } from "@trigger.dev/database" ;
21
+ import { logger } from "~/services/logger.server" ;
22
+ import { DynamicFlushScheduler } from "../dynamicFlushScheduler.server" ;
23
+ import { tracePubSub } from "../services/tracePubSub.server" ;
24
+ import type { TaskEventStoreTable } from "../taskEventStore.server" ;
25
+ import {
26
+ calculateDurationFromStart ,
27
+ calculateDurationFromStartJsDate ,
28
+ convertDateToNanoseconds ,
29
+ createExceptionPropertiesFromError ,
30
+ extractContextFromCarrier ,
31
+ generateDeterministicSpanId ,
32
+ generateSpanId ,
33
+ generateTraceId ,
34
+ getNowInNanoseconds ,
35
+ parseEventsField ,
36
+ } from "./common.server" ;
37
+ import type {
38
+ CompleteableTaskRun ,
39
+ CreateEventInput ,
40
+ EventBuilder ,
41
+ IEventRepository ,
42
+ RunPreparedEvent ,
43
+ SpanDetail ,
44
+ SpanSummary ,
45
+ TraceAttributes ,
46
+ TraceDetailedSummary ,
47
+ TraceEventOptions ,
48
+ TraceSummary ,
49
+ } from "./eventRepository.types" ;
49
50
import { originalRunIdCache } from "./originalRunIdCache.server" ;
50
- import { SemanticInternalAttributes } from "@trigger.dev/core/v3/semanticInternalAttributes" ;
51
- import { createJsonErrorObject } from "@trigger.dev/core/v3/errors" ;
52
51
53
52
export type ClickhouseEventRepositoryConfig = {
54
53
clickhouse : ClickHouse ;
@@ -342,6 +341,7 @@ export class ClickhouseEventRepository implements IEventRepository {
342
341
private createEventToTaskEventV1InputMetadata ( event : CreateEventInput ) : string {
343
342
return JSON . stringify ( {
344
343
style : event . style ? unflattenAttributes ( event . style ) : undefined ,
344
+ attemptNumber : event . attemptNumber ,
345
345
} ) ;
346
346
}
347
347
@@ -783,7 +783,7 @@ export class ClickhouseEventRepository implements IEventRepository {
783
783
return ;
784
784
}
785
785
786
- const startTime = convertDateToNanoseconds ( cancelledAt ) ;
786
+ const startTime = convertDateToNanoseconds ( run . createdAt ) ;
787
787
const expiresAt = convertDateToClickhouseDateTime (
788
788
new Date ( run . createdAt . getTime ( ) + 30 * 24 * 60 * 60 * 1000 )
789
789
) ;
@@ -803,7 +803,9 @@ export class ClickhouseEventRepository implements IEventRepository {
803
803
kind : "SPAN" ,
804
804
status : "CANCELLED" ,
805
805
attributes : { } ,
806
- metadata : "{}" ,
806
+ metadata : JSON . stringify ( {
807
+ reason,
808
+ } ) ,
807
809
expires_at : expiresAt ,
808
810
} ;
809
811
@@ -867,7 +869,7 @@ export class ClickhouseEventRepository implements IEventRepository {
867
869
return acc ;
868
870
} , { } as Record < string , TaskEventSummaryV1Result [ ] > ) ;
869
871
870
- const spanSummaries : Record < string , SpanSummary > = { } ;
872
+ const spanSummaries = new Map < string , SpanSummary > ( ) ;
871
873
let rootSpanId : string | undefined ;
872
874
873
875
for ( const [ spanId , spanRecords ] of Object . entries ( recordsGroupedBySpanId ) ) {
@@ -886,7 +888,7 @@ export class ClickhouseEventRepository implements IEventRepository {
886
888
spanSummary,
887
889
} ) ;
888
890
889
- spanSummaries [ spanId ] = spanSummary ;
891
+ spanSummaries . set ( spanId , spanSummary ) ;
890
892
891
893
if ( ! rootSpanId && ! spanSummary . parentId ) {
892
894
rootSpanId = spanId ;
@@ -901,20 +903,120 @@ export class ClickhouseEventRepository implements IEventRepository {
901
903
return ;
902
904
}
903
905
904
- const spans = Object . values ( spanSummaries ) ;
905
- const rootSpan = spanSummaries [ rootSpanId ] ;
906
+ const spans = Array . from ( spanSummaries . values ( ) ) ;
907
+ const rootSpan = spanSummaries . get ( rootSpanId ) ;
908
+
909
+ if ( ! rootSpan ) {
910
+ logger . debug ( "ClickhouseEventRepository.getTraceSummary no rootSpan" , {
911
+ spanSummaries,
912
+ } ) ;
913
+
914
+ return ;
915
+ }
916
+
917
+ const finalSpans = spans . map ( ( span ) => {
918
+ return this . #applyAncestorOverrides( span , spanSummaries ) ;
919
+ } ) ;
906
920
907
921
logger . info ( "ClickhouseEventRepository.getTraceSummary result" , {
908
922
rootSpan,
909
- spans,
923
+ spans : finalSpans ,
910
924
} ) ;
911
925
912
926
return {
913
927
rootSpan,
914
- spans,
928
+ spans : finalSpans ,
915
929
} ;
916
930
}
917
931
932
+ #applyAncestorOverrides( span : SpanSummary , spansById : Map < string , SpanSummary > ) : SpanSummary {
933
+ if ( span . data . level !== "TRACE" ) {
934
+ return span ;
935
+ }
936
+
937
+ if ( ! span . data . isPartial ) {
938
+ return span ;
939
+ }
940
+
941
+ if ( ! span . parentId ) {
942
+ return span ;
943
+ }
944
+
945
+ // Now we need to walk the ancestors of the span by span.parentId
946
+ // The first ancestor that is a TRACE span that is "closed" we will use to override the span
947
+ let parentSpanId : string | undefined = span . parentId ;
948
+ let overrideSpan : SpanSummary | undefined ;
949
+
950
+ while ( parentSpanId ) {
951
+ const parentSpan = spansById . get ( parentSpanId ) ;
952
+
953
+ if ( ! parentSpan ) {
954
+ break ;
955
+ }
956
+
957
+ if ( parentSpan . data . level === "TRACE" && ! parentSpan . data . isPartial ) {
958
+ overrideSpan = parentSpan ;
959
+ break ;
960
+ }
961
+
962
+ parentSpanId = parentSpan . parentId ;
963
+ }
964
+
965
+ if ( overrideSpan ) {
966
+ return this . #applyAncestorToSpan( span , overrideSpan ) ;
967
+ }
968
+
969
+ return span ;
970
+ }
971
+
972
+ #applyAncestorToSpan( span : SpanSummary , overrideSpan : SpanSummary ) : SpanSummary {
973
+ const overrideEndTime = calculateEndTimeFromStartTime (
974
+ overrideSpan . data . startTime ,
975
+ overrideSpan . data . duration
976
+ ) ;
977
+
978
+ if ( overrideSpan . data . isCancelled ) {
979
+ span . data . isCancelled = true ;
980
+ span . data . isPartial = false ;
981
+ span . data . isError = false ;
982
+ span . data . duration = calculateDurationFromStartJsDate ( span . data . startTime , overrideEndTime ) ;
983
+
984
+ const cancellationEvent = overrideSpan . data . events . find (
985
+ ( event ) => event . name === "cancellation"
986
+ ) ;
987
+
988
+ if ( cancellationEvent ) {
989
+ span . data . events . push ( cancellationEvent ) ;
990
+ }
991
+ }
992
+
993
+ if ( overrideSpan . data . isError && span . data . attemptNumber ) {
994
+ const attemptFailedEvent = overrideSpan . data . events . find (
995
+ ( event ) =>
996
+ event . name === "attempt_failed" &&
997
+ event . properties . attemptNumber === span . data . attemptNumber &&
998
+ event . properties . runId === span . runId
999
+ ) as AttemptFailedSpanEvent | undefined ;
1000
+
1001
+ if ( attemptFailedEvent ) {
1002
+ span . data . isError = true ;
1003
+ span . data . isPartial = false ;
1004
+ span . data . isCancelled = false ;
1005
+ span . data . duration = calculateDurationFromStartJsDate ( span . data . startTime , overrideEndTime ) ;
1006
+ span . data . events . push ( {
1007
+ name : "exception" ,
1008
+ time : attemptFailedEvent . time ,
1009
+ properties : {
1010
+ exception : attemptFailedEvent . properties . exception ,
1011
+ } ,
1012
+ } satisfies ExceptionSpanEvent ) ;
1013
+ span . data . events . push ( attemptFailedEvent ) ;
1014
+ }
1015
+ }
1016
+
1017
+ return span ;
1018
+ }
1019
+
918
1020
#mergeRecordsIntoSpanSummary(
919
1021
spanId : string ,
920
1022
records : TaskEventSummaryV1Result [ ]
@@ -955,6 +1057,14 @@ export class ClickhouseEventRepository implements IEventRepository {
955
1057
956
1058
const parsedMetadata = this . #parseMetadata( record . metadata ) ;
957
1059
1060
+ if (
1061
+ parsedMetadata &&
1062
+ "attemptNumber" in parsedMetadata &&
1063
+ typeof parsedMetadata . attemptNumber === "number"
1064
+ ) {
1065
+ span . data . attemptNumber = parsedMetadata . attemptNumber ;
1066
+ }
1067
+
958
1068
if ( record . kind === "ANCESTOR_OVERRIDE" || record . kind === "SPAN_EVENT" ) {
959
1069
// We need to add an event to the span
960
1070
span . data . events . push ( {
@@ -986,6 +1096,7 @@ export class ClickhouseEventRepository implements IEventRepository {
986
1096
typeof record . duration === "number" ? record . duration : Number ( record . duration ) ;
987
1097
} else {
988
1098
span . data . startTime = convertClickhouseDateTime64ToJsDate ( record . start_time ) ;
1099
+ span . data . message = record . message ;
989
1100
}
990
1101
}
991
1102
}
@@ -1173,3 +1284,7 @@ function kindToLevel(kind: string): TaskEventLevel {
1173
1284
function isLogEvent ( kind : string ) : boolean {
1174
1285
return kind . startsWith ( "LOG_" ) || kind === "DEBUG_EVENT" ;
1175
1286
}
1287
+
1288
+ function calculateEndTimeFromStartTime ( startTime : Date , duration : number ) : Date {
1289
+ return new Date ( startTime . getTime ( ) + duration / 1_000_000 ) ;
1290
+ }
0 commit comments