Skip to content

Commit 6ed2fd1

Browse files
authored
[service-utils] fix: Add LZ4 package for usageV2 compression (#6121)
1 parent a22457a commit 6ed2fd1

File tree

4 files changed

+179
-19
lines changed

4 files changed

+179
-19
lines changed

.changeset/lucky-seahorses-wink.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@thirdweb-dev/service-utils": patch
3+
---
4+
5+
[service-utils] import LZ4 codec

packages/service-utils/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
"dependencies": {
4848
"aws4fetch": "1.0.20",
4949
"kafkajs": "2.2.4",
50+
"kafkajs-lz4": "2.0.0-beta.0",
5051
"zod": "3.24.1"
5152
},
5253
"devDependencies": {

packages/service-utils/src/node/usageV2.ts

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
import { randomUUID } from "node:crypto";
22
import { checkServerIdentity } from "node:tls";
33
import {
4+
CompressionCodecs,
45
CompressionTypes,
56
Kafka,
67
type Producer,
78
type ProducerConfig,
89
} from "kafkajs";
10+
import LZ4Codec from "kafkajs-lz4";
911
import type { ServiceName } from "../core/services.js";
1012
import { type UsageV2Event, getTopicName } from "../core/usageV2.js";
1113

@@ -26,6 +28,7 @@ export class UsageV2Producer {
2628
private kafka: Kafka;
2729
private producer: Producer | null = null;
2830
private topic: string;
31+
private compression: CompressionTypes;
2932

3033
constructor(config: {
3134
/**
@@ -40,14 +43,27 @@ export class UsageV2Producer {
4043
* The product "source" where usage is coming from.
4144
*/
4245
productName: ServiceName;
46+
/**
47+
* The compression algorithm to use.
48+
*/
49+
compression?: CompressionTypes;
4350

4451
username: string;
4552
password: string;
4653
}) {
54+
const {
55+
producerName,
56+
environment,
57+
productName,
58+
compression = CompressionTypes.LZ4,
59+
username,
60+
password,
61+
} = config;
62+
4763
this.kafka = new Kafka({
48-
clientId: `${config.producerName}-${config.environment}`,
64+
clientId: `${producerName}-${environment}`,
4965
brokers:
50-
config.environment === "production"
66+
environment === "production"
5167
? ["warpstream.thirdweb.xyz:9092"]
5268
: ["warpstream-dev.thirdweb.xyz:9092"],
5369
ssl: {
@@ -57,19 +73,24 @@ export class UsageV2Producer {
5773
},
5874
sasl: {
5975
mechanism: "plain",
60-
username: config.username,
61-
password: config.password,
76+
username,
77+
password,
6278
},
6379
});
6480

65-
this.topic = getTopicName(config.productName);
81+
this.topic = getTopicName(productName);
82+
this.compression = compression;
6683
}
6784

6885
/**
6986
* Connect the producer.
7087
* This must be called before calling `sendEvents()`.
7188
*/
7289
async init(configOverrides?: ProducerConfig) {
90+
if (this.compression === CompressionTypes.LZ4) {
91+
CompressionCodecs[CompressionTypes.LZ4] = new LZ4Codec().codec;
92+
}
93+
7394
this.producer = this.kafka.producer({
7495
allowAutoTopicCreation: false,
7596
...configOverrides,
@@ -95,7 +116,6 @@ export class UsageV2Producer {
95116
configOverrides?: {
96117
acks?: number;
97118
timeout?: number;
98-
compression?: CompressionTypes;
99119
},
100120
): Promise<void> {
101121
if (!this.producer) {
@@ -121,7 +141,7 @@ export class UsageV2Producer {
121141
})),
122142
acks: -1, // All brokers must acknowledge
123143
timeout: 10_000, // 10 seconds
124-
compression: CompressionTypes.LZ4,
144+
compression: this.compression,
125145
...configOverrides,
126146
});
127147
}

0 commit comments

Comments
 (0)