Skip to content

Commit 34cfb67

Browse files
authored
List offsets api implemented (#156)
* describeTopics api implemented * List offsets api implemented * reverting some indentation changes * comment * relative changes * reverting * requested changes * requested changes * Change log changes
1 parent 72abfd2 commit 34cfb67

File tree

16 files changed

+761
-9
lines changed

16 files changed

+761
-9
lines changed

CHANGELOG.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,12 @@
1+
# confluent-kafka-javascript v1.0.0
2+
3+
v1.0.0 is a feature release. It is supported for all usage.
4+
5+
## Enhancements
6+
7+
1. Add support for an Admin API to fetch topic offsets(#156).
8+
9+
110
# confluent-kafka-javascript v0.6.1
211

312
v0.6.1 is a limited availability maintenance release. It is supported for all usage.

MIGRATION.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,8 @@ The admin-client only has support for a limited subset of methods, with more to
337337
and `operationTimeout` option.
338338
* The `fetchTopicMetadata` method is supported with additional `timeout`
339339
and `includeAuthorizedOperations` option. Fetching for all topics is not advisable.
340+
* The `fetchTopicOffsets` method is supported with additional `timeout`
341+
and `isolationLevel` option.
340342
341343
### Using the Schema Registry
342344
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
const { Kafka, IsolationLevel } = require('@confluentinc/kafka-javascript').KafkaJS;
2+
const { parseArgs } = require('node:util');
3+
4+
async function fetchOffsets() {
5+
// Parse command-line arguments
6+
const args = parseArgs({
7+
allowPositionals: true,
8+
options: {
9+
'bootstrap-servers': {
10+
type: 'string',
11+
short: 'b',
12+
default: 'localhost:9092',
13+
},
14+
'timeout': {
15+
type: 'string',
16+
short: 't',
17+
default: '5000',
18+
},
19+
'isolation-level': {
20+
type: 'string',
21+
short: 'i',
22+
default: '0', // Default to '0' for read_uncommitted
23+
},
24+
},
25+
});
26+
27+
const {
28+
'bootstrap-servers': bootstrapServers,
29+
timeout,
30+
'isolation-level': isolationLevel,
31+
} = args.values;
32+
33+
const [topic] = args.positionals;
34+
35+
if (!topic) {
36+
console.error('Topic name is required');
37+
process.exit(1);
38+
}
39+
40+
// Determine the isolation level
41+
let isolationLevelValue;
42+
if (isolationLevel === '0') {
43+
isolationLevelValue = IsolationLevel.READ_UNCOMMITTED;
44+
} else if (isolationLevel === '1') {
45+
isolationLevelValue = IsolationLevel.READ_COMMITTED;
46+
} else {
47+
console.error('Invalid isolation level. Use 0 for READ_UNCOMMITTED or 1 for READ_COMMITTED.');
48+
process.exit(1);
49+
}
50+
51+
const kafka = new Kafka({
52+
kafkaJS: {
53+
brokers: [bootstrapServers],
54+
},
55+
});
56+
57+
const admin = kafka.admin();
58+
await admin.connect();
59+
60+
try {
61+
// Fetch offsets for the specified topic
62+
const offsets = await admin.fetchTopicOffsets(
63+
topic,
64+
{
65+
isolationLevel: isolationLevelValue, // Use determined isolation level
66+
timeout: Number(timeout), // Convert timeout to a number
67+
});
68+
69+
console.log(`Offsets for topic "${topic}":`, JSON.stringify(offsets, null, 2));
70+
} catch (err) {
71+
console.error('Error fetching topic offsets:', err);
72+
} finally {
73+
await admin.disconnect();
74+
}
75+
}
76+
77+
fetchOffsets();

lib/admin.js

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,52 @@ const AclOperationTypes = Object.seal({
3838
IDEMPOTENT_WRITE: 12,
3939
});
4040

41+
/**
42+
* A list of isolation levels.
43+
* @enum {number}
44+
* @readonly
45+
* @memberof RdKafka
46+
*/
47+
const IsolationLevel = {
48+
READ_UNCOMMITTED: 0,
49+
READ_COMMITTED: 1,
50+
};
51+
52+
/**
53+
* Define an OffsetSpec to list offsets at.
54+
* Either a timestamp can be used, or else, one of the special, pre-defined values
55+
* (EARLIEST, LATEST, MAX_TIMESTAMP) can be used while passing an OffsetSpec to listOffsets.
56+
* @param {number} timestamp - The timestamp to list offsets at.
57+
* @constructor
58+
*/
59+
function OffsetSpec(timestamp) {
60+
this.timestamp = timestamp;
61+
}
62+
63+
/**
64+
* Specific OffsetSpec value used to retrieve the offset with the largest timestamp of a partition
65+
* as message timestamps can be specified client side this may not match
66+
* the log end offset returned by OffsetSpec.LATEST.
67+
*/
68+
OffsetSpec.MAX_TIMESTAMP = new OffsetSpec(-3);
69+
70+
/**
71+
* Special OffsetSpec value denoting the earliest offset for a topic partition.
72+
*/
73+
OffsetSpec.EARLIEST = new OffsetSpec(-2);
74+
75+
/**
76+
* Special OffsetSpec value denoting the latest offset for a topic partition.
77+
*/
78+
OffsetSpec.LATEST = new OffsetSpec(-1);
79+
4180
module.exports = {
4281
create: createAdminClient,
4382
createFrom: createAdminClientFrom,
4483
ConsumerGroupStates,
4584
AclOperationTypes,
85+
IsolationLevel: Object.freeze(IsolationLevel),
86+
OffsetSpec,
4687
};
4788

4889
var Client = require('./client');
@@ -609,3 +650,45 @@ AdminClient.prototype.describeTopics = function (topics, options, cb) {
609650
}
610651
});
611652
};
653+
654+
/**
655+
* List offsets for topic partition(s).
656+
*
657+
* @param {Array<{topic: string, partition: number, offset: OffsetSpec}>} partitions - The list of partitions to fetch offsets for.
658+
* @param {any?} options
659+
* @param {number?} options.timeout - The request timeout in milliseconds.
660+
* May be unset (default: 5000)
661+
* @param {RdKafka.IsolationLevel?} options.isolationLevel - The isolation level for reading the offsets.
662+
* (default: READ_UNCOMMITTED)
663+
* @param {function} cb - The callback to be executed when finished.
664+
*/
665+
AdminClient.prototype.listOffsets = function (partitions, options, cb) {
666+
if (!this._isConnected) {
667+
throw new Error('Client is disconnected');
668+
}
669+
670+
if(!options) {
671+
options = {};
672+
}
673+
674+
if (!Object.hasOwn(options, 'timeout')) {
675+
options.timeout = 5000;
676+
}
677+
678+
if(!Object.hasOwn(options, 'isolationLevel')) {
679+
options.isolationLevel = IsolationLevel.READ_UNCOMMITTED;
680+
}
681+
682+
this._client.listOffsets(partitions, options, function (err, offsets) {
683+
if (err) {
684+
if (cb) {
685+
cb(LibrdKafkaError.create(err));
686+
}
687+
return;
688+
}
689+
690+
if (cb) {
691+
cb(null, offsets);
692+
}
693+
});
694+
};

lib/kafkajs/_admin.js

Lines changed: 122 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
const { OffsetSpec } = require('../admin');
12
const RdKafka = require('../rdkafka');
23
const { kafkaJSToRdKafkaConfig,
34
createKafkaJsErrorFromLibRdKafkaError,
@@ -10,6 +11,7 @@ const { kafkaJSToRdKafkaConfig,
1011
severityToLogLevel,
1112
} = require('./_common');
1213
const error = require('./_error');
14+
const { hrtime } = require('process');
1315

1416
/**
1517
* NOTE: The Admin client is currently in an experimental state with many
@@ -666,10 +668,129 @@ class Admin {
666668
});
667669
});
668670
}
671+
672+
/**
673+
* List offsets for the specified topic partition(s).
674+
*
675+
* @param {string} topic - The topic to fetch offsets for.
676+
* @param {object?} options
677+
* @param {number?} options.timeout - The request timeout in milliseconds.
678+
* May be unset (default: 5000)
679+
* @param {KafkaJS.IsolationLevel?} options.isolationLevel - The isolation level for reading the offsets.
680+
* (default: READ_UNCOMMITTED)
681+
*
682+
* @returns {Promise<Array<{partition: number, offset: string, high: string; low: string}>>}
683+
*/
684+
async fetchTopicOffsets(topic, options = {}) {
685+
if (this.#state !== AdminState.CONNECTED) {
686+
throw new error.KafkaJSError("Admin client is not connected.", { code: error.ErrorCodes.ERR__STATE });
687+
}
688+
689+
if (!Object.hasOwn(options, 'timeout')) {
690+
options.timeout = 5000;
691+
}
692+
693+
let topicData;
694+
let startTime, endTime, timeTaken;
695+
696+
try {
697+
// Measure time taken for fetchTopicMetadata
698+
startTime = hrtime.bigint();
699+
topicData = await this.fetchTopicMetadata({ topics: [topic], timeout: options.timeout });
700+
endTime = hrtime.bigint();
701+
timeTaken = Number(endTime - startTime) / 1e6; // Convert nanoseconds to milliseconds
702+
703+
// Adjust timeout for the next request
704+
options.timeout -= timeTaken;
705+
if (options.timeout <= 0) {
706+
throw new error.KafkaJSError("Timeout exceeded while fetching topic metadata.", { code: error.ErrorCodes.ERR__TIMED_OUT });
707+
}
708+
} catch (err) {
709+
throw new createKafkaJsErrorFromLibRdKafkaError(err);
710+
}
711+
712+
const partitionIds = topicData.flatMap(topic =>
713+
topic.partitions.map(partition => partition.partitionId)
714+
);
715+
716+
const topicPartitionOffsetsLatest = partitionIds.map(partitionId => ({
717+
topic,
718+
partition: partitionId,
719+
offset: OffsetSpec.LATEST
720+
}));
721+
722+
const topicPartitionOffsetsEarliest = partitionIds.map(partitionId => ({
723+
topic,
724+
partition: partitionId,
725+
offset: OffsetSpec.EARLIEST
726+
}));
727+
728+
try {
729+
// Measure time taken for listOffsets (latest)
730+
startTime = hrtime.bigint();
731+
const latestOffsets = await this.#listOffsets(topicPartitionOffsetsLatest, options);
732+
endTime = hrtime.bigint();
733+
timeTaken = Number(endTime - startTime) / 1e6; // Convert nanoseconds to milliseconds
734+
735+
// Adjust timeout for the next request
736+
options.timeout -= timeTaken;
737+
if (options.timeout <= 0) {
738+
throw new error.KafkaJSError("Timeout exceeded while fetching latest offsets.", { code: error.ErrorCodes.ERR__TIMED_OUT });
739+
}
740+
741+
// Measure time taken for listOffsets (earliest)
742+
startTime = hrtime.bigint();
743+
const earliestOffsets = await this.#listOffsets(topicPartitionOffsetsEarliest, options);
744+
endTime = hrtime.bigint();
745+
timeTaken = Number(endTime - startTime) / 1e6; // Convert nanoseconds to milliseconds
746+
747+
// Adjust timeout for the next request
748+
options.timeout -= timeTaken;
749+
if (options.timeout <= 0) {
750+
throw new error.KafkaJSError("Timeout exceeded while fetching earliest offsets.", { code: error.ErrorCodes.ERR__TIMED_OUT });
751+
}
752+
753+
const combinedResults = partitionIds.map(partitionId => {
754+
const latest = latestOffsets.find(offset => offset.partition === partitionId);
755+
const earliest = earliestOffsets.find(offset => offset.partition === partitionId);
756+
757+
return {
758+
partition: partitionId,
759+
offset: latest.offset.toString(),
760+
high: latest.offset.toString(),
761+
low: earliest.offset.toString()
762+
};
763+
});
764+
765+
return combinedResults;
766+
} catch (err) {
767+
throw createKafkaJsErrorFromLibRdKafkaError(err);
768+
}
769+
}
770+
771+
#listOffsets(partitionOffsets, options) {
772+
return new Promise((resolve, reject) => {
773+
this.#internalClient.listOffsets(partitionOffsets, options, (err, offsets) => {
774+
if (err) {
775+
reject(createKafkaJsErrorFromLibRdKafkaError(err));
776+
} else {
777+
resolve(offsets);
778+
}
779+
});
780+
});
781+
}
669782
}
670783

671784
module.exports = {
672785
Admin,
673786
ConsumerGroupStates: RdKafka.AdminClient.ConsumerGroupStates,
674-
AclOperationTypes: RdKafka.AdminClient.AclOperationTypes
787+
AclOperationTypes: RdKafka.AdminClient.AclOperationTypes,
788+
/**
789+
* A list of isolation levels.
790+
* @enum {number}
791+
* @readonly
792+
* @memberof KafkaJS
793+
* @see RdKafka.IsolationLevel
794+
*/
795+
IsolationLevel: RdKafka.AdminClient.IsolationLevel
675796
};

lib/kafkajs/_kafka.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
const { Producer, CompressionTypes } = require('./_producer');
22
const { Consumer, PartitionAssigners } = require('./_consumer');
3-
const { Admin, ConsumerGroupStates, AclOperationTypes } = require('./_admin');
3+
const { Admin, ConsumerGroupStates, AclOperationTypes, IsolationLevel } = require('./_admin');
44
const error = require('./_error');
55
const { logLevel, checkIfKafkaJsKeysPresent, CompatibilityErrorMessages } = require('./_common');
66

@@ -87,4 +87,5 @@ module.exports = {
8787
PartitionAssignors: PartitionAssigners,
8888
CompressionTypes,
8989
ConsumerGroupStates,
90-
AclOperationTypes };
90+
AclOperationTypes,
91+
IsolationLevel};

lib/rdkafka.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,8 @@ module.exports = {
3131
Topic: Topic,
3232
features: features,
3333
librdkafkaVersion: lib.librdkafkaVersion,
34+
IsolationLevel: Admin.IsolationLevel,
35+
OffsetSpec: Admin.OffsetSpec,
36+
ConsumerGroupStates: Admin.ConsumerGroupStates,
37+
AclOperationTypes: Admin.AclOperationTypes,
3438
};

0 commit comments

Comments
 (0)