Skip to content

Commit 5ad049f

Browse files
committed
Enable s/publish
1 parent 926fca4 commit 5ad049f

File tree

3 files changed

+162
-53
lines changed

3 files changed

+162
-53
lines changed

js/lib/metrics.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ function writeFinalResults(
150150
rttHistogram,
151151
perSecondStats
152152
) {
153-
const duration = (end - start);
153+
const duration = (end - start)/1000;
154154
const messageRate = totalMessages / duration;
155155

156156
console.log('#################################################');

js/lib/publisher.js

Lines changed: 37 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ async function publisherRoutine(
77
dataSize,
88
client,
99
isRunningRef,
10-
totalMessagesRef
10+
totalMessagesRef,
11+
rateLimiter
1112
) {
1213
if (verbose) {
1314
console.log(
@@ -18,25 +19,46 @@ async function publisherRoutine(
1819
}
1920

2021
const payload = !measureRTT ? 'A'.repeat(dataSize) : '';
22+
const duplicatedClient = client.duplicate(); // Create a duplicated connection for this publisher
2123

22-
while (isRunningRef.value) {
23-
let msg = payload;
24-
if (measureRTT) {
25-
msg = Date.now();
26-
}
24+
try {
25+
while (isRunningRef.value) {
26+
for (const channel of channels) {
27+
try {
28+
// Apply rate limiting if configured
29+
if (rateLimiter) {
30+
await rateLimiter.removeTokens(1);
31+
}
32+
33+
let msg = payload;
34+
if (measureRTT) {
35+
msg = Date.now().toString();
36+
}
2737

28-
for (const channel of channels) {
29-
try {
30-
if (mode === 'spublish') {
31-
await client.spublish(channel, msg);
32-
} else {
33-
await client.publish(channel, msg);
38+
if (mode === 'spublish') {
39+
await duplicatedClient.spublish(channel, msg);
40+
} else {
41+
await duplicatedClient.publish(channel, msg);
42+
}
43+
totalMessagesRef.value++;
44+
} catch (err) {
45+
console.error(`Error publishing to channel ${channel}:`, err);
3446
}
35-
totalMessagesRef.value++;
36-
} catch (err) {
37-
console.error(`Error publishing to channel ${channel}:`, err);
3847
}
3948
}
49+
} finally {
50+
// Clean shutdown - disconnect the client
51+
if (verbose) {
52+
console.log(`Publisher ${clientName} shutting down...`);
53+
}
54+
try {
55+
duplicatedClient.disconnect();
56+
if (verbose) {
57+
console.log(`Publisher ${clientName} disconnected successfully`);
58+
}
59+
} catch (err) {
60+
console.error(`Error disconnecting publisher ${clientName}:`, err);
61+
}
4062
}
4163
}
4264

js/lib/redisManager.js

Lines changed: 124 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,12 @@ async function runBenchmark(argv) {
5151
let clients = [];
5252
let nodeAddresses = [];
5353
let slotClientMap = new Map();
54+
let cluster = null;
5455
console.log(`Using ${argv['slot-refresh-interval']} slot-refresh-interval`);
5556
console.log(`Using ${argv['redis-timeout']} redis-timeout`);
5657

5758
if (argv['oss-cluster-api-distribute-subscribers']) {
58-
const cluster = new Redis.Cluster(
59+
cluster = new Redis.Cluster(
5960
[
6061
{
6162
host: argv.host,
@@ -98,10 +99,12 @@ async function runBenchmark(argv) {
9899
}
99100

100101
nodeAddresses.push(`${ip}:${port}`);
101-
console.log(`Cluster mode - using ${nodeAddresses.length} unique nodes`);
102+
clients.push(client);
102103
}
104+
console.log(`Cluster mode - using ${nodeAddresses.length} unique nodes`);
103105
} else {
104106
const client = new Redis(redisOptions);
107+
clients.push(client);
105108
// Redis Cluster hash slots range: 0 - 16383
106109
for (let slot = 0; slot <= 16383; slot++) {
107110
slotClientMap.set(slot, client);
@@ -114,7 +117,7 @@ async function runBenchmark(argv) {
114117
const totalChannels = argv['channel-maximum'] - argv['channel-minimum'] + 1;
115118
const totalSubscriptions = totalChannels * argv['subscribers-per-channel'];
116119
const totalExpectedMessages = totalSubscriptions * argv.messages;
117-
const subscriptionsPerNode = Math.ceil(totalSubscriptions / clients.length);
120+
const subscriptionsPerNode = Math.ceil(totalSubscriptions / nodeAddresses.length);
118121

119122
if (argv['pool-size'] === 0) {
120123
redisOptions.connectionPoolSize = subscriptionsPerNode;
@@ -131,7 +134,60 @@ async function runBenchmark(argv) {
131134

132135
const promises = [];
133136

134-
if (argv.mode.includes('subscribe')) {
137+
138+
if (argv.mode.includes('publish')) {
139+
// Run publishers
140+
totalPublishersRef.value = argv.clients;
141+
console.log(`Starting ${argv.clients} publishers in ${argv.mode} mode`);
142+
143+
for (let clientId = 1; clientId <= argv.clients; clientId++) {
144+
const channels = [];
145+
const numChannels = pickChannelCount(argv);
146+
147+
for (let i = 0; i < numChannels; i++) {
148+
const channelId = randomChannel(argv);
149+
const channelName = `${argv['subscriber-prefix']}${channelId}`;
150+
channels.push(channelName);
151+
}
152+
153+
const publisherName = `publisher#${clientId}`;
154+
let client;
155+
156+
if (argv.mode === 'spublish' && argv['oss-cluster-api-distribute-subscribers']) {
157+
// For sharded publish in cluster mode, get the appropriate client for the first channel
158+
const slot = clusterKeySlot(channels[0]);
159+
client = slotClientMap.get(slot);
160+
} else {
161+
// For regular publish or non-cluster, round-robin assignment
162+
client = clients[clientId % clients.length];
163+
}
164+
165+
if (argv.verbose) {
166+
console.log(`Publisher ${clientId} targeting channels ${channels}`);
167+
}
168+
169+
promises.push(
170+
publisherRoutine(
171+
publisherName,
172+
channels,
173+
argv.mode,
174+
argv['measure-rtt-latency'],
175+
argv.verbose,
176+
argv['data-size'],
177+
client,
178+
isRunningRef,
179+
totalMessagesRef
180+
)
181+
);
182+
183+
totalConnectsRef.value++;
184+
185+
if (clientId % 100 === 0) {
186+
console.log(`Created ${clientId} publishers so far.`);
187+
}
188+
}
189+
} else if (argv.mode.includes('subscribe')) {
190+
// Only run subscribers
135191
if (argv['subscribers-placement-per-channel'] === 'dense') {
136192
for (let clientId = 1; clientId <= argv.clients; clientId++) {
137193
const channels = [];
@@ -175,7 +231,7 @@ async function runBenchmark(argv) {
175231
totalSubscribedRef,
176232
totalConnectsRef,
177233
argv.verbose,
178-
argv.cliens
234+
argv.clients
179235
)
180236
);
181237
}
@@ -185,39 +241,70 @@ async function runBenchmark(argv) {
185241
process.exit(1);
186242
}
187243

188-
const { startTime, now, perSecondStats } = await updateCLI(
189-
argv['client-update-tick'],
190-
argv.messages > 0 ? totalExpectedMessages : 0,
191-
argv['test-time'],
192-
argv['measure-rtt-latency'],
193-
argv.mode,
194-
isRunningRef,
195-
totalMessagesRef,
196-
totalConnectsRef,
197-
totalSubscribedRef,
198-
totalPublishersRef,
199-
messageRateTs,
200-
rttAccumulator,
201-
rttHistogram,
202-
() => {} // no-op, outputResults is handled after await
203-
);
244+
try {
245+
const { startTime, now, perSecondStats } = await updateCLI(
246+
argv['client-update-tick'],
247+
argv.messages > 0 ? totalExpectedMessages : 0,
248+
argv['test-time'],
249+
argv['measure-rtt-latency'],
250+
argv.mode,
251+
isRunningRef,
252+
totalMessagesRef,
253+
totalConnectsRef,
254+
totalSubscribedRef,
255+
totalPublishersRef,
256+
messageRateTs,
257+
rttAccumulator,
258+
rttHistogram,
259+
() => {} // no-op, outputResults is handled after await
260+
);
204261

205-
// Wait for all routines to finish
206-
await Promise.all(promises);
207-
208-
// THEN output final results
209-
writeFinalResults(
210-
startTime,
211-
now,
212-
argv,
213-
argv.mode,
214-
totalMessagesRef.value,
215-
totalSubscribedRef.value,
216-
messageRateTs,
217-
rttAccumulator,
218-
rttHistogram,
219-
perSecondStats
220-
);
262+
// Wait for all routines to finish
263+
console.log('Waiting for all clients to shut down cleanly...');
264+
await Promise.all(promises);
265+
266+
// THEN output final results
267+
writeFinalResults(
268+
startTime,
269+
now,
270+
argv,
271+
argv.mode,
272+
totalMessagesRef.value,
273+
totalSubscribedRef.value,
274+
messageRateTs,
275+
rttAccumulator,
276+
rttHistogram,
277+
perSecondStats
278+
);
279+
} finally {
280+
// Clean shutdown of primary clients
281+
console.log('Shutting down primary Redis connections...');
282+
283+
// Close cluster client if it exists
284+
if (cluster) {
285+
try {
286+
await cluster.quit();
287+
console.log('Cluster client disconnected successfully');
288+
} catch (err) {
289+
console.error('Error disconnecting cluster client:', err);
290+
}
291+
}
292+
293+
// Close all standalone clients
294+
const disconnectPromises = clients.map(async (client, i) => {
295+
try {
296+
await client.quit();
297+
if (argv.verbose) {
298+
console.log(`Node client #${i} disconnected successfully`);
299+
}
300+
} catch (err) {
301+
console.error(`Error disconnecting node client #${i}:`, err);
302+
}
303+
});
304+
305+
await Promise.all(disconnectPromises);
306+
console.log('All Redis connections closed');
307+
}
221308

222309
// cleanly exit the process once done
223310
process.exit(0);

0 commit comments

Comments
 (0)