Skip to content

Commit a22457a

Browse files
authored
[service-utils] Expose broker settings for usageV2 (#6120)
1 parent ae675db commit a22457a

File tree

2 files changed

+28
-3
lines changed

2 files changed

+28
-3
lines changed

.changeset/young-cherries-beg.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@thirdweb-dev/service-utils": patch
3+
---
4+
5+
[service-utils] Expose kafkajs config

packages/service-utils/src/node/usageV2.ts

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
import { randomUUID } from "node:crypto";
22
import { checkServerIdentity } from "node:tls";
3-
import { Kafka, type Producer } from "kafkajs";
3+
import {
4+
CompressionTypes,
5+
Kafka,
6+
type Producer,
7+
type ProducerConfig,
8+
} from "kafkajs";
49
import type { ServiceName } from "../core/services.js";
510
import { type UsageV2Event, getTopicName } from "../core/usageV2.js";
611

@@ -64,9 +69,10 @@ export class UsageV2Producer {
6469
* Connect the producer.
6570
* This must be called before calling `sendEvents()`.
6671
*/
67-
async init() {
72+
async init(configOverrides?: ProducerConfig) {
6873
this.producer = this.kafka.producer({
6974
allowAutoTopicCreation: false,
75+
...configOverrides,
7076
});
7177
await this.producer.connect();
7278
}
@@ -81,7 +87,17 @@ export class UsageV2Producer {
8187
*
8288
* @param events - The events to send.
8389
*/
84-
async sendEvents(events: UsageV2Event[]): Promise<void> {
90+
async sendEvents(
91+
events: UsageV2Event[],
92+
/**
93+
* Reference: https://kafka.js.org/docs/producing#producing-messages
94+
*/
95+
configOverrides?: {
96+
acks?: number;
97+
timeout?: number;
98+
compression?: CompressionTypes;
99+
},
100+
): Promise<void> {
85101
if (!this.producer) {
86102
throw new Error("Producer not initialized. Call `init()` first.");
87103
}
@@ -103,6 +119,10 @@ export class UsageV2Producer {
103119
messages: parsedEvents.map((event) => ({
104120
value: JSON.stringify(event),
105121
})),
122+
acks: -1, // All brokers must acknowledge
123+
timeout: 10_000, // 10 seconds
124+
compression: CompressionTypes.LZ4,
125+
...configOverrides,
106126
});
107127
}
108128

0 commit comments

Comments
 (0)