Skip to content

Commit f32c128

Browse files
committed
remove init()
1 parent 3adabeb commit f32c128

File tree

2 files changed

+17
-51
lines changed

2 files changed

+17
-51
lines changed

packages/service-utils/src/node/kafka.ts

Lines changed: 12 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,5 @@
11
import { checkServerIdentity } from "node:tls";
2-
import {
3-
CompressionTypes,
4-
Kafka,
5-
type Producer,
6-
type ProducerConfig,
7-
} from "kafkajs";
2+
import { CompressionTypes, Kafka, type Producer } from "kafkajs";
83
import { compress, decompress } from "lz4js";
94

105
// CompressionCodecs is not exported properly in kafkajs. Source: https://github.com/tulios/kafkajs/issues/1391
@@ -17,11 +12,10 @@ const { CompressionCodecs } = KafkaJS;
1712
*
1813
* Example:
1914
* ```ts
20-
* usageV2 = new KafkaProducer(..)
21-
* await usageV2.init()
22-
* await usageV2.sendEvents(events)
15+
* kafka = new KafkaProducer(...)
16+
* await kafka.send(topic, events)
2317
* // Non-blocking:
24-
* // void usageV2.sendEvents(events).catch(console.error)
18+
* // void kafka.send(topic, events).catch((e) => console.error(e))
2519
* ```
2620
*/
2721
export class KafkaProducer {
@@ -94,26 +88,9 @@ export class KafkaProducer {
9488
}
9589
}
9690

97-
/**
98-
* Connect the producer.
99-
* This must be called before calling `sendEvents()`.
100-
*/
101-
async init(configOverrides?: ProducerConfig) {
102-
this.producer = this.kafka.producer({
103-
allowAutoTopicCreation: false,
104-
...configOverrides,
105-
});
106-
await this.producer.connect();
107-
}
108-
10991
/**
11092
* Send messages to a Kafka topic.
11193
* This method may throw. To call this non-blocking:
112-
*
113-
* ```ts
114-
* kafka = new KafkaProducer(...)
115-
* void kafka.send(topic, messages).catch(console.error)
116-
*
11794
* @param topic
11895
* @param messages
11996
* @param configOverrides
@@ -124,13 +101,17 @@ export class KafkaProducer {
124101
/**
125102
* Reference: https://kafka.js.org/docs/producing#producing-messages
126103
*/
127-
configOverrides?: {
104+
options?: {
128105
acks?: number;
129106
timeout?: number;
107+
allowAutoTopicCreation?: boolean;
130108
},
131109
): Promise<void> {
132110
if (!this.producer) {
133-
throw new Error("Producer not initialized. Call `init()` first.");
111+
this.producer = this.kafka.producer({
112+
allowAutoTopicCreation: options?.allowAutoTopicCreation ?? false,
113+
});
114+
await this.producer.connect();
134115
}
135116

136117
await this.producer.send({
@@ -139,9 +120,8 @@ export class KafkaProducer {
139120
value: JSON.stringify(m),
140121
})),
141122
compression: this.compression,
142-
acks: -1, // All brokers must acknowledge
143-
timeout: 10_000, // 10 seconds
144-
...configOverrides,
123+
acks: options?.acks ?? -1, // Default: All brokers must acknowledge
124+
timeout: options?.timeout ?? 10_000, // Default: 10 seconds
145125
});
146126
}
147127

packages/service-utils/src/node/usageV2.ts

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import { randomUUID } from "node:crypto";
2-
import type { ProducerConfig } from "kafkajs";
32
import {
43
type UsageV2Event,
54
type UsageV2Source,
@@ -14,10 +13,9 @@ import { KafkaProducer } from "./kafka.js";
1413
* Example:
1514
* ```ts
1615
* usageV2 = new UsageV2Producer(..)
17-
* await usageV2.init()
1816
* await usageV2.sendEvents(events)
1917
* // Non-blocking:
20-
* // void usageV2.sendEvents(events).catch(console.error)
18+
* // void usageV2.sendEvents(events).catch((e) => console.error(e))
2119
* ```
2220
*/
2321
export class UsageV2Producer {
@@ -55,32 +53,20 @@ export class UsageV2Producer {
5553
this.topic = getTopicName(config.source);
5654
}
5755

58-
/**
59-
* Connect the producer.
60-
* This must be called before calling `sendEvents()`.
61-
*/
62-
async init(configOverrides?: ProducerConfig) {
63-
return this.kafkaProducer.init(configOverrides);
64-
}
65-
6656
/**
6757
* Send usageV2 events.
6858
* This method may throw. To call this non-blocking:
69-
*
70-
* ```ts
71-
* usageV2 = new UsageV2Producer(...)
72-
* void usageV2.sendEvents(events).catch(console.error)
73-
*
74-
* @param events - The events to send.
59+
* @param events
7560
*/
7661
async sendEvents(
7762
events: UsageV2Event[],
7863
/**
7964
* Reference: https://kafka.js.org/docs/producing#producing-messages
8065
*/
81-
configOverrides?: {
66+
options?: {
8267
acks?: number;
8368
timeout?: number;
69+
allowAutoTopicCreation?: boolean;
8470
},
8571
): Promise<void> {
8672
const parsedEvents = events.map((event) => ({
@@ -92,7 +78,7 @@ export class UsageV2Producer {
9278
? event.team_id.slice(5)
9379
: event.team_id,
9480
}));
95-
await this.kafkaProducer.send(this.topic, parsedEvents, configOverrides);
81+
await this.kafkaProducer.send(this.topic, parsedEvents, options);
9682
}
9783

9884
/**

0 commit comments

Comments
 (0)