11import { existsSync } from "node:fs"
22import { fileURLToPath } from "node:url"
33import { createPollingOutboxConsumer } from "@platform/events-outbox"
4- import { createBullmqEventsPublisher } from "@platform/queue-bullmq"
4+ import {
5+ createKafkaClient ,
6+ createRedpandaEventsConsumer ,
7+ createRedpandaEventsPublisher ,
8+ loadKafkaConfig ,
9+ } from "@platform/queue-redpanda"
510import { createLogger } from "@repo/observability"
611import { config as loadDotenv } from "dotenv"
7- import { getPostgresPool , getRedisConnection } from "./clients.ts "
8- import { createEventsWorker } from "./workers/events .ts"
12+ import { Effect } from "effect "
13+ import { getPostgresPool } from "./clients .ts"
914
1015const nodeEnv = process . env . NODE_ENV || "development"
1116const envFilePath = fileURLToPath ( new URL ( `../../../.env.${ nodeEnv } ` , import . meta. url ) )
@@ -14,31 +19,75 @@ if (existsSync(envFilePath)) {
1419 loadDotenv ( { path : envFilePath , quiet : true } )
1520}
1621
17- const redisConnection = getRedisConnection ( )
1822const pgPool = getPostgresPool ( 10 )
19- const { queue : eventsQueue , worker : eventsWorker } = createEventsWorker ( redisConnection )
20- const eventsPublisher = createBullmqEventsPublisher ( { queue : eventsQueue } )
21- const outboxConsumer = createPollingOutboxConsumer (
22- {
23- pool : pgPool ,
24- pollIntervalMs : 1000 ,
25- batchSize : 100 ,
26- } ,
27- eventsPublisher ,
28- )
2923
3024const logger = createLogger ( "workers" )
3125
32- eventsWorker . on ( "ready" , ( ) => {
26+ // Simple event handler that logs events (placeholder for actual side effect processing)
27+ const eventHandler = {
28+ handle : ( event : { id : string ; event : { name : string ; organizationId : string } } ) : Effect . Effect <
29+ void ,
30+ unknown ,
31+ never
32+ > =>
33+ Effect . gen ( function * ( ) {
34+ yield * Effect . logInfo (
35+ `Processing event ${ event . id } of type ${ event . event . name } for org ${ event . event . organizationId } ` ,
36+ )
37+ } ) ,
38+ }
39+
40+ // Initialize workers asynchronously
41+ const initializeWorkers = async ( ) => {
42+ // Load Redpanda configuration
43+ const kafkaConfig = Effect . runSync ( loadKafkaConfig ( ) )
44+ const kafkaClient = createKafkaClient ( kafkaConfig )
45+
46+ // Create Redpanda publisher (async initialization)
47+ const eventsPublisher = await createRedpandaEventsPublisher ( {
48+ kafka : kafkaClient ,
49+ config : kafkaConfig ,
50+ } )
51+
52+ // Create outbox consumer (polls Postgres outbox and publishes to Redpanda)
53+ const outboxConsumer = createPollingOutboxConsumer (
54+ {
55+ pool : pgPool ,
56+ pollIntervalMs : 1000 ,
57+ batchSize : 100 ,
58+ } ,
59+ eventsPublisher ,
60+ )
61+
62+ // Create Redpanda event consumer (consumes from Redpanda and processes events)
63+ const redpandaConsumer = createRedpandaEventsConsumer ( {
64+ kafka : kafkaClient ,
65+ config : kafkaConfig ,
66+ } )
67+
68+ // Start consumers
3369 outboxConsumer . start ( )
70+ await redpandaConsumer . start ( eventHandler )
71+ logger . info ( "workers ready - outbox consumer and Redpanda consumer started" )
72+
73+ return { outboxConsumer, redpandaConsumer }
74+ }
3475
35- logger . info ( "workers ready and outbox consumer started" )
76+ const workersPromise = initializeWorkers ( ) . catch ( ( error ) => {
77+ logger . error ( "Failed to initialize workers" , error )
78+ process . exit ( 1 )
3679} )
3780
81+ // Graceful shutdown
3882process . on ( "SIGINT" , async ( ) => {
39- await outboxConsumer . stop ( )
83+ logger . info ( "shutting down workers..." )
84+ try {
85+ const { outboxConsumer, redpandaConsumer } = await workersPromise
86+ await outboxConsumer . stop ( )
87+ await redpandaConsumer . stop ( )
88+ } catch ( error ) {
89+ logger . error ( "Error during shutdown (workers may not have started)" , error )
90+ }
4091 await pgPool . end ( )
41- await eventsQueue . close ( )
42- await eventsWorker . close ( )
4392 process . exit ( 0 )
4493} )
0 commit comments