Skip to content

Commit ee7ec15

Browse files
committed
move topics to code
1 parent 19aa134 commit ee7ec15

File tree

10 files changed

+21
-30
lines changed

10 files changed

+21
-30
lines changed

.env.example

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,10 +79,6 @@ LAT_KAFKA_SSL=false
7979
# Consumer Configuration
8080
LAT_KAFKA_CONSUMER_GROUP_ID=latitude-event-processors-v1
8181

82-
# Topic Configuration
83-
LAT_KAFKA_EVENTS_TOPIC=domain-events
84-
LAT_KAFKA_DLQ_TOPIC=domain-events-dlq
85-
8682
# Object Storage
8783
LAT_STORAGE_DRIVER=fs
8884
# Absolute path in your laptop

apps/workers/src/server.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ const initializeWorkers = async () => {
4646
// Create Redpanda publisher (async initialization)
4747
const eventsPublisher = await createRedpandaEventsPublisher({
4848
kafka: kafkaClient,
49-
config: kafkaConfig,
5049
})
5150

5251
// Create outbox consumer (polls Postgres outbox and publishes to Redpanda)
@@ -62,7 +61,7 @@ const initializeWorkers = async () => {
6261
// Create Redpanda event consumer (consumes from Redpanda and processes events)
6362
const redpandaConsumer = createRedpandaEventsConsumer({
6463
kafka: kafkaClient,
65-
config: kafkaConfig,
64+
groupId: kafkaConfig.groupId,
6665
})
6766

6867
// Start consumers

docker-compose.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ services:
6363
volumes:
6464
- redpanda_data:/var/lib/redpanda/data
6565
healthcheck:
66-
test: ["CMD-SHELL", "rpk cluster health --brokers localhost:29092"]
66+
test: ["CMD-SHELL", "curl -sf http://localhost:9644/v1/brokers || exit 1"]
6767
interval: 5s
6868
timeout: 10s
6969
retries: 12
@@ -75,12 +75,12 @@ services:
7575
redpanda:
7676
condition: service_healthy
7777
command:
78-
- rpk
7978
- topic
8079
- create
8180
- domain-events
8281
- domain-events-dlq
83-
- --brokers=redpanda:29092
82+
- -X
83+
- brokers=redpanda:29092
8484
- --partitions=1
8585
- --replicas=1
8686
restart: "no"

packages/platform/queue-redpanda/src/config.test.ts

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@ const ENV_KEYS = [
66
"LAT_KAFKA_BROKERS",
77
"LAT_KAFKA_CLIENT_ID",
88
"LAT_KAFKA_CONSUMER_GROUP_ID",
9-
"LAT_KAFKA_EVENTS_TOPIC",
10-
"LAT_KAFKA_DLQ_TOPIC",
119
"LAT_KAFKA_SSL",
1210
"LAT_KAFKA_SASL_USERNAME",
1311
"LAT_KAFKA_SASL_PASSWORD",
@@ -17,8 +15,6 @@ const BASE_ENV: Record<string, string> = {
1715
LAT_KAFKA_BROKERS: "localhost:9092,localhost:9093",
1816
LAT_KAFKA_CLIENT_ID: "test-client",
1917
LAT_KAFKA_CONSUMER_GROUP_ID: "test-group",
20-
LAT_KAFKA_EVENTS_TOPIC: "domain-events",
21-
LAT_KAFKA_DLQ_TOPIC: "domain-events-dlq",
2218
}
2319

2420
function setEnv(vars: Record<string, string>) {
@@ -43,8 +39,6 @@ describe("loadKafkaConfig", () => {
4339
expect(config.brokers).toEqual(["localhost:9092", "localhost:9093"])
4440
expect(config.clientId).toBe("test-client")
4541
expect(config.groupId).toBe("test-group")
46-
expect(config.eventsTopic).toBe("domain-events")
47-
expect(config.dlqTopic).toBe("domain-events-dlq")
4842
expect(config.ssl).toBeUndefined()
4943
expect(config.sasl).toBeUndefined()
5044
})

packages/platform/queue-redpanda/src/config.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@ export const loadKafkaConfig = (): Effect.Effect<KafkaConfig, KafkaSaslWithoutTl
1111
const brokers = yield* parseEnv("LAT_KAFKA_BROKERS", "string")
1212
const clientId = yield* parseEnv("LAT_KAFKA_CLIENT_ID", "string")
1313
const groupId = yield* parseEnv("LAT_KAFKA_CONSUMER_GROUP_ID", "string")
14-
const eventsTopic = yield* parseEnv("LAT_KAFKA_EVENTS_TOPIC", "string")
15-
const dlqTopic = yield* parseEnv("LAT_KAFKA_DLQ_TOPIC", "string")
1614

1715
// Optional: SSL/TLS encryption
1816
const ssl = yield* parseEnvOptional("LAT_KAFKA_SSL", "boolean")
@@ -34,8 +32,6 @@ export const loadKafkaConfig = (): Effect.Effect<KafkaConfig, KafkaSaslWithoutTl
3432
clientId,
3533
brokers: brokers.split(","),
3634
groupId,
37-
eventsTopic,
38-
dlqTopic,
3935
ssl,
4036
sasl:
4137
saslUsername && saslPassword

packages/platform/queue-redpanda/src/consumer.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@ import type { EventEnvelope } from "@domain/events"
22
import { Effect } from "effect"
33
import type { EachMessagePayload } from "kafkajs"
44
import { z } from "zod"
5-
import type { KafkaConfig } from "./types.ts"
5+
import { Topics } from "./topics.ts"
66

77
export interface RedpandaEventsConsumerConfig {
8-
readonly config: KafkaConfig
98
readonly kafka: import("kafkajs").Kafka
9+
readonly groupId: string
1010
}
1111

1212
export const DomainEventSchema = z.object({
@@ -34,15 +34,15 @@ const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms))
3434

3535
export const createRedpandaEventsConsumer = (config: RedpandaEventsConsumerConfig) => {
3636
const consumer = config.kafka.consumer({
37-
groupId: config.config.groupId,
37+
groupId: config.groupId,
3838
})
3939

4040
let isRunning = false
4141

4242
const start = async (handler: EventHandler): Promise<void> => {
4343
await consumer.connect()
4444
await consumer.subscribe({
45-
topic: config.config.eventsTopic,
45+
topic: Topics.domainEvents,
4646
})
4747

4848
isRunning = true
@@ -104,7 +104,7 @@ export const createRedpandaEventsConsumer = (config: RedpandaEventsConsumerConfi
104104
return {
105105
start,
106106
stop,
107-
pause: () => consumer.pause([{ topic: config.config.eventsTopic }]),
108-
resume: () => consumer.resume([{ topic: config.config.eventsTopic }]),
107+
pause: () => consumer.pause([{ topic: Topics.domainEvents }]),
108+
resume: () => consumer.resume([{ topic: Topics.domainEvents }]),
109109
}
110110
}

packages/platform/queue-redpanda/src/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ import { RedpandaQueueAdapterTag } from "./types.ts"
44

55
export { createKafkaClient, createKafkaClientEffect } from "./client.ts"
66

7+
export { Topics } from "./topics.ts"
8+
export type { TopicName } from "./topics.ts"
9+
710
export {
811
createRedpandaEventsPublisher,
912
createRedpandaEventsPublisherEffect,

packages/platform/queue-redpanda/src/producer.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import type { EventEnvelope, EventsPublisher } from "@domain/events"
22
import { Data, Effect } from "effect"
33
import type { Kafka } from "kafkajs"
4+
import { Topics } from "./topics.ts"
45
import { KafkaClientError } from "./types.ts"
5-
import type { KafkaConfig } from "./types.ts"
66

77
export class RedpandaProducerError extends Data.TaggedError("RedpandaProducerError")<{
88
readonly cause: unknown
@@ -11,7 +11,6 @@ export class RedpandaProducerError extends Data.TaggedError("RedpandaProducerErr
1111

1212
export interface RedpandaEventsPublisherConfig {
1313
readonly kafka: Kafka
14-
readonly config: KafkaConfig
1514
}
1615

1716
export const mapEnvelopeToMessage = (envelope: EventEnvelope) => {
@@ -63,7 +62,7 @@ export const createRedpandaEventsPublisherEffect = (
6362
yield* Effect.tryPromise({
6463
try: () =>
6564
producer.send({
66-
topic: config.config.eventsTopic,
65+
topic: Topics.domainEvents,
6766
acks: -1,
6867
messages: [message],
6968
}),
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
export const Topics = {
2+
domainEvents: "domain-events",
3+
domainEventsDlq: "domain-events-dlq",
4+
} as const
5+
6+
export type TopicName = (typeof Topics)[keyof typeof Topics]

packages/platform/queue-redpanda/src/types.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@ export interface KafkaConfig {
99
readonly clientId: string
1010
readonly brokers: string[]
1111
readonly groupId: string
12-
readonly eventsTopic: string
13-
readonly dlqTopic: string
1412
readonly ssl: boolean | undefined
1513
readonly sasl: { readonly mechanism: "plain"; readonly username: string; readonly password: string } | undefined
1614
}

0 commit comments

Comments
 (0)