Skip to content

Commit e1bd8e8

Browse files
PratRanj07emasabmilindlClaimundefinerayokota
authored
List consumer group offsets (#49)
* 1st commit * 2nd commit * 3rd commit * Added tests and examples * Formatting * Formatting * little change * some small changes * changes requested * requested changes * requested changes * name change * indentation * indentation * Add data-governance to code owners for schema registry clients (#52) * Add data-governance to code owners for schema registry clients * Fix ownership * Fix deprecation warning * Separate eachMessage and eachBatch internal consume loop * Update performance example with more cases * Add per-partition cache with global expiry * Add per-partition cache expiry logic * Allow cache to disburse multiple messages at once * Add per-partition concurrency * Add partition level concurrency to faux-eachBatch * Create persistent workers for per-partition concurrency, prevents excessive Promise spawning * Fix tests for Per Partition Concurrency * Add message set capability to message cache * Add naive batching (without resolution handling) * Add batch staleness, resolution, and offset management to eachBatch * Update tests for true eachBatch Also cleans up some excessive parametrization. Separates tests into files so they can be parallelized better. wq * Remove debug-only properties * Update MIGRATION.md for eachBatch * Bump version * Fix linting and Makefile issues (#2) * Add single-offset-storage method for performance In promisified mode, offsets are stored after every single message Calling into FFI is already not cheap and on top of that, the offsetsStore method does a lot of allocations because it needs to make a vector etc. etc. This method change yields a benefit of 8%. Also reduces dynamic casts by storing a reference to the rdkafka consumer. * Remove jshint and expand eslint coverage Many changes within, but only cosmetic: 1. remove jshint as it's rather unmaintained and we don't need 2 linters. 2. update eslint to latest possible version. 3. update eslint coverage to include old API and promisified tests. 4. update makefile accordingly 5. fix lint errors. * Update Makefule, update cpplint to latest version * Comply with cpplint * Add SchemaRegistryClient, RestService, and testing (#1) * Add SchemaRegistryClient, RestService, and testing * Add new Makefile for schema registry * Merging * Revert to throwing exceptions * Add mock client for testing * Remove testing artifacts * Fix flaky e2e tests (#54) * fixing empty message and key * indentation * 1st commit * changes * changes * alignment * Preset fix (#6) * Fix jest to unblock tests * Add git history * Do not modify RegExps which don't start with a ^ 1. Adds test. Also fixes some flakiness in the subscribe test. 2. Fixes typing issue for assignment() * Fix argument mutation in run, pause and resume * Dekregistry client (#67) * Add Mock Schema Registry Client (#9) (#66) * Add mock client for testing * Remove testing artifacts * Dekregistry client (#10) * Add mock client for testing * Remove testing artifacts * Add dekregistry client, mock dekregistry client --------- Co-authored-by: Robert Yokota <[email protected]> --------- Co-authored-by: Robert Yokota <[email protected]> * Add clientConfig, baseUrl retry, RestError, encodeURIComponent (#12) (#68) * Add clientConfig, baseUrl retry, RestError * refactor such that RestService takes in necessary dependencies * Update tsconfig.json (#69) * Fix broken tests (#70) Co-authored-by: Robert Yokota <[email protected]> * Add commitCb method (#59) to avoid blocking while committing and return a Promise without having to call consume() * Fix eslint config (#71) * Add eslint rules (#72) * Add some ts eslint rules * Fix makefile * First cut at JavaScript serdes (#73) * First cut at serdes * Checkpoint - no errs * Minor cleanup * Add siv * Fix eslint errs * Minor cleanup * Minor cleanup * Fix configs * Fix execute calls * Clean up public, compat levels * Fix test * Incorporate review feedback * Add assign/unassign within rebalance callbacks Also remove onPartitionsAssigned and onPartitionsRevoked. * Add performance benchmarking script modes and README * Add performance benchmarking script modes and README * Fix createTopics return * Add topic creation to benchmarks * Remove needless batch size (msgs) 1 * Add performance example to semaphore * Clean up the perf runner script * Add confluent debian repo for performance benchmark * Remove store from promisified API * Add binding level debug logging and client name to logs * Add binding level debug logging and client name to logs * Add C++ changes for binding-level logging * Fix typo in script name * First cut at Data Contract rules (#77) * First cut at encryption rules (#74) * First cut at encryption rules * Add tests * Clean up package.json * Clean up package.json * Add kms clients * Minor fix * First cut at additional serde tests (#75) * First cut at additional serde tests * Add JSON test * Add protobuf ref test * Add format params * Checkpoint * Add coverage to gitignore * Remove coverage * Minor fix * Minor fix * Avro ref test * Add json ref test * Add nested tests * First cut at Data Contract rules * Remove CEL executors for now * Minor refactor to use RuleRegistry * Move DEK registry under encryption * Clean up package.json * Add CSFLE test with logical type * Minor fix * Add CEL executors * Revert "Add CEL executors" This reverts commit 850c3de. * Minor fixes * Minor fixes * Separate SR into a different workspace (#78) Co-authored-by: Milind L <[email protected]> * Refactor to always use a barrier for pending operation (#26) Readers writer lock with reentrant calls Refactor to always use a barrier for pending operation to avoid problems or regressions with async pause or resume calls that will be introduced later. Uses linked lists for the cache. Removes heap-js code. Test name improvement to run single parametric tests Seek signature and add partitionsConsumedConcurrently to ConsumerRunConfig type definition Make final max poll interval double of configured value so even last message processed before cache cleanup can take that time to process. Fix to restart max poll interval timer on fetch. Marking batch stale after cache clear was requested and max poll interval is reached before it's cleared. Add assignmentLost function to the rebalance callback. Fix to nextN size, version with max.poll.interval.ms applied to each message or batch (only for messages after cache reset) Performance test, removing outliers Start performance timer from first message received after resuming * Schemaregistry rebase (#33) (#80) * rebase dev_early_access * Add OAuth Support to Rest Service (#25) * Add OAuth client support * Add optional chaining for token * Fix merge conflict * add simple-oauth2 dependency * Add Docker environment for integration tests (#34) (#81) * Add docker files * Add docker env for integ tests * Fix log level config in light of binding logs * Also add rule for no trailing spaces and enforce it * Fix log level config in light of binding logs * Remove consumerGroupId argument from sendOffsets and add tests (#82) * Performance measurement improvements CKJS performance improved through fetch.queue.backoff.ms * Admin examples for available APIs (#84) * Fix listGroups segfault when passing an undefined matchConsumerGroupStates (#85) * Add more unit tests; minor fixes for KMS clients (#86) * Minor fixes for KMS clients * Add JSON 2020-12 test * Bump version to 0.1.17-devel * Add complex encryption tests (#89) * Add index.ts (#91) * Enhance HighLevelProducer to take schema serializers (#92) * Add auth features (#47) (#94) * Add auth features * Update schemaregistry/rest-service.ts * Minor changes * Minor changes (missed a few) --------- Co-authored-by: Robert Yokota <[email protected]> * Add more JSON Schema validation tests (#95) * Move ts-jest to dev dependencies (#96) * Add JSON integration tests (#46) (#97) * Add JSON integration tests * remove random * Unsubscribe before disconnecting to mitigate hangs on destroy (#98) Co-authored-by: Emanuele Sabellico <[email protected]> * Pass creds to DEK Registry client (#99) * Bump version to 0.2.0 and drop -devel (#100) * Remove mandatory basic or bearer auth credentials (#57) (#101) * Add build script and readme (#104) Co-authored-by: claimundefine <[email protected]> * Add license (#105) * Add license * Clean up scripts * Add clearLatestCaches/clearCaches API, fix test to call clearLatestCaches (#102) * Add clearLatestCaches for use in tests * Minor cleanup * Minor cleanup * Add avro integration tests (#56) (#106) * Add tsdoc (#107) * Add tsdocs * More tsdoc fixes * Enhance docs (#108) * Update kafkajs README and sr example * Minor formatting * Update schemaregistry README (#109) * Add restService interfaces to exported types (#110) Co-authored-by: claimundefine <[email protected]> * Rename DekClient to avoid conflict with Client (#112) * Minor cleanup * Rename DekClient * Schemaregistry examples (#69) (#113) * Add restService interfaces to exported types * Bugfix for rest service and oauth * Add schemaregistry examples workspace with avro, json, and csfle examples (#70) (#114) * Add schemaregistry examples workspace with avro, json, and csfle examples * change file name * Change schemaregistry name * Add extra descriptor for constants * Add content type by default * Change to basic auth for most examples * bugfix integ tests for registering -value (#71) (#115) * bugfix integ tests for registering -value * Add autoRegister and useLatestVersion tests * Bump version to v0.2.1 (#116) * Update version to 0.2.1 for EA release (#72) (#117) * Add Kafka Oauth implementation (#74) (#119) * Upgrade librdkafka to v2.6.0 (#120) * Upgrade librdkafka to v2.6.0 * Add install-from-source * Dynamic linking for tests * Speed up librdkafka build * Bump version to 0.3.0-RC1 and: (#122) - change software version to be without v as other clients - use relative dev or example dependencies to avoid changing version in multiple places - updates to RELEASE section * v0.3.0 (#126) * npmignore more files and fix to the update versions script * Improved Releasing section * Minor optimization to reduce schema ID lookups (#123) * v0.3.0-RC2 (#127) * v0.3.0 final release (#128) * Changelog entry for #123 * Fix header conversion in eachBatch (#130) * 1st commit * 2nd commit * 3rd commit * changes requested * requested changes * required Changes * remove unnecessary changes * indentation and unnecessary changes * indentation * comment removed * comment added * changelog entry * Changed topic partition js to c conversion structure * refactoring * Requested changes * final changes --------- Co-authored-by: Emanuele Sabellico <[email protected]> Co-authored-by: Milind L <[email protected]> Co-authored-by: Milind L <[email protected]> Co-authored-by: Justin Wang <[email protected]> Co-authored-by: claimundefine <[email protected]> Co-authored-by: Robert Yokota <[email protected]>
1 parent 0cc42af commit e1bd8e8

File tree

14 files changed

+1046
-3
lines changed

14 files changed

+1046
-3
lines changed

CHANGELOG.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1-
# confluent-kafka-javascript v0.3.1
1+
# confluent-kafka-javascript v0.4.0
22

3-
v0.3.1 is a limited availability maintenance release. It is supported for all usage.
3+
v0.4.0 is a limited availability feature release. It is supported for all usage.
44

55
## Enhancements
66

77
1. Fixes an issue where headers were not passed correctly to the `eachBatch` callback (#130).
8+
2. Add support for an Admin API to list a consumer group's offsets (#49).
89

910

1011
# confluent-kafka-javascript v0.3.0

MIGRATION.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,8 @@ The admin-client only has support for a limited subset of methods, with more to
331331
* The `describeGroups` method is supported with additional `timeout` and `includeAuthorizedOperations` options.
332332
A number of additional properties have been added to the returned groups.
333333
* The `deleteGroups` method is supported with an additional `timeout` option.
334+
* The `fetchOffsets` method is supported with additional `timeout` and
335+
`requireStableOffsets` option but `resolveOffsets` option is not yet supported.
334336
335337
### Using the Schema Registry
336338
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
const { Kafka } = require('@confluentinc/kafka-javascript').KafkaJS;
2+
const { parseArgs } = require('node:util');
3+
4+
async function fetchOffsets() {
5+
const args = parseArgs({
6+
allowPositionals: true,
7+
options: {
8+
'bootstrap-servers': {
9+
type: 'string',
10+
short: 'b',
11+
default: 'localhost:9092',
12+
},
13+
'timeout': {
14+
type: 'string',
15+
short: 'm',
16+
default: '5000',
17+
},
18+
'require-stable-offsets': {
19+
type: 'boolean',
20+
short: 'r',
21+
default: false,
22+
},
23+
},
24+
});
25+
26+
const {
27+
'bootstrap-servers': bootstrapServers,
28+
timeout,
29+
'require-stable-offsets': requireStableOffsets,
30+
} = args.values;
31+
32+
const [groupId, ...rest] = args.positionals;
33+
34+
if (!groupId) {
35+
console.error('Group ID is required');
36+
process.exit(1);
37+
}
38+
39+
const kafka = new Kafka({
40+
kafkaJS: {
41+
brokers: [bootstrapServers],
42+
},
43+
});
44+
45+
const admin = kafka.admin();
46+
await admin.connect();
47+
48+
try {
49+
// Parse topics and partitions from remaining arguments
50+
const topicInput = parseTopicsAndPartitions(rest);
51+
52+
// Fetch offsets for the specified consumer group
53+
const offsets = await admin.fetchOffsets({
54+
groupId: groupId,
55+
topics: topicInput,
56+
requireStableOffsets,
57+
timeout: Number(timeout),
58+
});
59+
60+
console.log(`Offsets for Consumer Group "${groupId}":`, JSON.stringify(offsets, null, 2));
61+
} catch (err) {
62+
console.error('Error fetching consumer group offsets:', err);
63+
} finally {
64+
await admin.disconnect();
65+
}
66+
}
67+
68+
// Helper function to parse topics and partitions from arguments
69+
function parseTopicsAndPartitions(args) {
70+
if (args.length === 0) return undefined;
71+
72+
const topicInput = [];
73+
let i = 0;
74+
75+
while (i < args.length) {
76+
const topic = args[i];
77+
i++;
78+
79+
const partitions = [];
80+
while (i < args.length && !isNaN(args[i])) {
81+
partitions.push(Number(args[i]));
82+
i++;
83+
}
84+
85+
// Add topic with partitions (or an empty array if no partitions specified)
86+
if (partitions.length > 0) {
87+
topicInput.push({ topic, partitions });
88+
} else {
89+
topicInput.push(topic); // Add as a string if no partitions specified
90+
}
91+
}
92+
93+
return topicInput;
94+
}
95+
96+
fetchOffsets();

lib/admin.js

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -437,3 +437,50 @@ AdminClient.prototype.listTopics = function (options, cb) {
437437
}
438438
});
439439
};
440+
441+
/**
442+
* List offsets for topic partition(s) for consumer group(s).
443+
*
444+
* @param {import("../../types/rdkafka").ListGroupOffsets} listGroupOffsets - The list of groupId, partitions to fetch offsets for.
445+
* If partitions is null, list offsets for all partitions
446+
* in the group.
447+
* @param {number?} options.timeout - The request timeout in milliseconds.
448+
* May be unset (default: 5000)
449+
* @param {boolean?} options.requireStableOffsets - Whether broker should return stable offsets
450+
* (transaction-committed). (default: false)
451+
*
452+
* @param {function} cb - The callback to be executed when finished.
453+
*/
454+
AdminClient.prototype.listConsumerGroupOffsets = function (listGroupOffsets, options, cb) {
455+
456+
if (!this._isConnected) {
457+
throw new Error('Client is disconnected');
458+
}
459+
460+
if (!listGroupOffsets[0].groupId) {
461+
throw new Error('groupId must be provided');
462+
}
463+
464+
465+
if (!Object.hasOwn(options, 'timeout')) {
466+
options.timeout = 5000;
467+
}
468+
469+
if (!Object.hasOwn(options, 'requireStableOffsets')) {
470+
options.requireStableOffsets = false;
471+
}
472+
473+
this._client.listConsumerGroupOffsets(listGroupOffsets, options, function (err, offsets) {
474+
if (err) {
475+
if (cb) {
476+
cb(LibrdKafkaError.create(err));
477+
}
478+
return;
479+
}
480+
481+
if (cb) {
482+
cb(null, offsets);
483+
}
484+
});
485+
};
486+

lib/kafkajs/_admin.js

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,123 @@ class Admin {
417417
});
418418
});
419419
}
420+
421+
/**
422+
* Fetch the offsets for topic partition(s) for consumer group(s).
423+
*
424+
* @param {string} options.groupId - The group ID to fetch offsets for.
425+
* @param {import("../../types/kafkajs").TopicInput} options.topics - The topics to fetch offsets for.
426+
* @param {boolean} options.resolveOffsets - not yet implemented
427+
* @param {number?} options.timeout - The request timeout in milliseconds.
428+
* May be unset (default: 5000)
429+
* @param {boolean?} options.requireStableOffsets - Whether broker should return stable offsets
430+
* (transaction-committed). (default: false)
431+
*
432+
* @returns {Promise<Array<topic: string, partitions: import('../../types/kafkajs').FetchOffsetsPartition>>}
433+
*/
434+
async fetchOffsets(options = {}) {
435+
if (this.#state !== AdminState.CONNECTED) {
436+
throw new error.KafkaJSError("Admin client is not connected.", { code: error.ErrorCodes.ERR__STATE });
437+
}
438+
439+
if (Object.hasOwn(options, "resolveOffsets")) {
440+
throw new error.KafkaJSError("resolveOffsets is not yet implemented.", { code: error.ErrorCodes.ERR__NOT_IMPLEMENTED });
441+
}
442+
443+
const { groupId, topics } = options;
444+
445+
if (!groupId) {
446+
throw new error.KafkaJSError("groupId is required.", { code: error.ErrorCodes.ERR__INVALID_ARG });
447+
}
448+
449+
let partitions = null;
450+
let originalTopics = null;
451+
452+
/*
453+
If the input is a list of topic string, the user expects us to
454+
fetch offsets for all all partitions of all the input topics. In
455+
librdkafka, we can only fetch offsets by topic partitions, or else,
456+
we can fetch all of them. Thus, we must fetch offsets for all topic
457+
partitions (by settings partitions to null) and filter by the topic strings later.
458+
*/
459+
if (topics && Array.isArray(topics)) {
460+
if (typeof topics[0] === 'string') {
461+
originalTopics = topics;
462+
partitions = null;
463+
} else if (typeof topics[0] === 'object' && Array.isArray(topics[0].partitions)) {
464+
partitions = topics.flatMap(topic => topic.partitions.map(partition => ({
465+
topic: topic.topic,
466+
partition
467+
})));
468+
} else {
469+
throw new error.KafkaJSError("Invalid topics format.", { code: error.ErrorCodes.ERR__INVALID_ARG });
470+
}
471+
}
472+
473+
const listGroupOffsets = [{
474+
groupId,
475+
partitions
476+
}];
477+
478+
479+
return new Promise((resolve, reject) => {
480+
this.#internalClient.listConsumerGroupOffsets(listGroupOffsets, options, (err, offsets) => {
481+
if (err) {
482+
reject(createKafkaJsErrorFromLibRdKafkaError(err));
483+
} else {
484+
485+
/**
486+
* Offsets is an array of group results, each containing a group id,
487+
* an error and an array of partitions.
488+
* We need to convert it to the required format of an array of topics, each
489+
* containing an array of partitions.
490+
*/
491+
const topicPartitionMap = new Map();
492+
493+
if (offsets.length !== 1) {
494+
reject(new error.KafkaJSError("Unexpected number of group results."));
495+
return;
496+
}
497+
498+
const groupResult = offsets[0];
499+
500+
if (groupResult.error) {
501+
reject(createKafkaJsErrorFromLibRdKafkaError(groupResult.error));
502+
return;
503+
}
504+
505+
// Traverse the partitions and group them by topic
506+
groupResult.partitions.forEach(partitionObj => {
507+
const { topic, partition, offset, leaderEpoch, metadata, error } = partitionObj;
508+
const fetchOffsetsPartition = {
509+
partition: partition,
510+
offset: String(offset),
511+
metadata: metadata || null,
512+
leaderEpoch: leaderEpoch || null,
513+
error: error || null
514+
};
515+
516+
// Group partitions by topic
517+
if (!topicPartitionMap.has(topic)) {
518+
topicPartitionMap.set(topic, []);
519+
}
520+
topicPartitionMap.get(topic).push(fetchOffsetsPartition);
521+
});
522+
523+
// Convert the map back to the desired array format
524+
let convertedOffsets = Array.from(topicPartitionMap, ([topic, partitions]) => ({
525+
topic,
526+
partitions
527+
}));
528+
529+
if (originalTopics !== null) {
530+
convertedOffsets = convertedOffsets.filter(convertedOffset => originalTopics.includes(convertedOffset.topic));
531+
}
532+
resolve(convertedOffsets);
533+
}
534+
});
535+
});
536+
}
420537
}
421538

422539
module.exports = {

0 commit comments

Comments
 (0)