@@ -754,11 +754,6 @@ describe("RunsReplicationService (part 2/2)", () => {
754754 expect ( queryError ) . toBeNull ( ) ;
755755 expect ( result ?. length ) . toBe ( 10 ) ;
756756
757- console . log ( "Data" , {
758- runsData,
759- result,
760- } ) ;
761-
762757 // Check a few random runs for correctness
763758 for ( let i = 0 ; i < 9 ; i ++ ) {
764759 const expected = runsData [ i ] ;
@@ -810,6 +805,17 @@ describe("RunsReplicationService (part 2/2)", () => {
810805 logger : new Logger ( "runs-replication-merge-batch" , "info" ) ,
811806 } ) ;
812807
808+ // Listen to batchFlushed events to verify merging
809+ const batchFlushedEvents : Array < {
810+ flushId : string ;
811+ taskRunInserts : any [ ] ;
812+ payloadInserts : any [ ] ;
813+ } > = [ ] ;
814+
815+ runsReplicationService . events . on ( "batchFlushed" , ( event ) => {
816+ batchFlushedEvents . push ( event ) ;
817+ } ) ;
818+
813819 await runsReplicationService . start ( ) ;
814820
815821 const organization = await prisma . organization . create ( {
@@ -842,80 +848,57 @@ describe("RunsReplicationService (part 2/2)", () => {
842848
843849 // Create a run and rapidly update it multiple times in a transaction
844850 // This should create multiple events for the same run that get merged
845- const [ taskRun ] = await prisma . $transaction ( async ( tx ) => {
846- const run = await tx . taskRun . create ( {
847- data : {
848- friendlyId : `run_merge_${ Date . now ( ) } ` ,
849- taskIdentifier : "my-task-merge" ,
850- payload : JSON . stringify ( { version : 1 } ) ,
851- payloadType : "application/json" ,
852- traceId : `merge-${ Date . now ( ) } ` ,
853- spanId : `merge-${ Date . now ( ) } ` ,
854- queue : "test-merge-batch" ,
855- runtimeEnvironmentId : runtimeEnvironment . id ,
856- projectId : project . id ,
857- organizationId : organization . id ,
858- environmentType : "DEVELOPMENT" ,
859- engine : "V2" ,
860- status : "PENDING" ,
861- } ,
862- } ) ;
863-
864- await tx . taskRun . update ( {
865- where : { id : run . id } ,
866- data : { status : "EXECUTING" , payload : JSON . stringify ( { version : 2 } ) } ,
867- } ) ;
868-
869- await tx . taskRun . update ( {
870- where : { id : run . id } ,
871- data : { status : "COMPLETED_SUCCESSFULLY" , payload : JSON . stringify ( { version : 3 } ) } ,
872- } ) ;
873-
874- return [ run ] ;
851+ const run = await prisma . taskRun . create ( {
852+ data : {
853+ friendlyId : `run_merge_${ Date . now ( ) } ` ,
854+ taskIdentifier : "my-task-merge" ,
855+ payload : JSON . stringify ( { version : 1 } ) ,
856+ payloadType : "application/json" ,
857+ traceId : `merge-${ Date . now ( ) } ` ,
858+ spanId : `merge-${ Date . now ( ) } ` ,
859+ queue : "test-merge-batch" ,
860+ runtimeEnvironmentId : runtimeEnvironment . id ,
861+ projectId : project . id ,
862+ organizationId : organization . id ,
863+ environmentType : "DEVELOPMENT" ,
864+ engine : "V2" ,
865+ status : "PENDING_VERSION" ,
866+ } ,
875867 } ) ;
876-
877- // Wait for replication
878- await setTimeout ( 2000 ) ;
879-
880- // Query ClickHouse for the run using FINAL
881- const queryRuns = clickhouse . reader . query ( {
882- name : "runs-replication-merge-batch" ,
883- query : "SELECT * FROM trigger_dev.task_runs_v2 FINAL WHERE run_id = {run_id:String}" ,
884- schema : z . any ( ) ,
885- params : z . object ( { run_id : z . string ( ) } ) ,
868+ await prisma . taskRun . update ( {
869+ where : { id : run . id } ,
870+ data : { status : "DEQUEUED" } ,
871+ } ) ;
872+ await prisma . taskRun . update ( {
873+ where : { id : run . id } ,
874+ data : { status : "EXECUTING" } ,
875+ } ) ;
876+ await prisma . taskRun . update ( {
877+ where : { id : run . id } ,
878+ data : { status : "PAUSED" } ,
879+ } ) ;
880+ await prisma . taskRun . update ( {
881+ where : { id : run . id } ,
882+ data : { status : "EXECUTING" } ,
883+ } ) ;
884+ await prisma . taskRun . update ( {
885+ where : { id : run . id } ,
886+ data : { status : "COMPLETED_SUCCESSFULLY" } ,
886887 } ) ;
887888
888- const [ queryError , result ] = await queryRuns ( { run_id : taskRun . id } ) ;
889-
890- expect ( queryError ) . toBeNull ( ) ;
891- expect ( result ?. length ) . toBe ( 1 ) ;
889+ await setTimeout ( 1000 ) ;
892890
893- // Should have the final status from the last update
894- expect ( result ?. [ 0 ] ) . toEqual (
891+ expect ( batchFlushedEvents ?. [ 0 ] . taskRunInserts ) . toHaveLength ( 2 ) ;
892+ expect ( batchFlushedEvents ?. [ 0 ] . taskRunInserts [ 0 ] ) . toEqual (
895893 expect . objectContaining ( {
896- run_id : taskRun . id ,
897- status : "COMPLETED_SUCCESSFULLY " ,
894+ run_id : run . id ,
895+ status : "PENDING_VERSION " ,
898896 } )
899897 ) ;
900-
901- // Check payload was also updated to latest version
902- const queryPayloads = clickhouse . reader . query ( {
903- name : "runs-replication-merge-batch" ,
904- query : "SELECT * FROM trigger_dev.raw_task_runs_payload_v1 WHERE run_id = {run_id:String}" ,
905- schema : z . any ( ) ,
906- params : z . object ( { run_id : z . string ( ) } ) ,
907- } ) ;
908-
909- const [ payloadError , payloadResult ] = await queryPayloads ( { run_id : taskRun . id } ) ;
910-
911- expect ( payloadError ) . toBeNull ( ) ;
912- expect ( payloadResult ?. length ) . toBe ( 1 ) ;
913- expect ( payloadResult ?. [ 0 ] ) . toEqual (
898+ expect ( batchFlushedEvents ?. [ 0 ] . taskRunInserts [ 1 ] ) . toEqual (
914899 expect . objectContaining ( {
915- run_id : taskRun . id ,
916- payload : expect . objectContaining ( {
917- data : { version : 3 } ,
918- } ) ,
900+ run_id : run . id ,
901+ status : "COMPLETED_SUCCESSFULLY" ,
919902 } )
920903 ) ;
921904
@@ -949,6 +932,17 @@ describe("RunsReplicationService (part 2/2)", () => {
949932 logger : new Logger ( "runs-replication-sorting" , "info" ) ,
950933 } ) ;
951934
935+ // Listen to batchFlushed events to verify sorting
936+ const batchFlushedEvents : Array < {
937+ flushId : string ;
938+ taskRunInserts : any [ ] ;
939+ payloadInserts : any [ ] ;
940+ } > = [ ] ;
941+
942+ runsReplicationService . events . on ( "batchFlushed" , ( event ) => {
943+ batchFlushedEvents . push ( event ) ;
944+ } ) ;
945+
952946 await runsReplicationService . start ( ) ;
953947
954948 // Create two organizations to test sorting by organization_id
@@ -1004,85 +998,121 @@ describe("RunsReplicationService (part 2/2)", () => {
1004998
1005999 const now = Date . now ( ) ;
10061000
1007- // Create runs in reverse alphabetical order by organization
1008- // The sorting should put org2 (org-a) before org1 (org-z)
1009- const [ run1 , run2 ] = await prisma . $transaction ( async ( tx ) => {
1010- const run1 = await tx . taskRun . create ( {
1011- data : {
1012- friendlyId : `run_sort_org_z_${ now } ` ,
1013- taskIdentifier : "my-task-sort" ,
1014- payload : JSON . stringify ( { org : "z" } ) ,
1015- payloadType : "application/json" ,
1016- traceId : `sort-z-${ now } ` ,
1017- spanId : `sort-z-${ now } ` ,
1018- queue : "test-sorting" ,
1019- runtimeEnvironmentId : env1 . id ,
1020- projectId : project1 . id ,
1021- organizationId : org1 . id ,
1022- environmentType : "DEVELOPMENT" ,
1023- engine : "V2" ,
1024- status : "PENDING" ,
1025- createdAt : new Date ( now + 100 ) , // Later timestamp
1026- } ,
1027- } ) ;
1028-
1029- const run2 = await tx . taskRun . create ( {
1030- data : {
1031- friendlyId : `run_sort_org_a_${ now } ` ,
1032- taskIdentifier : "my-task-sort" ,
1033- payload : JSON . stringify ( { org : "a" } ) ,
1034- payloadType : "application/json" ,
1035- traceId : `sort-a-${ now } ` ,
1036- spanId : `sort-a-${ now } ` ,
1037- queue : "test-sorting" ,
1038- runtimeEnvironmentId : env2 . id ,
1039- projectId : project2 . id ,
1040- organizationId : org2 . id ,
1041- environmentType : "DEVELOPMENT" ,
1042- engine : "V2" ,
1043- status : "PENDING" ,
1044- createdAt : new Date ( now ) , // Earlier timestamp
1045- } ,
1046- } ) ;
1047-
1048- return [ run1 , run2 ] ;
1001+ const run1 = await prisma . taskRun . create ( {
1002+ data : {
1003+ friendlyId : `run_sort_org_z_${ now } ` ,
1004+ taskIdentifier : "my-task-sort" ,
1005+ payload : JSON . stringify ( { org : "z" } ) ,
1006+ payloadType : "application/json" ,
1007+ traceId : `sort-z-${ now } ` ,
1008+ spanId : `sort-z-${ now } ` ,
1009+ queue : "test-sorting" ,
1010+ runtimeEnvironmentId : env1 . id ,
1011+ projectId : project1 . id ,
1012+ organizationId : org1 . id ,
1013+ environmentType : "DEVELOPMENT" ,
1014+ engine : "V2" ,
1015+ status : "PENDING" ,
1016+ createdAt : new Date ( now + 2000 ) ,
1017+ } ,
1018+ } ) ;
1019+ await prisma . taskRun . update ( {
1020+ where : { id : run1 . id } ,
1021+ data : { status : "DEQUEUED" } ,
10491022 } ) ;
10501023
1051- // Wait for replication
1052- await setTimeout ( 2000 ) ;
1053-
1054- // Query ClickHouse for both runs
1055- const queryRuns = clickhouse . reader . query ( {
1056- name : "runs-replication-sorting" ,
1057- query : `SELECT run_id, organization_id, project_id, environment_id, created_at, friendly_id
1058- FROM trigger_dev.task_runs_v2 FINAL
1059- WHERE run_id IN ({run_id_1:String}, {run_id_2:String})
1060- ORDER BY organization_id, project_id, environment_id, created_at, run_id` ,
1061- schema : z . any ( ) ,
1062- params : z . object ( { run_id_1 : z . string ( ) , run_id_2 : z . string ( ) } ) ,
1024+ await prisma . taskRun . create ( {
1025+ data : {
1026+ friendlyId : `run_sort_org_a_${ now } ` ,
1027+ taskIdentifier : "my-task-sort" ,
1028+ payload : JSON . stringify ( { org : "a" } ) ,
1029+ payloadType : "application/json" ,
1030+ traceId : `sort-a-${ now } ` ,
1031+ spanId : `sort-a-${ now } ` ,
1032+ queue : "test-sorting" ,
1033+ runtimeEnvironmentId : env2 . id ,
1034+ projectId : project2 . id ,
1035+ organizationId : org2 . id ,
1036+ environmentType : "DEVELOPMENT" ,
1037+ engine : "V2" ,
1038+ status : "PENDING" ,
1039+ createdAt : new Date ( now + 1000 ) ,
1040+ } ,
10631041 } ) ;
10641042
1065- const [ queryError , result ] = await queryRuns ( { run_id_1 : run1 . id , run_id_2 : run2 . id } ) ;
1043+ await prisma . taskRun . create ( {
1044+ data : {
1045+ friendlyId : `run_sort_org_a_${ now } _2` ,
1046+ taskIdentifier : "my-task-sort" ,
1047+ payload : JSON . stringify ( { org : "a" } ) ,
1048+ payloadType : "application/json" ,
1049+ traceId : `sort-a-${ now } ` ,
1050+ spanId : `sort-a-${ now } ` ,
1051+ queue : "test-sorting" ,
1052+ runtimeEnvironmentId : env2 . id ,
1053+ projectId : project2 . id ,
1054+ organizationId : org2 . id ,
1055+ environmentType : "DEVELOPMENT" ,
1056+ engine : "V2" ,
1057+ status : "PENDING" ,
1058+ createdAt : new Date ( now ) ,
1059+ } ,
1060+ } ) ;
10661061
1067- expect ( queryError ) . toBeNull ( ) ;
1068- expect ( result ?. length ) . toBe ( 2 ) ;
1062+ await setTimeout ( 1000 ) ;
10691063
1070- // Due to sorting, org2 (org-a) should come first even though it was created second
1071- expect ( result ?. [ 0 ] ) . toEqual (
1072- expect . objectContaining ( {
1073- run_id : run2 . id ,
1074- organization_id : org2 . id ,
1075- friendly_id : `run_sort_org_a_${ now } ` ,
1076- } )
1077- ) ;
1064+ expect ( batchFlushedEvents [ 0 ] ?. taskRunInserts . length ) . toBeGreaterThan ( 1 ) ;
1065+ expect ( batchFlushedEvents [ 0 ] ?. payloadInserts . length ) . toBeGreaterThan ( 1 ) ;
1066+
1067+ // Verify sorting order: organization_id, project_id, environment_id, created_at, run_id
1068+ for ( let i = 1 ; i < batchFlushedEvents [ 0 ] ?. taskRunInserts . length ; i ++ ) {
1069+ const prev = batchFlushedEvents [ 0 ] ?. taskRunInserts [ i - 1 ] ;
1070+ const curr = batchFlushedEvents [ 0 ] ?. taskRunInserts [ i ] ;
1071+
1072+ const prevKey = [
1073+ prev . organization_id ,
1074+ prev . project_id ,
1075+ prev . environment_id ,
1076+ prev . created_at ,
1077+ prev . run_id ,
1078+ ] ;
1079+ const currKey = [
1080+ curr . organization_id ,
1081+ curr . project_id ,
1082+ curr . environment_id ,
1083+ curr . created_at ,
1084+ curr . run_id ,
1085+ ] ;
1086+
1087+ const keysAreEqual = prevKey . every ( ( val , idx ) => val === currKey [ idx ] ) ;
1088+ if ( keysAreEqual ) {
1089+ // Also valid order
1090+ continue ;
1091+ }
1092+
1093+ // Compare tuples lexicographically
1094+ let isCorrectOrder = false ;
1095+ for ( let j = 0 ; j < prevKey . length ; j ++ ) {
1096+ if ( prevKey [ j ] < currKey [ j ] ) {
1097+ isCorrectOrder = true ;
1098+ break ;
1099+ }
1100+ if ( prevKey [ j ] > currKey [ j ] ) {
1101+ isCorrectOrder = false ;
1102+ break ;
1103+ }
1104+ // If equal, continue to next field
1105+ }
1106+
1107+ expect ( isCorrectOrder ) . toBeTruthy ( ) ;
1108+ }
10781109
1079- expect ( result ?. [ 1 ] ) . toEqual (
1080- expect . objectContaining ( {
1081- run_id : run1 . id ,
1082- organization_id : org1 . id ,
1083- friendly_id : `run_sort_org_z_${ now } ` ,
1084- } )
1085- ) ;
1110+ // Verify payloadInserts are also sorted by run_id
1111+ for ( let i = 1 ; i < batchFlushedEvents [ 0 ] ?. payloadInserts . length ; i ++ ) {
1112+ const prev = batchFlushedEvents [ 0 ] ?. payloadInserts [ i - 1 ] ;
1113+ const curr = batchFlushedEvents [ 0 ] ?. payloadInserts [ i ] ;
1114+ expect ( prev . run_id <= curr . run_id ) . toBeTruthy ( ) ;
1115+ }
10861116
10871117 await runsReplicationService . stop ( ) ;
10881118 }
0 commit comments