@@ -6,6 +6,7 @@ import { z } from "zod";
66import { TaskRunStatus } from "~/database-types" ;
77import { RunsReplicationService } from "~/services/runsReplicationService.server" ;
88import { createInMemoryTracing } from "./utils/tracing" ;
9+ import superjson from "superjson" ;
910
1011vi . setConfig ( { testTimeout : 60_000 } ) ;
1112
@@ -133,6 +134,147 @@ describe("RunsReplicationService", () => {
133134 }
134135 ) ;
135136
137+ containerTest (
138+ "should replicate runs with super json payloads to clickhouse" ,
139+ async ( { clickhouseContainer, redisOptions, postgresContainer, prisma } ) => {
140+ await prisma . $executeRawUnsafe ( `ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;` ) ;
141+
142+ const clickhouse = new ClickHouse ( {
143+ url : clickhouseContainer . getConnectionUrl ( ) ,
144+ name : "runs-replication" ,
145+ compression : {
146+ request : true ,
147+ } ,
148+ } ) ;
149+
150+ const { tracer, exporter } = createInMemoryTracing ( ) ;
151+
152+ const runsReplicationService = new RunsReplicationService ( {
153+ clickhouse,
154+ pgConnectionUrl : postgresContainer . getConnectionUri ( ) ,
155+ serviceName : "runs-replication" ,
156+ slotName : "task_runs_to_clickhouse_v1" ,
157+ publicationName : "task_runs_to_clickhouse_v1_publication" ,
158+ redisOptions,
159+ maxFlushConcurrency : 1 ,
160+ flushIntervalMs : 100 ,
161+ flushBatchSize : 1 ,
162+ leaderLockTimeoutMs : 5000 ,
163+ leaderLockExtendIntervalMs : 1000 ,
164+ ackIntervalSeconds : 5 ,
165+ tracer,
166+ } ) ;
167+
168+ await runsReplicationService . start ( ) ;
169+
170+ const organization = await prisma . organization . create ( {
171+ data : {
172+ title : "test" ,
173+ slug : "test" ,
174+ } ,
175+ } ) ;
176+
177+ const project = await prisma . project . create ( {
178+ data : {
179+ name : "test" ,
180+ slug : "test" ,
181+ organizationId : organization . id ,
182+ externalRef : "test" ,
183+ } ,
184+ } ) ;
185+
186+ const runtimeEnvironment = await prisma . runtimeEnvironment . create ( {
187+ data : {
188+ slug : "test" ,
189+ type : "DEVELOPMENT" ,
190+ projectId : project . id ,
191+ organizationId : organization . id ,
192+ apiKey : "test" ,
193+ pkApiKey : "test" ,
194+ shortcode : "test" ,
195+ } ,
196+ } ) ;
197+
198+ const date = new Date ( ) ;
199+
200+ // Now we insert a row into the table
201+ const taskRun = await prisma . taskRun . create ( {
202+ data : {
203+ friendlyId : "run_1234" ,
204+ taskIdentifier : "my-task" ,
205+ payload : superjson . stringify ( {
206+ foo : "bar" ,
207+ bigint : BigInt ( 1234 ) ,
208+ date,
209+ map : new Map ( [ [ "foo" , "bar" ] ] ) ,
210+ } ) ,
211+ payloadType : "application/super+json" ,
212+ traceId : "1234" ,
213+ spanId : "1234" ,
214+ queue : "test" ,
215+ runtimeEnvironmentId : runtimeEnvironment . id ,
216+ projectId : project . id ,
217+ organizationId : organization . id ,
218+ environmentType : "DEVELOPMENT" ,
219+ engine : "V2" ,
220+ } ,
221+ } ) ;
222+
223+ await setTimeout ( 1000 ) ;
224+
225+ // Check that the row was replicated to clickhouse
226+ const queryRuns = clickhouse . reader . query ( {
227+ name : "runs-replication" ,
228+ query : "SELECT * FROM trigger_dev.task_runs_v2" ,
229+ schema : z . any ( ) ,
230+ } ) ;
231+
232+ const [ queryError , result ] = await queryRuns ( { } ) ;
233+
234+ expect ( queryError ) . toBeNull ( ) ;
235+ expect ( result ?. length ) . toBe ( 1 ) ;
236+ expect ( result ?. [ 0 ] ) . toEqual (
237+ expect . objectContaining ( {
238+ run_id : taskRun . id ,
239+ friendly_id : taskRun . friendlyId ,
240+ task_identifier : taskRun . taskIdentifier ,
241+ environment_id : runtimeEnvironment . id ,
242+ project_id : project . id ,
243+ organization_id : organization . id ,
244+ environment_type : "DEVELOPMENT" ,
245+ engine : "V2" ,
246+ } )
247+ ) ;
248+
249+ const queryPayloads = clickhouse . reader . query ( {
250+ name : "runs-replication" ,
251+ query : "SELECT * FROM trigger_dev.raw_task_runs_payload_v1 WHERE run_id = {run_id:String}" ,
252+ schema : z . any ( ) ,
253+ params : z . object ( { run_id : z . string ( ) } ) ,
254+ } ) ;
255+
256+ const [ payloadQueryError , payloadResult ] = await queryPayloads ( { run_id : taskRun . id } ) ;
257+
258+ expect ( payloadQueryError ) . toBeNull ( ) ;
259+ expect ( payloadResult ?. length ) . toBe ( 1 ) ;
260+ expect ( payloadResult ?. [ 0 ] ) . toEqual (
261+ expect . objectContaining ( {
262+ run_id : taskRun . id ,
263+ payload : {
264+ data : expect . objectContaining ( {
265+ foo : "bar" ,
266+ bigint : "1234" ,
267+ date : date . toISOString ( ) ,
268+ map : [ [ "foo" , "bar" ] ] ,
269+ } ) ,
270+ } ,
271+ } )
272+ ) ;
273+
274+ await runsReplicationService . stop ( ) ;
275+ }
276+ ) ;
277+
136278 containerTest (
137279 "should not produce any handle_transaction spans when no TaskRun events are produced" ,
138280 async ( { clickhouseContainer, redisOptions, postgresContainer, prisma } ) => {
0 commit comments