Skip to content

Commit 3099517

Browse files
authored
Delete Records Api implemented (#141)
* delete records api implemented * requested changes * changelog change * minor changes * minor changes * minor changes
1 parent 253629d commit 3099517

File tree

14 files changed

+631
-8
lines changed

14 files changed

+631
-8
lines changed

CHANGELOG.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,12 @@
1+
# confluent-kafka-javascript v0.5.0
2+
3+
v0.5.0 is a limited availability feature release. It is supported for all usage.
4+
5+
## Enhancements
6+
7+
1. Add support for an Admin API to delete records.(#141).
8+
9+
110
# confluent-kafka-javascript v0.4.0
211

312
v0.4.0 is a limited availability feature release. It is supported for all usage.

MIGRATION.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,8 @@ The admin-client only has support for a limited subset of methods, with more to
333333
* The `deleteGroups` method is supported with an additional `timeout` option.
334334
* The `fetchOffsets` method is supported with additional `timeout` and
335335
`requireStableOffsets` option but `resolveOffsets` option is not yet supported.
336+
* The `deleteTopicRecords` method is supported with additional `timeout`
337+
and `operationTimeout` option.
336338
337339
### Using the Schema Registry
338340
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
const { Kafka } = require('@confluentinc/kafka-javascript').KafkaJS;
2+
const { parseArgs } = require('node:util');
3+
4+
async function deleteTopicRecords() {
5+
const args = parseArgs({
6+
allowPositionals: true,
7+
options: {
8+
'bootstrap-servers': {
9+
type: 'string',
10+
short: 'b',
11+
default: 'localhost:9092',
12+
},
13+
'timeout': {
14+
type: 'string',
15+
short: 't',
16+
default: '5000',
17+
},
18+
'operation-timeout': {
19+
type: 'string',
20+
short: 'o',
21+
default: '60000',
22+
},
23+
},
24+
});
25+
26+
const {
27+
'bootstrap-servers': bootstrapServers,
28+
timeout,
29+
'operation-timeout': operationTimeout,
30+
} = args.values;
31+
32+
const [topic, ...rest] = args.positionals;
33+
34+
if (!topic || rest.length % 2 !== 0) {
35+
console.error("Usage: node deleteTopicRecords.js --bootstrap-servers <servers> --timeout <timeout> --operation-timeout <operation-timeout> <topic> <partition offset ...>");
36+
process.exit(1);
37+
}
38+
39+
const kafka = new Kafka({
40+
kafkaJS: {
41+
brokers: [bootstrapServers],
42+
},
43+
});
44+
45+
const admin = kafka.admin();
46+
await admin.connect();
47+
48+
try {
49+
// Parse partitions and offsets, ensuring pairs of partition and offset are provided
50+
const partitionsInput = parsePartitionsAndOffsets(rest);
51+
52+
// Delete records for the specified topic and partitions
53+
const result = await admin.deleteTopicRecords({
54+
topic: topic,
55+
partitions: partitionsInput,
56+
timeout: Number(timeout),
57+
operationTimeout: Number(operationTimeout),
58+
});
59+
60+
console.log(`Records deleted for Topic "${topic}":`, JSON.stringify(result, null, 2));
61+
} catch (err) {
62+
console.error("Error deleting topic records:", err);
63+
} finally {
64+
await admin.disconnect();
65+
}
66+
}
67+
68+
// Helper function to parse partitions and offsets from arguments
69+
function parsePartitionsAndOffsets(args) {
70+
const partitions = [];
71+
for (let i = 0; i < args.length; i += 2) {
72+
const partition = parseInt(args[i]);
73+
const offset = args[i + 1];
74+
if (isNaN(partition) || isNaN(parseInt(offset))) {
75+
console.error("Partition and offset should be numbers and provided in pairs.");
76+
process.exit(1);
77+
}
78+
partitions.push({ partition, offset });
79+
}
80+
return partitions;
81+
}
82+
83+
deleteTopicRecords();

lib/admin.js

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,9 @@ AdminClient.prototype.listConsumerGroupOffsets = function (listGroupOffsets, opt
461461
throw new Error('groupId must be provided');
462462
}
463463

464+
if(!options) {
465+
options = {};
466+
}
464467

465468
if (!Object.hasOwn(options, 'timeout')) {
466469
options.timeout = 5000;
@@ -484,3 +487,50 @@ AdminClient.prototype.listConsumerGroupOffsets = function (listGroupOffsets, opt
484487
});
485488
};
486489

490+
/**
491+
* Deletes records (messages) in topic partitions older than the offsets provided.
492+
* Provide Topic.OFFSET_END or -1 as offset to delete all records.
493+
*
494+
* @param {import("../../types/rdkafka").TopicPartitionOffset[]} delRecords - The list of topic partitions and
495+
* offsets to delete records up to.
496+
* @param {number} options.operationTimeout - The operation timeout in milliseconds.
497+
* May be unset (default: 60000)
498+
* @param {number?} options.timeout - The request timeout in milliseconds.
499+
* May be unset (default: 5000)
500+
*
501+
* @param {function} cb - The callback to be executed when finished.
502+
*/
503+
AdminClient.prototype.deleteRecords = function (delRecords, options, cb) {
504+
if (!this._isConnected) {
505+
throw new Error('Client is disconnected');
506+
}
507+
508+
if(!options) {
509+
options = {};
510+
}
511+
512+
if (!Object.hasOwn(options, 'timeout')) {
513+
options.timeout = 5000;
514+
}
515+
516+
if (!Object.hasOwn(options, 'operationTimeout')) {
517+
options.operationTimeout = 60000;
518+
}
519+
520+
if (!Array.isArray(delRecords) || delRecords.length === 0) {
521+
throw new Error('delRecords must be a non-empty array');
522+
}
523+
524+
this._client.deleteRecords(delRecords, options, function (err, results) {
525+
if (err) {
526+
if (cb) {
527+
cb(LibrdKafkaError.create(err));
528+
}
529+
return;
530+
}
531+
532+
if (cb) {
533+
cb(null, results);
534+
}
535+
});
536+
};

lib/kafkajs/_admin.js

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -534,6 +534,61 @@ class Admin {
534534
});
535535
});
536536
}
537+
538+
/**
539+
* Deletes records (messages) in topic partitions older than the offsets provided.
540+
* Provide -1 as offset to delete all records.
541+
*
542+
* @param {string} options.topic - The topic to delete offsets for.
543+
* @param {import("../../types/kafkajs").SeekEntry[]} options.partitons - The partitions to delete offsets for.
544+
* @param {number} options.operationTimeout - The operation timeout in milliseconds.
545+
* May be unset (default: 60000)
546+
* @param {number?} options.timeout - The request timeout in milliseconds.
547+
* May be unset (default: 5000)
548+
*
549+
* @returns {Promise<import('../../types/kafkajs').DeleteRecordsResult[]>>}
550+
*/
551+
async deleteTopicRecords(options = {}) {
552+
if (this.#state !== AdminState.CONNECTED) {
553+
throw new error.KafkaJSError("Admin client is not connected.", { code: error.ErrorCodes.ERR__STATE });
554+
}
555+
556+
if (!Object.hasOwn(options, 'topic') || !Object.hasOwn(options, 'partitions') || !Array.isArray(options.partitions)) {
557+
throw new error.KafkaJSError("Options must include 'topic' and 'partitions', and 'partitions' must be an array.", { code: error.ErrorCodes.ERR__INVALID_ARG });
558+
}
559+
560+
const { topic, partitions } = options;
561+
562+
// Create an array of TopicPartitionOffset objects
563+
const topicPartitionOffsets = [];
564+
565+
for (const partition of partitions) {
566+
if (partition.offset === null || partition.offset === undefined) {
567+
throw new error.KafkaJSError("Each partition must have a valid offset.", { code: error.ErrorCodes.ERR__INVALID_ARG });
568+
}
569+
570+
const offset = +partition.offset;
571+
if (isNaN(offset)) {
572+
throw new error.KafkaJSError("Offset must be a valid number.", { code: error.ErrorCodes.ERR__INVALID_ARG });
573+
}
574+
575+
topicPartitionOffsets.push({
576+
topic,
577+
partition: partition.partition,
578+
offset: offset,
579+
});
580+
}
581+
582+
return new Promise((resolve, reject) => {
583+
this.#internalClient.deleteRecords(topicPartitionOffsets, options, (err, results) => {
584+
if (err) {
585+
reject(createKafkaJsErrorFromLibRdKafkaError(err));
586+
} else {
587+
resolve(results);
588+
}
589+
});
590+
});
591+
}
537592
}
538593

539594
module.exports = {

src/admin.cc

Lines changed: 135 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ void AdminClient::Init(v8::Local<v8::Object> exports) {
115115
Nan::SetPrototypeMethod(tpl, "createTopic", NodeCreateTopic);
116116
Nan::SetPrototypeMethod(tpl, "deleteTopic", NodeDeleteTopic);
117117
Nan::SetPrototypeMethod(tpl, "createPartitions", NodeCreatePartitions);
118+
Nan::SetPrototypeMethod(tpl, "deleteRecords", NodeDeleteRecords);
118119

119120
// Consumer group related operations
120121
Nan::SetPrototypeMethod(tpl, "listGroups", NodeListGroups);
@@ -743,6 +744,74 @@ Baton AdminClient::ListConsumerGroupOffsets(
743744
}
744745
}
745746

747+
Baton AdminClient::DeleteRecords(rd_kafka_DeleteRecords_t **del_records,
748+
size_t del_records_cnt,
749+
int operation_timeout_ms, int timeout_ms,
750+
rd_kafka_event_t **event_response) {
751+
if (!IsConnected()) {
752+
return Baton(RdKafka::ERR__STATE);
753+
}
754+
755+
{
756+
scoped_shared_write_lock lock(m_connection_lock);
757+
if (!IsConnected()) {
758+
return Baton(RdKafka::ERR__STATE);
759+
}
760+
761+
// Make admin options to establish that we are deleting records
762+
rd_kafka_AdminOptions_t *options = rd_kafka_AdminOptions_new(
763+
m_client->c_ptr(), RD_KAFKA_ADMIN_OP_DELETERECORDS);
764+
765+
char errstr[512];
766+
rd_kafka_resp_err_t err = rd_kafka_AdminOptions_set_request_timeout(
767+
options, timeout_ms, errstr, sizeof(errstr));
768+
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
769+
return Baton(static_cast<RdKafka::ErrorCode>(err), errstr);
770+
}
771+
772+
err = rd_kafka_AdminOptions_set_operation_timeout(
773+
options, operation_timeout_ms, errstr, sizeof(errstr));
774+
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
775+
return Baton(static_cast<RdKafka::ErrorCode>(err), errstr);
776+
}
777+
778+
// Create queue just for this operation.
779+
rd_kafka_queue_t *rkqu = rd_kafka_queue_new(m_client->c_ptr());
780+
781+
rd_kafka_DeleteRecords(m_client->c_ptr(), del_records,
782+
del_records_cnt, options, rkqu);
783+
784+
// Poll for an event by type in that queue
785+
// DON'T destroy the event. It is the out parameter, and ownership is
786+
// the caller's.
787+
*event_response =
788+
PollForEvent(rkqu, RD_KAFKA_EVENT_DELETERECORDS_RESULT, timeout_ms);
789+
790+
// Destroy the queue since we are done with it.
791+
rd_kafka_queue_destroy(rkqu);
792+
793+
// Destroy the options we just made because we polled already
794+
rd_kafka_AdminOptions_destroy(options);
795+
796+
// If we got no response from that operation, this is a failure
797+
// likely due to time out
798+
if (*event_response == NULL) {
799+
return Baton(RdKafka::ERR__TIMED_OUT);
800+
}
801+
802+
// Now we can get the error code from the event
803+
if (rd_kafka_event_error(*event_response)) {
804+
// If we had a special error code, get out of here with it
805+
const rd_kafka_resp_err_t errcode = rd_kafka_event_error(*event_response);
806+
return Baton(static_cast<RdKafka::ErrorCode>(errcode));
807+
}
808+
809+
// At this point, event_response contains the result, which needs
810+
// to be parsed/converted by the caller.
811+
return Baton(RdKafka::ERR_NO_ERROR);
812+
}
813+
}
814+
746815
void AdminClient::ActivateDispatchers() {
747816
// Listen to global config
748817
m_gconfig->listen();
@@ -1147,10 +1216,7 @@ NAN_METHOD(AdminClient::NodeListConsumerGroupOffsets) {
11471216
}
11481217

11491218
// Now process the second argument: options (timeout and requireStableOffsets)
1150-
v8::Local<v8::Object> options = Nan::New<v8::Object>();
1151-
if (info.Length() > 2 && info[1]->IsObject()) {
1152-
options = info[1].As<v8::Object>();
1153-
}
1219+
v8::Local<v8::Object> options = info[1].As<v8::Object>();
11541220

11551221
bool require_stable_offsets =
11561222
GetParameter<bool>(options, "requireStableOffsets", false);
@@ -1167,4 +1233,69 @@ NAN_METHOD(AdminClient::NodeListConsumerGroupOffsets) {
11671233
require_stable_offsets, timeout_ms));
11681234
}
11691235

1236+
/**
1237+
* Delete Records.
1238+
*/
1239+
NAN_METHOD(AdminClient::NodeDeleteRecords) {
1240+
Nan::HandleScope scope;
1241+
1242+
if (info.Length() < 3 || !info[2]->IsFunction()) {
1243+
return Nan::ThrowError("Need to specify a callback");
1244+
}
1245+
1246+
if (!info[0]->IsArray()) {
1247+
return Nan::ThrowError(
1248+
"Must provide array containg 'TopicPartitionOffset' objects");
1249+
}
1250+
1251+
if (!info[1]->IsObject()) {
1252+
return Nan::ThrowError("Must provide 'options' object");
1253+
}
1254+
1255+
// Get list of TopicPartitions to delete records from
1256+
// and convert it into rd_kafka_DeleteRecords_t array
1257+
v8::Local<v8::Array> delete_records_list = info[0].As<v8::Array>();
1258+
1259+
if (delete_records_list->Length() == 0) {
1260+
return Nan::ThrowError("Must provide at least one TopicPartitionOffset");
1261+
}
1262+
1263+
/**
1264+
* The ownership of this is taken by
1265+
* Workers::AdminClientDeleteRecords and freeing it is also handled
1266+
* by that class.
1267+
*/
1268+
rd_kafka_DeleteRecords_t **delete_records =
1269+
static_cast<rd_kafka_DeleteRecords_t **>(
1270+
malloc(sizeof(rd_kafka_DeleteRecords_t *) * 1));
1271+
1272+
rd_kafka_topic_partition_list_t *partitions =
1273+
Conversion::TopicPartition::TopicPartitionv8ArrayToTopicPartitionList(
1274+
delete_records_list, true);
1275+
if (partitions == NULL) {
1276+
return Nan::ThrowError(
1277+
"Failed to convert objects in delete records list, provide proper "
1278+
"TopicPartitionOffset objects");
1279+
}
1280+
delete_records[0] = rd_kafka_DeleteRecords_new(partitions);
1281+
1282+
rd_kafka_topic_partition_list_destroy(partitions);
1283+
1284+
// Now process the second argument: options (timeout and operation_timeout)
1285+
v8::Local<v8::Object> options = info[1].As<v8::Object>();
1286+
1287+
int operation_timeout_ms =
1288+
GetParameter<int64_t>(options, "operation_timeout", 60000);
1289+
int timeout_ms = GetParameter<int64_t>(options, "timeout", 5000);
1290+
1291+
// Create the final callback object
1292+
v8::Local<v8::Function> cb = info[2].As<v8::Function>();
1293+
Nan::Callback *callback = new Nan::Callback(cb);
1294+
AdminClient *client = ObjectWrap::Unwrap<AdminClient>(info.This());
1295+
1296+
// Queue the worker to process the offset fetch request asynchronously
1297+
Nan::AsyncQueueWorker(new Workers::AdminClientDeleteRecords(
1298+
callback, client, delete_records, 1, operation_timeout_ms, timeout_ms));
1299+
}
1300+
11701301
} // namespace NodeKafka

0 commit comments

Comments
 (0)