11import { randomUUID } from "node:crypto" ;
22import { checkServerIdentity } from "node:tls" ;
33import { Kafka , type Producer } from "kafkajs" ;
4- import type { UsageV2Event } from "../core/usageV2.js" ;
5-
6- const TOPIC_USAGE_V2 = "usage_v2.raw_events" ;
4+ import type { ServiceName } from "src/core/services.js" ;
5+ import { type UsageV2Event , getTopicName } from "../core/usageV2.js" ;
76
87/**
98 * Creates a UsageV2Producer which opens a persistent TCP connection.
@@ -21,6 +20,7 @@ const TOPIC_USAGE_V2 = "usage_v2.raw_events";
2120export class UsageV2Producer {
2221 private kafka : Kafka ;
2322 private producer : Producer | null = null ;
23+ private productName : ServiceName ;
2424
2525 constructor ( config : {
2626 /**
@@ -31,6 +31,10 @@ export class UsageV2Producer {
3131 * The environment the service is running in.
3232 */
3333 environment : "development" | "production" ;
34+ /**
35+ * The product "source" where usage is coming from.
36+ */
37+ productName : ServiceName ;
3438
3539 username : string ;
3640 password : string ;
@@ -52,6 +56,8 @@ export class UsageV2Producer {
5256 password : config . password ,
5357 } ,
5458 } ) ;
59+
60+ this . productName = config . productName ;
5561 }
5662
5763 /**
@@ -82,27 +88,18 @@ export class UsageV2Producer {
8288
8389 const parsedEvents = events . map ( ( event ) => {
8490 return {
91+ ...event ,
8592 id : event . id ?? randomUUID ( ) ,
8693 created_at : event . created_at ?? new Date ( ) ,
87- source : event . source ,
88- action : event . action ,
8994 // Remove the "team_" prefix, if any.
9095 team_id : event . team_id . startsWith ( "team_" )
9196 ? event . team_id . slice ( 5 )
9297 : event . team_id ,
93- project_id : event . project_id ,
94- sdk_name : event . sdk_name ,
95- sdk_platform : event . sdk_platform ,
96- sdk_version : event . sdk_version ,
97- sdk_os : event . sdk_os ,
98- product_name : event . product_name ,
99- product_version : event . product_version ,
100- data : JSON . stringify ( event . data ) ,
10198 } ;
10299 } ) ;
103100
104101 await this . producer . send ( {
105- topic : TOPIC_USAGE_V2 ,
102+ topic : getTopicName ( this . productName ) ,
106103 messages : parsedEvents . map ( ( event ) => ( {
107104 value : JSON . stringify ( event ) ,
108105 } ) ) ,
0 commit comments