Skip to content

Commit ce613e5

Browse files
committed
add optional compression
1 parent 64c7451 commit ce613e5

File tree

1 file changed

+24
-9
lines changed

1 file changed

+24
-9
lines changed

packages/service-utils/src/node/usageV2.ts

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@ import LZ4Codec from "kafkajs-lz4";
1111
import type { ServiceName } from "../core/services.js";
1212
import { type UsageV2Event, getTopicName } from "../core/usageV2.js";
1313

14-
CompressionCodecs[CompressionTypes.LZ4] = new LZ4Codec().codec;
15-
1614
/**
1715
* Creates a UsageV2Producer which opens a persistent TCP connection.
1816
* This class is thread-safe so your service should re-use one instance.
@@ -30,6 +28,7 @@ export class UsageV2Producer {
3028
private kafka: Kafka;
3129
private producer: Producer | null = null;
3230
private topic: string;
31+
private compression: CompressionTypes;
3332

3433
constructor(config: {
3534
/**
@@ -44,14 +43,27 @@ export class UsageV2Producer {
4443
* The product "source" where usage is coming from.
4544
*/
4645
productName: ServiceName;
46+
/**
47+
* The compression algorithm to use.
48+
*/
49+
compression?: CompressionTypes;
4750

4851
username: string;
4952
password: string;
5053
}) {
54+
const {
55+
producerName,
56+
environment,
57+
productName,
58+
compression = CompressionTypes.LZ4,
59+
username,
60+
password,
61+
} = config;
62+
5163
this.kafka = new Kafka({
52-
clientId: `${config.producerName}-${config.environment}`,
64+
clientId: `${producerName}-${environment}`,
5365
brokers:
54-
config.environment === "production"
66+
environment === "production"
5567
? ["warpstream.thirdweb.xyz:9092"]
5668
: ["warpstream-dev.thirdweb.xyz:9092"],
5769
ssl: {
@@ -61,12 +73,16 @@ export class UsageV2Producer {
6173
},
6274
sasl: {
6375
mechanism: "plain",
64-
username: config.username,
65-
password: config.password,
76+
username,
77+
password,
6678
},
6779
});
6880

69-
this.topic = getTopicName(config.productName);
81+
this.topic = getTopicName(productName);
82+
this.compression = compression;
83+
if (compression === CompressionTypes.LZ4) {
84+
CompressionCodecs[CompressionTypes.LZ4] = new LZ4Codec().codec;
85+
}
7086
}
7187

7288
/**
@@ -99,7 +115,6 @@ export class UsageV2Producer {
99115
configOverrides?: {
100116
acks?: number;
101117
timeout?: number;
102-
compression?: CompressionTypes;
103118
},
104119
): Promise<void> {
105120
if (!this.producer) {
@@ -125,7 +140,7 @@ export class UsageV2Producer {
125140
})),
126141
acks: -1, // All brokers must acknowledge
127142
timeout: 10_000, // 10 seconds
128-
compression: CompressionTypes.LZ4,
143+
compression: this.compression,
129144
...configOverrides,
130145
});
131146
}

0 commit comments

Comments
 (0)