Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 19 additions & 11 deletions packages/service-utils/src/node/usageV2.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
import { randomUUID } from "node:crypto";
import { checkServerIdentity } from "node:tls";
import * as KafkaJS from "kafkajs";
import {
CompressionTypes,
Kafka,
type Producer,
type ProducerConfig,
} from "kafkajs";
import LZ4Codec from "kafkajs-lz4";
import type { ServiceName } from "../core/services.js";
import { type UsageV2Event, getTopicName } from "../core/usageV2.js";

// Import KafkaJS with CJS pattern (source: https://github.com/tulios/kafkajs/issues/1391)
import KafkaJS from "kafkajs";
const { CompressionCodecs } = KafkaJS;

/**
* Creates a UsageV2Producer which opens a persistent TCP connection.
* This class is thread-safe so your service should re-use one instance.
Expand All @@ -19,10 +28,10 @@ import { type UsageV2Event, getTopicName } from "../core/usageV2.js";
* ```
*/
export class UsageV2Producer {
private kafka: KafkaJS.Kafka;
private producer: KafkaJS.Producer | null = null;
private kafka: Kafka;
private producer: Producer | null = null;
private topic: string;
private compression: KafkaJS.CompressionTypes;
private compression: CompressionTypes;

constructor(config: {
/**
Expand All @@ -40,7 +49,7 @@ export class UsageV2Producer {
/**
* The compression algorithm to use.
*/
compression?: KafkaJS.CompressionTypes;
compression?: CompressionTypes;

username: string;
password: string;
Expand All @@ -49,12 +58,12 @@ export class UsageV2Producer {
producerName,
environment,
productName,
compression = KafkaJS.CompressionTypes.LZ4,
compression = CompressionTypes.LZ4,
username,
password,
} = config;

this.kafka = new KafkaJS.Kafka({
this.kafka = new Kafka({
clientId: `${producerName}-${environment}`,
brokers:
environment === "production"
Expand All @@ -80,10 +89,9 @@ export class UsageV2Producer {
* Connect the producer.
* This must be called before calling `sendEvents()`.
*/
async init(configOverrides?: KafkaJS.ProducerConfig) {
if (this.compression === KafkaJS.CompressionTypes.LZ4) {
KafkaJS.CompressionCodecs[KafkaJS.CompressionTypes.LZ4] =
new LZ4Codec().codec;
async init(configOverrides?: ProducerConfig) {
if (this.compression === CompressionTypes.LZ4) {
CompressionCodecs[CompressionTypes.LZ4] = new LZ4Codec().codec;
}

this.producer = this.kafka.producer({
Expand Down
Loading