Skip to content

Commit ccf6329

Browse files
authored
[service-utils] Add kafka producer to emit raw messages (#6170)
1 parent 821caa6 commit ccf6329

File tree

3 files changed

+167
-113
lines changed

3 files changed

+167
-113
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@thirdweb-dev/service-utils": patch
3+
---
4+
5+
Add raw kafka producer
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
import { checkServerIdentity } from "node:tls";
2+
import { CompressionTypes, Kafka, type Producer } from "kafkajs";
3+
import { compress, decompress } from "lz4js";
4+
5+
// CompressionCodecs is not exported properly in kafkajs. Source: https://github.com/tulios/kafkajs/issues/1391
6+
import KafkaJS from "kafkajs";
7+
const { CompressionCodecs } = KafkaJS;
8+
9+
/**
10+
* Creates a KafkaProducer which opens a persistent TCP connection.
11+
* This class is thread-safe so your service should re-use one instance.
12+
*
13+
* Example:
14+
* ```ts
15+
* kafka = new KafkaProducer(...)
16+
* await kafka.send(topic, events)
17+
* // Non-blocking:
18+
* // void kafka.send(topic, events).catch((e) => console.error(e))
19+
* ```
20+
*/
21+
export class KafkaProducer {
22+
private kafka: Kafka;
23+
private producer: Producer | null = null;
24+
private compression: CompressionTypes;
25+
26+
constructor(config: {
27+
/**
28+
* A descriptive name for your service. Example: "storage-server"
29+
*/
30+
producerName: string;
31+
/**
32+
* The environment the service is running in.
33+
*/
34+
environment: "development" | "production";
35+
/**
36+
* Whether to compress the events.
37+
*/
38+
shouldCompress?: boolean;
39+
40+
username: string;
41+
password: string;
42+
}) {
43+
const {
44+
producerName,
45+
environment,
46+
shouldCompress = true,
47+
username,
48+
password,
49+
} = config;
50+
51+
this.kafka = new Kafka({
52+
clientId: `${producerName}-${environment}`,
53+
brokers:
54+
environment === "production"
55+
? ["warpstream.thirdweb.xyz:9092"]
56+
: ["warpstream-dev.thirdweb.xyz:9092"],
57+
ssl: {
58+
checkServerIdentity(hostname, cert) {
59+
return checkServerIdentity(hostname.toLowerCase(), cert);
60+
},
61+
},
62+
sasl: {
63+
mechanism: "plain",
64+
username,
65+
password,
66+
},
67+
});
68+
69+
if (shouldCompress) {
70+
this.compression = CompressionTypes.LZ4;
71+
72+
CompressionCodecs[CompressionTypes.LZ4] = () => ({
73+
// biome-ignore lint/style/noRestrictedGlobals: kafkajs expects a Buffer
74+
compress: (encoder: { buffer: Buffer }) => {
75+
const compressed = compress(encoder.buffer);
76+
// biome-ignore lint/style/noRestrictedGlobals: kafkajs expects a Buffer
77+
return Buffer.from(compressed);
78+
},
79+
// biome-ignore lint/style/noRestrictedGlobals: kafkajs expects a Buffer
80+
decompress: (buffer: Buffer) => {
81+
const decompressed = decompress(buffer);
82+
// biome-ignore lint/style/noRestrictedGlobals: kafkajs expects a Buffer
83+
return Buffer.from(decompressed);
84+
},
85+
});
86+
} else {
87+
this.compression = CompressionTypes.None;
88+
}
89+
}
90+
91+
/**
92+
* Send messages to a Kafka topic.
93+
* This method may throw. To call this non-blocking:
94+
* @param topic
95+
* @param messages
96+
* @param configOverrides
97+
*/
98+
async send(
99+
topic: string,
100+
messages: Record<string, unknown>[],
101+
/**
102+
* Reference: https://kafka.js.org/docs/producing#producing-messages
103+
*/
104+
options?: {
105+
acks?: number;
106+
timeout?: number;
107+
allowAutoTopicCreation?: boolean;
108+
},
109+
): Promise<void> {
110+
if (!this.producer) {
111+
this.producer = this.kafka.producer({
112+
allowAutoTopicCreation: options?.allowAutoTopicCreation ?? false,
113+
});
114+
await this.producer.connect();
115+
}
116+
117+
await this.producer.send({
118+
topic,
119+
messages: messages.map((m) => ({
120+
value: JSON.stringify(m),
121+
})),
122+
compression: this.compression,
123+
acks: options?.acks ?? -1, // Default: All brokers must acknowledge
124+
timeout: options?.timeout ?? 10_000, // Default: 10 seconds
125+
});
126+
}
127+
128+
/**
129+
* Disconnects KafkaProducer.
130+
* Useful when shutting down the service to flush in-flight events.
131+
*/
132+
async disconnect() {
133+
if (this.producer) {
134+
await this.producer.disconnect();
135+
this.producer = null;
136+
}
137+
}
138+
}
Lines changed: 24 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,10 @@
11
import { randomUUID } from "node:crypto";
2-
import { checkServerIdentity } from "node:tls";
3-
import {
4-
CompressionTypes,
5-
Kafka,
6-
type Producer,
7-
type ProducerConfig,
8-
} from "kafkajs";
9-
import { compress, decompress } from "lz4js";
102
import {
113
type UsageV2Event,
124
type UsageV2Source,
135
getTopicName,
146
} from "../core/usageV2.js";
15-
16-
// CompressionCodecs is not exported properly in kafkajs. Source: https://github.com/tulios/kafkajs/issues/1391
17-
import KafkaJS from "kafkajs";
18-
const { CompressionCodecs } = KafkaJS;
7+
import { KafkaProducer } from "./kafka.js";
198

209
/**
2110
* Creates a UsageV2Producer which opens a persistent TCP connection.
@@ -24,17 +13,14 @@ const { CompressionCodecs } = KafkaJS;
2413
* Example:
2514
* ```ts
2615
* usageV2 = new UsageV2Producer(..)
27-
* await usageV2.init()
2816
* await usageV2.sendEvents(events)
2917
* // Non-blocking:
30-
* // void usageV2.sendEvents(events).catch(console.error)
18+
* // void usageV2.sendEvents(events).catch((e) => console.error(e))
3119
* ```
3220
*/
3321
export class UsageV2Producer {
34-
private kafka: Kafka;
35-
private producer: Producer | null = null;
22+
private kafkaProducer: KafkaProducer;
3623
private topic: string;
37-
private compression: CompressionTypes;
3824

3925
constructor(config: {
4026
/**
@@ -57,124 +43,49 @@ export class UsageV2Producer {
5743
username: string;
5844
password: string;
5945
}) {
60-
const {
61-
producerName,
62-
environment,
63-
source,
64-
shouldCompress = true,
65-
username,
66-
password,
67-
} = config;
68-
69-
this.kafka = new Kafka({
70-
clientId: `${producerName}-${environment}`,
71-
brokers:
72-
environment === "production"
73-
? ["warpstream.thirdweb.xyz:9092"]
74-
: ["warpstream-dev.thirdweb.xyz:9092"],
75-
ssl: {
76-
checkServerIdentity(hostname, cert) {
77-
return checkServerIdentity(hostname.toLowerCase(), cert);
78-
},
79-
},
80-
sasl: {
81-
mechanism: "plain",
82-
username,
83-
password,
84-
},
85-
});
86-
87-
this.topic = getTopicName(source);
88-
this.compression = shouldCompress
89-
? CompressionTypes.LZ4
90-
: CompressionTypes.None;
91-
}
92-
93-
/**
94-
* Connect the producer.
95-
* This must be called before calling `sendEvents()`.
96-
*/
97-
async init(configOverrides?: ProducerConfig) {
98-
if (this.compression === CompressionTypes.LZ4) {
99-
CompressionCodecs[CompressionTypes.LZ4] = () => ({
100-
// biome-ignore lint/style/noRestrictedGlobals: kafkajs expects a Buffer
101-
compress: (encoder: { buffer: Buffer }) => {
102-
const compressed = compress(encoder.buffer);
103-
// biome-ignore lint/style/noRestrictedGlobals: kafkajs expects a Buffer
104-
return Buffer.from(compressed);
105-
},
106-
// biome-ignore lint/style/noRestrictedGlobals: kafkajs expects a Buffer
107-
decompress: (buffer: Buffer) => {
108-
const decompressed = decompress(buffer);
109-
// biome-ignore lint/style/noRestrictedGlobals: kafkajs expects a Buffer
110-
return Buffer.from(decompressed);
111-
},
112-
});
113-
}
114-
115-
this.producer = this.kafka.producer({
116-
allowAutoTopicCreation: false,
117-
...configOverrides,
46+
this.kafkaProducer = new KafkaProducer({
47+
producerName: config.producerName,
48+
environment: config.environment,
49+
shouldCompress: config.shouldCompress,
50+
username: config.username,
51+
password: config.password,
11852
});
119-
await this.producer.connect();
53+
this.topic = getTopicName(config.source);
12054
}
12155

12256
/**
12357
* Send usageV2 events.
12458
* This method may throw. To call this non-blocking:
125-
*
126-
* ```ts
127-
* usageV2 = new UsageV2Producer(...)
128-
* void usageV2.sendEvents(events).catch(console.error)
129-
*
130-
* @param events - The events to send.
59+
* @param events
13160
*/
13261
async sendEvents(
13362
events: UsageV2Event[],
13463
/**
13564
* Reference: https://kafka.js.org/docs/producing#producing-messages
13665
*/
137-
configOverrides?: {
66+
options?: {
13867
acks?: number;
13968
timeout?: number;
69+
allowAutoTopicCreation?: boolean;
14070
},
14171
): Promise<void> {
142-
if (!this.producer) {
143-
throw new Error("Producer not initialized. Call `init()` first.");
144-
}
145-
146-
const parsedEvents = events.map((event) => {
147-
return {
148-
...event,
149-
id: event.id ?? randomUUID(),
150-
created_at: event.created_at ?? new Date(),
151-
// Remove the "team_" prefix, if any.
152-
team_id: event.team_id.startsWith("team_")
153-
? event.team_id.slice(5)
154-
: event.team_id,
155-
};
156-
});
157-
158-
await this.producer.send({
159-
topic: this.topic,
160-
messages: parsedEvents.map((event) => ({
161-
value: JSON.stringify(event),
162-
})),
163-
acks: -1, // All brokers must acknowledge
164-
timeout: 10_000, // 10 seconds
165-
compression: this.compression,
166-
...configOverrides,
167-
});
72+
const parsedEvents = events.map((event) => ({
73+
...event,
74+
id: event.id ?? randomUUID(),
75+
created_at: event.created_at ?? new Date(),
76+
// Remove the "team_" prefix, if any.
77+
team_id: event.team_id.startsWith("team_")
78+
? event.team_id.slice(5)
79+
: event.team_id,
80+
}));
81+
await this.kafkaProducer.send(this.topic, parsedEvents, options);
16882
}
16983

17084
/**
17185
* Disconnects UsageV2Producer.
17286
* Useful when shutting down the service to flush in-flight events.
17387
*/
17488
async disconnect() {
175-
if (this.producer) {
176-
await this.producer.disconnect();
177-
this.producer = null;
178-
}
89+
await this.kafkaProducer.disconnect();
17990
}
18091
}

0 commit comments

Comments
 (0)