Skip to content

Commit 3adabeb

Browse files
committed
[service-utils] Add kafka producer to emit raw messages
1 parent 821caa6 commit 3adabeb

File tree

3 files changed

+185
-97
lines changed

3 files changed

+185
-97
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@thirdweb-dev/service-utils": patch
3+
---
4+
5+
Add raw kafka producer
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
import { checkServerIdentity } from "node:tls";
2+
import {
3+
CompressionTypes,
4+
Kafka,
5+
type Producer,
6+
type ProducerConfig,
7+
} from "kafkajs";
8+
import { compress, decompress } from "lz4js";
9+
10+
// CompressionCodecs is not exported properly in kafkajs. Source: https://github.com/tulios/kafkajs/issues/1391
11+
import KafkaJS from "kafkajs";
12+
const { CompressionCodecs } = KafkaJS;
13+
14+
/**
15+
* Creates a KafkaProducer which opens a persistent TCP connection.
16+
* This class is thread-safe so your service should re-use one instance.
17+
*
18+
* Example:
19+
* ```ts
20+
* usageV2 = new KafkaProducer(..)
21+
* await usageV2.init()
22+
* await usageV2.sendEvents(events)
23+
* // Non-blocking:
24+
* // void usageV2.sendEvents(events).catch(console.error)
25+
* ```
26+
*/
27+
export class KafkaProducer {
28+
private kafka: Kafka;
29+
private producer: Producer | null = null;
30+
private compression: CompressionTypes;
31+
32+
constructor(config: {
33+
/**
34+
* A descriptive name for your service. Example: "storage-server"
35+
*/
36+
producerName: string;
37+
/**
38+
* The environment the service is running in.
39+
*/
40+
environment: "development" | "production";
41+
/**
42+
* Whether to compress the events.
43+
*/
44+
shouldCompress?: boolean;
45+
46+
username: string;
47+
password: string;
48+
}) {
49+
const {
50+
producerName,
51+
environment,
52+
shouldCompress = true,
53+
username,
54+
password,
55+
} = config;
56+
57+
this.kafka = new Kafka({
58+
clientId: `${producerName}-${environment}`,
59+
brokers:
60+
environment === "production"
61+
? ["warpstream.thirdweb.xyz:9092"]
62+
: ["warpstream-dev.thirdweb.xyz:9092"],
63+
ssl: {
64+
checkServerIdentity(hostname, cert) {
65+
return checkServerIdentity(hostname.toLowerCase(), cert);
66+
},
67+
},
68+
sasl: {
69+
mechanism: "plain",
70+
username,
71+
password,
72+
},
73+
});
74+
75+
if (shouldCompress) {
76+
this.compression = CompressionTypes.LZ4;
77+
78+
CompressionCodecs[CompressionTypes.LZ4] = () => ({
79+
// biome-ignore lint/style/noRestrictedGlobals: kafkajs expects a Buffer
80+
compress: (encoder: { buffer: Buffer }) => {
81+
const compressed = compress(encoder.buffer);
82+
// biome-ignore lint/style/noRestrictedGlobals: kafkajs expects a Buffer
83+
return Buffer.from(compressed);
84+
},
85+
// biome-ignore lint/style/noRestrictedGlobals: kafkajs expects a Buffer
86+
decompress: (buffer: Buffer) => {
87+
const decompressed = decompress(buffer);
88+
// biome-ignore lint/style/noRestrictedGlobals: kafkajs expects a Buffer
89+
return Buffer.from(decompressed);
90+
},
91+
});
92+
} else {
93+
this.compression = CompressionTypes.None;
94+
}
95+
}
96+
97+
/**
98+
* Connect the producer.
99+
* This must be called before calling `sendEvents()`.
100+
*/
101+
async init(configOverrides?: ProducerConfig) {
102+
this.producer = this.kafka.producer({
103+
allowAutoTopicCreation: false,
104+
...configOverrides,
105+
});
106+
await this.producer.connect();
107+
}
108+
109+
/**
110+
* Send messages to a Kafka topic.
111+
* This method may throw. To call this non-blocking:
112+
*
113+
* ```ts
114+
* kafka = new KafkaProducer(...)
115+
* void kafka.send(topic, messages).catch(console.error)
116+
*
117+
* @param topic
118+
* @param messages
119+
* @param configOverrides
120+
*/
121+
async send(
122+
topic: string,
123+
messages: Record<string, unknown>[],
124+
/**
125+
* Reference: https://kafka.js.org/docs/producing#producing-messages
126+
*/
127+
configOverrides?: {
128+
acks?: number;
129+
timeout?: number;
130+
},
131+
): Promise<void> {
132+
if (!this.producer) {
133+
throw new Error("Producer not initialized. Call `init()` first.");
134+
}
135+
136+
await this.producer.send({
137+
topic,
138+
messages: messages.map((m) => ({
139+
value: JSON.stringify(m),
140+
})),
141+
compression: this.compression,
142+
acks: -1, // All brokers must acknowledge
143+
timeout: 10_000, // 10 seconds
144+
...configOverrides,
145+
});
146+
}
147+
148+
/**
149+
* Disconnects KafkaProducer.
150+
* Useful when shutting down the service to flush in-flight events.
151+
*/
152+
async disconnect() {
153+
if (this.producer) {
154+
await this.producer.disconnect();
155+
this.producer = null;
156+
}
157+
}
158+
}
Lines changed: 22 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,11 @@
11
import { randomUUID } from "node:crypto";
2-
import { checkServerIdentity } from "node:tls";
3-
import {
4-
CompressionTypes,
5-
Kafka,
6-
type Producer,
7-
type ProducerConfig,
8-
} from "kafkajs";
9-
import { compress, decompress } from "lz4js";
2+
import type { ProducerConfig } from "kafkajs";
103
import {
114
type UsageV2Event,
125
type UsageV2Source,
136
getTopicName,
147
} from "../core/usageV2.js";
15-
16-
// CompressionCodecs is not exported properly in kafkajs. Source: https://github.com/tulios/kafkajs/issues/1391
17-
import KafkaJS from "kafkajs";
18-
const { CompressionCodecs } = KafkaJS;
8+
import { KafkaProducer } from "./kafka.js";
199

2010
/**
2111
* Creates a UsageV2Producer which opens a persistent TCP connection.
@@ -31,10 +21,8 @@ const { CompressionCodecs } = KafkaJS;
3121
* ```
3222
*/
3323
export class UsageV2Producer {
34-
private kafka: Kafka;
35-
private producer: Producer | null = null;
24+
private kafkaProducer: KafkaProducer;
3625
private topic: string;
37-
private compression: CompressionTypes;
3826

3927
constructor(config: {
4028
/**
@@ -57,66 +45,22 @@ export class UsageV2Producer {
5745
username: string;
5846
password: string;
5947
}) {
60-
const {
61-
producerName,
62-
environment,
63-
source,
64-
shouldCompress = true,
65-
username,
66-
password,
67-
} = config;
68-
69-
this.kafka = new Kafka({
70-
clientId: `${producerName}-${environment}`,
71-
brokers:
72-
environment === "production"
73-
? ["warpstream.thirdweb.xyz:9092"]
74-
: ["warpstream-dev.thirdweb.xyz:9092"],
75-
ssl: {
76-
checkServerIdentity(hostname, cert) {
77-
return checkServerIdentity(hostname.toLowerCase(), cert);
78-
},
79-
},
80-
sasl: {
81-
mechanism: "plain",
82-
username,
83-
password,
84-
},
48+
this.kafkaProducer = new KafkaProducer({
49+
producerName: config.producerName,
50+
environment: config.environment,
51+
shouldCompress: config.shouldCompress,
52+
username: config.username,
53+
password: config.password,
8554
});
86-
87-
this.topic = getTopicName(source);
88-
this.compression = shouldCompress
89-
? CompressionTypes.LZ4
90-
: CompressionTypes.None;
55+
this.topic = getTopicName(config.source);
9156
}
9257

9358
/**
9459
* Connect the producer.
9560
* This must be called before calling `sendEvents()`.
9661
*/
9762
async init(configOverrides?: ProducerConfig) {
98-
if (this.compression === CompressionTypes.LZ4) {
99-
CompressionCodecs[CompressionTypes.LZ4] = () => ({
100-
// biome-ignore lint/style/noRestrictedGlobals: kafkajs expects a Buffer
101-
compress: (encoder: { buffer: Buffer }) => {
102-
const compressed = compress(encoder.buffer);
103-
// biome-ignore lint/style/noRestrictedGlobals: kafkajs expects a Buffer
104-
return Buffer.from(compressed);
105-
},
106-
// biome-ignore lint/style/noRestrictedGlobals: kafkajs expects a Buffer
107-
decompress: (buffer: Buffer) => {
108-
const decompressed = decompress(buffer);
109-
// biome-ignore lint/style/noRestrictedGlobals: kafkajs expects a Buffer
110-
return Buffer.from(decompressed);
111-
},
112-
});
113-
}
114-
115-
this.producer = this.kafka.producer({
116-
allowAutoTopicCreation: false,
117-
...configOverrides,
118-
});
119-
await this.producer.connect();
63+
return this.kafkaProducer.init(configOverrides);
12064
}
12165

12266
/**
@@ -139,42 +83,23 @@ export class UsageV2Producer {
13983
timeout?: number;
14084
},
14185
): Promise<void> {
142-
if (!this.producer) {
143-
throw new Error("Producer not initialized. Call `init()` first.");
144-
}
145-
146-
const parsedEvents = events.map((event) => {
147-
return {
148-
...event,
149-
id: event.id ?? randomUUID(),
150-
created_at: event.created_at ?? new Date(),
151-
// Remove the "team_" prefix, if any.
152-
team_id: event.team_id.startsWith("team_")
153-
? event.team_id.slice(5)
154-
: event.team_id,
155-
};
156-
});
157-
158-
await this.producer.send({
159-
topic: this.topic,
160-
messages: parsedEvents.map((event) => ({
161-
value: JSON.stringify(event),
162-
})),
163-
acks: -1, // All brokers must acknowledge
164-
timeout: 10_000, // 10 seconds
165-
compression: this.compression,
166-
...configOverrides,
167-
});
86+
const parsedEvents = events.map((event) => ({
87+
...event,
88+
id: event.id ?? randomUUID(),
89+
created_at: event.created_at ?? new Date(),
90+
// Remove the "team_" prefix, if any.
91+
team_id: event.team_id.startsWith("team_")
92+
? event.team_id.slice(5)
93+
: event.team_id,
94+
}));
95+
await this.kafkaProducer.send(this.topic, parsedEvents, configOverrides);
16896
}
16997

17098
/**
17199
* Disconnects UsageV2Producer.
172100
* Useful when shutting down the service to flush in-flight events.
173101
*/
174102
async disconnect() {
175-
if (this.producer) {
176-
await this.producer.disconnect();
177-
this.producer = null;
178-
}
103+
await this.kafkaProducer.disconnect();
179104
}
180105
}

0 commit comments

Comments
 (0)