From 0ef0bed34a6dace65446202409abc4539b420630 Mon Sep 17 00:00:00 2001 From: 1sonofqiu Date: Wed, 4 Jun 2025 18:48:51 +0800 Subject: [PATCH 1/8] feat(kv): enhance namespaced key-value store operations and handlers --- .../automq/shell/stream/ClientKVClient.java | 79 +++++++ .../org/apache/kafka/clients/admin/Admin.java | 43 ++++ .../admin/DeleteNamespacedKVOptions.java | 15 ++ .../admin/DeleteNamespacedKVResult.java | 22 ++ .../kafka/clients/admin/ForwardingAdmin.java | 15 ++ .../clients/admin/GetNamespacedKVOptions.java | 4 + .../clients/admin/GetNamespacedKVResult.java | 21 ++ .../kafka/clients/admin/KafkaAdminClient.java | 90 ++++++++ .../admin/NamespacedKVRecordsToDelete.java | 40 ++++ .../clients/admin/PutNamespacedKVOptions.java | 24 +++ .../clients/admin/PutNamespacedKVResult.java | 34 +++ .../internals/DeleteNamespacedKVHandler.java | 89 ++++++++ .../internals/GetNamespacedKVHandler.java | 95 +++++++++ .../internals/NamespacedKVRecordsToGet.java | 38 ++++ .../internals/NamespacedKVRecordsToPut.java | 39 ++++ .../internals/PutNamespacedKVHandler.java | 87 ++++++++ .../errors/InvalidKVRecordEpochException.java | 8 + .../apache/kafka/common/protocol/Errors.java | 2 + .../common/message/DeleteKVsRequest.json | 16 +- .../common/message/DeleteKVsResponse.json | 11 +- .../common/message/GetKVsRequest.json | 10 +- .../common/message/GetKVsResponse.json | 16 +- .../common/message/PutKVsRequest.json | 16 +- .../common/message/PutKVsResponse.json | 10 +- .../clients/admin/KafkaAdminClientTest.java | 32 ++- .../kafka/clients/admin/MockAdminClient.java | 15 ++ .../log/stream/s3/ControllerKVClient.java | 194 ++++++++++++++++++ .../kafka/log/streamaspect/MemoryClient.java | 79 +++++++ .../kafka/controller/QuorumController.java | 8 +- .../controller/stream/KVControlManager.java | 83 +++++++- .../resources/common/metadata/KVRecord.json | 16 +- .../common/metadata/RemoveKVRecord.json | 8 +- .../controller/ClusterControlManagerTest.java | 8 +- .../java/com/automq/stream/api/KVClient.java | 34 +++ .../java/com/automq/stream/api/KeyValue.java | 71 +++++++ .../automq/stream/utils/KVRecordUtils.java | 11 + .../server/common/automq/AutoMQVersion.java | 15 +- 37 files changed, 1364 insertions(+), 34 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/clients/admin/DeleteNamespacedKVOptions.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/admin/DeleteNamespacedKVResult.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/admin/GetNamespacedKVOptions.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/admin/GetNamespacedKVResult.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/admin/NamespacedKVRecordsToDelete.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/admin/PutNamespacedKVOptions.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/admin/PutNamespacedKVResult.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteNamespacedKVHandler.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/admin/internals/GetNamespacedKVHandler.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/admin/internals/NamespacedKVRecordsToGet.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/admin/internals/NamespacedKVRecordsToPut.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/admin/internals/PutNamespacedKVHandler.java create mode 100644 clients/src/main/java/org/apache/kafka/common/errors/InvalidKVRecordEpochException.java create mode 100644 s3stream/src/main/java/com/automq/stream/utils/KVRecordUtils.java diff --git a/automq-shell/src/main/java/com/automq/shell/stream/ClientKVClient.java b/automq-shell/src/main/java/com/automq/shell/stream/ClientKVClient.java index e213db5cfd..d4f7d4f60b 100644 --- a/automq-shell/src/main/java/com/automq/shell/stream/ClientKVClient.java +++ b/automq-shell/src/main/java/com/automq/shell/stream/ClientKVClient.java @@ -39,6 +39,7 @@ import com.automq.shell.metrics.S3MetricsExporter; import com.automq.stream.api.KeyValue; +import com.automq.stream.api.KeyValue.ValueAndEpoch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -95,6 +96,32 @@ public KeyValue.Value getKV(String key) throws IOException { throw code.exception(); } + public ValueAndEpoch getKV(String key, String namespace) throws IOException { + + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("[ClientKVClient]: Get KV: {} Namespace: {}", key, namespace); + } + + GetKVsRequestData data = new GetKVsRequestData() + .setGetKeyRequests(List.of(new GetKVsRequestData.GetKVRequest().setKey(key).setNamespace(namespace))); + + long now = Time.SYSTEM.milliseconds(); + ClientRequest clientRequest = networkClient.newClientRequest(String.valueOf(bootstrapServer.id()), + new GetKVsRequest.Builder(data), now, true, 3000, null); + + ClientResponse response = NetworkClientUtils.sendAndReceive(networkClient, clientRequest, Time.SYSTEM); + GetKVsResponseData responseData = (GetKVsResponseData) response.responseBody().data(); + + Errors code = Errors.forCode(responseData.errorCode()); + if (Objects.requireNonNull(code) == Errors.NONE) { + return ValueAndEpoch.of( + responseData.getKVResponses().get(0).value(), + responseData.getKVResponses().get(0).epoch()); + } + + throw code.exception(); + } + public KeyValue.Value putKV(String key, byte[] value) throws IOException { long now = Time.SYSTEM.milliseconds(); @@ -119,6 +146,32 @@ public KeyValue.Value putKV(String key, byte[] value) throws IOException { throw code.exception(); } + public ValueAndEpoch putKV(String key, byte[] value, String namespace, long epoch) throws IOException { + long now = Time.SYSTEM.milliseconds(); + + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("[ClientKVClient]: put KV: {}", key); + } + + PutKVsRequestData data = new PutKVsRequestData() + .setPutKVRequests(List.of(new PutKVsRequestData.PutKVRequest().setKey(key).setValue(value).setNamespace(namespace).setEpoch(epoch))); + + ClientRequest clientRequest = networkClient.newClientRequest(String.valueOf(bootstrapServer.id()), + new PutKVsRequest.Builder(data), now, true, 3000, null); + + ClientResponse response = NetworkClientUtils.sendAndReceive(networkClient, clientRequest, Time.SYSTEM); + PutKVsResponseData responseData = (PutKVsResponseData) response.responseBody().data(); + + Errors code = Errors.forCode(responseData.errorCode()); + if (Objects.requireNonNull(code) == Errors.NONE) { + return ValueAndEpoch.of( + responseData.putKVResponses().get(0).value(), + responseData.putKVResponses().get(0).epoch()); + } + + throw code.exception(); + } + public KeyValue.Value deleteKV(String key) throws IOException { long now = Time.SYSTEM.milliseconds(); @@ -142,4 +195,30 @@ public KeyValue.Value deleteKV(String key) throws IOException { throw code.exception(); } + + public ValueAndEpoch deleteKV(String key, String namespace, long epoch) throws IOException { + long now = Time.SYSTEM.milliseconds(); + + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("[ClientKVClient]: Delete KV: {}", key); + } + + DeleteKVsRequestData data = new DeleteKVsRequestData() + .setDeleteKVRequests(List.of(new DeleteKVsRequestData.DeleteKVRequest().setKey(key).setNamespace(namespace).setEpoch(epoch))); + + ClientRequest clientRequest = networkClient.newClientRequest(String.valueOf(bootstrapServer.id()), + new DeleteKVsRequest.Builder(data), now, true, 3000, null); + + ClientResponse response = NetworkClientUtils.sendAndReceive(networkClient, clientRequest, Time.SYSTEM); + DeleteKVsResponseData responseData = (DeleteKVsResponseData) response.responseBody().data(); + + Errors code = Errors.forCode(responseData.errorCode()); + if (Objects.requireNonNull(code) == Errors.NONE) { + return ValueAndEpoch.of( + responseData.deleteKVResponses().get(0).value(), + responseData.deleteKVResponses().get(0).epoch()); + } + + throw code.exception(); + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java index a5e4e910b3..4dfffe238f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java @@ -1739,6 +1739,49 @@ default ListClientMetricsResourcesResult listClientMetricsResources() { * @return {@link UpdateGroupResult} */ UpdateGroupResult updateGroup(String groupId, UpdateGroupSpec groupSpec, UpdateGroupOptions options); + + GetNamespacedKVResult getNamespacedKV( + Optional> partitions, + String namespace, + String key, + String value, + GetNamespacedKVOptions options + ); + + /** + * Put a key-value pair in the namespaced KV store. + * @param partitions + * @param namespace + * @param key + * @param value + * @param options + * @return + */ + PutNamespacedKVResult putNamespacedKV( + Optional> partitions, + String namespace, + String key, + String value, + PutNamespacedKVOptions options + ); + + /** + * Delete a key-value pair in the namespaced KV store. + * @param partitions + * @param namespace + * @param key + * @param options + * @return + */ + DeleteNamespacedKVResult deleteNamespacedKV( + Optional> partitions, + String namespace, + String key, + DeleteNamespacedKVOptions options + ); + + + // AutoMQ inject end /** diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteNamespacedKVOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteNamespacedKVOptions.java new file mode 100644 index 0000000000..b15bbca77e --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteNamespacedKVOptions.java @@ -0,0 +1,15 @@ +package org.apache.kafka.clients.admin; + +public class DeleteNamespacedKVOptions extends AbstractOptions { + + private long ifMatchEpoch = 0L; + + public DeleteNamespacedKVOptions ifMatchEpoch(long epoch) { + this.ifMatchEpoch = epoch; + return this; + } + + public long ifMatchEpoch() { + return ifMatchEpoch; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteNamespacedKVResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteNamespacedKVResult.java new file mode 100644 index 0000000000..a20b6c7af0 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteNamespacedKVResult.java @@ -0,0 +1,22 @@ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; + +import java.util.Map; + +public class DeleteNamespacedKVResult extends AbstractOptions { + + private final Map> futures; + + public DeleteNamespacedKVResult(Map> futures) { + this.futures = futures; + } + + /** + * Return a future which succeeds if the put operation is successful. + */ + public KafkaFuture>> all() { + return KafkaFuture.completedFuture(futures); + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java index 8822553259..6d59f8d154 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java @@ -320,5 +320,20 @@ public UpdateGroupResult updateGroup(String groupId, UpdateGroupSpec groupSpec, return delegate.updateGroup(groupId, groupSpec, options); } + @Override + public GetNamespacedKVResult getNamespacedKV(Optional> partitions, String namespace, String key, String value, GetNamespacedKVOptions options) { + return delegate.getNamespacedKV(partitions, namespace, key, value, options); + } + + @Override + public PutNamespacedKVResult putNamespacedKV(Optional> partitions, String namespace, String key, String value, PutNamespacedKVOptions options) { + return delegate.putNamespacedKV(partitions, namespace, key, value, options); + } + + @Override + public DeleteNamespacedKVResult deleteNamespacedKV(Optional> partitions, String namespace, String key, DeleteNamespacedKVOptions options) { + return delegate.deleteNamespacedKV(partitions, namespace, key, options); + } + // AutoMQ inject end } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/GetNamespacedKVOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/GetNamespacedKVOptions.java new file mode 100644 index 0000000000..d21d9e3755 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/GetNamespacedKVOptions.java @@ -0,0 +1,4 @@ +package org.apache.kafka.clients.admin; + +public class GetNamespacedKVOptions extends AbstractOptions { +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/GetNamespacedKVResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/GetNamespacedKVResult.java new file mode 100644 index 0000000000..05dd67a138 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/GetNamespacedKVResult.java @@ -0,0 +1,21 @@ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.GetKVsResponseData.GetKVResponse; + +import java.util.Map; +import java.util.concurrent.ExecutionException; + +public class GetNamespacedKVResult { + + private final Map> futures; + + public GetNamespacedKVResult(Map> futures) { + this.futures = futures; + } + + public KafkaFuture>> all() throws ExecutionException, InterruptedException { + return KafkaFuture.completedFuture(futures); + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 12202776ac..7e343673fa 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -47,14 +47,19 @@ import org.apache.kafka.clients.admin.internals.CoordinatorKey; import org.apache.kafka.clients.admin.internals.DeleteConsumerGroupOffsetsHandler; import org.apache.kafka.clients.admin.internals.DeleteConsumerGroupsHandler; +import org.apache.kafka.clients.admin.internals.DeleteNamespacedKVHandler; import org.apache.kafka.clients.admin.internals.DeleteRecordsHandler; import org.apache.kafka.clients.admin.internals.DescribeConsumerGroupsHandler; import org.apache.kafka.clients.admin.internals.DescribeProducersHandler; import org.apache.kafka.clients.admin.internals.DescribeTransactionsHandler; import org.apache.kafka.clients.admin.internals.FenceProducersHandler; +import org.apache.kafka.clients.admin.internals.GetNamespacedKVHandler; import org.apache.kafka.clients.admin.internals.ListConsumerGroupOffsetsHandler; import org.apache.kafka.clients.admin.internals.ListOffsetsHandler; import org.apache.kafka.clients.admin.internals.ListTransactionsHandler; +import org.apache.kafka.clients.admin.internals.NamespacedKVRecordsToGet; +import org.apache.kafka.clients.admin.internals.NamespacedKVRecordsToPut; +import org.apache.kafka.clients.admin.internals.PutNamespacedKVHandler; import org.apache.kafka.clients.admin.internals.RemoveMembersFromConsumerGroupHandler; import org.apache.kafka.clients.admin.internals.UpdateGroupHandler; import org.apache.kafka.clients.consumer.OffsetAndMetadata; @@ -132,6 +137,7 @@ import org.apache.kafka.common.message.DeleteAclsResponseData; import org.apache.kafka.common.message.DeleteAclsResponseData.DeleteAclsFilterResult; import org.apache.kafka.common.message.DeleteAclsResponseData.DeleteAclsMatchingAcl; +import org.apache.kafka.common.message.DeleteKVsRequestData; import org.apache.kafka.common.message.DeleteTopicsRequestData; import org.apache.kafka.common.message.DeleteTopicsRequestData.DeleteTopicState; import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult; @@ -152,6 +158,8 @@ import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData.UserName; import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData; import org.apache.kafka.common.message.ExpireDelegationTokenRequestData; +import org.apache.kafka.common.message.GetKVsRequestData; +import org.apache.kafka.common.message.GetKVsResponseData; import org.apache.kafka.common.message.GetNextNodeIdRequestData; import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity; @@ -160,6 +168,7 @@ import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData; import org.apache.kafka.common.message.MetadataRequestData; +import org.apache.kafka.common.message.PutKVsRequestData; import org.apache.kafka.common.message.RemoveRaftVoterRequestData; import org.apache.kafka.common.message.RenewDelegationTokenRequestData; import org.apache.kafka.common.message.UnregisterBrokerRequestData; @@ -266,6 +275,7 @@ import org.slf4j.Logger; +import java.nio.charset.StandardCharsets; import java.security.InvalidKeyException; import java.security.NoSuchAlgorithmException; import java.time.Duration; @@ -4866,6 +4876,86 @@ public UpdateGroupResult updateGroup(String groupId, UpdateGroupSpec groupSpec, return new UpdateGroupResult(future.get(CoordinatorKey.byGroupId(groupId))); } + @Override + public GetNamespacedKVResult getNamespacedKV(Optional> partitions, String namespace, String key, String value, GetNamespacedKVOptions options) { + Set targetPartitions = partitions.orElseThrow(() -> + new IllegalArgumentException("Partitions cannot be empty") + ); + + NamespacedKVRecordsToGet.Builder recordsToGetBuilder = NamespacedKVRecordsToGet.newBuilder(); + for (TopicPartition tp : targetPartitions) { + GetKVsRequestData.GetKVRequest kvRequest = new GetKVsRequestData.GetKVRequest() + .setKey(key) + .setNamespace(namespace); + + recordsToGetBuilder.addRecord(tp, kvRequest); + } + + NamespacedKVRecordsToGet recordsToGet = recordsToGetBuilder.build(); + GetNamespacedKVHandler handler = new GetNamespacedKVHandler(logContext, recordsToGet); + SimpleAdminApiFuture future = GetNamespacedKVHandler.newFuture(targetPartitions); + + invokeDriver(handler, future, options.timeoutMs); + + return new GetNamespacedKVResult(future.all()); + } + + @Override + public PutNamespacedKVResult putNamespacedKV(Optional> partitions, String namespace, String key, String value, PutNamespacedKVOptions options) { + Set targetPartitions = partitions.orElseThrow(() -> + new IllegalArgumentException("Partitions cannot be empty") + ); + + NamespacedKVRecordsToPut.Builder recordsToPutBuilder = NamespacedKVRecordsToPut.newBuilder(); + for (TopicPartition tp : targetPartitions) { + PutKVsRequestData.PutKVRequest kvRequest = new PutKVsRequestData.PutKVRequest() + .setKey(key) + .setValue(value.getBytes(StandardCharsets.UTF_8)) + .setNamespace(namespace) + .setOverwrite(options.overwrite()) + .setEpoch(options.ifMatchEpoch()); + + recordsToPutBuilder.addRecord(tp, kvRequest); + } + + NamespacedKVRecordsToPut recordsToPut = recordsToPutBuilder.build(); + + PutNamespacedKVHandler handler = new PutNamespacedKVHandler(logContext, recordsToPut); + SimpleAdminApiFuture future = PutNamespacedKVHandler.newFuture(targetPartitions); + + invokeDriver(handler, future, options.timeoutMs); + + return new PutNamespacedKVResult(future.all()); + } + + + @Override + public DeleteNamespacedKVResult deleteNamespacedKV(Optional> partitions, String namespace, String key, DeleteNamespacedKVOptions options) { + + Set targetPartitions = partitions.orElseThrow(() -> + new IllegalArgumentException("Partitions cannot be empty") + ); + + NamespacedKVRecordsToDelete.Builder recordsToDeleteBuilder = NamespacedKVRecordsToDelete.newBuilder(); + for (TopicPartition tp : targetPartitions) { + DeleteKVsRequestData.DeleteKVRequest kvRequest = new DeleteKVsRequestData.DeleteKVRequest() + .setKey(key) + .setNamespace(namespace) + .setEpoch(options.ifMatchEpoch()); + + recordsToDeleteBuilder.addRecord(tp, kvRequest); + } + + NamespacedKVRecordsToDelete recordsToDelete = recordsToDeleteBuilder.build(); + + DeleteNamespacedKVHandler handler = new DeleteNamespacedKVHandler(logContext, recordsToDelete); + SimpleAdminApiFuture future = DeleteNamespacedKVHandler.newFuture(targetPartitions); + + invokeDriver(handler, future, options.timeoutMs); + + return new DeleteNamespacedKVResult(future.all()); + } + private void invokeDriver( AdminApiHandler handler, AdminApiFuture future, diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/NamespacedKVRecordsToDelete.java b/clients/src/main/java/org/apache/kafka/clients/admin/NamespacedKVRecordsToDelete.java new file mode 100644 index 0000000000..0db8e27990 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/NamespacedKVRecordsToDelete.java @@ -0,0 +1,40 @@ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.DeleteKVsRequestData.DeleteKVRequest; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class NamespacedKVRecordsToDelete { + + private final Map> recordsByPartition; + + public NamespacedKVRecordsToDelete(Map> recordsByPartition) { + this.recordsByPartition = recordsByPartition; + } + + public static NamespacedKVRecordsToDelete.Builder newBuilder() { + return new NamespacedKVRecordsToDelete.Builder(); + } + + public Map> recordsByPartition() { + return recordsByPartition; + } + + public static class Builder { + private final Map> records = new HashMap<>(); + + public NamespacedKVRecordsToDelete.Builder addRecord(TopicPartition partition, DeleteKVRequest request) { + records.computeIfAbsent(partition, k -> new ArrayList<>()).add(request); + return this; + } + + public NamespacedKVRecordsToDelete build() { + return new NamespacedKVRecordsToDelete(records); + } + } + +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/PutNamespacedKVOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/PutNamespacedKVOptions.java new file mode 100644 index 0000000000..06d20eefe3 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/PutNamespacedKVOptions.java @@ -0,0 +1,24 @@ +package org.apache.kafka.clients.admin; + +public class PutNamespacedKVOptions extends AbstractOptions { + + private boolean overwrite = false; + private long ifMatchEpoch = 0L; + + public PutNamespacedKVOptions overwrite(boolean overwrite) { + this.overwrite = overwrite; + return this; + } + + public PutNamespacedKVOptions ifMatchEpoch(long epoch) { + this.ifMatchEpoch = epoch; + return this; + } + + public boolean overwrite() { + return overwrite; + } + public long ifMatchEpoch() { + return ifMatchEpoch; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/PutNamespacedKVResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/PutNamespacedKVResult.java new file mode 100644 index 0000000000..dc1f05fd8e --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/PutNamespacedKVResult.java @@ -0,0 +1,34 @@ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; + +import java.util.Map; + + +public class PutNamespacedKVResult { + + private final Map> futures; + + /** + * Return a future which succeeds only if all the records deletions succeed. + */ + public PutNamespacedKVResult(Map> futures) { + this.futures = futures; + } + + /** + * Return a future which succeeds only if all the records deletions succeed. + */ + public KafkaFuture>> all() { + return KafkaFuture.completedFuture(futures); + } + +// /** +// * Return a future which succeeds if the put operation is successful. +// */ +// public Map> all() { +// return futures; +// } + +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteNamespacedKVHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteNamespacedKVHandler.java new file mode 100644 index 0000000000..27e2a43fc3 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteNamespacedKVHandler.java @@ -0,0 +1,89 @@ +package org.apache.kafka.clients.admin.internals; + +import org.apache.kafka.clients.admin.NamespacedKVRecordsToDelete; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.DeleteKVsRequestData; +import org.apache.kafka.common.message.DeleteKVsRequestData.DeleteKVRequest; +import org.apache.kafka.common.message.DeleteKVsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.s3.DeleteKVsRequest; +import org.apache.kafka.common.requests.s3.DeleteKVsResponse; +import org.apache.kafka.common.utils.LogContext; + +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class DeleteNamespacedKVHandler extends AdminApiHandler.Batched { + + private final Logger logger; + private final NamespacedKVRecordsToDelete recordsToDelete; + private final AdminApiLookupStrategy lookupStrategy; + + public DeleteNamespacedKVHandler(LogContext logContext, NamespacedKVRecordsToDelete recordsToDelete) { + this.logger = logContext.logger(PutNamespacedKVHandler.class); + this.recordsToDelete = recordsToDelete; + this.lookupStrategy = new PartitionLeaderStrategy(logContext); + } + + @Override + AbstractRequest.Builder buildBatchedRequest(int brokerId, Set partitions) { + Map> filteredRecords = new HashMap<>(); + for (TopicPartition partition : partitions) { + if (recordsToDelete.recordsByPartition().containsKey(partition)) { + filteredRecords.put(partition, recordsToDelete.recordsByPartition().get(partition)); + } + } + + DeleteKVsRequestData requestData = new DeleteKVsRequestData(); + List allRequests = new ArrayList<>(); + filteredRecords.values().forEach(allRequests::addAll); + requestData.setDeleteKVRequests(allRequests); + + return new DeleteKVsRequest.Builder(requestData); + } + + @Override + public String apiName() { + return "DeleteKVs"; + } + + @Override + public ApiResult handleResponse(Node broker, Set partitions, AbstractResponse response) { + DeleteKVsResponse deleteResponse = (DeleteKVsResponse) response; + DeleteKVsResponseData responseData = deleteResponse.data(); + final Map completed = new HashMap<>(); + final Map failed = new HashMap<>(); + + partitions.forEach(partition -> { + Errors error = Errors.forCode(responseData.errorCode()); + if (error != Errors.NONE) { + failed.put(partition, error.exception()); + } else { + completed.put(partition, null); + } + }); + + return new ApiResult<>(completed, failed, Collections.emptyList()); + } + + @Override + public AdminApiLookupStrategy lookupStrategy() { + return this.lookupStrategy; + } + + public static AdminApiFuture.SimpleAdminApiFuture newFuture( + Set partitions + ) { + return AdminApiFuture.forKeys(new HashSet<>(partitions)); + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/GetNamespacedKVHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/GetNamespacedKVHandler.java new file mode 100644 index 0000000000..ee151df011 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/GetNamespacedKVHandler.java @@ -0,0 +1,95 @@ +package org.apache.kafka.clients.admin.internals; + +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.GetKVsRequestData; +import org.apache.kafka.common.message.GetKVsResponseData; +import org.apache.kafka.common.message.GetKVsResponseData.GetKVResponse; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.s3.GetKVsRequest; +import org.apache.kafka.common.requests.s3.GetKVsResponse; +import org.apache.kafka.common.utils.LogContext; + +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class GetNamespacedKVHandler extends AdminApiHandler.Batched { + private final Logger logger; + private final NamespacedKVRecordsToGet recordsToGet; + private final AdminApiLookupStrategy lookupStrategy; + private final List orderedPartitions; + + public GetNamespacedKVHandler(LogContext logContext, NamespacedKVRecordsToGet recordsToGet) { + this.logger = logContext.logger(PutNamespacedKVHandler.class); + this.recordsToGet = recordsToGet; + this.lookupStrategy = new PartitionLeaderStrategy(logContext); + this.orderedPartitions = new ArrayList<>(recordsToGet.recordsByPartition().keySet()); + } + + @Override + AbstractRequest.Builder buildBatchedRequest(int brokerId, Set partitions) { + + GetKVsRequestData requestData = new GetKVsRequestData(); + for (TopicPartition tp : orderedPartitions) { + if (partitions.contains(tp)) { + requestData.getKeyRequests().addAll( + recordsToGet.recordsByPartition().get(tp) + ); + } + } + + return new GetKVsRequest.Builder(requestData); + } + + @Override + public String apiName() { + return "GetKVs"; + } + + @Override + public ApiResult handleResponse(Node broker, Set partitions, AbstractResponse response) { + + GetKVsResponseData data = ((GetKVsResponse) response).data(); + Map completed = new LinkedHashMap<>(); + Map failed = new HashMap<>(); + List responses = data.getKVResponses(); + int responseIndex = 0; + for (TopicPartition tp : orderedPartitions) { + if (!partitions.contains(tp)) { + continue; + } + if (responseIndex >= responses.size()) { + failed.put(tp, new IllegalStateException("Missing response for partition")); + continue; + } + GetKVResponse resp = responses.get(responseIndex++); + if (resp.errorCode() == Errors.NONE.code()) { + completed.put(tp, resp); + } else { + failed.put(tp, Errors.forCode(resp.errorCode()).exception()); + } + } + return new ApiResult<>(completed, failed, Collections.emptyList()); + } + + @Override + public AdminApiLookupStrategy lookupStrategy() { + return this.lookupStrategy; + } + + public static AdminApiFuture.SimpleAdminApiFuture newFuture( + Set partitions + ) { + return AdminApiFuture.forKeys(new HashSet<>(partitions)); + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/NamespacedKVRecordsToGet.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/NamespacedKVRecordsToGet.java new file mode 100644 index 0000000000..39ec6aa04e --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/NamespacedKVRecordsToGet.java @@ -0,0 +1,38 @@ +package org.apache.kafka.clients.admin.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.GetKVsRequestData.GetKVRequest; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class NamespacedKVRecordsToGet { + + private final Map> recordsByPartition; + + public NamespacedKVRecordsToGet(Map> recordsByPartition) { + this.recordsByPartition = recordsByPartition; + } + + public static NamespacedKVRecordsToGet.Builder newBuilder() { + return new NamespacedKVRecordsToGet.Builder(); + } + + public static class Builder { + private final Map> records = new HashMap<>(); + public Builder addRecord(TopicPartition tp, GetKVRequest req) { + records.computeIfAbsent(tp, k -> new ArrayList<>()).add(req); + return this; + } + + public NamespacedKVRecordsToGet build() { + return new NamespacedKVRecordsToGet(records); + } + } + + public Map> recordsByPartition() { + return recordsByPartition; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/NamespacedKVRecordsToPut.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/NamespacedKVRecordsToPut.java new file mode 100644 index 0000000000..28b34266bd --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/NamespacedKVRecordsToPut.java @@ -0,0 +1,39 @@ +package org.apache.kafka.clients.admin.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.PutKVsRequestData.PutKVRequest; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class NamespacedKVRecordsToPut { + + private final Map> recordsByPartition; + + private NamespacedKVRecordsToPut(Map> recordsByPartition) { + this.recordsByPartition = recordsByPartition; + } + + public static Builder newBuilder() { + return new Builder(); + } + + public Map> recordsByPartition() { + return recordsByPartition; + } + + public static class Builder { + private final Map> records = new HashMap<>(); + + public Builder addRecord(TopicPartition partition, PutKVRequest request) { + records.computeIfAbsent(partition, k -> new ArrayList<>()).add(request); + return this; + } + + public NamespacedKVRecordsToPut build() { + return new NamespacedKVRecordsToPut(records); + } + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/PutNamespacedKVHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/PutNamespacedKVHandler.java new file mode 100644 index 0000000000..260a4454d6 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/PutNamespacedKVHandler.java @@ -0,0 +1,87 @@ +package org.apache.kafka.clients.admin.internals; + +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.PutKVsRequestData; +import org.apache.kafka.common.message.PutKVsRequestData.PutKVRequest; +import org.apache.kafka.common.message.PutKVsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.s3.PutKVsRequest; +import org.apache.kafka.common.requests.s3.PutKVsResponse; +import org.apache.kafka.common.utils.LogContext; + +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class PutNamespacedKVHandler extends AdminApiHandler.Batched { + private final Logger logger; + private final NamespacedKVRecordsToPut recordsToPut; + private final AdminApiLookupStrategy lookupStrategy; + + public PutNamespacedKVHandler(LogContext logContext, NamespacedKVRecordsToPut recordsToPut) { + this.logger = logContext.logger(PutNamespacedKVHandler.class); + this.recordsToPut = recordsToPut; + this.lookupStrategy = new PartitionLeaderStrategy(logContext); + } + + @Override + protected AbstractRequest.Builder buildBatchedRequest(int brokerId, Set partitions) { + Map> filteredRecords = new HashMap<>(); + for (TopicPartition partition : partitions) { + if (recordsToPut.recordsByPartition().containsKey(partition)) { + filteredRecords.put(partition, recordsToPut.recordsByPartition().get(partition)); + } + } + + PutKVsRequestData requestData = new PutKVsRequestData(); + List allRequests = new ArrayList<>(); + filteredRecords.values().forEach(allRequests::addAll); + requestData.setPutKVRequests(allRequests); + + return new PutKVsRequest.Builder(requestData); + } + + @Override + public String apiName() { + return "PutKVs"; + } + + @Override + public ApiResult handleResponse(Node broker, Set partitions, AbstractResponse response) { + PutKVsResponse putResponse = (PutKVsResponse) response; + PutKVsResponseData responseData = putResponse.data(); + final Map completed = new HashMap<>(); + final Map failed = new HashMap<>(); + + partitions.forEach(partition -> { + Errors error = Errors.forCode(responseData.errorCode()); + if (error != Errors.NONE) { + failed.put(partition, error.exception()); + } else { + completed.put(partition, null); + } + }); + + return new ApiResult<>(completed, failed, Collections.emptyList()); + } + + @Override + public AdminApiLookupStrategy lookupStrategy() { + return this.lookupStrategy; + } + + public static AdminApiFuture.SimpleAdminApiFuture newFuture( + Set partitions + ) { + return AdminApiFuture.forKeys(new HashSet<>(partitions)); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InvalidKVRecordEpochException.java b/clients/src/main/java/org/apache/kafka/common/errors/InvalidKVRecordEpochException.java new file mode 100644 index 0000000000..4b0f190b8d --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/InvalidKVRecordEpochException.java @@ -0,0 +1,8 @@ +package org.apache.kafka.common.errors; + +public class InvalidKVRecordEpochException extends ApiException { + + public InvalidKVRecordEpochException(String message) { + super(message); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index b886a24a1f..c33ade3cf8 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -61,6 +61,7 @@ import org.apache.kafka.common.errors.InvalidFetchSessionEpochException; import org.apache.kafka.common.errors.InvalidFetchSizeException; import org.apache.kafka.common.errors.InvalidGroupIdException; +import org.apache.kafka.common.errors.InvalidKVRecordEpochException; import org.apache.kafka.common.errors.InvalidPartitionsException; import org.apache.kafka.common.errors.InvalidPidMappingException; import org.apache.kafka.common.errors.InvalidPrincipalTypeException; @@ -440,6 +441,7 @@ public enum Errors { NODE_LOCKED(515, "The node is locked", NodeLockedException::new), OBJECT_NOT_COMMITED(516, "The object is not commited.", ObjectNotCommittedException::new), STREAM_INNER_ERROR(599, "The stream inner error.", StreamInnerErrorException::new), + INVALID_KV_RECORD_EPOCH(600, "The KV record epoch is invalid.", InvalidKVRecordEpochException::new), // AutoMQ inject end INVALID_RECORD_STATE(121, "The record state is invalid. The acknowledgement of delivery could not be completed.", InvalidRecordStateException::new), diff --git a/clients/src/main/resources/common/message/DeleteKVsRequest.json b/clients/src/main/resources/common/message/DeleteKVsRequest.json index 2cf9190ce6..0c054e20ba 100644 --- a/clients/src/main/resources/common/message/DeleteKVsRequest.json +++ b/clients/src/main/resources/common/message/DeleteKVsRequest.json @@ -21,7 +21,7 @@ "broker" ], "name": "DeleteKVsRequest", - "validVersions": "0", + "validVersions": "0-1", "flexibleVersions": "0+", "fields": [ { @@ -35,8 +35,20 @@ "type": "string", "versions": "0+", "about": "Key is the key to delete" + }, + { + "name": "Namespace", + "type": "string", + "versions": "1+", + "about": "Namespace" + }, + { + "name": "Epoch", + "type": "int64", + "versions": "1+", + "about": "Epoch" } ] } ] -} \ No newline at end of file +} diff --git a/clients/src/main/resources/common/message/DeleteKVsResponse.json b/clients/src/main/resources/common/message/DeleteKVsResponse.json index eae0e2fbb5..d5493591bb 100644 --- a/clients/src/main/resources/common/message/DeleteKVsResponse.json +++ b/clients/src/main/resources/common/message/DeleteKVsResponse.json @@ -17,7 +17,7 @@ "apiKey": 511, "type": "response", "name": "DeleteKVsResponse", - "validVersions": "0", + "validVersions": "0-1", "flexibleVersions": "0+", "fields": [ { @@ -50,8 +50,15 @@ "versions": "0+", "nullableVersions": "0+", "about": "Value" + }, + { + "name": "Epoch", + "type": "int64", + "versions": "1+", + "about": "Epoch" } + ] } ] -} \ No newline at end of file +} diff --git a/clients/src/main/resources/common/message/GetKVsRequest.json b/clients/src/main/resources/common/message/GetKVsRequest.json index e04d138bd9..192df1b6ef 100644 --- a/clients/src/main/resources/common/message/GetKVsRequest.json +++ b/clients/src/main/resources/common/message/GetKVsRequest.json @@ -21,7 +21,7 @@ "broker" ], "name": "GetKVsRequest", - "validVersions": "0", + "validVersions": "0-1", "flexibleVersions": "0+", "fields": [ { @@ -35,8 +35,14 @@ "type": "string", "versions": "0+", "about": "Key is the key to get" + }, + { + "name": "Namespace", + "type": "string", + "versions": "1+", + "about": "Namespace" } ] } ] -} \ No newline at end of file +} diff --git a/clients/src/main/resources/common/message/GetKVsResponse.json b/clients/src/main/resources/common/message/GetKVsResponse.json index d602c08c46..7b32f42b67 100644 --- a/clients/src/main/resources/common/message/GetKVsResponse.json +++ b/clients/src/main/resources/common/message/GetKVsResponse.json @@ -17,7 +17,7 @@ "apiKey": 509, "type": "response", "name": "GetKVsResponse", - "validVersions": "0", + "validVersions": "0-1", "flexibleVersions": "0+", "fields": [ { @@ -50,8 +50,20 @@ "versions": "0+", "nullableVersions": "0+", "about": "Value" + }, + { + "name": "Namespace", + "type": "string", + "versions": "1+", + "about": "Namespace" + }, + { + "name": "Epoch", + "type": "int64", + "versions": "1+", + "about": "Epoch" } ] } ] -} \ No newline at end of file +} diff --git a/clients/src/main/resources/common/message/PutKVsRequest.json b/clients/src/main/resources/common/message/PutKVsRequest.json index 041c25dc84..2976c5e900 100644 --- a/clients/src/main/resources/common/message/PutKVsRequest.json +++ b/clients/src/main/resources/common/message/PutKVsRequest.json @@ -21,7 +21,7 @@ "broker" ], "name": "PutKVsRequest", - "validVersions": "0", + "validVersions": "0-1", "flexibleVersions": "0+", "fields": [ { @@ -47,8 +47,20 @@ "type": "bool", "versions": "0+", "about": "overwrite put kv" + }, + { + "name": "Namespace", + "type": "string", + "versions": "1+", + "about": "Namespace" + }, + { + "name": "Epoch", + "type": "int64", + "versions": "1+", + "about": "Epoch" } ] } ] -} \ No newline at end of file +} diff --git a/clients/src/main/resources/common/message/PutKVsResponse.json b/clients/src/main/resources/common/message/PutKVsResponse.json index 067a867018..90187c434c 100644 --- a/clients/src/main/resources/common/message/PutKVsResponse.json +++ b/clients/src/main/resources/common/message/PutKVsResponse.json @@ -17,7 +17,7 @@ "apiKey": 510, "type": "response", "name": "PutKVsResponse", - "validVersions": "0", + "validVersions": "0-1", "flexibleVersions": "0+", "fields": [ { @@ -49,8 +49,14 @@ "type": "bytes", "versions": "0+", "about": "Value" + }, + { + "name": "Epoch", + "type": "int64", + "versions": "1+", + "about": "Epoch" } ] } ] -} \ No newline at end of file +} diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 34c2722d46..b95b26c5be 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -6948,7 +6948,7 @@ public void testDescribeLogDirsPartialFailure() throws Exception { assertNotNull(result.descriptions().get(1).get()); } } - + @Test public void testDescribeReplicaLogDirsWithNonExistReplica() throws Exception { int brokerId = 0; @@ -6967,7 +6967,7 @@ public void testDescribeReplicaLogDirsWithNonExistReplica() throws Exception { DescribeReplicaLogDirsResult result = env.adminClient().describeReplicaLogDirs(asList(tpr1, tpr2)); Map> values = result.values(); - + assertEquals(logDir, values.get(tpr1).get().getCurrentReplicaLogDir()); assertNull(values.get(tpr1).get().getFutureReplicaLogDir()); assertEquals(offsetLag, values.get(tpr1).get().getCurrentReplicaOffsetLag()); @@ -7668,6 +7668,34 @@ public void testClientInstanceIdNoTelemetryReporterRegistered() { admin.close(); } + @Test + public void testPutKV_NewKey() { + + Properties props = new Properties(); + props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + + KafkaAdminClient admin = (KafkaAdminClient) AdminClient.create(props); + + PutNamespacedKVOptions options = new PutNamespacedKVOptions(); + options.overwrite(true); + options.ifMatchEpoch(0L); + Set multiPartitions = new HashSet<>(Arrays.asList( + new TopicPartition("test-topic", 0), + new TopicPartition("test-topic", 1))); + + PutNamespacedKVResult putNamespacedKVResult = admin.putNamespacedKV(Optional.of(multiPartitions), "namespace1", "key1", "value1", options); + Map> map; + try { + map = putNamespacedKVResult.all().get(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + + assertNotNull(map); + } + private UnregisterBrokerResponse prepareUnregisterBrokerResponse(Errors error, int throttleTimeMs) { return new UnregisterBrokerResponse(new UnregisterBrokerResponseData() .setErrorCode(error.code()) diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index aea08dcd34..b0cdab67af 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -1450,6 +1450,21 @@ public UpdateGroupResult updateGroup(String groupId, UpdateGroupSpec groupSpec, throw new UnsupportedOperationException(); } + @Override + public GetNamespacedKVResult getNamespacedKV(Optional> partitions, String namespace, String key, String value, GetNamespacedKVOptions options) { + throw new UnsupportedOperationException(); + } + + @Override + public PutNamespacedKVResult putNamespacedKV(Optional> partitions, String namespace, String key, String value, PutNamespacedKVOptions options) { + throw new UnsupportedOperationException(); + } + + @Override + public DeleteNamespacedKVResult deleteNamespacedKV(Optional> partitions, String namespace, String key, DeleteNamespacedKVOptions options) { + throw new UnsupportedOperationException(); + } + // AutoMQ inject end } diff --git a/core/src/main/scala/kafka/log/stream/s3/ControllerKVClient.java b/core/src/main/scala/kafka/log/stream/s3/ControllerKVClient.java index 0bf3416765..afff759d07 100644 --- a/core/src/main/scala/kafka/log/stream/s3/ControllerKVClient.java +++ b/core/src/main/scala/kafka/log/stream/s3/ControllerKVClient.java @@ -44,7 +44,9 @@ import com.automq.stream.api.KVClient; import com.automq.stream.api.KeyValue; import com.automq.stream.api.KeyValue.Key; +import com.automq.stream.api.KeyValue.KeyAndNamespace; import com.automq.stream.api.KeyValue.Value; +import com.automq.stream.api.KeyValue.ValueAndEpoch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -247,4 +249,196 @@ public Builder toRequestBuilder() { this.requestSender.send(task); return future; } + + @Override + public CompletableFuture putNamespacedKVIfAbsent(KeyValue keyValue) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("[ControllerKVClient]: Put Namespaced KV if absent: {}", keyValue); + } + PutKVRequest request = new PutKVRequest() + .setKey(keyValue.key().get()) + .setValue(keyValue.value().get().array()) + .setNamespace(keyValue.namespace()) + .setEpoch(keyValue.epoch()); + WrapRequest req = new BatchRequest() { + @Override + public Builder addSubRequest(Builder builder) { + PutKVsRequest.Builder realBuilder = (PutKVsRequest.Builder) builder; + return realBuilder.addSubRequest(request); + } + + @Override + public ApiKeys apiKey() { + return ApiKeys.PUT_KVS; + } + + @Override + public Builder toRequestBuilder() { + return new PutKVsRequest.Builder( + new PutKVsRequestData() + ).addSubRequest(request); + } + }; + CompletableFuture future = new CompletableFuture<>(); + RequestTask task = new RequestTask(req, future, response -> { + Errors code = Errors.forCode(response.errorCode()); + switch (code) { + case NONE: + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("[ControllerKVClient]: Put Namespaced KV if absent: {}, result: {}", keyValue, response); + } + return ResponseHandleResult.withSuccess(ValueAndEpoch.of(response.value(), response.epoch())); + case KEY_EXIST: + LOGGER.warn("[ControllerKVClient]: Failed to Put Namespaced KV if absent: {}, code: {}, key already exist", keyValue, code); + return ResponseHandleResult.withSuccess(ValueAndEpoch.of(response.value(), response.epoch())); + default: + LOGGER.error("[ControllerKVClient]: Failed to Put Namespaced KV if absent: {}, code: {}, retry later", keyValue, code); + return ResponseHandleResult.withRetry(); + } + }); + this.requestSender.send(task); + return future; + } + + @Override + public CompletableFuture putNamespacedKV(KeyValue keyValue) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("[ControllerKVClient]: Put Namespaced KV: {}", keyValue); + } + PutKVRequest request = new PutKVRequest() + .setKey(keyValue.key().get()) + .setValue(keyValue.value().get().array()) + .setNamespace(keyValue.namespace()) + .setEpoch(keyValue.epoch()) + .setOverwrite(true); + WrapRequest req = new BatchRequest() { + @Override + public Builder addSubRequest(Builder builder) { + PutKVsRequest.Builder realBuilder = (PutKVsRequest.Builder) builder; + return realBuilder.addSubRequest(request); + } + + @Override + public ApiKeys apiKey() { + return ApiKeys.PUT_KVS; + } + + @Override + public Builder toRequestBuilder() { + return new PutKVsRequest.Builder( + new PutKVsRequestData() + ).addSubRequest(request); + } + }; + CompletableFuture future = new CompletableFuture<>(); + RequestTask task = new RequestTask(req, future, response -> { + Errors code = Errors.forCode(response.errorCode()); + switch (code) { + case NONE: + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("[ControllerKVClient]: Put Namespaced KV: {}, result: {}", keyValue, response); + } + return ResponseHandleResult.withSuccess(ValueAndEpoch.of(response.value(), response.epoch())); + default: + LOGGER.error("[ControllerKVClient]: Failed to Put Namespaced KV: {}, code: {}, retry later", keyValue, code); + return ResponseHandleResult.withRetry(); + } + }); + this.requestSender.send(task); + return future; + } + + @Override + public CompletableFuture getNamespacedKV(KeyAndNamespace keyAndNamespace) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("[ControllerKVClient]: Get KV: {}, Namespace: {}", keyAndNamespace.key(), keyAndNamespace.namespace()); + } + GetKVRequest request = new GetKVRequest() + .setKey(keyAndNamespace.key().get()) + .setNamespace(keyAndNamespace.namespace()); + WrapRequest req = new BatchRequest() { + @Override + public Builder addSubRequest(Builder builder) { + GetKVsRequest.Builder realBuilder = (GetKVsRequest.Builder) builder; + return realBuilder.addSubRequest(request); + } + + @Override + public ApiKeys apiKey() { + return ApiKeys.GET_KVS; + } + + @Override + public Builder toRequestBuilder() { + return new GetKVsRequest.Builder( + new GetKVsRequestData() + ).addSubRequest(request); + } + }; + CompletableFuture future = new CompletableFuture<>(); + RequestTask task = new RequestTask<>(req, future, response -> { + Errors code = Errors.forCode(response.errorCode()); + switch (code) { + case NONE: + ValueAndEpoch val = ValueAndEpoch.of(response.value(), response.epoch()); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("[ControllerKVClient]: Get Namespaced KV: {}, result: {}", keyAndNamespace.key(), response); + } + return ResponseHandleResult.withSuccess(val); + default: + LOGGER.error("[ControllerKVClient]: Failed to Get Namespaced KV: {}, code: {}, retry later", keyAndNamespace.key(), code); + return ResponseHandleResult.withRetry(); + } + }); + this.requestSender.send(task); + return future; + } + + @Override + public CompletableFuture delNamespacedKV(KeyValue keyValue) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("[ControllerKVClient]: Delete Namespaced KV: {}", keyValue.key()); + } + DeleteKVRequest request = new DeleteKVRequest() + .setKey(keyValue.key().get()); + WrapRequest req = new BatchRequest() { + @Override + public Builder addSubRequest(Builder builder) { + DeleteKVsRequest.Builder realBuilder = (DeleteKVsRequest.Builder) builder; + return realBuilder.addSubRequest(request); + } + + @Override + public ApiKeys apiKey() { + return ApiKeys.DELETE_KVS; + } + + @Override + public Builder toRequestBuilder() { + return new DeleteKVsRequest.Builder( + new DeleteKVsRequestData() + ).addSubRequest(request); + } + }; + + CompletableFuture future = new CompletableFuture<>(); + RequestTask task = new RequestTask<>(req, future, response -> { + Errors code = Errors.forCode(response.errorCode()); + switch (code) { + case NONE: + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("[ControllerKVClient]: Delete Namespaced KV: {}, result: {}", keyValue.key(), response); + } + return ResponseHandleResult.withSuccess(ValueAndEpoch.of(response.value(), response.epoch())); + case KEY_NOT_EXIST: + LOGGER.info("[ControllerKVClient]: Delete Namespaced KV: {}, result: KEY_NOT_EXIST", keyValue.key()); + return ResponseHandleResult.withSuccess(ValueAndEpoch.of((ByteBuffer) null, 0L)); + default: + LOGGER.error("[ControllerKVClient]: Failed to Delete Namespaced KV: {}, code: {}, retry later", keyValue.key(), code); + return ResponseHandleResult.withRetry(); + } + }); + this.requestSender.send(task); + return future; + } } diff --git a/core/src/main/scala/kafka/log/streamaspect/MemoryClient.java b/core/src/main/scala/kafka/log/streamaspect/MemoryClient.java index 75074f5b77..f417bee422 100644 --- a/core/src/main/scala/kafka/log/streamaspect/MemoryClient.java +++ b/core/src/main/scala/kafka/log/streamaspect/MemoryClient.java @@ -19,6 +19,7 @@ package kafka.log.streamaspect; + import com.automq.stream.DefaultRecordBatch; import com.automq.stream.RecordBatchWithContextWrapper; import com.automq.stream.api.AppendResult; @@ -28,7 +29,9 @@ import com.automq.stream.api.KVClient; import com.automq.stream.api.KeyValue; import com.automq.stream.api.KeyValue.Key; +import com.automq.stream.api.KeyValue.KeyAndNamespace; import com.automq.stream.api.KeyValue.Value; +import com.automq.stream.api.KeyValue.ValueAndEpoch; import com.automq.stream.api.OpenStreamOptions; import com.automq.stream.api.RecordBatch; import com.automq.stream.api.RecordBatchWithContext; @@ -51,6 +54,9 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicLong; +import static com.automq.stream.utils.KVRecordUtils.buildCompositeKey; +import static org.apache.kafka.common.protocol.Errors.INVALID_KV_RECORD_EPOCH; + public class MemoryClient implements Client { private final StreamClient streamClient = new StreamClientImpl(); private final KVClient kvClient = new KVClientImpl(); @@ -180,6 +186,7 @@ public void shutdown() { public static class KVClientImpl implements KVClient { private final Map store = new ConcurrentHashMap<>(); + private final Map keyMetadataMap = new ConcurrentHashMap<>(); @Override public CompletableFuture putKV(KeyValue keyValue) { @@ -202,5 +209,77 @@ public CompletableFuture getKV(Key key) { public CompletableFuture delKV(Key key) { return CompletableFuture.completedFuture(Value.of(store.remove(key.get()))); } + + @Override + public CompletableFuture putNamespacedKVIfAbsent(KeyValue keyValue) { + String key = buildCompositeKey(keyValue.namespace(), keyValue.key().get()); + KeyMetadata keyMetadata = keyMetadataMap.get(key); + long currentEpoch = keyMetadata != null ? keyMetadata.getEpoch() : 0; + if (keyValue.epoch() > 0 && keyValue.epoch() != currentEpoch) { + return CompletableFuture.failedFuture(INVALID_KV_RECORD_EPOCH.exception()); + } + long newEpoch = System.currentTimeMillis(); + + ByteBuffer value = store.putIfAbsent(key, keyValue.value().get().duplicate()); + if (keyValue.namespace() != null && !keyValue.namespace().isEmpty()) { + keyMetadataMap.putIfAbsent(keyValue.key().get(), new KeyMetadata(keyValue.namespace(), newEpoch)); + } + return CompletableFuture.completedFuture(ValueAndEpoch.of(value, newEpoch)); + } + + @Override + public CompletableFuture putNamespacedKV(KeyValue keyValue) { + String key = buildCompositeKey(keyValue.namespace(), keyValue.key().get()); + KeyMetadata keyMetadata = keyMetadataMap.get(key); + long currentEpoch = keyMetadata != null ? keyMetadata.getEpoch() : 0; + if (keyValue.epoch() > 0 && keyValue.epoch() != currentEpoch) { + return CompletableFuture.failedFuture(INVALID_KV_RECORD_EPOCH.exception()); + } + long newEpoch = System.currentTimeMillis(); + + ByteBuffer value = store.put(key, keyValue.value().get().duplicate()); + if (keyValue.namespace() != null && !keyValue.namespace().isEmpty()) { + keyMetadataMap.put(keyValue.key().get(), new KeyMetadata(keyValue.namespace(), newEpoch)); + } + return CompletableFuture.completedFuture(ValueAndEpoch.of(value, newEpoch)); + } + + @Override + public CompletableFuture getNamespacedKV(KeyAndNamespace keyAndNamespace) { + String key = buildCompositeKey(keyAndNamespace.namespace(), keyAndNamespace.key().get()); + KeyMetadata keyMetadata = null; + if (keyAndNamespace.namespace() != null && !keyAndNamespace.namespace().isEmpty()) { + keyMetadata = keyMetadataMap.get(key); + } + return CompletableFuture.completedFuture(ValueAndEpoch.of(store.get(key), keyMetadata != null ? keyMetadata.getEpoch() : 0L)); + } + + @Override + public CompletableFuture delNamespacedKV(KeyValue keyValue) { + String key = buildCompositeKey(keyValue.namespace(), keyValue.key().get()); + KeyMetadata keyMetadata = keyMetadataMap.get(key); + long currentEpoch = keyMetadata != null ? keyMetadata.getEpoch() : 0; + if (keyValue.epoch() > 0 && keyValue.epoch() != currentEpoch) { + return CompletableFuture.failedFuture(INVALID_KV_RECORD_EPOCH.exception()); + } + return CompletableFuture.completedFuture(ValueAndEpoch.of(store.remove(key), currentEpoch)); + } + + private static class KeyMetadata { + private final long epoch; + private final String namespace; + public KeyMetadata(String namespace, long epoch) { + this.namespace = namespace; + this.epoch = epoch; + } + + public long getEpoch() { + return epoch; + } + + public String getNamespace() { + return namespace; + } + } } } diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 28795589ef..0ff7af0251 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -2051,9 +2051,6 @@ private QuorumController( this.time = time; this.controllerMetrics = controllerMetrics; this.snapshotRegistry = new SnapshotRegistry(logContext); - // AutoMQ for Kafka inject start - this.kvControlManager = new KVControlManager(snapshotRegistry, logContext); - // AutoMQ for Kafka inject end this.deferredEventQueue = new DeferredEventQueue(logContext); this.deferredUnstableEventQueue = new DeferredEventQueue(logContext); this.offsetControl = new OffsetControlManager.Builder(). @@ -2090,6 +2087,9 @@ private QuorumController( setMetadataVersion(MetadataVersion.MINIMUM_KRAFT_VERSION). setClusterFeatureSupportDescriber(clusterSupportDescriber). build(); + // AutoMQ for Kafka inject start + this.kvControlManager = new KVControlManager(snapshotRegistry, logContext, featureControl); + // AutoMQ for Kafka inject end this.clusterControl = new ClusterControlManager.Builder(). setLogContext(logContext). setClusterId(clusterId). @@ -2165,7 +2165,7 @@ private QuorumController( this.streamControlManager = new StreamControlManager(this, snapshotRegistry, logContext, this.s3ObjectControlManager, clusterControl, featureControl, replicationControl); this.topicDeletionManager = new TopicDeletionManager(snapshotRegistry, this, streamControlManager, kvControlManager); - this.nodeControlManager = new NodeControlManager(snapshotRegistry, new DefaultNodeRuntimeInfoManager(clusterControl, streamControlManager)); + this.nodeControlManager = new NodeControlManager(snapshotRegistry, new DefaultNodeRuntimeInfoManager(clusterControl, streamControlManager), featureControl); this.extension = extension.apply(this); // set the nodeControlManager here to avoid circular dependency diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/KVControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/KVControlManager.java index fd6ef712e4..23a18f7a9b 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/KVControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/KVControlManager.java @@ -28,6 +28,7 @@ import org.apache.kafka.common.metadata.RemoveKVRecord; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.controller.ControllerResult; +import org.apache.kafka.controller.FeatureControlManager; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.timeline.SnapshotRegistry; import org.apache.kafka.timeline.TimelineHashMap; @@ -39,6 +40,8 @@ import java.util.List; import java.util.Map; +import static com.automq.stream.utils.KVRecordUtils.buildCompositeKey; +import static org.apache.kafka.common.protocol.Errors.INVALID_KV_RECORD_EPOCH; import static org.apache.kafka.common.protocol.Errors.KEY_EXIST; import static org.apache.kafka.common.protocol.Errors.KEY_NOT_EXIST; @@ -47,30 +50,54 @@ public class KVControlManager { private final SnapshotRegistry registry; private final Logger log; private final TimelineHashMap kv; + private final TimelineHashMap keyMetadataMap; + private final FeatureControlManager featureControl; - public KVControlManager(SnapshotRegistry registry, LogContext logContext) { + public KVControlManager(SnapshotRegistry registry, LogContext logContext, FeatureControlManager featureControl) { this.registry = registry; this.log = logContext.logger(KVControlManager.class); this.kv = new TimelineHashMap<>(registry, 0); + this.keyMetadataMap = new TimelineHashMap<>(registry, 0); + this.featureControl = featureControl; } public GetKVResponse getKV(GetKVRequest request) { - String key = request.key(); + String key = buildCompositeKey(request.namespace(), request.key()); byte[] value = kv.containsKey(key) ? kv.get(key).array() : null; + KeyMetadata keyMetadata = null; + if (request.namespace() != null && !request.namespace().isEmpty()) { + keyMetadata = keyMetadataMap.get(key); + } + return new GetKVResponse() - .setValue(value); + .setValue(value) + .setNamespace(request.namespace()) + .setEpoch(keyMetadata != null ? keyMetadata.getEpoch() : 0L); } public ControllerResult putKV(PutKVRequest request) { - String key = request.key(); + String key = buildCompositeKey(request.namespace(), request.key()); + KeyMetadata keyMetadata = keyMetadataMap.get(key); + long currentEpoch = keyMetadata != null ? keyMetadata.getEpoch() : 0; + if (request.epoch() > 0 && request.epoch() != currentEpoch) { + return ControllerResult.of(Collections.emptyList(), + new PutKVResponse() + .setErrorCode(INVALID_KV_RECORD_EPOCH.code()) + .setEpoch(currentEpoch)); + } + + long newEpoch = System.currentTimeMillis(); ByteBuffer value = kv.get(key); if (value == null || request.overwrite()) { // generate kv record ApiMessageAndVersion record = new ApiMessageAndVersion(new KVRecord() .setKeyValues(Collections.singletonList(new KeyValue() .setKey(key) - .setValue(request.value()))), (short) 0); - return ControllerResult.of(Collections.singletonList(record), new PutKVResponse().setValue(request.value())); + .setValue(request.value()) + .setNamespace(request.namespace()) + .setEpoch(newEpoch))), + featureControl.autoMQVersion().namespacedKVRecordVersion()); + return ControllerResult.of(Collections.singletonList(record), new PutKVResponse().setValue(request.value()).setEpoch(newEpoch)); } // exist and not allow overwriting return ControllerResult.of(Collections.emptyList(), new PutKVResponse() @@ -79,14 +106,25 @@ public ControllerResult putKV(PutKVRequest request) { } public ControllerResult deleteKV(DeleteKVRequest request) { + String key = buildCompositeKey(request.namespace(), request.key()); + KeyMetadata keyMetadata = keyMetadataMap.get(key); + long currentEpoch = keyMetadata != null ? keyMetadata.getEpoch() : 0; + if (request.epoch() > 0 && request.epoch() != currentEpoch) { + return ControllerResult.of(Collections.emptyList(), + new DeleteKVResponse() + .setErrorCode(INVALID_KV_RECORD_EPOCH.code()) + .setEpoch(currentEpoch)); + } log.trace("DeleteKVRequestData: {}", request); DeleteKVResponse resp = new DeleteKVResponse(); ByteBuffer value = kv.get(request.key()); if (value != null) { // generate remove-kv record ApiMessageAndVersion record = new ApiMessageAndVersion(new RemoveKVRecord() - .setKeys(Collections.singletonList(request.key())), (short) 0); - return ControllerResult.of(Collections.singletonList(record), resp.setValue(value.array())); + .setKeys(Collections.singletonList(request.key())) + .setNamepsace(request.namespace()), + featureControl.autoMQVersion().namespacedKVRecordVersion()); + return ControllerResult.of(Collections.singletonList(record), resp.setValue(value.array()).setEpoch(currentEpoch)); } return ControllerResult.of(Collections.emptyList(), resp.setErrorCode(KEY_NOT_EXIST.code())); } @@ -94,18 +132,43 @@ public ControllerResult deleteKV(DeleteKVRequest request) { public void replay(KVRecord record) { List keyValues = record.keyValues(); for (KeyValue keyValue : keyValues) { - kv.put(keyValue.key(), ByteBuffer.wrap(keyValue.value())); + String key = buildCompositeKey(keyValue.namespace(), keyValue.key()); + kv.put(key, ByteBuffer.wrap(keyValue.value())); + if (keyValue.namespace() != null && !keyValue.namespace().isEmpty()) { + keyMetadataMap.put(key, new KeyMetadata(keyValue.namespace(), keyValue.epoch())); + } } } public void replay(RemoveKVRecord record) { List keys = record.keys(); for (String key : keys) { - kv.remove(key); + String compositeKey = buildCompositeKey(record.namepsace(), key); + kv.remove(compositeKey); + if (record.namepsace() != null && !record.namepsace().isEmpty()) { + keyMetadataMap.remove(compositeKey); + } } } public Map kv() { return kv; } + + private static class KeyMetadata { + private final long epoch; + private final String namespace; + public KeyMetadata(String namespace, long epoch) { + this.namespace = namespace; + this.epoch = epoch; + } + + public long getEpoch() { + return epoch; + } + + public String getNamespace() { + return namespace; + } + } } diff --git a/metadata/src/main/resources/common/metadata/KVRecord.json b/metadata/src/main/resources/common/metadata/KVRecord.json index 62b70c2ceb..0e2166d79a 100644 --- a/metadata/src/main/resources/common/metadata/KVRecord.json +++ b/metadata/src/main/resources/common/metadata/KVRecord.json @@ -17,7 +17,7 @@ "apiKey": 516, "type": "metadata", "name": "KVRecord", - "validVersions": "0", + "validVersions": "0-1", "flexibleVersions": "0+", "fields": [ { @@ -37,8 +37,20 @@ "type": "bytes", "versions": "0+", "about": "Value" + }, + { + "name": "Namespace", + "type": "string", + "versions": "1+", + "about": "Namespace" + }, + { + "name": "Epoch", + "type": "int64", + "versions": "1+", + "about": "Epoch" } ] } ] -} \ No newline at end of file +} diff --git a/metadata/src/main/resources/common/metadata/RemoveKVRecord.json b/metadata/src/main/resources/common/metadata/RemoveKVRecord.json index 049d939be2..48a693d5f2 100644 --- a/metadata/src/main/resources/common/metadata/RemoveKVRecord.json +++ b/metadata/src/main/resources/common/metadata/RemoveKVRecord.json @@ -25,6 +25,12 @@ "type": "[]string", "versions": "0+", "about": "Keys" + }, + { + "name": "Namepsace", + "type": "string", + "versions": "1+", + "about": "Namepsace" } ] -} \ No newline at end of file +} diff --git a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java index 4b9564a1f1..7572871c19 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java @@ -74,6 +74,7 @@ import java.util.stream.Stream; import static java.util.Arrays.asList; +import static org.apache.kafka.controller.FeatureControlManagerTest.features; import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV2; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -728,7 +729,12 @@ public void testReRegistrationAndBrokerEpoch(boolean newIncarnationId) { public void testReusableNodeIds() { MockTime time = new MockTime(0, 0, 0); SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); - KVControlManager kvControl = new KVControlManager(snapshotRegistry, new LogContext()); + FeatureControlManager featureControlManager = new FeatureControlManager.Builder(). + setQuorumFeatures(features("foo", 1, 2)). + setSnapshotRegistry(snapshotRegistry). + setMetadataVersion(MetadataVersion.IBP_3_3_IV0). + build(); + KVControlManager kvControl = new KVControlManager(snapshotRegistry, new LogContext(), featureControlManager); FeatureControlManager featureControl = new FeatureControlManager.Builder(). setSnapshotRegistry(snapshotRegistry). setQuorumFeatures(new QuorumFeatures(0, diff --git a/s3stream/src/main/java/com/automq/stream/api/KVClient.java b/s3stream/src/main/java/com/automq/stream/api/KVClient.java index a3e2152ebc..31786ca115 100644 --- a/s3stream/src/main/java/com/automq/stream/api/KVClient.java +++ b/s3stream/src/main/java/com/automq/stream/api/KVClient.java @@ -20,7 +20,9 @@ package com.automq.stream.api; import com.automq.stream.api.KeyValue.Key; +import com.automq.stream.api.KeyValue.KeyAndNamespace; import com.automq.stream.api.KeyValue.Value; +import com.automq.stream.api.KeyValue.ValueAndEpoch; import java.util.concurrent.CompletableFuture; @@ -36,6 +38,14 @@ public interface KVClient { */ CompletableFuture putKVIfAbsent(KeyValue keyValue); + /** + * Put namespaced key value if key not exist, return current key value namespace epoch after putting. + * + * @param keyValue {@link KeyValue} k-v pair namespace and epoch + * @return async put result. {@link ValueAndEpoch} current value and epoch after putting. + */ + CompletableFuture putNamespacedKVIfAbsent(KeyValue keyValue); + /** * Put key value, overwrite if key exist, return current key value after putting. * @@ -44,6 +54,14 @@ public interface KVClient { */ CompletableFuture putKV(KeyValue keyValue); + /** + * Put key value, overwrite if key exist, return current key value namespace epoch after putting. + * + * @param keyValue {@link KeyValue} k-v pair namespace and epoch + * @return async put result. {@link ValueAndEpoch} current value and epoch after putting. + */ + CompletableFuture putNamespacedKV(KeyValue keyValue); + /** * Get value by key. * @@ -52,6 +70,14 @@ public interface KVClient { */ CompletableFuture getKV(Key key); + /** + * Get value by key. + * + * @param keyAndNamespace key and namespace. + * @return async get result. {@link ValueAndEpoch} value and epoch, null if key not exist. + */ + CompletableFuture getNamespacedKV(KeyAndNamespace keyAndNamespace); + /** * Delete key value by key. If key not exist, return null. * @@ -59,4 +85,12 @@ public interface KVClient { * @return async delete result. {@link Value} deleted value, null if key not exist. */ CompletableFuture delKV(Key key); + + /** + * Delete key value by key. If key not exist, return null. + * + * @param keyValue k-v pair namespace and epoch + * @return async delete result. {@link ValueAndEpoch} deleted value and epoch, null if key not exist. + */ + CompletableFuture delNamespacedKV(KeyValue keyValue); } diff --git a/s3stream/src/main/java/com/automq/stream/api/KeyValue.java b/s3stream/src/main/java/com/automq/stream/api/KeyValue.java index 7722120c48..0d17bf131f 100644 --- a/s3stream/src/main/java/com/automq/stream/api/KeyValue.java +++ b/s3stream/src/main/java/com/automq/stream/api/KeyValue.java @@ -25,16 +25,31 @@ public class KeyValue { private final Key key; private final Value value; + private final String namespace; + private final long epoch; private KeyValue(Key key, Value value) { this.key = key; this.value = value; + this.namespace = null; + this.epoch = 0L; + } + + public KeyValue(Key key, Value value, String namespace, long epoch) { + this.key = key; + this.value = value; + this.namespace = namespace; + this.epoch = epoch; } public static KeyValue of(String key, ByteBuffer value) { return new KeyValue(Key.of(key), Value.of(value)); } + public static KeyValue of(String key, ByteBuffer value, String namespace, long epoch) { + return new KeyValue(Key.of(key), Value.of(value), namespace, epoch); + } + public Key key() { return key; } @@ -43,6 +58,14 @@ public Value value() { return value; } + public String namespace() { + return namespace; + } + + public long epoch() { + return epoch; + } + @Override public boolean equals(Object o) { if (this == o) @@ -154,4 +177,52 @@ public String toString() { '}'; } } + + public static class KeyAndNamespace { + private final Key key; + private final String namespace; + + public KeyAndNamespace(Key key, String namespace) { + this.key = key; + this.namespace = namespace; + } + + public Key key() { + return key; + } + + public String namespace() { + return namespace; + } + + public static KeyAndNamespace of(String key, String namespace) { + return new KeyAndNamespace(Key.of(key), namespace); + } + } + + public static class ValueAndEpoch { + private final Value value; + private final long epoch; + + public ValueAndEpoch(Value value, long epoch) { + this.value = value; + this.epoch = epoch; + } + + public Value value() { + return value; + } + + public long epoch() { + return epoch; + } + + public static ValueAndEpoch of(byte[] value, long epoch) { + return new ValueAndEpoch(Value.of(value), epoch); + } + + public static ValueAndEpoch of(ByteBuffer value, long epoch) { + return new ValueAndEpoch(Value.of(value), epoch); + } + } } diff --git a/s3stream/src/main/java/com/automq/stream/utils/KVRecordUtils.java b/s3stream/src/main/java/com/automq/stream/utils/KVRecordUtils.java new file mode 100644 index 0000000000..0e8a136085 --- /dev/null +++ b/s3stream/src/main/java/com/automq/stream/utils/KVRecordUtils.java @@ -0,0 +1,11 @@ +package com.automq.stream.utils; + +public class KVRecordUtils { + + public static String buildCompositeKey(String namespace, String key) { + if (namespace == null || namespace.isEmpty()) { + return key; + } + return namespace + "/" + key; + } +} diff --git a/server-common/src/main/java/org/apache/kafka/server/common/automq/AutoMQVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/automq/AutoMQVersion.java index 082a96e0a2..4f9f9979de 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/automq/AutoMQVersion.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/automq/AutoMQVersion.java @@ -31,10 +31,12 @@ public enum AutoMQVersion { // Support object bucket index // Support huge cluster // Support node registration - V2((short) 3); + V2((short) 3), + // Support kv record namespace + V3((short) 4); public static final String FEATURE_NAME = "automq.version"; - public static final AutoMQVersion LATEST = V2; + public static final AutoMQVersion LATEST = V3; private final short level; private final Version s3streamVersion; @@ -125,6 +127,14 @@ public short streamObjectRecordVersion() { } } + public short namespacedKVRecordVersion() { + if (isAtLeast(V3)) { + return 1; + } else { + return 0; + } + } + public Version s3streamVersion() { return s3streamVersion; } @@ -139,6 +149,7 @@ private Version mapS3StreamVersion(short automqVersion) { case 2: return Version.V0; case 3: + case 4: return Version.V1; default: throw new IllegalArgumentException("Unknown AutoMQVersion level: " + automqVersion); From 9a3b426eddb340f97f9bdbc15f98fe0e2b4076d9 Mon Sep 17 00:00:00 2001 From: 1sonofqiu Date: Wed, 4 Jun 2025 20:00:36 +0800 Subject: [PATCH 2/8] fix(build): remove unnecessary featureControl parameter from NodeControlManager initialization --- .../main/java/org/apache/kafka/controller/QuorumController.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 0ff7af0251..3ffdbb27fe 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -2165,7 +2165,7 @@ private QuorumController( this.streamControlManager = new StreamControlManager(this, snapshotRegistry, logContext, this.s3ObjectControlManager, clusterControl, featureControl, replicationControl); this.topicDeletionManager = new TopicDeletionManager(snapshotRegistry, this, streamControlManager, kvControlManager); - this.nodeControlManager = new NodeControlManager(snapshotRegistry, new DefaultNodeRuntimeInfoManager(clusterControl, streamControlManager), featureControl); + this.nodeControlManager = new NodeControlManager(snapshotRegistry, new DefaultNodeRuntimeInfoManager(clusterControl, streamControlManager)); this.extension = extension.apply(this); // set the nodeControlManager here to avoid circular dependency From ab077eca8577ee40031740e8a7f48c707bb77155 Mon Sep 17 00:00:00 2001 From: 1sonofqiu Date: Wed, 4 Jun 2025 20:24:35 +0800 Subject: [PATCH 3/8] feat(kv): simplify key handling in KVControlManager and add namespaced read/write tests --- .../controller/stream/KVControlManager.java | 10 ++--- .../controller/KVControlManagerTest.java | 42 ++++++++++++++++++- 2 files changed, 45 insertions(+), 7 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/KVControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/KVControlManager.java index 23a18f7a9b..a2306866ed 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/KVControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/KVControlManager.java @@ -132,10 +132,9 @@ public ControllerResult deleteKV(DeleteKVRequest request) { public void replay(KVRecord record) { List keyValues = record.keyValues(); for (KeyValue keyValue : keyValues) { - String key = buildCompositeKey(keyValue.namespace(), keyValue.key()); - kv.put(key, ByteBuffer.wrap(keyValue.value())); + kv.put(keyValue.key(), ByteBuffer.wrap(keyValue.value())); if (keyValue.namespace() != null && !keyValue.namespace().isEmpty()) { - keyMetadataMap.put(key, new KeyMetadata(keyValue.namespace(), keyValue.epoch())); + keyMetadataMap.put(keyValue.key(), new KeyMetadata(keyValue.namespace(), keyValue.epoch())); } } } @@ -143,10 +142,9 @@ public void replay(KVRecord record) { public void replay(RemoveKVRecord record) { List keys = record.keys(); for (String key : keys) { - String compositeKey = buildCompositeKey(record.namepsace(), key); - kv.remove(compositeKey); + kv.remove(key); if (record.namepsace() != null && !record.namepsace().isEmpty()) { - keyMetadataMap.remove(compositeKey); + keyMetadataMap.remove(key); } } } diff --git a/metadata/src/test/java/org/apache/kafka/controller/KVControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/KVControlManagerTest.java index 7f21670a1f..30a1535fc4 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/KVControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/KVControlManagerTest.java @@ -31,6 +31,7 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.controller.stream.KVControlManager; import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.timeline.SnapshotRegistry; import org.junit.jupiter.api.BeforeEach; @@ -41,6 +42,7 @@ import java.util.List; import java.util.stream.Collectors; +import static org.apache.kafka.controller.FeatureControlManagerTest.features; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; @@ -54,7 +56,12 @@ public class KVControlManagerTest { public void setUp() { LogContext logContext = new LogContext(); SnapshotRegistry registry = new SnapshotRegistry(logContext); - this.manager = new KVControlManager(registry, logContext); + FeatureControlManager featureControlManager = new FeatureControlManager.Builder(). + setQuorumFeatures(features("foo", 1, 2)). + setSnapshotRegistry(registry). + setMetadataVersion(MetadataVersion.IBP_3_3_IV0). + build(); + this.manager = new KVControlManager(registry, logContext, featureControlManager); } @Test @@ -113,6 +120,39 @@ public void testBasicReadWrite() { assertEquals(Errors.KEY_NOT_EXIST.code(), result3.response().errorCode()); } + @Test + public void testNamespacedReadWrite() { + ControllerResult result = manager.putKV(new PutKVRequest() + .setKey("key1") + .setValue("value1".getBytes()) + .setNamespace("__automq_test") + .setEpoch(0)); + assertEquals(1, result.records().size()); + assertEquals(Errors.NONE.code(), result.response().errorCode()); + assertEquals("value1", new String(result.response().value())); + replay(manager, result.records()); + + result = manager.putKV(new PutKVRequest() + .setKey("key1") + .setValue("value1-1".getBytes()) + .setNamespace("__automq_test") + .setEpoch(0)); + assertEquals(0, result.records().size()); + assertEquals(Errors.KEY_EXIST.code(), result.response().errorCode()); + assertEquals("value1", new String(result.response().value())); + + result = manager.putKV(new PutKVRequest() + .setKey("key1") + .setValue("value1-2".getBytes()) + .setNamespace("__automq_test") + .setEpoch(0) + .setOverwrite(true)); + assertEquals(1, result.records().size()); + assertEquals(Errors.NONE.code(), result.response().errorCode()); + assertEquals("value1-2", new String(result.response().value())); + replay(manager, result.records()); + } + private void replay(KVControlManager manager, List records) { List messages = records.stream().map(x -> x.message()) .collect(Collectors.toList()); From e709fb9cbd072ebd900f33cf46ad334c2decd9ec Mon Sep 17 00:00:00 2001 From: 1sonofqiu Date: Thu, 5 Jun 2025 11:22:45 +0800 Subject: [PATCH 4/8] feat(kv): test put and delete epoch validation --- .../controller/stream/KVControlManager.java | 4 +- .../controller/KVControlManagerTest.java | 122 +++++++++++++++++- 2 files changed, 121 insertions(+), 5 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/KVControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/KVControlManager.java index a2306866ed..d61f829598 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/KVControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/KVControlManager.java @@ -117,11 +117,11 @@ public ControllerResult deleteKV(DeleteKVRequest request) { } log.trace("DeleteKVRequestData: {}", request); DeleteKVResponse resp = new DeleteKVResponse(); - ByteBuffer value = kv.get(request.key()); + ByteBuffer value = kv.get(key); if (value != null) { // generate remove-kv record ApiMessageAndVersion record = new ApiMessageAndVersion(new RemoveKVRecord() - .setKeys(Collections.singletonList(request.key())) + .setKeys(Collections.singletonList(key)) .setNamepsace(request.namespace()), featureControl.autoMQVersion().namespacedKVRecordVersion()); return ControllerResult.of(Collections.singletonList(record), resp.setValue(value.array()).setEpoch(currentEpoch)); diff --git a/metadata/src/test/java/org/apache/kafka/controller/KVControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/KVControlManagerTest.java index 30a1535fc4..664ec476df 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/KVControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/KVControlManagerTest.java @@ -42,9 +42,11 @@ import java.util.List; import java.util.stream.Collectors; +import static org.apache.kafka.common.protocol.Errors.INVALID_KV_RECORD_EPOCH; import static org.apache.kafka.controller.FeatureControlManagerTest.features; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; @Timeout(40) @Tag("S3Unit") @@ -126,7 +128,8 @@ public void testNamespacedReadWrite() { .setKey("key1") .setValue("value1".getBytes()) .setNamespace("__automq_test") - .setEpoch(0)); +// .setEpoch(0) + ); assertEquals(1, result.records().size()); assertEquals(Errors.NONE.code(), result.response().errorCode()); assertEquals("value1", new String(result.response().value())); @@ -136,7 +139,8 @@ public void testNamespacedReadWrite() { .setKey("key1") .setValue("value1-1".getBytes()) .setNamespace("__automq_test") - .setEpoch(0)); +// .setEpoch(0) + ); assertEquals(0, result.records().size()); assertEquals(Errors.KEY_EXIST.code(), result.response().errorCode()); assertEquals("value1", new String(result.response().value())); @@ -145,12 +149,124 @@ public void testNamespacedReadWrite() { .setKey("key1") .setValue("value1-2".getBytes()) .setNamespace("__automq_test") - .setEpoch(0) +// .setEpoch(0) .setOverwrite(true)); assertEquals(1, result.records().size()); assertEquals(Errors.NONE.code(), result.response().errorCode()); assertEquals("value1-2", new String(result.response().value())); replay(manager, result.records()); + + GetKVResponse result2 = manager.getKV(new GetKVRequest() + .setKey("key1") + .setNamespace("__automq_test")); + assertEquals("value1-2", new String(result2.value())); + + result2 = manager.getKV(new GetKVRequest() + .setKey("key2") + .setNamespace("__automq_test")); + assertNull(result2.value()); + + ControllerResult result3 = manager.deleteKV(new DeleteKVRequest() + .setKey("key2") + .setNamespace("__automq_test")); + assertEquals(0, result3.records().size()); + assertEquals(Errors.KEY_NOT_EXIST.code(), result3.response().errorCode()); + + result3 = manager.deleteKV(new DeleteKVRequest() + .setKey("key1") + .setNamespace("__automq_test") +// .setEpoch(0) + ); + assertEquals(1, result3.records().size()); + assertEquals(Errors.NONE.code(), result3.response().errorCode()); + assertEquals("value1-2", new String(result3.response().value())); + replay(manager, result3.records()); + // key1 is deleted + result2 = manager.getKV(new GetKVRequest() + .setKey("key1") + .setNamespace("__automq_test")); + assertNull(result2.value()); + + result3 = manager.deleteKV(new DeleteKVRequest() + .setKey("key1") + .setNamespace("__automq_test")); + assertEquals(0, result3.records().size()); + assertEquals(Errors.KEY_NOT_EXIST.code(), result3.response().errorCode()); + } + + @Test + public void testPutWithEpochValidation() { + ControllerResult result = manager.putKV(new PutKVRequest() + .setKey("key1") + .setValue("value1".getBytes()) + .setNamespace("__epoch_test") + .setEpoch(0)); + assertEquals(1, result.records().size()); + assertEquals(Errors.NONE.code(), result.response().errorCode()); + long initialEpoch = result.response().epoch(); + assertTrue(initialEpoch > 0); + replay(manager, result.records()); + result = manager.putKV(new PutKVRequest() + .setKey("key1") + .setValue("value2".getBytes()) + .setNamespace("__epoch_test") + .setEpoch(initialEpoch - 1) + .setOverwrite(true)); + assertEquals(0, result.records().size()); + assertEquals(INVALID_KV_RECORD_EPOCH.code(), result.response().errorCode()); + assertEquals(initialEpoch, result.response().epoch()); + // without overwrite, should fail + result = manager.putKV(new PutKVRequest() + .setKey("key1") + .setValue("value2".getBytes()) + .setNamespace("__epoch_test") + .setEpoch(initialEpoch)); + assertEquals(0, result.records().size()); + assertEquals(Errors.KEY_EXIST.code(), result.response().errorCode()); + // with overwrite, should succeed + result = manager.putKV(new PutKVRequest() + .setKey("key1") + .setValue("value2".getBytes()) + .setNamespace("__epoch_test") + .setEpoch(initialEpoch) + .setOverwrite(true)); + assertEquals(1, result.records().size()); + assertEquals(Errors.NONE.code(), result.response().errorCode()); + long newEpoch = result.response().epoch(); + assertTrue(newEpoch > initialEpoch); + replay(manager, result.records()); + GetKVResponse readResp = manager.getKV(new GetKVRequest() + .setKey("key1") + .setNamespace("__epoch_test")); + assertEquals(newEpoch, readResp.epoch()); + } + + @Test + public void testDeleteWithEpochValidation() { + ControllerResult putResult = manager.putKV(new PutKVRequest() + .setKey("key1") + .setValue("value1".getBytes()) + .setNamespace("__epoch_test")); + long initialEpoch = putResult.response().epoch(); + replay(manager, putResult.records()); + ControllerResult delResult = manager.deleteKV(new DeleteKVRequest() + .setKey("key1") + .setNamespace("__epoch_test") + .setEpoch(initialEpoch - 1)); + assertEquals(0, delResult.records().size()); + assertEquals(INVALID_KV_RECORD_EPOCH.code(), delResult.response().errorCode()); + assertEquals(initialEpoch, delResult.response().epoch()); + delResult = manager.deleteKV(new DeleteKVRequest() + .setKey("key1") + .setNamespace("__epoch_test") + .setEpoch(initialEpoch)); + assertEquals(1, delResult.records().size()); + assertEquals(Errors.NONE.code(), delResult.response().errorCode()); + replay(manager, delResult.records()); + GetKVResponse readResp = manager.getKV(new GetKVRequest() + .setKey("key1") + .setNamespace("__epoch_test")); + assertNull(readResp.value()); } private void replay(KVControlManager manager, List records) { From 28045f41a79df32fdd32864a9cc93c044d23df06 Mon Sep 17 00:00:00 2001 From: 1sonofqiu Date: Thu, 5 Jun 2025 11:23:30 +0800 Subject: [PATCH 5/8] feat(kv): test put and delete epoch validation --- .../org/apache/kafka/controller/KVControlManagerTest.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/metadata/src/test/java/org/apache/kafka/controller/KVControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/KVControlManagerTest.java index 664ec476df..ac34141b38 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/KVControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/KVControlManagerTest.java @@ -128,7 +128,6 @@ public void testNamespacedReadWrite() { .setKey("key1") .setValue("value1".getBytes()) .setNamespace("__automq_test") -// .setEpoch(0) ); assertEquals(1, result.records().size()); assertEquals(Errors.NONE.code(), result.response().errorCode()); @@ -139,7 +138,6 @@ public void testNamespacedReadWrite() { .setKey("key1") .setValue("value1-1".getBytes()) .setNamespace("__automq_test") -// .setEpoch(0) ); assertEquals(0, result.records().size()); assertEquals(Errors.KEY_EXIST.code(), result.response().errorCode()); @@ -149,7 +147,6 @@ public void testNamespacedReadWrite() { .setKey("key1") .setValue("value1-2".getBytes()) .setNamespace("__automq_test") -// .setEpoch(0) .setOverwrite(true)); assertEquals(1, result.records().size()); assertEquals(Errors.NONE.code(), result.response().errorCode()); @@ -175,7 +172,6 @@ public void testNamespacedReadWrite() { result3 = manager.deleteKV(new DeleteKVRequest() .setKey("key1") .setNamespace("__automq_test") -// .setEpoch(0) ); assertEquals(1, result3.records().size()); assertEquals(Errors.NONE.code(), result3.response().errorCode()); From 36a804bb1d48e31a30eab564af7b2f8d73a2c1e3 Mon Sep 17 00:00:00 2001 From: 1sonofqiu Date: Thu, 5 Jun 2025 19:05:02 +0800 Subject: [PATCH 6/8] chore(kv): correct namespace spelling in KVControlManager and RemoveKVRecord --- .../apache/kafka/controller/stream/KVControlManager.java | 4 ++-- .../src/main/resources/common/metadata/RemoveKVRecord.json | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/KVControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/KVControlManager.java index d61f829598..a3e006e5b3 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/KVControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/KVControlManager.java @@ -122,7 +122,7 @@ public ControllerResult deleteKV(DeleteKVRequest request) { // generate remove-kv record ApiMessageAndVersion record = new ApiMessageAndVersion(new RemoveKVRecord() .setKeys(Collections.singletonList(key)) - .setNamepsace(request.namespace()), + .setNamespace(request.namespace()), featureControl.autoMQVersion().namespacedKVRecordVersion()); return ControllerResult.of(Collections.singletonList(record), resp.setValue(value.array()).setEpoch(currentEpoch)); } @@ -143,7 +143,7 @@ public void replay(RemoveKVRecord record) { List keys = record.keys(); for (String key : keys) { kv.remove(key); - if (record.namepsace() != null && !record.namepsace().isEmpty()) { + if (record.namespace() != null && !record.namespace().isEmpty()) { keyMetadataMap.remove(key); } } diff --git a/metadata/src/main/resources/common/metadata/RemoveKVRecord.json b/metadata/src/main/resources/common/metadata/RemoveKVRecord.json index 48a693d5f2..dc0253e5cf 100644 --- a/metadata/src/main/resources/common/metadata/RemoveKVRecord.json +++ b/metadata/src/main/resources/common/metadata/RemoveKVRecord.json @@ -17,7 +17,7 @@ "apiKey": 517, "type": "metadata", "name": "RemoveKVRecord", - "validVersions": "0", + "validVersions": "0-1", "flexibleVersions": "0+", "fields": [ { @@ -27,10 +27,10 @@ "about": "Keys" }, { - "name": "Namepsace", + "name": "Namespace", "type": "string", "versions": "1+", - "about": "Namepsace" + "about": "Namespace" } ] } From 901306838890ad9c2ab78544cadcb77332acdcbe Mon Sep 17 00:00:00 2001 From: 1sonofqiu Date: Thu, 5 Jun 2025 22:19:31 +0800 Subject: [PATCH 7/8] feat(kv): refactor getNamespacedKV methods to remove value parameter and update response handling --- .../org/apache/kafka/clients/admin/Admin.java | 1 - .../admin/DeleteNamespacedKVResult.java | 3 - .../kafka/clients/admin/ForwardingAdmin.java | 4 +- .../kafka/clients/admin/KafkaAdminClient.java | 5 +- .../clients/admin/PutNamespacedKVResult.java | 21 ++----- .../internals/GetNamespacedKVHandler.java | 4 +- .../internals/PutNamespacedKVHandler.java | 57 +++++++++++-------- .../kafka/clients/admin/MockAdminClient.java | 2 +- 8 files changed, 46 insertions(+), 51 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java index 4dfffe238f..befd1dd892 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java @@ -1744,7 +1744,6 @@ GetNamespacedKVResult getNamespacedKV( Optional> partitions, String namespace, String key, - String value, GetNamespacedKVOptions options ); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteNamespacedKVResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteNamespacedKVResult.java index a20b6c7af0..0438a051e4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteNamespacedKVResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteNamespacedKVResult.java @@ -13,9 +13,6 @@ public DeleteNamespacedKVResult(Map> futures) this.futures = futures; } - /** - * Return a future which succeeds if the put operation is successful. - */ public KafkaFuture>> all() { return KafkaFuture.completedFuture(futures); } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java index 6d59f8d154..3a8d5e1778 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java @@ -321,8 +321,8 @@ public UpdateGroupResult updateGroup(String groupId, UpdateGroupSpec groupSpec, } @Override - public GetNamespacedKVResult getNamespacedKV(Optional> partitions, String namespace, String key, String value, GetNamespacedKVOptions options) { - return delegate.getNamespacedKV(partitions, namespace, key, value, options); + public GetNamespacedKVResult getNamespacedKV(Optional> partitions, String namespace, String key, GetNamespacedKVOptions options) { + return delegate.getNamespacedKV(partitions, namespace, key, options); } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 7e343673fa..9c6d1c1b66 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -169,6 +169,7 @@ import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData; import org.apache.kafka.common.message.MetadataRequestData; import org.apache.kafka.common.message.PutKVsRequestData; +import org.apache.kafka.common.message.PutKVsResponseData; import org.apache.kafka.common.message.RemoveRaftVoterRequestData; import org.apache.kafka.common.message.RenewDelegationTokenRequestData; import org.apache.kafka.common.message.UnregisterBrokerRequestData; @@ -4877,7 +4878,7 @@ public UpdateGroupResult updateGroup(String groupId, UpdateGroupSpec groupSpec, } @Override - public GetNamespacedKVResult getNamespacedKV(Optional> partitions, String namespace, String key, String value, GetNamespacedKVOptions options) { + public GetNamespacedKVResult getNamespacedKV(Optional> partitions, String namespace, String key, GetNamespacedKVOptions options) { Set targetPartitions = partitions.orElseThrow(() -> new IllegalArgumentException("Partitions cannot be empty") ); @@ -4921,7 +4922,7 @@ public PutNamespacedKVResult putNamespacedKV(Optional> parti NamespacedKVRecordsToPut recordsToPut = recordsToPutBuilder.build(); PutNamespacedKVHandler handler = new PutNamespacedKVHandler(logContext, recordsToPut); - SimpleAdminApiFuture future = PutNamespacedKVHandler.newFuture(targetPartitions); + SimpleAdminApiFuture future = PutNamespacedKVHandler.newFuture(targetPartitions); invokeDriver(handler, future, options.timeoutMs); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/PutNamespacedKVResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/PutNamespacedKVResult.java index dc1f05fd8e..bb2a7b207b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/PutNamespacedKVResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/PutNamespacedKVResult.java @@ -2,33 +2,22 @@ import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.PutKVsResponseData.PutKVResponse; import java.util.Map; public class PutNamespacedKVResult { - private final Map> futures; + private final Map> futures; - /** - * Return a future which succeeds only if all the records deletions succeed. - */ - public PutNamespacedKVResult(Map> futures) { + public PutNamespacedKVResult(Map> futures) { this.futures = futures; } - /** - * Return a future which succeeds only if all the records deletions succeed. - */ - public KafkaFuture>> all() { + + public KafkaFuture>> all() { return KafkaFuture.completedFuture(futures); } -// /** -// * Return a future which succeeds if the put operation is successful. -// */ -// public Map> all() { -// return futures; -// } - } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/GetNamespacedKVHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/GetNamespacedKVHandler.java index ee151df011..18375b04ee 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/GetNamespacedKVHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/GetNamespacedKVHandler.java @@ -60,8 +60,8 @@ public String apiName() { public ApiResult handleResponse(Node broker, Set partitions, AbstractResponse response) { GetKVsResponseData data = ((GetKVsResponse) response).data(); - Map completed = new LinkedHashMap<>(); - Map failed = new HashMap<>(); + final Map completed = new LinkedHashMap<>(); + final Map failed = new HashMap<>(); List responses = data.getKVResponses(); int responseIndex = 0; for (TopicPartition tp : orderedPartitions) { diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/PutNamespacedKVHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/PutNamespacedKVHandler.java index 260a4454d6..5e56420dcc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/PutNamespacedKVHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/PutNamespacedKVHandler.java @@ -5,6 +5,7 @@ import org.apache.kafka.common.message.PutKVsRequestData; import org.apache.kafka.common.message.PutKVsRequestData.PutKVRequest; import org.apache.kafka.common.message.PutKVsResponseData; +import org.apache.kafka.common.message.PutKVsResponseData.PutKVResponse; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.AbstractResponse; @@ -15,38 +16,40 @@ import org.slf4j.Logger; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; +import java.util.stream.Collectors; -public class PutNamespacedKVHandler extends AdminApiHandler.Batched { +public class PutNamespacedKVHandler extends AdminApiHandler.Batched { private final Logger logger; private final NamespacedKVRecordsToPut recordsToPut; private final AdminApiLookupStrategy lookupStrategy; + private final List orderedPartitions; public PutNamespacedKVHandler(LogContext logContext, NamespacedKVRecordsToPut recordsToPut) { this.logger = logContext.logger(PutNamespacedKVHandler.class); this.recordsToPut = recordsToPut; this.lookupStrategy = new PartitionLeaderStrategy(logContext); + this.orderedPartitions = new ArrayList<>(recordsToPut.recordsByPartition().keySet()); } @Override protected AbstractRequest.Builder buildBatchedRequest(int brokerId, Set partitions) { - Map> filteredRecords = new HashMap<>(); - for (TopicPartition partition : partitions) { - if (recordsToPut.recordsByPartition().containsKey(partition)) { - filteredRecords.put(partition, recordsToPut.recordsByPartition().get(partition)); - } - } - PutKVsRequestData requestData = new PutKVsRequestData(); - List allRequests = new ArrayList<>(); - filteredRecords.values().forEach(allRequests::addAll); - requestData.setPutKVRequests(allRequests); + List allPutRequests = orderedPartitions.stream() + .filter(partitions::contains) + .map(tp -> recordsToPut.recordsByPartition().get(tp)) + .filter(Objects::nonNull) + .flatMap(Collection::stream).collect(Collectors.toList()); + requestData.setPutKVRequests(allPutRequests); return new PutKVsRequest.Builder(requestData); } @@ -56,21 +59,27 @@ public String apiName() { } @Override - public ApiResult handleResponse(Node broker, Set partitions, AbstractResponse response) { - PutKVsResponse putResponse = (PutKVsResponse) response; - PutKVsResponseData responseData = putResponse.data(); - final Map completed = new HashMap<>(); + public ApiResult handleResponse(Node broker, Set partitions, AbstractResponse response) { + PutKVsResponseData responseData = ((PutKVsResponse) response).data(); + List responses = responseData.putKVResponses(); + final Map completed = new LinkedHashMap<>(); final Map failed = new HashMap<>(); - - partitions.forEach(partition -> { - Errors error = Errors.forCode(responseData.errorCode()); - if (error != Errors.NONE) { - failed.put(partition, error.exception()); + int responseIndex = 0; + for (TopicPartition tp : orderedPartitions) { + if (!partitions.contains(tp)) { + continue; + } + if (responseIndex >= responses.size()) { + failed.put(tp, new IllegalStateException("Missing response for partition")); + continue; + } + PutKVResponse resp = responses.get(responseIndex++); + if (resp.errorCode() == Errors.NONE.code()) { + completed.put(tp, resp); } else { - completed.put(partition, null); + failed.put(tp, Errors.forCode(resp.errorCode()).exception()); } - }); - + } return new ApiResult<>(completed, failed, Collections.emptyList()); } @@ -79,7 +88,7 @@ public AdminApiLookupStrategy lookupStrategy() { return this.lookupStrategy; } - public static AdminApiFuture.SimpleAdminApiFuture newFuture( + public static AdminApiFuture.SimpleAdminApiFuture newFuture( Set partitions ) { return AdminApiFuture.forKeys(new HashSet<>(partitions)); diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index b0cdab67af..07b02b6747 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -1451,7 +1451,7 @@ public UpdateGroupResult updateGroup(String groupId, UpdateGroupSpec groupSpec, } @Override - public GetNamespacedKVResult getNamespacedKV(Optional> partitions, String namespace, String key, String value, GetNamespacedKVOptions options) { + public GetNamespacedKVResult getNamespacedKV(Optional> partitions, String namespace, String key, GetNamespacedKVOptions options) { throw new UnsupportedOperationException(); } From 546cf76696051bb40d925b0a1f50f0af4f678135 Mon Sep 17 00:00:00 2001 From: 1sonofqiu Date: Fri, 6 Jun 2025 10:31:16 +0800 Subject: [PATCH 8/8] chore(kv): remove testPutKV_NewKey method from KafkaAdminClientTest --- .../clients/admin/KafkaAdminClientTest.java | 30 ++----------------- 1 file changed, 2 insertions(+), 28 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index b95b26c5be..4dae251b1f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -116,6 +116,7 @@ import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult; import org.apache.kafka.common.message.FindCoordinatorRequestData; import org.apache.kafka.common.message.FindCoordinatorResponseData; +import org.apache.kafka.common.message.GetKVsResponseData; import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData; import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse; @@ -141,6 +142,7 @@ import org.apache.kafka.common.message.OffsetFetchRequestData; import org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestGroup; import org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestTopics; +import org.apache.kafka.common.message.PutKVsResponseData; import org.apache.kafka.common.message.UnregisterBrokerResponseData; import org.apache.kafka.common.message.WriteTxnMarkersResponseData; import org.apache.kafka.common.protocol.ApiKeys; @@ -7668,34 +7670,6 @@ public void testClientInstanceIdNoTelemetryReporterRegistered() { admin.close(); } - @Test - public void testPutKV_NewKey() { - - Properties props = new Properties(); - props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - - KafkaAdminClient admin = (KafkaAdminClient) AdminClient.create(props); - - PutNamespacedKVOptions options = new PutNamespacedKVOptions(); - options.overwrite(true); - options.ifMatchEpoch(0L); - Set multiPartitions = new HashSet<>(Arrays.asList( - new TopicPartition("test-topic", 0), - new TopicPartition("test-topic", 1))); - - PutNamespacedKVResult putNamespacedKVResult = admin.putNamespacedKV(Optional.of(multiPartitions), "namespace1", "key1", "value1", options); - Map> map; - try { - map = putNamespacedKVResult.all().get(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } catch (ExecutionException e) { - throw new RuntimeException(e); - } - - assertNotNull(map); - } - private UnregisterBrokerResponse prepareUnregisterBrokerResponse(Errors error, int throttleTimeMs) { return new UnregisterBrokerResponse(new UnregisterBrokerResponseData() .setErrorCode(error.code())