Skip to content

Commit a01e3ba

Browse files
committed
feat: kafka stress testing
1 parent af7079d commit a01e3ba

File tree

5 files changed

+840
-1
lines changed

5 files changed

+840
-1
lines changed

apps/basket/package.json

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,17 @@
1010
"test:routes": "bun test src/routes/*.test.ts",
1111
"test:utils": "bun test src/utils/*.test.ts",
1212
"test:kafka": "bun test src/lib/kafka.test.ts src/lib/producer.test.ts",
13-
"test:coverage": "bun test --coverage"
13+
"test:coverage": "bun test --coverage",
14+
"test:stress": "bun test src/lib/kafka-stress.test.ts",
15+
"stress:light": "bun run src/lib/kafka-stress-cli.ts 1000 10 100 1",
16+
"stress:medium": "bun run src/lib/kafka-stress-cli.ts 5000 10 250 1",
17+
"stress:heavy": "bun run src/lib/kafka-stress-cli.ts 10000 10 500 1",
18+
"stress:extreme": "bun run src/lib/kafka-stress-cli.ts 25000 10 1000 1",
19+
"stress:burst": "bun run src/lib/kafka-stress-cli.ts 50000 1 1000 1",
20+
"stress:sustained": "bun run src/lib/kafka-stress-cli.ts 10000 30 500 1",
21+
"stress:concurrent": "bun run src/lib/kafka-stress-cli.ts 5000 10 250 5",
22+
"stress:monitor": "bun run src/lib/kafka-monitor.ts",
23+
"stress:run": "bun run src/lib/run-stress-test.ts"
1424
},
1525
"dependencies": {
1626
"@clickhouse/client": "catalog:",
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
#!/usr/bin/env bun
2+
3+
import { Kafka } from 'kafkajs';
4+
5+
const BROKER = process.env.KAFKA_BROKERS || 'localhost:9092';
6+
const TOPIC = process.env.KAFKA_TEST_TOPIC || 'stress-test-events';
7+
const GROUP_ID = `monitor-${Date.now()}`;
8+
9+
console.log('📊 Kafka/Redpanda Stress Test Monitor');
10+
console.log(` Broker: ${BROKER}`);
11+
console.log(` Topic: ${TOPIC}`);
12+
console.log('');
13+
14+
const kafka = new Kafka({
15+
clientId: 'basket-stress-monitor',
16+
brokers: [BROKER],
17+
});
18+
19+
const consumer = kafka.consumer({
20+
groupId: GROUP_ID,
21+
sessionTimeout: 60000,
22+
});
23+
24+
let messageCount = 0;
25+
let startTime: number | null = null;
26+
let lastUpdateTime = Date.now();
27+
let lastCount = 0;
28+
let minLatency = Number.POSITIVE_INFINITY;
29+
let maxLatency = 0;
30+
let totalLatency = 0;
31+
let latencyCount = 0;
32+
33+
const printStats = () => {
34+
if (!startTime) return;
35+
36+
const now = Date.now();
37+
const elapsed = (now - startTime) / 1000;
38+
const throughput = Math.floor(messageCount / elapsed);
39+
40+
// Calculate current throughput (last second)
41+
const currentThroughput = messageCount - lastCount;
42+
lastCount = messageCount;
43+
lastUpdateTime = now;
44+
45+
// Calculate average latency
46+
const avgLatency =
47+
latencyCount > 0 ? Math.floor(totalLatency / latencyCount) : 0;
48+
49+
console.clear();
50+
console.log('📊 Real-time Kafka/Redpanda Monitor\n');
51+
console.log('┌─────────────────────────────────────────┐');
52+
console.log('│ Message Statistics │');
53+
console.log('├─────────────────────────────────────────┤');
54+
console.log(`│ Total Messages: ${messageCount.toString().padStart(16)} │`);
55+
console.log(`│ Duration: ${elapsed.toFixed(1).padStart(11)}s │`);
56+
console.log(
57+
`│ Avg Throughput: ${throughput.toString().padStart(10)} msg/s │`
58+
);
59+
console.log(
60+
`│ Current Rate: ${currentThroughput.toString().padStart(10)} msg/s │`
61+
);
62+
console.log('├─────────────────────────────────────────┤');
63+
console.log('│ Latency Stats │');
64+
console.log('├─────────────────────────────────────────┤');
65+
console.log(
66+
`│ Avg Latency: ${avgLatency.toString().padStart(12)}ms │`
67+
);
68+
console.log(
69+
`│ Min Latency: ${minLatency === Number.POSITIVE_INFINITY ? 'N/A'.padStart(12) : `${minLatency}ms`.padStart(12)} │`
70+
);
71+
console.log(`│ Max Latency: ${maxLatency.toString().padStart(12)}ms │`);
72+
console.log('└─────────────────────────────────────────┘\n');
73+
74+
// Progress bar
75+
const barWidth = 40;
76+
const progress = Math.min(messageCount / 1000, 100);
77+
const filled = Math.floor((progress / 100) * barWidth);
78+
const empty = barWidth - filled;
79+
const bar = '█'.repeat(filled) + '░'.repeat(empty);
80+
console.log(`Progress: [${bar}] ${Math.floor(progress)}%\n`);
81+
82+
console.log('Press Ctrl+C to stop monitoring');
83+
};
84+
85+
const run = async () => {
86+
await consumer.connect();
87+
console.log('✅ Connected to Kafka/Redpanda');
88+
89+
await consumer.subscribe({ topic: TOPIC, fromBeginning: false });
90+
console.log(`✅ Subscribed to topic: ${TOPIC}`);
91+
console.log('⏳ Waiting for messages...\n');
92+
93+
// Update stats every second
94+
const statsInterval = setInterval(printStats, 1000);
95+
96+
await consumer.run({
97+
eachMessage: async ({ message }) => {
98+
if (!startTime) {
99+
startTime = Date.now();
100+
}
101+
102+
messageCount++;
103+
104+
// Calculate latency (time from message creation to consumption)
105+
if (message.value) {
106+
try {
107+
const event = JSON.parse(message.value.toString());
108+
if (event.timestamp) {
109+
const latency = Date.now() - event.timestamp;
110+
if (latency < 10000) {
111+
// Ignore outliers > 10s
112+
minLatency = Math.min(minLatency, latency);
113+
maxLatency = Math.max(maxLatency, latency);
114+
totalLatency += latency;
115+
latencyCount++;
116+
}
117+
}
118+
} catch {
119+
// Ignore parse errors
120+
}
121+
}
122+
},
123+
});
124+
125+
// Cleanup on exit
126+
process.on('SIGINT', async () => {
127+
console.log('\n\n⚠️ Stopping monitor...');
128+
clearInterval(statsInterval);
129+
await consumer.disconnect();
130+
console.log('✅ Disconnected');
131+
process.exit(0);
132+
});
133+
};
134+
135+
run().catch((error) => {
136+
console.error('❌ Monitor error:', error);
137+
process.exit(1);
138+
});
139+
Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
#!/usr/bin/env bun
2+
3+
import { CompressionTypes, Kafka } from 'kafkajs';
4+
import { randomUUID } from 'node:crypto';
5+
6+
const BROKER = process.env.KAFKA_BROKERS || 'localhost:9092';
7+
const TEST_TOPIC = process.env.KAFKA_TEST_TOPIC || 'stress-test-events';
8+
9+
// CLI arguments
10+
const args = process.argv.slice(2);
11+
const config = {
12+
messagesPerSecond: Number.parseInt(args[0]) || 10000,
13+
durationSeconds: Number.parseInt(args[1]) || 10,
14+
batchSize: Number.parseInt(args[2]) || 500,
15+
numProducers: Number.parseInt(args[3]) || 1,
16+
};
17+
18+
console.log('🔧 Configuration:');
19+
console.log(` Broker: ${BROKER}`);
20+
console.log(` Topic: ${TEST_TOPIC}`);
21+
console.log(` Messages/sec: ${config.messagesPerSecond}`);
22+
console.log(` Duration: ${config.durationSeconds}s`);
23+
console.log(` Batch size: ${config.batchSize}`);
24+
console.log(` Concurrent producers: ${config.numProducers}`);
25+
console.log('');
26+
27+
/**
28+
* Generate a realistic analytics event payload
29+
*/
30+
const generateEvent = (index: number) => {
31+
const eventId = randomUUID();
32+
const sessionId = randomUUID();
33+
const anonymousId = randomUUID();
34+
const clientId = `client-${Math.floor(Math.random() * 100)}`;
35+
36+
return {
37+
id: randomUUID(),
38+
client_id: clientId,
39+
event_name: 'pageview',
40+
anonymous_id: anonymousId,
41+
time: Date.now(),
42+
session_id: sessionId,
43+
event_type: 'track',
44+
event_id: eventId,
45+
session_start_time: Date.now() - Math.random() * 300000,
46+
timestamp: Date.now(),
47+
referrer: 'https://google.com',
48+
url: `https://example.com/page-${index % 100}`,
49+
path: `/page-${index % 100}`,
50+
title: `Page ${index % 100}`,
51+
ip: `192.168.${Math.floor(Math.random() * 255)}.${Math.floor(Math.random() * 255)}`,
52+
user_agent:
53+
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
54+
browser_name: 'Chrome',
55+
browser_version: '120.0.0',
56+
os_name: 'Windows',
57+
os_version: '10',
58+
device_type: 'desktop',
59+
device_brand: '',
60+
device_model: '',
61+
country: 'US',
62+
region: 'CA',
63+
city: 'San Francisco',
64+
screen_resolution: '1920x1080',
65+
viewport_size: '1920x1080',
66+
language: 'en-US',
67+
timezone: 'America/Los_Angeles',
68+
connection_type: '4g',
69+
rtt: 50,
70+
downlink: 10,
71+
time_on_page: Math.floor(Math.random() * 60000),
72+
scroll_depth: Math.floor(Math.random() * 100),
73+
interaction_count: Math.floor(Math.random() * 50),
74+
page_count: 1,
75+
utm_source: '',
76+
utm_medium: '',
77+
utm_campaign: '',
78+
utm_term: '',
79+
utm_content: '',
80+
load_time: Math.floor(Math.random() * 3000),
81+
dom_ready_time: Math.floor(Math.random() * 2000),
82+
dom_interactive: Math.floor(Math.random() * 1500),
83+
ttfb: Math.floor(Math.random() * 500),
84+
connection_time: Math.floor(Math.random() * 100),
85+
render_time: Math.floor(Math.random() * 1000),
86+
redirect_time: 0,
87+
domain_lookup_time: Math.floor(Math.random() * 50),
88+
properties: '{}',
89+
created_at: Date.now(),
90+
};
91+
};
92+
93+
/**
94+
* Create a Kafka producer optimized for maximum throughput
95+
*/
96+
const createProducer = async () => {
97+
const kafka = new Kafka({
98+
clientId: `basket-stress-test-${randomUUID()}`,
99+
brokers: [BROKER],
100+
});
101+
102+
const producer = kafka.producer({
103+
allowAutoTopicCreation: true,
104+
maxInFlightRequests: 5,
105+
idempotent: false,
106+
});
107+
108+
await producer.connect();
109+
return producer;
110+
};
111+
112+
/**
113+
* Send a batch of messages (fire-and-forget for maximum throughput)
114+
*/
115+
const sendBatch = (producer: any, batchSize: number, startIndex: number) => {
116+
const messages = [];
117+
for (let i = 0; i < batchSize; i++) {
118+
const event = generateEvent(startIndex + i);
119+
messages.push({
120+
key: event.client_id,
121+
value: JSON.stringify(event),
122+
});
123+
}
124+
125+
return producer.send({
126+
topic: TEST_TOPIC,
127+
messages,
128+
compression: CompressionTypes.GZIP,
129+
});
130+
};
131+
132+
/**
133+
* Run stress test for a single producer (fire-and-forget for max throughput)
134+
*/
135+
const runProducerStressTest = async (
136+
producerId: number,
137+
messagesPerProducer: number,
138+
targetMessagesPerSecond: number,
139+
durationSeconds: number
140+
) => {
141+
const producer = await createProducer();
142+
console.log(`✅ Producer ${producerId} connected`);
143+
144+
const batchSize = config.batchSize;
145+
const promises = [];
146+
147+
// Fire off all batches as fast as possible (fire-and-forget)
148+
// No rate limiting - let Kafka handle the backpressure
149+
for (let i = 0; i < messagesPerProducer; i += batchSize) {
150+
const batchPromise = sendBatch(producer, batchSize, i);
151+
promises.push(batchPromise);
152+
}
153+
154+
// Wait for all messages to be sent
155+
await Promise.all(promises);
156+
await producer.disconnect();
157+
158+
return messagesPerProducer;
159+
};
160+
161+
/**
162+
* Main stress test execution
163+
*/
164+
const runStressTest = async () => {
165+
const totalMessages =
166+
config.messagesPerSecond * config.durationSeconds * config.numProducers;
167+
const messagesPerProducer = Math.floor(totalMessages / config.numProducers);
168+
169+
console.log('🚀 Starting Kafka/Redpanda stress test...');
170+
console.log(`📊 Total messages: ${totalMessages.toLocaleString()}`);
171+
console.log(`📦 Messages per producer: ${messagesPerProducer.toLocaleString()}`);
172+
console.log('');
173+
174+
const startTime = Date.now();
175+
176+
// Create progress tracker
177+
const progressInterval = setInterval(() => {
178+
const elapsed = (Date.now() - startTime) / 1000;
179+
console.log(`⏱️ Elapsed time: ${elapsed.toFixed(1)}s`);
180+
}, 2000);
181+
182+
// Run all producers concurrently
183+
const producerTasks = Array.from({ length: config.numProducers }, (_, idx) =>
184+
runProducerStressTest(
185+
idx + 1,
186+
messagesPerProducer,
187+
config.messagesPerSecond,
188+
config.durationSeconds
189+
)
190+
);
191+
192+
const results = await Promise.all(producerTasks);
193+
clearInterval(progressInterval);
194+
195+
const endTime = Date.now();
196+
const duration = (endTime - startTime) / 1000;
197+
const totalSent = results.reduce((sum, count) => sum + count, 0);
198+
const actualThroughput = Math.floor(totalSent / duration);
199+
200+
console.log('');
201+
console.log('📈 Results:');
202+
console.log(` Total messages sent: ${totalSent.toLocaleString()}`);
203+
console.log(` Duration: ${duration.toFixed(2)}s`);
204+
console.log(` Throughput: ${actualThroughput.toLocaleString()} msg/sec`);
205+
console.log(
206+
` Avg per producer: ${Math.floor(actualThroughput / config.numProducers).toLocaleString()} msg/sec`
207+
);
208+
console.log('');
209+
console.log('✅ Stress test completed successfully!');
210+
};
211+
212+
// Handle graceful shutdown
213+
process.on('SIGINT', () => {
214+
console.log('\n⚠️ Interrupted, shutting down...');
215+
process.exit(0);
216+
});
217+
218+
process.on('SIGTERM', () => {
219+
console.log('\n⚠️ Terminated, shutting down...');
220+
process.exit(0);
221+
});
222+
223+
// Run the stress test
224+
runStressTest()
225+
.then(() => {
226+
process.exit(0);
227+
})
228+
.catch((error) => {
229+
console.error('❌ Stress test failed:', error);
230+
process.exit(1);
231+
});
232+

0 commit comments

Comments
 (0)