Skip to content

Commit 788a972

Browse files
committed
Update comments and fix small issues
1 parent 938df93 commit 788a972

File tree

4 files changed

+21
-7
lines changed

4 files changed

+21
-7
lines changed

lib/kafkajs/_common.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ const CompatibilityErrorMessages = Object.freeze({
188188
socketFactory: () =>
189189
"The socketFactory property is not supported.",
190190
logLevelName: (setLevel) =>
191-
"The log level must be one of: " + logLevel.keys().join(", ") + ", was " + setLevel,
191+
"The log level must be one of: " + Object.keys(logLevel).join(", ") + ", was " + setLevel,
192192
reauthenticationThreshold: () =>
193193
"Reauthentication threshold cannot be set, and reauthentication is automated when 80% of connections.max.reauth.ms is reached.",
194194
unsupportedKey: (key) =>

lib/kafkajs/_consumer.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1069,7 +1069,7 @@ class Consumer {
10691069
* Otherwise, they will be committed when commitOffsets is called without arguments.
10701070
*
10711071
* enable.auto.offset.store must be set to false to use this API.
1072-
* @param {import("../../types/kafkajs").TopicPartitionOffsetAndMetadata[]?} topicPartitions
1072+
* @param {import("../../types/kafkajs").TopicPartitionOffset[]?} topicPartitions
10731073
*/
10741074
storeOffsets(topicPartitions) {
10751075
if (this.#state !== ConsumerState.CONNECTED) {
@@ -1099,7 +1099,7 @@ class Consumer {
10991099

11001100
/**
11011101
* Commit offsets for the given topic partitions. If topic partitions are not specified, commits all offsets.
1102-
* @param {import("../../types/kafkajs").TopicPartitionOffsetAndMetadata[]?} topicPartitions
1102+
* @param {import("../../types/kafkajs").TopicPartitionOffset[]?} topicPartitions
11031103
* @returns {Promise<void>} a promise that resolves when the offsets have been committed.
11041104
*/
11051105
async commitOffsets(topicPartitions = null) {

lib/kafkajs/_producer.js

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -501,8 +501,19 @@ class Producer {
501501
}
502502

503503
/**
504-
* send(record: ProducerRecord): Promise<RecordMetadata[]>
504+
* Check if there is an ongoing transaction.
505+
*
506+
* NOTE: Since Producer itself represents a transaction, and there is no distinct
507+
* type for a transaction, this method exists on the producer.
508+
* @returns {boolean} true if there is an ongoing transaction, false otherwise.
509+
*/
510+
isActive() {
511+
return this.#ongoingTransaction;
512+
}
505513

514+
/**
515+
* Sends a record of messages to a specific topic.
516+
*
506517
* @param {import('../../types/kafkajs').ProducerRecord} sendOptions - The record to send. The keys `acks`, `timeout`, and `compression` are not used, and should not be set, rather, they should be set in the global config.
507518
* @returns {Promise<import("../../types/kafkajs").RecordMetadata[]>} Resolves with the record metadata for the messages.
508519
*/
@@ -572,15 +583,15 @@ class Producer {
572583
const topicPartitionRecordMetadata = new Map();
573584
for (const recordMetadata of recordMetadataArr) {
574585
const key = `${recordMetadata.topicName},${recordMetadata.partition}`;
575-
if (recordMetadata.baseOffset == null || !topicPartitionRecordMetadata.has(key)) {
586+
if (recordMetadata.baseOffset === null || !topicPartitionRecordMetadata.has(key)) {
576587
topicPartitionRecordMetadata.set(key, recordMetadata);
577588
continue;
578589
}
579590

580591
const currentRecordMetadata = topicPartitionRecordMetadata.get(key);
581592

582593
// Don't overwrite a null baseOffset
583-
if (currentRecordMetadata.baseOffset == null) {
594+
if (currentRecordMetadata.baseOffset === null) {
584595
continue;
585596
}
586597

@@ -598,7 +609,9 @@ class Producer {
598609
}
599610

600611
/**
601-
* sendBatch(batch: ProducerBatch): Promise<RecordMetadata[]>
612+
* Sends a record of messages to various topics.
613+
*
614+
* NOTE: This method is identical to calling send() repeatedly and waiting on all the return values together.
602615
* @param {import('../../types/kafkajs').ProducerBatch} sendOptions - The record to send. The keys `acks`, `timeout`, and `compression` are not used, and should not be set, rather, they should be set in the global config.
603616
* @returns {Promise<import("../../types/kafkajs").RecordMetadata[]>} Resolves with the record metadata for the messages.
604617
*/

test/promisified/producer/concurrentTransaction.spec.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ describe('Producer > Transactional producer', () => {
3434
producer1 = newProducer();
3535
await producer1.connect();
3636
const transaction1 = await producer1.transaction();
37+
expect(transaction1.isActive()).toBe(true);
3738
await transaction1.send({ topic: topicName, messages: [message] });
3839

3940
// Producer 2 starts with the same transactional id to cause the concurrent transactions error

0 commit comments

Comments
 (0)