Skip to content

Commit 493f468

Browse files
committed
feat: Add usageV2 support
1 parent b38604c commit 493f468

File tree

6 files changed

+290
-113
lines changed

6 files changed

+290
-113
lines changed

.changeset/neat-ads-build.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@thirdweb-dev/service-utils": minor
3+
---
4+
5+
feat: Add usageV2 support

packages/service-utils/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
"sideEffects": false,
4747
"dependencies": {
4848
"aws4fetch": "1.0.20",
49+
"kafkajs": "^2.2.4",
4950
"zod": "3.24.1"
5051
},
5152
"devDependencies": {
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import type { UsageV2Event } from "src/core/usageV2.js";
2+
3+
/**
4+
* Send events to Kafka.
5+
* This method may throw. To call this non-blocking:
6+
*
7+
* ```ts
8+
* sendUsageV2Events("production", events).catch(console.error)
9+
* ```
10+
*
11+
* @param environment - The environment the service is running in.
12+
* @param events - The events to send.
13+
*/
14+
export async function sendUsageV2Events(
15+
environment: "development" | "production",
16+
events: UsageV2Event[],
17+
): Promise<void> {
18+
const baseUrl =
19+
environment === "production"
20+
? "https://u.thirdweb.com"
21+
: "https://u.thirdweb-dev.com";
22+
23+
const resp = await fetch(`${baseUrl}/usage-v2/raw-events`, {
24+
method: "POST",
25+
headers: {
26+
"Content-Type": "application/json",
27+
},
28+
body: JSON.stringify({ events }),
29+
});
30+
31+
if (!resp.ok) {
32+
throw new Error(
33+
`[UsageV2] unexpected response ${resp.status}: ${await resp.text()}`,
34+
);
35+
}
36+
resp.body?.cancel();
37+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
export interface UsageV2Event {
2+
/**
3+
* A unique identifier for the event. Defaults to a random UUID.
4+
* Useful if your service retries sending events.
5+
*/
6+
id?: `${string}-${string}-${string}-${string}-${string}`;
7+
/**
8+
* The event timestamp. Defaults to now().
9+
*/
10+
created_at?: Date;
11+
/**
12+
* The source of the event. Example: "storage"
13+
*/
14+
source: string;
15+
/**
16+
* The action of the event. Example: "upload"
17+
*/
18+
action: string;
19+
/**
20+
* The team ID.
21+
*/
22+
team_id: string;
23+
/**
24+
* The client ID, if available.
25+
*/
26+
client_id?: string;
27+
/**
28+
* The SDK name, if available.
29+
*/
30+
sdk_name?: string;
31+
/**
32+
* The SDK platform, if available.
33+
*/
34+
sdk_platform?: string;
35+
/**
36+
* The SDK version, if available.
37+
*/
38+
sdk_version?: string;
39+
/**
40+
* The SDK OS, if available.
41+
*/
42+
sdk_os?: string;
43+
/**
44+
* The product name, if available.
45+
*/
46+
product_name?: string;
47+
/**
48+
* The product version, if available.
49+
*/
50+
product_version?: string;
51+
/**
52+
* An object of service-specific data. Example: "file_size_bytes"
53+
* It is safe to pass any new JSON-serializable data here before updating the usageV2 schema.
54+
*/
55+
data: Record<string, unknown>;
56+
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
import { randomUUID } from "node:crypto";
2+
import { checkServerIdentity } from "node:tls";
3+
import { Kafka, type Producer } from "kafkajs";
4+
import type { UsageV2Event } from "src/core/usageV2.js";
5+
6+
const TOPIC_USAGE_V2 = "usage_v2.raw_events";
7+
8+
/**
9+
* Creates a UsageV2Producer which opens a persistent TCP connection.
10+
* This class is thread-safe so your service should re-use one instance.
11+
*
12+
* Example:
13+
* ```ts
14+
* usageV2 = new UsageV2Producer(..)
15+
* await usageV2.init()
16+
* await usageV2.sendEvents(events)
17+
* // Non-blocking:
18+
* // void usageV2.sendEvents(events).catch(console.error)
19+
* ```
20+
*/
21+
export class UsageV2Producer {
22+
private kafka: Kafka;
23+
private producer: Producer | null = null;
24+
25+
constructor(config: {
26+
/**
27+
* A descriptive name for your service. Example: "storage-server"
28+
*/
29+
producerName: string;
30+
/**
31+
* The environment the service is running in.
32+
*/
33+
environment: "development" | "production";
34+
35+
username: string;
36+
password: string;
37+
}) {
38+
this.kafka = new Kafka({
39+
clientId: `${config.producerName}-${config.environment}`,
40+
brokers:
41+
config.environment === "production"
42+
? ["warpstream.thirdweb.xyz:9092"]
43+
: ["warpstream-dev.thirdweb.xyz:9092"],
44+
ssl: {
45+
checkServerIdentity(hostname, cert) {
46+
return checkServerIdentity(hostname.toLowerCase(), cert);
47+
},
48+
},
49+
sasl: {
50+
mechanism: "plain",
51+
username: config.username,
52+
password: config.password,
53+
},
54+
});
55+
}
56+
57+
/**
58+
* Connect the producer.
59+
* This must be called before calling `sendEvents()`.
60+
*/
61+
async init() {
62+
this.producer = this.kafka.producer({
63+
allowAutoTopicCreation: false,
64+
});
65+
await this.producer.connect();
66+
}
67+
68+
/**
69+
* Send usageV2 events.
70+
* This method may throw. To call this non-blocking:
71+
*
72+
* ```ts
73+
* usageV2 = new UsageV2Producer(...)
74+
* void usageV2.sendEvents(events).catch(console.error)
75+
*
76+
* @param events - The events to send.
77+
*/
78+
async sendEvents(events: UsageV2Event[]): Promise<void> {
79+
if (!this.producer) {
80+
throw new Error("Producer not initialized. Call `init()` first.");
81+
}
82+
83+
const parsedEvents = events.map(({ id, created_at, data, ...rest }) => {
84+
return {
85+
id: id ?? randomUUID(),
86+
created_at: created_at ?? new Date(),
87+
data: JSON.stringify(data),
88+
...rest,
89+
};
90+
});
91+
92+
await this.producer.send({
93+
topic: TOPIC_USAGE_V2,
94+
messages: parsedEvents.map((event) => ({
95+
value: JSON.stringify(event),
96+
})),
97+
});
98+
}
99+
100+
/**
101+
* Disconnects UsageV2Producer.
102+
* Useful when shutting down the service to flush in-flight events.
103+
*/
104+
async disconnect() {
105+
if (this.producer) {
106+
await this.producer.disconnect();
107+
this.producer = null;
108+
}
109+
}
110+
}

0 commit comments

Comments
 (0)