Skip to content

Commit c8c3130

Browse files
committed
Don't modify passed messages in sendOptions
1 parent 6782b07 commit c8c3130

File tree

2 files changed

+16
-11
lines changed

2 files changed

+16
-11
lines changed

ci/tests/run_perf_test.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ async function main() {
9696

9797
await runCommand(`MODE=${mode} node performance-consolidated.js --create-topics`);
9898

99-
console.log(`Waiting 10s ms after topic creation before starting producer and consumers...`);
99+
console.log(`Waiting 10s after topic creation before starting producer and consumers...`);
100100
await new Promise(resolve => setTimeout(resolve, 10000));
101101

102102
console.log(`Starting producer and consumers...`);

lib/kafkajs/_producer.js

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -622,27 +622,32 @@ class Producer {
622622
const msgPromises = [];
623623
for (let i = 0; i < sendOptions.messages.length; i++) {
624624
const msg = sendOptions.messages[i];
625-
626-
if (!Object.hasOwn(msg, "partition") || msg.partition === null) {
627-
msg.partition = -1;
625+
let key = msg.key;
626+
let value = msg.value;
627+
let partition = msg.partition;
628+
let headers = msg.headers;
629+
let timestamp = msg.timestamp;
630+
631+
if (partition === undefined || msg.partition === null) {
632+
partition = -1;
628633
}
629634

630-
if (typeof msg.value === 'string') {
631-
msg.value = Buffer.from(msg.value);
635+
if (typeof value === 'string') {
636+
value = Buffer.from(value);
632637
}
633638

634-
if (Object.hasOwn(msg, "timestamp") && msg.timestamp) {
635-
msg.timestamp = Number(msg.timestamp);
639+
if (timestamp) {
640+
timestamp = Number(timestamp);
636641
} else {
637-
msg.timestamp = 0;
642+
timestamp = 0;
638643
}
639644

640-
msg.headers = convertToRdKafkaHeaders(msg.headers);
645+
headers = convertToRdKafkaHeaders(headers);
641646

642647
msgPromises.push(new Promise((resolve, reject) => {
643648
const opaque = { resolve, reject };
644649
try {
645-
this.#internalClient.produce(sendOptions.topic, msg.partition, msg.value, msg.key, msg.timestamp, opaque, msg.headers);
650+
this.#internalClient.produce(sendOptions.topic, partition, value, key, timestamp, opaque, headers);
646651
} catch (err) {
647652
reject(err);
648653
}

0 commit comments

Comments
 (0)