1
1
import type {
2
2
ClickHouse ,
3
+ TaskEventDetailedSummaryV1Result ,
3
4
TaskEventDetailsV1Result ,
4
5
TaskEventSummaryV1Result ,
5
6
TaskEventV1Input ,
@@ -48,8 +49,10 @@ import type {
48
49
IEventRepository ,
49
50
RunPreparedEvent ,
50
51
SpanDetail ,
52
+ SpanDetailedSummary ,
51
53
SpanOverride ,
52
54
SpanSummary ,
55
+ SpanSummaryCommon ,
53
56
TraceAttributes ,
54
57
TraceDetailedSummary ,
55
58
TraceEventOptions ,
@@ -891,13 +894,6 @@ export class ClickhouseEventRepository implements IEventRepository {
891
894
queryBuilder . limit ( this . _config . maximumTraceSummaryViewCount ) ;
892
895
}
893
896
894
- const { query, params } = queryBuilder . build ( ) ;
895
-
896
- logger . debug ( "ClickhouseEventRepository.getTraceSummary query" , {
897
- query,
898
- params,
899
- } ) ;
900
-
901
897
const [ queryError , records ] = await queryBuilder . execute ( ) ;
902
898
903
899
if ( queryError ) {
@@ -908,10 +904,6 @@ export class ClickhouseEventRepository implements IEventRepository {
908
904
return ;
909
905
}
910
906
911
- logger . info ( "ClickhouseEventRepository.getTraceSummary" , {
912
- records,
913
- } ) ;
914
-
915
907
const recordsGroupedBySpanId = records . reduce ( ( acc , record ) => {
916
908
acc [ record . span_id ] = [ ...( acc [ record . span_id ] ?? [ ] ) , record ] ;
917
909
return acc ;
@@ -921,21 +913,12 @@ export class ClickhouseEventRepository implements IEventRepository {
921
913
let rootSpanId : string | undefined ;
922
914
923
915
for ( const [ spanId , spanRecords ] of Object . entries ( recordsGroupedBySpanId ) ) {
924
- logger . debug ( "ClickhouseEventRepository.getTraceSummary recordsGroupedBySpanId" , {
925
- spanId,
926
- spanRecords,
927
- } ) ;
928
-
929
916
const spanSummary = this . #mergeRecordsIntoSpanSummary( spanId , spanRecords ) ;
930
917
931
918
if ( ! spanSummary ) {
932
919
continue ;
933
920
}
934
921
935
- logger . debug ( "ClickhouseEventRepository.getTraceSummary spanSummary" , {
936
- spanSummary,
937
- } ) ;
938
-
939
922
spanSummaries . set ( spanId , spanSummary ) ;
940
923
941
924
if ( ! rootSpanId && ! spanSummary . parentId ) {
@@ -944,21 +927,13 @@ export class ClickhouseEventRepository implements IEventRepository {
944
927
}
945
928
946
929
if ( ! rootSpanId ) {
947
- logger . debug ( "ClickhouseEventRepository.getTraceSummary no rootSpanId" , {
948
- spanSummaries,
949
- } ) ;
950
-
951
930
return ;
952
931
}
953
932
954
933
const spans = Array . from ( spanSummaries . values ( ) ) ;
955
934
const rootSpan = spanSummaries . get ( rootSpanId ) ;
956
935
957
936
if ( ! rootSpan ) {
958
- logger . debug ( "ClickhouseEventRepository.getTraceSummary no rootSpan" , {
959
- spanSummaries,
960
- } ) ;
961
-
962
937
return ;
963
938
}
964
939
@@ -968,11 +943,6 @@ export class ClickhouseEventRepository implements IEventRepository {
968
943
return this . #applyAncestorOverrides( span , spanSummaries , overridesBySpanId ) ;
969
944
} ) ;
970
945
971
- logger . info ( "ClickhouseEventRepository.getTraceSummary result" , {
972
- rootSpan,
973
- spans : finalSpans ,
974
- } ) ;
975
-
976
946
return {
977
947
rootSpan,
978
948
spans : finalSpans ,
@@ -1008,13 +978,6 @@ export class ClickhouseEventRepository implements IEventRepository {
1008
978
1009
979
queryBuilder . orderBy ( "start_time ASC" ) ;
1010
980
1011
- const { query, params } = queryBuilder . build ( ) ;
1012
-
1013
- logger . debug ( "ClickhouseEventRepository.getSpan query" , {
1014
- query,
1015
- params,
1016
- } ) ;
1017
-
1018
981
const [ queryError , records ] = await queryBuilder . execute ( ) ;
1019
982
1020
983
if ( queryError ) {
@@ -1027,13 +990,20 @@ export class ClickhouseEventRepository implements IEventRepository {
1027
990
1028
991
const span = this . #mergeRecordsIntoSpanDetail( spanId , records ) ;
1029
992
1030
- logger . info ( "ClickhouseEventRepository.getSpan" , {
1031
- span,
1032
- } ) ;
1033
-
1034
993
return span ;
1035
994
}
1036
995
996
+ async getSpanOriginalRunId (
997
+ storeTable : TaskEventStoreTable ,
998
+ environmentId : string ,
999
+ spanId : string ,
1000
+ traceId : string ,
1001
+ startCreatedAt : Date ,
1002
+ endCreatedAt ?: Date
1003
+ ) : Promise < string | undefined > {
1004
+ return await originalRunIdCache . lookup ( traceId , spanId ) ;
1005
+ }
1006
+
1037
1007
#mergeRecordsIntoSpanDetail(
1038
1008
spanId : string ,
1039
1009
records : TaskEventDetailsV1Result [ ]
@@ -1132,11 +1102,11 @@ export class ClickhouseEventRepository implements IEventRepository {
1132
1102
return span ;
1133
1103
}
1134
1104
1135
- #applyAncestorOverrides(
1136
- span : SpanSummary ,
1137
- spansById : Map < string , SpanSummary > ,
1105
+ #applyAncestorOverrides< TSpanSummary extends SpanSummaryCommon > (
1106
+ span : TSpanSummary ,
1107
+ spansById : Map < string , TSpanSummary > ,
1138
1108
overridesBySpanId : Record < string , SpanOverride >
1139
- ) : SpanSummary {
1109
+ ) : TSpanSummary {
1140
1110
if ( span . data . level !== "TRACE" ) {
1141
1111
return span ;
1142
1112
}
@@ -1152,7 +1122,7 @@ export class ClickhouseEventRepository implements IEventRepository {
1152
1122
// Now we need to walk the ancestors of the span by span.parentId
1153
1123
// The first ancestor that is a TRACE span that is "closed" we will use to override the span
1154
1124
let parentSpanId : string | undefined = span . parentId ;
1155
- let overrideSpan : SpanSummary | undefined ;
1125
+ let overrideSpan : TSpanSummary | undefined ;
1156
1126
1157
1127
while ( parentSpanId ) {
1158
1128
const parentSpan = spansById . get ( parentSpanId ) ;
@@ -1176,11 +1146,11 @@ export class ClickhouseEventRepository implements IEventRepository {
1176
1146
return span ;
1177
1147
}
1178
1148
1179
- #applyAncestorToSpan(
1180
- span : SpanSummary ,
1181
- overrideSpan : SpanSummary ,
1149
+ #applyAncestorToSpan< TSpanSummary extends SpanSummaryCommon > (
1150
+ span : TSpanSummary ,
1151
+ overrideSpan : TSpanSummary ,
1182
1152
overridesBySpanId : Record < string , SpanOverride >
1183
- ) : SpanSummary {
1153
+ ) : TSpanSummary {
1184
1154
if ( overridesBySpanId [ span . id ] ) {
1185
1155
return span ;
1186
1156
}
@@ -1361,7 +1331,182 @@ export class ClickhouseEventRepository implements IEventRepository {
1361
1331
endCreatedAt ?: Date ,
1362
1332
options ?: { includeDebugLogs ?: boolean }
1363
1333
) : Promise < TraceDetailedSummary | undefined > {
1364
- throw new Error ( "ClickhouseEventRepository.getTraceDetailedSummary not implemented" ) ;
1334
+ const startCreatedAtWithBuffer = new Date ( startCreatedAt . getTime ( ) - 1000 ) ;
1335
+
1336
+ const queryBuilder = this . _clickhouse . taskEvents . traceDetailedSummaryQueryBuilder ( ) ;
1337
+
1338
+ queryBuilder . where ( "environment_id = {environmentId: String}" , { environmentId } ) ;
1339
+ queryBuilder . where ( "trace_id = {traceId: String}" , { traceId } ) ;
1340
+ queryBuilder . where ( "start_time >= {startCreatedAt: String}" , {
1341
+ startCreatedAt : convertDateToNanoseconds ( startCreatedAtWithBuffer ) . toString ( ) ,
1342
+ } ) ;
1343
+
1344
+ if ( endCreatedAt ) {
1345
+ queryBuilder . where ( "start_time <= {endCreatedAt: String}" , {
1346
+ endCreatedAt : convertDateToNanoseconds ( endCreatedAt ) . toString ( ) ,
1347
+ } ) ;
1348
+ }
1349
+
1350
+ if ( options ?. includeDebugLogs === false ) {
1351
+ queryBuilder . where ( "kind != {kind: String}" , { kind : "DEBUG_EVENT" } ) ;
1352
+ }
1353
+
1354
+ queryBuilder . orderBy ( "start_time ASC" ) ;
1355
+
1356
+ if ( this . _config . maximumTraceSummaryViewCount ) {
1357
+ queryBuilder . limit ( this . _config . maximumTraceSummaryViewCount ) ;
1358
+ }
1359
+
1360
+ const [ queryError , records ] = await queryBuilder . execute ( ) ;
1361
+
1362
+ if ( queryError ) {
1363
+ throw queryError ;
1364
+ }
1365
+
1366
+ if ( ! records ) {
1367
+ return ;
1368
+ }
1369
+
1370
+ const recordsGroupedBySpanId = records . reduce ( ( acc , record ) => {
1371
+ acc [ record . span_id ] = [ ...( acc [ record . span_id ] ?? [ ] ) , record ] ;
1372
+ return acc ;
1373
+ } , { } as Record < string , TaskEventDetailedSummaryV1Result [ ] > ) ;
1374
+
1375
+ const spanSummaries = new Map < string , SpanDetailedSummary > ( ) ;
1376
+ let rootSpanId : string | undefined ;
1377
+
1378
+ for ( const [ spanId , spanRecords ] of Object . entries ( recordsGroupedBySpanId ) ) {
1379
+ const spanSummary = this . #mergeRecordsIntoSpanDetailedSummary( spanId , spanRecords ) ;
1380
+
1381
+ if ( ! spanSummary ) {
1382
+ continue ;
1383
+ }
1384
+
1385
+ spanSummaries . set ( spanId , spanSummary ) ;
1386
+
1387
+ if ( ! rootSpanId && ! spanSummary . parentId ) {
1388
+ rootSpanId = spanId ;
1389
+ }
1390
+ }
1391
+
1392
+ if ( ! rootSpanId ) {
1393
+ return ;
1394
+ }
1395
+
1396
+ const spans = Array . from ( spanSummaries . values ( ) ) ;
1397
+
1398
+ const overridesBySpanId : Record < string , SpanOverride > = { } ;
1399
+ const spanDetailedSummaryMap = new Map < string , SpanDetailedSummary > ( ) ;
1400
+
1401
+ const finalSpans = spans . map ( ( span ) => {
1402
+ const finalSpan = this . #applyAncestorOverrides( span , spanSummaries , overridesBySpanId ) ;
1403
+ spanDetailedSummaryMap . set ( span . id , finalSpan ) ;
1404
+ return finalSpan ;
1405
+ } ) ;
1406
+
1407
+ // Second pass: build parent-child relationships
1408
+ for ( const finalSpan of finalSpans ) {
1409
+ if ( finalSpan . parentId ) {
1410
+ const parent = spanDetailedSummaryMap . get ( finalSpan . parentId ) ;
1411
+ if ( parent ) {
1412
+ parent . children . push ( finalSpan ) ;
1413
+ }
1414
+ }
1415
+ }
1416
+
1417
+ const rootSpan = spanDetailedSummaryMap . get ( rootSpanId ) ;
1418
+
1419
+ if ( ! rootSpan ) {
1420
+ return ;
1421
+ }
1422
+
1423
+ return {
1424
+ traceId,
1425
+ rootSpan,
1426
+ } ;
1427
+ }
1428
+
1429
+ #mergeRecordsIntoSpanDetailedSummary(
1430
+ spanId : string ,
1431
+ records : TaskEventDetailedSummaryV1Result [ ]
1432
+ ) : SpanDetailedSummary | undefined {
1433
+ if ( records . length === 0 ) {
1434
+ return undefined ;
1435
+ }
1436
+
1437
+ let span : SpanDetailedSummary | undefined ;
1438
+
1439
+ for ( const record of records ) {
1440
+ if ( ! span ) {
1441
+ span = {
1442
+ id : spanId ,
1443
+ parentId : record . parent_span_id ? record . parent_span_id : undefined ,
1444
+ runId : record . run_id ,
1445
+ data : {
1446
+ message : record . message ,
1447
+ taskSlug : undefined ,
1448
+ duration :
1449
+ typeof record . duration === "number" ? record . duration : Number ( record . duration ) ,
1450
+ isError : false ,
1451
+ isPartial : true , // Partial by default, can only be set to false
1452
+ isCancelled : false ,
1453
+ startTime : convertClickhouseDateTime64ToJsDate ( record . start_time ) ,
1454
+ level : kindToLevel ( record . kind ) ,
1455
+ events : [ ] ,
1456
+ } ,
1457
+ children : [ ] ,
1458
+ } ;
1459
+ }
1460
+
1461
+ if ( isLogEvent ( record . kind ) ) {
1462
+ span . data . isPartial = false ;
1463
+ span . data . isCancelled = false ;
1464
+ span . data . isError = record . status === "ERROR" ;
1465
+ }
1466
+
1467
+ const parsedMetadata = this . #parseMetadata( record . metadata ) ;
1468
+
1469
+ if (
1470
+ parsedMetadata &&
1471
+ "attemptNumber" in parsedMetadata &&
1472
+ typeof parsedMetadata . attemptNumber === "number"
1473
+ ) {
1474
+ span . data . attemptNumber = parsedMetadata . attemptNumber ;
1475
+ }
1476
+
1477
+ if ( record . kind === "ANCESTOR_OVERRIDE" || record . kind === "SPAN_EVENT" ) {
1478
+ // We need to add an event to the span
1479
+ span . data . events . push ( {
1480
+ name : record . message ,
1481
+ time : convertClickhouseDateTime64ToJsDate ( record . start_time ) ,
1482
+ properties : parsedMetadata ?? { } ,
1483
+ } ) ;
1484
+ }
1485
+
1486
+ if ( record . kind === "SPAN" ) {
1487
+ if ( record . status === "ERROR" ) {
1488
+ span . data . isError = true ;
1489
+ span . data . isPartial = false ;
1490
+ span . data . isCancelled = false ;
1491
+ } else if ( record . status === "CANCELLED" ) {
1492
+ span . data . isCancelled = true ;
1493
+ span . data . isPartial = false ;
1494
+ span . data . isError = false ;
1495
+ } else if ( record . status === "OK" ) {
1496
+ span . data . isPartial = false ;
1497
+ }
1498
+
1499
+ if ( record . status !== "PARTIAL" ) {
1500
+ span . data . duration =
1501
+ typeof record . duration === "number" ? record . duration : Number ( record . duration ) ;
1502
+ } else {
1503
+ span . data . startTime = convertClickhouseDateTime64ToJsDate ( record . start_time ) ;
1504
+ span . data . message = record . message ;
1505
+ }
1506
+ }
1507
+ }
1508
+
1509
+ return span ;
1365
1510
}
1366
1511
1367
1512
async getRunEvents (
@@ -1373,17 +1518,6 @@ export class ClickhouseEventRepository implements IEventRepository {
1373
1518
) : Promise < RunPreparedEvent [ ] > {
1374
1519
throw new Error ( "ClickhouseEventRepository.getRunEvents not implemented" ) ;
1375
1520
}
1376
-
1377
- async getSpanOriginalRunId (
1378
- storeTable : TaskEventStoreTable ,
1379
- environmentId : string ,
1380
- spanId : string ,
1381
- traceId : string ,
1382
- startCreatedAt : Date ,
1383
- endCreatedAt ?: Date
1384
- ) : Promise < string | undefined > {
1385
- return await originalRunIdCache . lookup ( traceId , spanId ) ;
1386
- }
1387
1521
}
1388
1522
1389
1523
export const convertDateToClickhouseDateTime = ( date : Date ) : string => {
0 commit comments