diff --git a/automq-shell/src/main/java/com/automq/shell/stream/ClientKVClient.java b/automq-shell/src/main/java/com/automq/shell/stream/ClientKVClient.java index e213db5cfd..d4f7d4f60b 100644 --- a/automq-shell/src/main/java/com/automq/shell/stream/ClientKVClient.java +++ b/automq-shell/src/main/java/com/automq/shell/stream/ClientKVClient.java @@ -39,6 +39,7 @@ import com.automq.shell.metrics.S3MetricsExporter; import com.automq.stream.api.KeyValue; +import com.automq.stream.api.KeyValue.ValueAndEpoch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -95,6 +96,32 @@ public KeyValue.Value getKV(String key) throws IOException { throw code.exception(); } + public ValueAndEpoch getKV(String key, String namespace) throws IOException { + + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("[ClientKVClient]: Get KV: {} Namespace: {}", key, namespace); + } + + GetKVsRequestData data = new GetKVsRequestData() + .setGetKeyRequests(List.of(new GetKVsRequestData.GetKVRequest().setKey(key).setNamespace(namespace))); + + long now = Time.SYSTEM.milliseconds(); + ClientRequest clientRequest = networkClient.newClientRequest(String.valueOf(bootstrapServer.id()), + new GetKVsRequest.Builder(data), now, true, 3000, null); + + ClientResponse response = NetworkClientUtils.sendAndReceive(networkClient, clientRequest, Time.SYSTEM); + GetKVsResponseData responseData = (GetKVsResponseData) response.responseBody().data(); + + Errors code = Errors.forCode(responseData.errorCode()); + if (Objects.requireNonNull(code) == Errors.NONE) { + return ValueAndEpoch.of( + responseData.getKVResponses().get(0).value(), + responseData.getKVResponses().get(0).epoch()); + } + + throw code.exception(); + } + public KeyValue.Value putKV(String key, byte[] value) throws IOException { long now = Time.SYSTEM.milliseconds(); @@ -119,6 +146,32 @@ public KeyValue.Value putKV(String key, byte[] value) throws IOException { throw code.exception(); } + public ValueAndEpoch putKV(String key, byte[] value, String namespace, long epoch) throws IOException { + long now = Time.SYSTEM.milliseconds(); + + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("[ClientKVClient]: put KV: {}", key); + } + + PutKVsRequestData data = new PutKVsRequestData() + .setPutKVRequests(List.of(new PutKVsRequestData.PutKVRequest().setKey(key).setValue(value).setNamespace(namespace).setEpoch(epoch))); + + ClientRequest clientRequest = networkClient.newClientRequest(String.valueOf(bootstrapServer.id()), + new PutKVsRequest.Builder(data), now, true, 3000, null); + + ClientResponse response = NetworkClientUtils.sendAndReceive(networkClient, clientRequest, Time.SYSTEM); + PutKVsResponseData responseData = (PutKVsResponseData) response.responseBody().data(); + + Errors code = Errors.forCode(responseData.errorCode()); + if (Objects.requireNonNull(code) == Errors.NONE) { + return ValueAndEpoch.of( + responseData.putKVResponses().get(0).value(), + responseData.putKVResponses().get(0).epoch()); + } + + throw code.exception(); + } + public KeyValue.Value deleteKV(String key) throws IOException { long now = Time.SYSTEM.milliseconds(); @@ -142,4 +195,30 @@ public KeyValue.Value deleteKV(String key) throws IOException { throw code.exception(); } + + public ValueAndEpoch deleteKV(String key, String namespace, long epoch) throws IOException { + long now = Time.SYSTEM.milliseconds(); + + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("[ClientKVClient]: Delete KV: {}", key); + } + + DeleteKVsRequestData data = new DeleteKVsRequestData() + .setDeleteKVRequests(List.of(new DeleteKVsRequestData.DeleteKVRequest().setKey(key).setNamespace(namespace).setEpoch(epoch))); + + ClientRequest clientRequest = networkClient.newClientRequest(String.valueOf(bootstrapServer.id()), + new DeleteKVsRequest.Builder(data), now, true, 3000, null); + + ClientResponse response = NetworkClientUtils.sendAndReceive(networkClient, clientRequest, Time.SYSTEM); + DeleteKVsResponseData responseData = (DeleteKVsResponseData) response.responseBody().data(); + + Errors code = Errors.forCode(responseData.errorCode()); + if (Objects.requireNonNull(code) == Errors.NONE) { + return ValueAndEpoch.of( + responseData.deleteKVResponses().get(0).value(), + responseData.deleteKVResponses().get(0).epoch()); + } + + throw code.exception(); + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java index a5e4e910b3..befd1dd892 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java @@ -1739,6 +1739,48 @@ default ListClientMetricsResourcesResult listClientMetricsResources() { * @return {@link UpdateGroupResult} */ UpdateGroupResult updateGroup(String groupId, UpdateGroupSpec groupSpec, UpdateGroupOptions options); + + GetNamespacedKVResult getNamespacedKV( + Optional> partitions, + String namespace, + String key, + GetNamespacedKVOptions options + ); + + /** + * Put a key-value pair in the namespaced KV store. + * @param partitions + * @param namespace + * @param key + * @param value + * @param options + * @return + */ + PutNamespacedKVResult putNamespacedKV( + Optional> partitions, + String namespace, + String key, + String value, + PutNamespacedKVOptions options + ); + + /** + * Delete a key-value pair in the namespaced KV store. + * @param partitions + * @param namespace + * @param key + * @param options + * @return + */ + DeleteNamespacedKVResult deleteNamespacedKV( + Optional> partitions, + String namespace, + String key, + DeleteNamespacedKVOptions options + ); + + + // AutoMQ inject end /** diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteNamespacedKVOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteNamespacedKVOptions.java new file mode 100644 index 0000000000..b15bbca77e --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteNamespacedKVOptions.java @@ -0,0 +1,15 @@ +package org.apache.kafka.clients.admin; + +public class DeleteNamespacedKVOptions extends AbstractOptions { + + private long ifMatchEpoch = 0L; + + public DeleteNamespacedKVOptions ifMatchEpoch(long epoch) { + this.ifMatchEpoch = epoch; + return this; + } + + public long ifMatchEpoch() { + return ifMatchEpoch; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteNamespacedKVResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteNamespacedKVResult.java new file mode 100644 index 0000000000..0438a051e4 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteNamespacedKVResult.java @@ -0,0 +1,19 @@ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; + +import java.util.Map; + +public class DeleteNamespacedKVResult extends AbstractOptions { + + private final Map> futures; + + public DeleteNamespacedKVResult(Map> futures) { + this.futures = futures; + } + + public KafkaFuture>> all() { + return KafkaFuture.completedFuture(futures); + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java index 8822553259..3a8d5e1778 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java @@ -320,5 +320,20 @@ public UpdateGroupResult updateGroup(String groupId, UpdateGroupSpec groupSpec, return delegate.updateGroup(groupId, groupSpec, options); } + @Override + public GetNamespacedKVResult getNamespacedKV(Optional> partitions, String namespace, String key, GetNamespacedKVOptions options) { + return delegate.getNamespacedKV(partitions, namespace, key, options); + } + + @Override + public PutNamespacedKVResult putNamespacedKV(Optional> partitions, String namespace, String key, String value, PutNamespacedKVOptions options) { + return delegate.putNamespacedKV(partitions, namespace, key, value, options); + } + + @Override + public DeleteNamespacedKVResult deleteNamespacedKV(Optional> partitions, String namespace, String key, DeleteNamespacedKVOptions options) { + return delegate.deleteNamespacedKV(partitions, namespace, key, options); + } + // AutoMQ inject end } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/GetNamespacedKVOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/GetNamespacedKVOptions.java new file mode 100644 index 0000000000..d21d9e3755 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/GetNamespacedKVOptions.java @@ -0,0 +1,4 @@ +package org.apache.kafka.clients.admin; + +public class GetNamespacedKVOptions extends AbstractOptions { +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/GetNamespacedKVResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/GetNamespacedKVResult.java new file mode 100644 index 0000000000..05dd67a138 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/GetNamespacedKVResult.java @@ -0,0 +1,21 @@ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.GetKVsResponseData.GetKVResponse; + +import java.util.Map; +import java.util.concurrent.ExecutionException; + +public class GetNamespacedKVResult { + + private final Map> futures; + + public GetNamespacedKVResult(Map> futures) { + this.futures = futures; + } + + public KafkaFuture>> all() throws ExecutionException, InterruptedException { + return KafkaFuture.completedFuture(futures); + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 12202776ac..9c6d1c1b66 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -47,14 +47,19 @@ import org.apache.kafka.clients.admin.internals.CoordinatorKey; import org.apache.kafka.clients.admin.internals.DeleteConsumerGroupOffsetsHandler; import org.apache.kafka.clients.admin.internals.DeleteConsumerGroupsHandler; +import org.apache.kafka.clients.admin.internals.DeleteNamespacedKVHandler; import org.apache.kafka.clients.admin.internals.DeleteRecordsHandler; import org.apache.kafka.clients.admin.internals.DescribeConsumerGroupsHandler; import org.apache.kafka.clients.admin.internals.DescribeProducersHandler; import org.apache.kafka.clients.admin.internals.DescribeTransactionsHandler; import org.apache.kafka.clients.admin.internals.FenceProducersHandler; +import org.apache.kafka.clients.admin.internals.GetNamespacedKVHandler; import org.apache.kafka.clients.admin.internals.ListConsumerGroupOffsetsHandler; import org.apache.kafka.clients.admin.internals.ListOffsetsHandler; import org.apache.kafka.clients.admin.internals.ListTransactionsHandler; +import org.apache.kafka.clients.admin.internals.NamespacedKVRecordsToGet; +import org.apache.kafka.clients.admin.internals.NamespacedKVRecordsToPut; +import org.apache.kafka.clients.admin.internals.PutNamespacedKVHandler; import org.apache.kafka.clients.admin.internals.RemoveMembersFromConsumerGroupHandler; import org.apache.kafka.clients.admin.internals.UpdateGroupHandler; import org.apache.kafka.clients.consumer.OffsetAndMetadata; @@ -132,6 +137,7 @@ import org.apache.kafka.common.message.DeleteAclsResponseData; import org.apache.kafka.common.message.DeleteAclsResponseData.DeleteAclsFilterResult; import org.apache.kafka.common.message.DeleteAclsResponseData.DeleteAclsMatchingAcl; +import org.apache.kafka.common.message.DeleteKVsRequestData; import org.apache.kafka.common.message.DeleteTopicsRequestData; import org.apache.kafka.common.message.DeleteTopicsRequestData.DeleteTopicState; import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult; @@ -152,6 +158,8 @@ import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData.UserName; import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData; import org.apache.kafka.common.message.ExpireDelegationTokenRequestData; +import org.apache.kafka.common.message.GetKVsRequestData; +import org.apache.kafka.common.message.GetKVsResponseData; import org.apache.kafka.common.message.GetNextNodeIdRequestData; import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity; @@ -160,6 +168,8 @@ import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData; import org.apache.kafka.common.message.MetadataRequestData; +import org.apache.kafka.common.message.PutKVsRequestData; +import org.apache.kafka.common.message.PutKVsResponseData; import org.apache.kafka.common.message.RemoveRaftVoterRequestData; import org.apache.kafka.common.message.RenewDelegationTokenRequestData; import org.apache.kafka.common.message.UnregisterBrokerRequestData; @@ -266,6 +276,7 @@ import org.slf4j.Logger; +import java.nio.charset.StandardCharsets; import java.security.InvalidKeyException; import java.security.NoSuchAlgorithmException; import java.time.Duration; @@ -4866,6 +4877,86 @@ public UpdateGroupResult updateGroup(String groupId, UpdateGroupSpec groupSpec, return new UpdateGroupResult(future.get(CoordinatorKey.byGroupId(groupId))); } + @Override + public GetNamespacedKVResult getNamespacedKV(Optional> partitions, String namespace, String key, GetNamespacedKVOptions options) { + Set targetPartitions = partitions.orElseThrow(() -> + new IllegalArgumentException("Partitions cannot be empty") + ); + + NamespacedKVRecordsToGet.Builder recordsToGetBuilder = NamespacedKVRecordsToGet.newBuilder(); + for (TopicPartition tp : targetPartitions) { + GetKVsRequestData.GetKVRequest kvRequest = new GetKVsRequestData.GetKVRequest() + .setKey(key) + .setNamespace(namespace); + + recordsToGetBuilder.addRecord(tp, kvRequest); + } + + NamespacedKVRecordsToGet recordsToGet = recordsToGetBuilder.build(); + GetNamespacedKVHandler handler = new GetNamespacedKVHandler(logContext, recordsToGet); + SimpleAdminApiFuture future = GetNamespacedKVHandler.newFuture(targetPartitions); + + invokeDriver(handler, future, options.timeoutMs); + + return new GetNamespacedKVResult(future.all()); + } + + @Override + public PutNamespacedKVResult putNamespacedKV(Optional> partitions, String namespace, String key, String value, PutNamespacedKVOptions options) { + Set targetPartitions = partitions.orElseThrow(() -> + new IllegalArgumentException("Partitions cannot be empty") + ); + + NamespacedKVRecordsToPut.Builder recordsToPutBuilder = NamespacedKVRecordsToPut.newBuilder(); + for (TopicPartition tp : targetPartitions) { + PutKVsRequestData.PutKVRequest kvRequest = new PutKVsRequestData.PutKVRequest() + .setKey(key) + .setValue(value.getBytes(StandardCharsets.UTF_8)) + .setNamespace(namespace) + .setOverwrite(options.overwrite()) + .setEpoch(options.ifMatchEpoch()); + + recordsToPutBuilder.addRecord(tp, kvRequest); + } + + NamespacedKVRecordsToPut recordsToPut = recordsToPutBuilder.build(); + + PutNamespacedKVHandler handler = new PutNamespacedKVHandler(logContext, recordsToPut); + SimpleAdminApiFuture future = PutNamespacedKVHandler.newFuture(targetPartitions); + + invokeDriver(handler, future, options.timeoutMs); + + return new PutNamespacedKVResult(future.all()); + } + + + @Override + public DeleteNamespacedKVResult deleteNamespacedKV(Optional> partitions, String namespace, String key, DeleteNamespacedKVOptions options) { + + Set targetPartitions = partitions.orElseThrow(() -> + new IllegalArgumentException("Partitions cannot be empty") + ); + + NamespacedKVRecordsToDelete.Builder recordsToDeleteBuilder = NamespacedKVRecordsToDelete.newBuilder(); + for (TopicPartition tp : targetPartitions) { + DeleteKVsRequestData.DeleteKVRequest kvRequest = new DeleteKVsRequestData.DeleteKVRequest() + .setKey(key) + .setNamespace(namespace) + .setEpoch(options.ifMatchEpoch()); + + recordsToDeleteBuilder.addRecord(tp, kvRequest); + } + + NamespacedKVRecordsToDelete recordsToDelete = recordsToDeleteBuilder.build(); + + DeleteNamespacedKVHandler handler = new DeleteNamespacedKVHandler(logContext, recordsToDelete); + SimpleAdminApiFuture future = DeleteNamespacedKVHandler.newFuture(targetPartitions); + + invokeDriver(handler, future, options.timeoutMs); + + return new DeleteNamespacedKVResult(future.all()); + } + private void invokeDriver( AdminApiHandler handler, AdminApiFuture future, diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/NamespacedKVRecordsToDelete.java b/clients/src/main/java/org/apache/kafka/clients/admin/NamespacedKVRecordsToDelete.java new file mode 100644 index 0000000000..0db8e27990 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/NamespacedKVRecordsToDelete.java @@ -0,0 +1,40 @@ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.DeleteKVsRequestData.DeleteKVRequest; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class NamespacedKVRecordsToDelete { + + private final Map> recordsByPartition; + + public NamespacedKVRecordsToDelete(Map> recordsByPartition) { + this.recordsByPartition = recordsByPartition; + } + + public static NamespacedKVRecordsToDelete.Builder newBuilder() { + return new NamespacedKVRecordsToDelete.Builder(); + } + + public Map> recordsByPartition() { + return recordsByPartition; + } + + public static class Builder { + private final Map> records = new HashMap<>(); + + public NamespacedKVRecordsToDelete.Builder addRecord(TopicPartition partition, DeleteKVRequest request) { + records.computeIfAbsent(partition, k -> new ArrayList<>()).add(request); + return this; + } + + public NamespacedKVRecordsToDelete build() { + return new NamespacedKVRecordsToDelete(records); + } + } + +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/PutNamespacedKVOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/PutNamespacedKVOptions.java new file mode 100644 index 0000000000..06d20eefe3 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/PutNamespacedKVOptions.java @@ -0,0 +1,24 @@ +package org.apache.kafka.clients.admin; + +public class PutNamespacedKVOptions extends AbstractOptions { + + private boolean overwrite = false; + private long ifMatchEpoch = 0L; + + public PutNamespacedKVOptions overwrite(boolean overwrite) { + this.overwrite = overwrite; + return this; + } + + public PutNamespacedKVOptions ifMatchEpoch(long epoch) { + this.ifMatchEpoch = epoch; + return this; + } + + public boolean overwrite() { + return overwrite; + } + public long ifMatchEpoch() { + return ifMatchEpoch; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/PutNamespacedKVResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/PutNamespacedKVResult.java new file mode 100644 index 0000000000..bb2a7b207b --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/PutNamespacedKVResult.java @@ -0,0 +1,23 @@ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.PutKVsResponseData.PutKVResponse; + +import java.util.Map; + + +public class PutNamespacedKVResult { + + private final Map> futures; + + public PutNamespacedKVResult(Map> futures) { + this.futures = futures; + } + + + public KafkaFuture>> all() { + return KafkaFuture.completedFuture(futures); + } + +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteNamespacedKVHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteNamespacedKVHandler.java new file mode 100644 index 0000000000..27e2a43fc3 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteNamespacedKVHandler.java @@ -0,0 +1,89 @@ +package org.apache.kafka.clients.admin.internals; + +import org.apache.kafka.clients.admin.NamespacedKVRecordsToDelete; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.DeleteKVsRequestData; +import org.apache.kafka.common.message.DeleteKVsRequestData.DeleteKVRequest; +import org.apache.kafka.common.message.DeleteKVsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.s3.DeleteKVsRequest; +import org.apache.kafka.common.requests.s3.DeleteKVsResponse; +import org.apache.kafka.common.utils.LogContext; + +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class DeleteNamespacedKVHandler extends AdminApiHandler.Batched { + + private final Logger logger; + private final NamespacedKVRecordsToDelete recordsToDelete; + private final AdminApiLookupStrategy lookupStrategy; + + public DeleteNamespacedKVHandler(LogContext logContext, NamespacedKVRecordsToDelete recordsToDelete) { + this.logger = logContext.logger(PutNamespacedKVHandler.class); + this.recordsToDelete = recordsToDelete; + this.lookupStrategy = new PartitionLeaderStrategy(logContext); + } + + @Override + AbstractRequest.Builder buildBatchedRequest(int brokerId, Set partitions) { + Map> filteredRecords = new HashMap<>(); + for (TopicPartition partition : partitions) { + if (recordsToDelete.recordsByPartition().containsKey(partition)) { + filteredRecords.put(partition, recordsToDelete.recordsByPartition().get(partition)); + } + } + + DeleteKVsRequestData requestData = new DeleteKVsRequestData(); + List allRequests = new ArrayList<>(); + filteredRecords.values().forEach(allRequests::addAll); + requestData.setDeleteKVRequests(allRequests); + + return new DeleteKVsRequest.Builder(requestData); + } + + @Override + public String apiName() { + return "DeleteKVs"; + } + + @Override + public ApiResult handleResponse(Node broker, Set partitions, AbstractResponse response) { + DeleteKVsResponse deleteResponse = (DeleteKVsResponse) response; + DeleteKVsResponseData responseData = deleteResponse.data(); + final Map completed = new HashMap<>(); + final Map failed = new HashMap<>(); + + partitions.forEach(partition -> { + Errors error = Errors.forCode(responseData.errorCode()); + if (error != Errors.NONE) { + failed.put(partition, error.exception()); + } else { + completed.put(partition, null); + } + }); + + return new ApiResult<>(completed, failed, Collections.emptyList()); + } + + @Override + public AdminApiLookupStrategy lookupStrategy() { + return this.lookupStrategy; + } + + public static AdminApiFuture.SimpleAdminApiFuture newFuture( + Set partitions + ) { + return AdminApiFuture.forKeys(new HashSet<>(partitions)); + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/GetNamespacedKVHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/GetNamespacedKVHandler.java new file mode 100644 index 0000000000..18375b04ee --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/GetNamespacedKVHandler.java @@ -0,0 +1,95 @@ +package org.apache.kafka.clients.admin.internals; + +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.GetKVsRequestData; +import org.apache.kafka.common.message.GetKVsResponseData; +import org.apache.kafka.common.message.GetKVsResponseData.GetKVResponse; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.s3.GetKVsRequest; +import org.apache.kafka.common.requests.s3.GetKVsResponse; +import org.apache.kafka.common.utils.LogContext; + +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class GetNamespacedKVHandler extends AdminApiHandler.Batched { + private final Logger logger; + private final NamespacedKVRecordsToGet recordsToGet; + private final AdminApiLookupStrategy lookupStrategy; + private final List orderedPartitions; + + public GetNamespacedKVHandler(LogContext logContext, NamespacedKVRecordsToGet recordsToGet) { + this.logger = logContext.logger(PutNamespacedKVHandler.class); + this.recordsToGet = recordsToGet; + this.lookupStrategy = new PartitionLeaderStrategy(logContext); + this.orderedPartitions = new ArrayList<>(recordsToGet.recordsByPartition().keySet()); + } + + @Override + AbstractRequest.Builder buildBatchedRequest(int brokerId, Set partitions) { + + GetKVsRequestData requestData = new GetKVsRequestData(); + for (TopicPartition tp : orderedPartitions) { + if (partitions.contains(tp)) { + requestData.getKeyRequests().addAll( + recordsToGet.recordsByPartition().get(tp) + ); + } + } + + return new GetKVsRequest.Builder(requestData); + } + + @Override + public String apiName() { + return "GetKVs"; + } + + @Override + public ApiResult handleResponse(Node broker, Set partitions, AbstractResponse response) { + + GetKVsResponseData data = ((GetKVsResponse) response).data(); + final Map completed = new LinkedHashMap<>(); + final Map failed = new HashMap<>(); + List responses = data.getKVResponses(); + int responseIndex = 0; + for (TopicPartition tp : orderedPartitions) { + if (!partitions.contains(tp)) { + continue; + } + if (responseIndex >= responses.size()) { + failed.put(tp, new IllegalStateException("Missing response for partition")); + continue; + } + GetKVResponse resp = responses.get(responseIndex++); + if (resp.errorCode() == Errors.NONE.code()) { + completed.put(tp, resp); + } else { + failed.put(tp, Errors.forCode(resp.errorCode()).exception()); + } + } + return new ApiResult<>(completed, failed, Collections.emptyList()); + } + + @Override + public AdminApiLookupStrategy lookupStrategy() { + return this.lookupStrategy; + } + + public static AdminApiFuture.SimpleAdminApiFuture newFuture( + Set partitions + ) { + return AdminApiFuture.forKeys(new HashSet<>(partitions)); + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/NamespacedKVRecordsToGet.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/NamespacedKVRecordsToGet.java new file mode 100644 index 0000000000..39ec6aa04e --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/NamespacedKVRecordsToGet.java @@ -0,0 +1,38 @@ +package org.apache.kafka.clients.admin.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.GetKVsRequestData.GetKVRequest; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class NamespacedKVRecordsToGet { + + private final Map> recordsByPartition; + + public NamespacedKVRecordsToGet(Map> recordsByPartition) { + this.recordsByPartition = recordsByPartition; + } + + public static NamespacedKVRecordsToGet.Builder newBuilder() { + return new NamespacedKVRecordsToGet.Builder(); + } + + public static class Builder { + private final Map> records = new HashMap<>(); + public Builder addRecord(TopicPartition tp, GetKVRequest req) { + records.computeIfAbsent(tp, k -> new ArrayList<>()).add(req); + return this; + } + + public NamespacedKVRecordsToGet build() { + return new NamespacedKVRecordsToGet(records); + } + } + + public Map> recordsByPartition() { + return recordsByPartition; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/NamespacedKVRecordsToPut.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/NamespacedKVRecordsToPut.java new file mode 100644 index 0000000000..28b34266bd --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/NamespacedKVRecordsToPut.java @@ -0,0 +1,39 @@ +package org.apache.kafka.clients.admin.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.PutKVsRequestData.PutKVRequest; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class NamespacedKVRecordsToPut { + + private final Map> recordsByPartition; + + private NamespacedKVRecordsToPut(Map> recordsByPartition) { + this.recordsByPartition = recordsByPartition; + } + + public static Builder newBuilder() { + return new Builder(); + } + + public Map> recordsByPartition() { + return recordsByPartition; + } + + public static class Builder { + private final Map> records = new HashMap<>(); + + public Builder addRecord(TopicPartition partition, PutKVRequest request) { + records.computeIfAbsent(partition, k -> new ArrayList<>()).add(request); + return this; + } + + public NamespacedKVRecordsToPut build() { + return new NamespacedKVRecordsToPut(records); + } + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/PutNamespacedKVHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/PutNamespacedKVHandler.java new file mode 100644 index 0000000000..5e56420dcc --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/PutNamespacedKVHandler.java @@ -0,0 +1,96 @@ +package org.apache.kafka.clients.admin.internals; + +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.PutKVsRequestData; +import org.apache.kafka.common.message.PutKVsRequestData.PutKVRequest; +import org.apache.kafka.common.message.PutKVsResponseData; +import org.apache.kafka.common.message.PutKVsResponseData.PutKVResponse; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.s3.PutKVsRequest; +import org.apache.kafka.common.requests.s3.PutKVsResponse; +import org.apache.kafka.common.utils.LogContext; + +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +public class PutNamespacedKVHandler extends AdminApiHandler.Batched { + private final Logger logger; + private final NamespacedKVRecordsToPut recordsToPut; + private final AdminApiLookupStrategy lookupStrategy; + private final List orderedPartitions; + + public PutNamespacedKVHandler(LogContext logContext, NamespacedKVRecordsToPut recordsToPut) { + this.logger = logContext.logger(PutNamespacedKVHandler.class); + this.recordsToPut = recordsToPut; + this.lookupStrategy = new PartitionLeaderStrategy(logContext); + this.orderedPartitions = new ArrayList<>(recordsToPut.recordsByPartition().keySet()); + } + + @Override + protected AbstractRequest.Builder buildBatchedRequest(int brokerId, Set partitions) { + PutKVsRequestData requestData = new PutKVsRequestData(); + List allPutRequests = orderedPartitions.stream() + .filter(partitions::contains) + .map(tp -> recordsToPut.recordsByPartition().get(tp)) + .filter(Objects::nonNull) + .flatMap(Collection::stream).collect(Collectors.toList()); + + requestData.setPutKVRequests(allPutRequests); + return new PutKVsRequest.Builder(requestData); + } + + @Override + public String apiName() { + return "PutKVs"; + } + + @Override + public ApiResult handleResponse(Node broker, Set partitions, AbstractResponse response) { + PutKVsResponseData responseData = ((PutKVsResponse) response).data(); + List responses = responseData.putKVResponses(); + final Map completed = new LinkedHashMap<>(); + final Map failed = new HashMap<>(); + int responseIndex = 0; + for (TopicPartition tp : orderedPartitions) { + if (!partitions.contains(tp)) { + continue; + } + if (responseIndex >= responses.size()) { + failed.put(tp, new IllegalStateException("Missing response for partition")); + continue; + } + PutKVResponse resp = responses.get(responseIndex++); + if (resp.errorCode() == Errors.NONE.code()) { + completed.put(tp, resp); + } else { + failed.put(tp, Errors.forCode(resp.errorCode()).exception()); + } + } + return new ApiResult<>(completed, failed, Collections.emptyList()); + } + + @Override + public AdminApiLookupStrategy lookupStrategy() { + return this.lookupStrategy; + } + + public static AdminApiFuture.SimpleAdminApiFuture newFuture( + Set partitions + ) { + return AdminApiFuture.forKeys(new HashSet<>(partitions)); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InvalidKVRecordEpochException.java b/clients/src/main/java/org/apache/kafka/common/errors/InvalidKVRecordEpochException.java new file mode 100644 index 0000000000..4b0f190b8d --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/InvalidKVRecordEpochException.java @@ -0,0 +1,8 @@ +package org.apache.kafka.common.errors; + +public class InvalidKVRecordEpochException extends ApiException { + + public InvalidKVRecordEpochException(String message) { + super(message); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index b886a24a1f..c33ade3cf8 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -61,6 +61,7 @@ import org.apache.kafka.common.errors.InvalidFetchSessionEpochException; import org.apache.kafka.common.errors.InvalidFetchSizeException; import org.apache.kafka.common.errors.InvalidGroupIdException; +import org.apache.kafka.common.errors.InvalidKVRecordEpochException; import org.apache.kafka.common.errors.InvalidPartitionsException; import org.apache.kafka.common.errors.InvalidPidMappingException; import org.apache.kafka.common.errors.InvalidPrincipalTypeException; @@ -440,6 +441,7 @@ public enum Errors { NODE_LOCKED(515, "The node is locked", NodeLockedException::new), OBJECT_NOT_COMMITED(516, "The object is not commited.", ObjectNotCommittedException::new), STREAM_INNER_ERROR(599, "The stream inner error.", StreamInnerErrorException::new), + INVALID_KV_RECORD_EPOCH(600, "The KV record epoch is invalid.", InvalidKVRecordEpochException::new), // AutoMQ inject end INVALID_RECORD_STATE(121, "The record state is invalid. The acknowledgement of delivery could not be completed.", InvalidRecordStateException::new), diff --git a/clients/src/main/resources/common/message/DeleteKVsRequest.json b/clients/src/main/resources/common/message/DeleteKVsRequest.json index 2cf9190ce6..0c054e20ba 100644 --- a/clients/src/main/resources/common/message/DeleteKVsRequest.json +++ b/clients/src/main/resources/common/message/DeleteKVsRequest.json @@ -21,7 +21,7 @@ "broker" ], "name": "DeleteKVsRequest", - "validVersions": "0", + "validVersions": "0-1", "flexibleVersions": "0+", "fields": [ { @@ -35,8 +35,20 @@ "type": "string", "versions": "0+", "about": "Key is the key to delete" + }, + { + "name": "Namespace", + "type": "string", + "versions": "1+", + "about": "Namespace" + }, + { + "name": "Epoch", + "type": "int64", + "versions": "1+", + "about": "Epoch" } ] } ] -} \ No newline at end of file +} diff --git a/clients/src/main/resources/common/message/DeleteKVsResponse.json b/clients/src/main/resources/common/message/DeleteKVsResponse.json index eae0e2fbb5..d5493591bb 100644 --- a/clients/src/main/resources/common/message/DeleteKVsResponse.json +++ b/clients/src/main/resources/common/message/DeleteKVsResponse.json @@ -17,7 +17,7 @@ "apiKey": 511, "type": "response", "name": "DeleteKVsResponse", - "validVersions": "0", + "validVersions": "0-1", "flexibleVersions": "0+", "fields": [ { @@ -50,8 +50,15 @@ "versions": "0+", "nullableVersions": "0+", "about": "Value" + }, + { + "name": "Epoch", + "type": "int64", + "versions": "1+", + "about": "Epoch" } + ] } ] -} \ No newline at end of file +} diff --git a/clients/src/main/resources/common/message/GetKVsRequest.json b/clients/src/main/resources/common/message/GetKVsRequest.json index e04d138bd9..192df1b6ef 100644 --- a/clients/src/main/resources/common/message/GetKVsRequest.json +++ b/clients/src/main/resources/common/message/GetKVsRequest.json @@ -21,7 +21,7 @@ "broker" ], "name": "GetKVsRequest", - "validVersions": "0", + "validVersions": "0-1", "flexibleVersions": "0+", "fields": [ { @@ -35,8 +35,14 @@ "type": "string", "versions": "0+", "about": "Key is the key to get" + }, + { + "name": "Namespace", + "type": "string", + "versions": "1+", + "about": "Namespace" } ] } ] -} \ No newline at end of file +} diff --git a/clients/src/main/resources/common/message/GetKVsResponse.json b/clients/src/main/resources/common/message/GetKVsResponse.json index d602c08c46..7b32f42b67 100644 --- a/clients/src/main/resources/common/message/GetKVsResponse.json +++ b/clients/src/main/resources/common/message/GetKVsResponse.json @@ -17,7 +17,7 @@ "apiKey": 509, "type": "response", "name": "GetKVsResponse", - "validVersions": "0", + "validVersions": "0-1", "flexibleVersions": "0+", "fields": [ { @@ -50,8 +50,20 @@ "versions": "0+", "nullableVersions": "0+", "about": "Value" + }, + { + "name": "Namespace", + "type": "string", + "versions": "1+", + "about": "Namespace" + }, + { + "name": "Epoch", + "type": "int64", + "versions": "1+", + "about": "Epoch" } ] } ] -} \ No newline at end of file +} diff --git a/clients/src/main/resources/common/message/PutKVsRequest.json b/clients/src/main/resources/common/message/PutKVsRequest.json index 041c25dc84..2976c5e900 100644 --- a/clients/src/main/resources/common/message/PutKVsRequest.json +++ b/clients/src/main/resources/common/message/PutKVsRequest.json @@ -21,7 +21,7 @@ "broker" ], "name": "PutKVsRequest", - "validVersions": "0", + "validVersions": "0-1", "flexibleVersions": "0+", "fields": [ { @@ -47,8 +47,20 @@ "type": "bool", "versions": "0+", "about": "overwrite put kv" + }, + { + "name": "Namespace", + "type": "string", + "versions": "1+", + "about": "Namespace" + }, + { + "name": "Epoch", + "type": "int64", + "versions": "1+", + "about": "Epoch" } ] } ] -} \ No newline at end of file +} diff --git a/clients/src/main/resources/common/message/PutKVsResponse.json b/clients/src/main/resources/common/message/PutKVsResponse.json index 067a867018..90187c434c 100644 --- a/clients/src/main/resources/common/message/PutKVsResponse.json +++ b/clients/src/main/resources/common/message/PutKVsResponse.json @@ -17,7 +17,7 @@ "apiKey": 510, "type": "response", "name": "PutKVsResponse", - "validVersions": "0", + "validVersions": "0-1", "flexibleVersions": "0+", "fields": [ { @@ -49,8 +49,14 @@ "type": "bytes", "versions": "0+", "about": "Value" + }, + { + "name": "Epoch", + "type": "int64", + "versions": "1+", + "about": "Epoch" } ] } ] -} \ No newline at end of file +} diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 34c2722d46..4dae251b1f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -116,6 +116,7 @@ import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult; import org.apache.kafka.common.message.FindCoordinatorRequestData; import org.apache.kafka.common.message.FindCoordinatorResponseData; +import org.apache.kafka.common.message.GetKVsResponseData; import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData; import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse; @@ -141,6 +142,7 @@ import org.apache.kafka.common.message.OffsetFetchRequestData; import org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestGroup; import org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestTopics; +import org.apache.kafka.common.message.PutKVsResponseData; import org.apache.kafka.common.message.UnregisterBrokerResponseData; import org.apache.kafka.common.message.WriteTxnMarkersResponseData; import org.apache.kafka.common.protocol.ApiKeys; @@ -6948,7 +6950,7 @@ public void testDescribeLogDirsPartialFailure() throws Exception { assertNotNull(result.descriptions().get(1).get()); } } - + @Test public void testDescribeReplicaLogDirsWithNonExistReplica() throws Exception { int brokerId = 0; @@ -6967,7 +6969,7 @@ public void testDescribeReplicaLogDirsWithNonExistReplica() throws Exception { DescribeReplicaLogDirsResult result = env.adminClient().describeReplicaLogDirs(asList(tpr1, tpr2)); Map> values = result.values(); - + assertEquals(logDir, values.get(tpr1).get().getCurrentReplicaLogDir()); assertNull(values.get(tpr1).get().getFutureReplicaLogDir()); assertEquals(offsetLag, values.get(tpr1).get().getCurrentReplicaOffsetLag()); diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index aea08dcd34..07b02b6747 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -1450,6 +1450,21 @@ public UpdateGroupResult updateGroup(String groupId, UpdateGroupSpec groupSpec, throw new UnsupportedOperationException(); } + @Override + public GetNamespacedKVResult getNamespacedKV(Optional> partitions, String namespace, String key, GetNamespacedKVOptions options) { + throw new UnsupportedOperationException(); + } + + @Override + public PutNamespacedKVResult putNamespacedKV(Optional> partitions, String namespace, String key, String value, PutNamespacedKVOptions options) { + throw new UnsupportedOperationException(); + } + + @Override + public DeleteNamespacedKVResult deleteNamespacedKV(Optional> partitions, String namespace, String key, DeleteNamespacedKVOptions options) { + throw new UnsupportedOperationException(); + } + // AutoMQ inject end } diff --git a/core/src/main/scala/kafka/log/stream/s3/ControllerKVClient.java b/core/src/main/scala/kafka/log/stream/s3/ControllerKVClient.java index 0bf3416765..afff759d07 100644 --- a/core/src/main/scala/kafka/log/stream/s3/ControllerKVClient.java +++ b/core/src/main/scala/kafka/log/stream/s3/ControllerKVClient.java @@ -44,7 +44,9 @@ import com.automq.stream.api.KVClient; import com.automq.stream.api.KeyValue; import com.automq.stream.api.KeyValue.Key; +import com.automq.stream.api.KeyValue.KeyAndNamespace; import com.automq.stream.api.KeyValue.Value; +import com.automq.stream.api.KeyValue.ValueAndEpoch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -247,4 +249,196 @@ public Builder toRequestBuilder() { this.requestSender.send(task); return future; } + + @Override + public CompletableFuture putNamespacedKVIfAbsent(KeyValue keyValue) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("[ControllerKVClient]: Put Namespaced KV if absent: {}", keyValue); + } + PutKVRequest request = new PutKVRequest() + .setKey(keyValue.key().get()) + .setValue(keyValue.value().get().array()) + .setNamespace(keyValue.namespace()) + .setEpoch(keyValue.epoch()); + WrapRequest req = new BatchRequest() { + @Override + public Builder addSubRequest(Builder builder) { + PutKVsRequest.Builder realBuilder = (PutKVsRequest.Builder) builder; + return realBuilder.addSubRequest(request); + } + + @Override + public ApiKeys apiKey() { + return ApiKeys.PUT_KVS; + } + + @Override + public Builder toRequestBuilder() { + return new PutKVsRequest.Builder( + new PutKVsRequestData() + ).addSubRequest(request); + } + }; + CompletableFuture future = new CompletableFuture<>(); + RequestTask task = new RequestTask(req, future, response -> { + Errors code = Errors.forCode(response.errorCode()); + switch (code) { + case NONE: + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("[ControllerKVClient]: Put Namespaced KV if absent: {}, result: {}", keyValue, response); + } + return ResponseHandleResult.withSuccess(ValueAndEpoch.of(response.value(), response.epoch())); + case KEY_EXIST: + LOGGER.warn("[ControllerKVClient]: Failed to Put Namespaced KV if absent: {}, code: {}, key already exist", keyValue, code); + return ResponseHandleResult.withSuccess(ValueAndEpoch.of(response.value(), response.epoch())); + default: + LOGGER.error("[ControllerKVClient]: Failed to Put Namespaced KV if absent: {}, code: {}, retry later", keyValue, code); + return ResponseHandleResult.withRetry(); + } + }); + this.requestSender.send(task); + return future; + } + + @Override + public CompletableFuture putNamespacedKV(KeyValue keyValue) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("[ControllerKVClient]: Put Namespaced KV: {}", keyValue); + } + PutKVRequest request = new PutKVRequest() + .setKey(keyValue.key().get()) + .setValue(keyValue.value().get().array()) + .setNamespace(keyValue.namespace()) + .setEpoch(keyValue.epoch()) + .setOverwrite(true); + WrapRequest req = new BatchRequest() { + @Override + public Builder addSubRequest(Builder builder) { + PutKVsRequest.Builder realBuilder = (PutKVsRequest.Builder) builder; + return realBuilder.addSubRequest(request); + } + + @Override + public ApiKeys apiKey() { + return ApiKeys.PUT_KVS; + } + + @Override + public Builder toRequestBuilder() { + return new PutKVsRequest.Builder( + new PutKVsRequestData() + ).addSubRequest(request); + } + }; + CompletableFuture future = new CompletableFuture<>(); + RequestTask task = new RequestTask(req, future, response -> { + Errors code = Errors.forCode(response.errorCode()); + switch (code) { + case NONE: + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("[ControllerKVClient]: Put Namespaced KV: {}, result: {}", keyValue, response); + } + return ResponseHandleResult.withSuccess(ValueAndEpoch.of(response.value(), response.epoch())); + default: + LOGGER.error("[ControllerKVClient]: Failed to Put Namespaced KV: {}, code: {}, retry later", keyValue, code); + return ResponseHandleResult.withRetry(); + } + }); + this.requestSender.send(task); + return future; + } + + @Override + public CompletableFuture getNamespacedKV(KeyAndNamespace keyAndNamespace) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("[ControllerKVClient]: Get KV: {}, Namespace: {}", keyAndNamespace.key(), keyAndNamespace.namespace()); + } + GetKVRequest request = new GetKVRequest() + .setKey(keyAndNamespace.key().get()) + .setNamespace(keyAndNamespace.namespace()); + WrapRequest req = new BatchRequest() { + @Override + public Builder addSubRequest(Builder builder) { + GetKVsRequest.Builder realBuilder = (GetKVsRequest.Builder) builder; + return realBuilder.addSubRequest(request); + } + + @Override + public ApiKeys apiKey() { + return ApiKeys.GET_KVS; + } + + @Override + public Builder toRequestBuilder() { + return new GetKVsRequest.Builder( + new GetKVsRequestData() + ).addSubRequest(request); + } + }; + CompletableFuture future = new CompletableFuture<>(); + RequestTask task = new RequestTask<>(req, future, response -> { + Errors code = Errors.forCode(response.errorCode()); + switch (code) { + case NONE: + ValueAndEpoch val = ValueAndEpoch.of(response.value(), response.epoch()); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("[ControllerKVClient]: Get Namespaced KV: {}, result: {}", keyAndNamespace.key(), response); + } + return ResponseHandleResult.withSuccess(val); + default: + LOGGER.error("[ControllerKVClient]: Failed to Get Namespaced KV: {}, code: {}, retry later", keyAndNamespace.key(), code); + return ResponseHandleResult.withRetry(); + } + }); + this.requestSender.send(task); + return future; + } + + @Override + public CompletableFuture delNamespacedKV(KeyValue keyValue) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("[ControllerKVClient]: Delete Namespaced KV: {}", keyValue.key()); + } + DeleteKVRequest request = new DeleteKVRequest() + .setKey(keyValue.key().get()); + WrapRequest req = new BatchRequest() { + @Override + public Builder addSubRequest(Builder builder) { + DeleteKVsRequest.Builder realBuilder = (DeleteKVsRequest.Builder) builder; + return realBuilder.addSubRequest(request); + } + + @Override + public ApiKeys apiKey() { + return ApiKeys.DELETE_KVS; + } + + @Override + public Builder toRequestBuilder() { + return new DeleteKVsRequest.Builder( + new DeleteKVsRequestData() + ).addSubRequest(request); + } + }; + + CompletableFuture future = new CompletableFuture<>(); + RequestTask task = new RequestTask<>(req, future, response -> { + Errors code = Errors.forCode(response.errorCode()); + switch (code) { + case NONE: + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("[ControllerKVClient]: Delete Namespaced KV: {}, result: {}", keyValue.key(), response); + } + return ResponseHandleResult.withSuccess(ValueAndEpoch.of(response.value(), response.epoch())); + case KEY_NOT_EXIST: + LOGGER.info("[ControllerKVClient]: Delete Namespaced KV: {}, result: KEY_NOT_EXIST", keyValue.key()); + return ResponseHandleResult.withSuccess(ValueAndEpoch.of((ByteBuffer) null, 0L)); + default: + LOGGER.error("[ControllerKVClient]: Failed to Delete Namespaced KV: {}, code: {}, retry later", keyValue.key(), code); + return ResponseHandleResult.withRetry(); + } + }); + this.requestSender.send(task); + return future; + } } diff --git a/core/src/main/scala/kafka/log/streamaspect/MemoryClient.java b/core/src/main/scala/kafka/log/streamaspect/MemoryClient.java index 75074f5b77..f417bee422 100644 --- a/core/src/main/scala/kafka/log/streamaspect/MemoryClient.java +++ b/core/src/main/scala/kafka/log/streamaspect/MemoryClient.java @@ -19,6 +19,7 @@ package kafka.log.streamaspect; + import com.automq.stream.DefaultRecordBatch; import com.automq.stream.RecordBatchWithContextWrapper; import com.automq.stream.api.AppendResult; @@ -28,7 +29,9 @@ import com.automq.stream.api.KVClient; import com.automq.stream.api.KeyValue; import com.automq.stream.api.KeyValue.Key; +import com.automq.stream.api.KeyValue.KeyAndNamespace; import com.automq.stream.api.KeyValue.Value; +import com.automq.stream.api.KeyValue.ValueAndEpoch; import com.automq.stream.api.OpenStreamOptions; import com.automq.stream.api.RecordBatch; import com.automq.stream.api.RecordBatchWithContext; @@ -51,6 +54,9 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicLong; +import static com.automq.stream.utils.KVRecordUtils.buildCompositeKey; +import static org.apache.kafka.common.protocol.Errors.INVALID_KV_RECORD_EPOCH; + public class MemoryClient implements Client { private final StreamClient streamClient = new StreamClientImpl(); private final KVClient kvClient = new KVClientImpl(); @@ -180,6 +186,7 @@ public void shutdown() { public static class KVClientImpl implements KVClient { private final Map store = new ConcurrentHashMap<>(); + private final Map keyMetadataMap = new ConcurrentHashMap<>(); @Override public CompletableFuture putKV(KeyValue keyValue) { @@ -202,5 +209,77 @@ public CompletableFuture getKV(Key key) { public CompletableFuture delKV(Key key) { return CompletableFuture.completedFuture(Value.of(store.remove(key.get()))); } + + @Override + public CompletableFuture putNamespacedKVIfAbsent(KeyValue keyValue) { + String key = buildCompositeKey(keyValue.namespace(), keyValue.key().get()); + KeyMetadata keyMetadata = keyMetadataMap.get(key); + long currentEpoch = keyMetadata != null ? keyMetadata.getEpoch() : 0; + if (keyValue.epoch() > 0 && keyValue.epoch() != currentEpoch) { + return CompletableFuture.failedFuture(INVALID_KV_RECORD_EPOCH.exception()); + } + long newEpoch = System.currentTimeMillis(); + + ByteBuffer value = store.putIfAbsent(key, keyValue.value().get().duplicate()); + if (keyValue.namespace() != null && !keyValue.namespace().isEmpty()) { + keyMetadataMap.putIfAbsent(keyValue.key().get(), new KeyMetadata(keyValue.namespace(), newEpoch)); + } + return CompletableFuture.completedFuture(ValueAndEpoch.of(value, newEpoch)); + } + + @Override + public CompletableFuture putNamespacedKV(KeyValue keyValue) { + String key = buildCompositeKey(keyValue.namespace(), keyValue.key().get()); + KeyMetadata keyMetadata = keyMetadataMap.get(key); + long currentEpoch = keyMetadata != null ? keyMetadata.getEpoch() : 0; + if (keyValue.epoch() > 0 && keyValue.epoch() != currentEpoch) { + return CompletableFuture.failedFuture(INVALID_KV_RECORD_EPOCH.exception()); + } + long newEpoch = System.currentTimeMillis(); + + ByteBuffer value = store.put(key, keyValue.value().get().duplicate()); + if (keyValue.namespace() != null && !keyValue.namespace().isEmpty()) { + keyMetadataMap.put(keyValue.key().get(), new KeyMetadata(keyValue.namespace(), newEpoch)); + } + return CompletableFuture.completedFuture(ValueAndEpoch.of(value, newEpoch)); + } + + @Override + public CompletableFuture getNamespacedKV(KeyAndNamespace keyAndNamespace) { + String key = buildCompositeKey(keyAndNamespace.namespace(), keyAndNamespace.key().get()); + KeyMetadata keyMetadata = null; + if (keyAndNamespace.namespace() != null && !keyAndNamespace.namespace().isEmpty()) { + keyMetadata = keyMetadataMap.get(key); + } + return CompletableFuture.completedFuture(ValueAndEpoch.of(store.get(key), keyMetadata != null ? keyMetadata.getEpoch() : 0L)); + } + + @Override + public CompletableFuture delNamespacedKV(KeyValue keyValue) { + String key = buildCompositeKey(keyValue.namespace(), keyValue.key().get()); + KeyMetadata keyMetadata = keyMetadataMap.get(key); + long currentEpoch = keyMetadata != null ? keyMetadata.getEpoch() : 0; + if (keyValue.epoch() > 0 && keyValue.epoch() != currentEpoch) { + return CompletableFuture.failedFuture(INVALID_KV_RECORD_EPOCH.exception()); + } + return CompletableFuture.completedFuture(ValueAndEpoch.of(store.remove(key), currentEpoch)); + } + + private static class KeyMetadata { + private final long epoch; + private final String namespace; + public KeyMetadata(String namespace, long epoch) { + this.namespace = namespace; + this.epoch = epoch; + } + + public long getEpoch() { + return epoch; + } + + public String getNamespace() { + return namespace; + } + } } } diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 28795589ef..3ffdbb27fe 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -2051,9 +2051,6 @@ private QuorumController( this.time = time; this.controllerMetrics = controllerMetrics; this.snapshotRegistry = new SnapshotRegistry(logContext); - // AutoMQ for Kafka inject start - this.kvControlManager = new KVControlManager(snapshotRegistry, logContext); - // AutoMQ for Kafka inject end this.deferredEventQueue = new DeferredEventQueue(logContext); this.deferredUnstableEventQueue = new DeferredEventQueue(logContext); this.offsetControl = new OffsetControlManager.Builder(). @@ -2090,6 +2087,9 @@ private QuorumController( setMetadataVersion(MetadataVersion.MINIMUM_KRAFT_VERSION). setClusterFeatureSupportDescriber(clusterSupportDescriber). build(); + // AutoMQ for Kafka inject start + this.kvControlManager = new KVControlManager(snapshotRegistry, logContext, featureControl); + // AutoMQ for Kafka inject end this.clusterControl = new ClusterControlManager.Builder(). setLogContext(logContext). setClusterId(clusterId). diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/KVControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/KVControlManager.java index fd6ef712e4..a3e006e5b3 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/KVControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/KVControlManager.java @@ -28,6 +28,7 @@ import org.apache.kafka.common.metadata.RemoveKVRecord; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.controller.ControllerResult; +import org.apache.kafka.controller.FeatureControlManager; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.timeline.SnapshotRegistry; import org.apache.kafka.timeline.TimelineHashMap; @@ -39,6 +40,8 @@ import java.util.List; import java.util.Map; +import static com.automq.stream.utils.KVRecordUtils.buildCompositeKey; +import static org.apache.kafka.common.protocol.Errors.INVALID_KV_RECORD_EPOCH; import static org.apache.kafka.common.protocol.Errors.KEY_EXIST; import static org.apache.kafka.common.protocol.Errors.KEY_NOT_EXIST; @@ -47,30 +50,54 @@ public class KVControlManager { private final SnapshotRegistry registry; private final Logger log; private final TimelineHashMap kv; + private final TimelineHashMap keyMetadataMap; + private final FeatureControlManager featureControl; - public KVControlManager(SnapshotRegistry registry, LogContext logContext) { + public KVControlManager(SnapshotRegistry registry, LogContext logContext, FeatureControlManager featureControl) { this.registry = registry; this.log = logContext.logger(KVControlManager.class); this.kv = new TimelineHashMap<>(registry, 0); + this.keyMetadataMap = new TimelineHashMap<>(registry, 0); + this.featureControl = featureControl; } public GetKVResponse getKV(GetKVRequest request) { - String key = request.key(); + String key = buildCompositeKey(request.namespace(), request.key()); byte[] value = kv.containsKey(key) ? kv.get(key).array() : null; + KeyMetadata keyMetadata = null; + if (request.namespace() != null && !request.namespace().isEmpty()) { + keyMetadata = keyMetadataMap.get(key); + } + return new GetKVResponse() - .setValue(value); + .setValue(value) + .setNamespace(request.namespace()) + .setEpoch(keyMetadata != null ? keyMetadata.getEpoch() : 0L); } public ControllerResult putKV(PutKVRequest request) { - String key = request.key(); + String key = buildCompositeKey(request.namespace(), request.key()); + KeyMetadata keyMetadata = keyMetadataMap.get(key); + long currentEpoch = keyMetadata != null ? keyMetadata.getEpoch() : 0; + if (request.epoch() > 0 && request.epoch() != currentEpoch) { + return ControllerResult.of(Collections.emptyList(), + new PutKVResponse() + .setErrorCode(INVALID_KV_RECORD_EPOCH.code()) + .setEpoch(currentEpoch)); + } + + long newEpoch = System.currentTimeMillis(); ByteBuffer value = kv.get(key); if (value == null || request.overwrite()) { // generate kv record ApiMessageAndVersion record = new ApiMessageAndVersion(new KVRecord() .setKeyValues(Collections.singletonList(new KeyValue() .setKey(key) - .setValue(request.value()))), (short) 0); - return ControllerResult.of(Collections.singletonList(record), new PutKVResponse().setValue(request.value())); + .setValue(request.value()) + .setNamespace(request.namespace()) + .setEpoch(newEpoch))), + featureControl.autoMQVersion().namespacedKVRecordVersion()); + return ControllerResult.of(Collections.singletonList(record), new PutKVResponse().setValue(request.value()).setEpoch(newEpoch)); } // exist and not allow overwriting return ControllerResult.of(Collections.emptyList(), new PutKVResponse() @@ -79,14 +106,25 @@ public ControllerResult putKV(PutKVRequest request) { } public ControllerResult deleteKV(DeleteKVRequest request) { + String key = buildCompositeKey(request.namespace(), request.key()); + KeyMetadata keyMetadata = keyMetadataMap.get(key); + long currentEpoch = keyMetadata != null ? keyMetadata.getEpoch() : 0; + if (request.epoch() > 0 && request.epoch() != currentEpoch) { + return ControllerResult.of(Collections.emptyList(), + new DeleteKVResponse() + .setErrorCode(INVALID_KV_RECORD_EPOCH.code()) + .setEpoch(currentEpoch)); + } log.trace("DeleteKVRequestData: {}", request); DeleteKVResponse resp = new DeleteKVResponse(); - ByteBuffer value = kv.get(request.key()); + ByteBuffer value = kv.get(key); if (value != null) { // generate remove-kv record ApiMessageAndVersion record = new ApiMessageAndVersion(new RemoveKVRecord() - .setKeys(Collections.singletonList(request.key())), (short) 0); - return ControllerResult.of(Collections.singletonList(record), resp.setValue(value.array())); + .setKeys(Collections.singletonList(key)) + .setNamespace(request.namespace()), + featureControl.autoMQVersion().namespacedKVRecordVersion()); + return ControllerResult.of(Collections.singletonList(record), resp.setValue(value.array()).setEpoch(currentEpoch)); } return ControllerResult.of(Collections.emptyList(), resp.setErrorCode(KEY_NOT_EXIST.code())); } @@ -95,6 +133,9 @@ public void replay(KVRecord record) { List keyValues = record.keyValues(); for (KeyValue keyValue : keyValues) { kv.put(keyValue.key(), ByteBuffer.wrap(keyValue.value())); + if (keyValue.namespace() != null && !keyValue.namespace().isEmpty()) { + keyMetadataMap.put(keyValue.key(), new KeyMetadata(keyValue.namespace(), keyValue.epoch())); + } } } @@ -102,10 +143,30 @@ public void replay(RemoveKVRecord record) { List keys = record.keys(); for (String key : keys) { kv.remove(key); + if (record.namespace() != null && !record.namespace().isEmpty()) { + keyMetadataMap.remove(key); + } } } public Map kv() { return kv; } + + private static class KeyMetadata { + private final long epoch; + private final String namespace; + public KeyMetadata(String namespace, long epoch) { + this.namespace = namespace; + this.epoch = epoch; + } + + public long getEpoch() { + return epoch; + } + + public String getNamespace() { + return namespace; + } + } } diff --git a/metadata/src/main/resources/common/metadata/KVRecord.json b/metadata/src/main/resources/common/metadata/KVRecord.json index 62b70c2ceb..0e2166d79a 100644 --- a/metadata/src/main/resources/common/metadata/KVRecord.json +++ b/metadata/src/main/resources/common/metadata/KVRecord.json @@ -17,7 +17,7 @@ "apiKey": 516, "type": "metadata", "name": "KVRecord", - "validVersions": "0", + "validVersions": "0-1", "flexibleVersions": "0+", "fields": [ { @@ -37,8 +37,20 @@ "type": "bytes", "versions": "0+", "about": "Value" + }, + { + "name": "Namespace", + "type": "string", + "versions": "1+", + "about": "Namespace" + }, + { + "name": "Epoch", + "type": "int64", + "versions": "1+", + "about": "Epoch" } ] } ] -} \ No newline at end of file +} diff --git a/metadata/src/main/resources/common/metadata/RemoveKVRecord.json b/metadata/src/main/resources/common/metadata/RemoveKVRecord.json index 049d939be2..dc0253e5cf 100644 --- a/metadata/src/main/resources/common/metadata/RemoveKVRecord.json +++ b/metadata/src/main/resources/common/metadata/RemoveKVRecord.json @@ -17,7 +17,7 @@ "apiKey": 517, "type": "metadata", "name": "RemoveKVRecord", - "validVersions": "0", + "validVersions": "0-1", "flexibleVersions": "0+", "fields": [ { @@ -25,6 +25,12 @@ "type": "[]string", "versions": "0+", "about": "Keys" + }, + { + "name": "Namespace", + "type": "string", + "versions": "1+", + "about": "Namespace" } ] -} \ No newline at end of file +} diff --git a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java index 4b9564a1f1..7572871c19 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java @@ -74,6 +74,7 @@ import java.util.stream.Stream; import static java.util.Arrays.asList; +import static org.apache.kafka.controller.FeatureControlManagerTest.features; import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV2; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -728,7 +729,12 @@ public void testReRegistrationAndBrokerEpoch(boolean newIncarnationId) { public void testReusableNodeIds() { MockTime time = new MockTime(0, 0, 0); SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); - KVControlManager kvControl = new KVControlManager(snapshotRegistry, new LogContext()); + FeatureControlManager featureControlManager = new FeatureControlManager.Builder(). + setQuorumFeatures(features("foo", 1, 2)). + setSnapshotRegistry(snapshotRegistry). + setMetadataVersion(MetadataVersion.IBP_3_3_IV0). + build(); + KVControlManager kvControl = new KVControlManager(snapshotRegistry, new LogContext(), featureControlManager); FeatureControlManager featureControl = new FeatureControlManager.Builder(). setSnapshotRegistry(snapshotRegistry). setQuorumFeatures(new QuorumFeatures(0, diff --git a/metadata/src/test/java/org/apache/kafka/controller/KVControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/KVControlManagerTest.java index 7f21670a1f..ac34141b38 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/KVControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/KVControlManagerTest.java @@ -31,6 +31,7 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.controller.stream.KVControlManager; import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.timeline.SnapshotRegistry; import org.junit.jupiter.api.BeforeEach; @@ -41,8 +42,11 @@ import java.util.List; import java.util.stream.Collectors; +import static org.apache.kafka.common.protocol.Errors.INVALID_KV_RECORD_EPOCH; +import static org.apache.kafka.controller.FeatureControlManagerTest.features; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; @Timeout(40) @Tag("S3Unit") @@ -54,7 +58,12 @@ public class KVControlManagerTest { public void setUp() { LogContext logContext = new LogContext(); SnapshotRegistry registry = new SnapshotRegistry(logContext); - this.manager = new KVControlManager(registry, logContext); + FeatureControlManager featureControlManager = new FeatureControlManager.Builder(). + setQuorumFeatures(features("foo", 1, 2)). + setSnapshotRegistry(registry). + setMetadataVersion(MetadataVersion.IBP_3_3_IV0). + build(); + this.manager = new KVControlManager(registry, logContext, featureControlManager); } @Test @@ -113,6 +122,149 @@ public void testBasicReadWrite() { assertEquals(Errors.KEY_NOT_EXIST.code(), result3.response().errorCode()); } + @Test + public void testNamespacedReadWrite() { + ControllerResult result = manager.putKV(new PutKVRequest() + .setKey("key1") + .setValue("value1".getBytes()) + .setNamespace("__automq_test") + ); + assertEquals(1, result.records().size()); + assertEquals(Errors.NONE.code(), result.response().errorCode()); + assertEquals("value1", new String(result.response().value())); + replay(manager, result.records()); + + result = manager.putKV(new PutKVRequest() + .setKey("key1") + .setValue("value1-1".getBytes()) + .setNamespace("__automq_test") + ); + assertEquals(0, result.records().size()); + assertEquals(Errors.KEY_EXIST.code(), result.response().errorCode()); + assertEquals("value1", new String(result.response().value())); + + result = manager.putKV(new PutKVRequest() + .setKey("key1") + .setValue("value1-2".getBytes()) + .setNamespace("__automq_test") + .setOverwrite(true)); + assertEquals(1, result.records().size()); + assertEquals(Errors.NONE.code(), result.response().errorCode()); + assertEquals("value1-2", new String(result.response().value())); + replay(manager, result.records()); + + GetKVResponse result2 = manager.getKV(new GetKVRequest() + .setKey("key1") + .setNamespace("__automq_test")); + assertEquals("value1-2", new String(result2.value())); + + result2 = manager.getKV(new GetKVRequest() + .setKey("key2") + .setNamespace("__automq_test")); + assertNull(result2.value()); + + ControllerResult result3 = manager.deleteKV(new DeleteKVRequest() + .setKey("key2") + .setNamespace("__automq_test")); + assertEquals(0, result3.records().size()); + assertEquals(Errors.KEY_NOT_EXIST.code(), result3.response().errorCode()); + + result3 = manager.deleteKV(new DeleteKVRequest() + .setKey("key1") + .setNamespace("__automq_test") + ); + assertEquals(1, result3.records().size()); + assertEquals(Errors.NONE.code(), result3.response().errorCode()); + assertEquals("value1-2", new String(result3.response().value())); + replay(manager, result3.records()); + // key1 is deleted + result2 = manager.getKV(new GetKVRequest() + .setKey("key1") + .setNamespace("__automq_test")); + assertNull(result2.value()); + + result3 = manager.deleteKV(new DeleteKVRequest() + .setKey("key1") + .setNamespace("__automq_test")); + assertEquals(0, result3.records().size()); + assertEquals(Errors.KEY_NOT_EXIST.code(), result3.response().errorCode()); + } + + @Test + public void testPutWithEpochValidation() { + ControllerResult result = manager.putKV(new PutKVRequest() + .setKey("key1") + .setValue("value1".getBytes()) + .setNamespace("__epoch_test") + .setEpoch(0)); + assertEquals(1, result.records().size()); + assertEquals(Errors.NONE.code(), result.response().errorCode()); + long initialEpoch = result.response().epoch(); + assertTrue(initialEpoch > 0); + replay(manager, result.records()); + result = manager.putKV(new PutKVRequest() + .setKey("key1") + .setValue("value2".getBytes()) + .setNamespace("__epoch_test") + .setEpoch(initialEpoch - 1) + .setOverwrite(true)); + assertEquals(0, result.records().size()); + assertEquals(INVALID_KV_RECORD_EPOCH.code(), result.response().errorCode()); + assertEquals(initialEpoch, result.response().epoch()); + // without overwrite, should fail + result = manager.putKV(new PutKVRequest() + .setKey("key1") + .setValue("value2".getBytes()) + .setNamespace("__epoch_test") + .setEpoch(initialEpoch)); + assertEquals(0, result.records().size()); + assertEquals(Errors.KEY_EXIST.code(), result.response().errorCode()); + // with overwrite, should succeed + result = manager.putKV(new PutKVRequest() + .setKey("key1") + .setValue("value2".getBytes()) + .setNamespace("__epoch_test") + .setEpoch(initialEpoch) + .setOverwrite(true)); + assertEquals(1, result.records().size()); + assertEquals(Errors.NONE.code(), result.response().errorCode()); + long newEpoch = result.response().epoch(); + assertTrue(newEpoch > initialEpoch); + replay(manager, result.records()); + GetKVResponse readResp = manager.getKV(new GetKVRequest() + .setKey("key1") + .setNamespace("__epoch_test")); + assertEquals(newEpoch, readResp.epoch()); + } + + @Test + public void testDeleteWithEpochValidation() { + ControllerResult putResult = manager.putKV(new PutKVRequest() + .setKey("key1") + .setValue("value1".getBytes()) + .setNamespace("__epoch_test")); + long initialEpoch = putResult.response().epoch(); + replay(manager, putResult.records()); + ControllerResult delResult = manager.deleteKV(new DeleteKVRequest() + .setKey("key1") + .setNamespace("__epoch_test") + .setEpoch(initialEpoch - 1)); + assertEquals(0, delResult.records().size()); + assertEquals(INVALID_KV_RECORD_EPOCH.code(), delResult.response().errorCode()); + assertEquals(initialEpoch, delResult.response().epoch()); + delResult = manager.deleteKV(new DeleteKVRequest() + .setKey("key1") + .setNamespace("__epoch_test") + .setEpoch(initialEpoch)); + assertEquals(1, delResult.records().size()); + assertEquals(Errors.NONE.code(), delResult.response().errorCode()); + replay(manager, delResult.records()); + GetKVResponse readResp = manager.getKV(new GetKVRequest() + .setKey("key1") + .setNamespace("__epoch_test")); + assertNull(readResp.value()); + } + private void replay(KVControlManager manager, List records) { List messages = records.stream().map(x -> x.message()) .collect(Collectors.toList()); diff --git a/s3stream/src/main/java/com/automq/stream/api/KVClient.java b/s3stream/src/main/java/com/automq/stream/api/KVClient.java index a3e2152ebc..31786ca115 100644 --- a/s3stream/src/main/java/com/automq/stream/api/KVClient.java +++ b/s3stream/src/main/java/com/automq/stream/api/KVClient.java @@ -20,7 +20,9 @@ package com.automq.stream.api; import com.automq.stream.api.KeyValue.Key; +import com.automq.stream.api.KeyValue.KeyAndNamespace; import com.automq.stream.api.KeyValue.Value; +import com.automq.stream.api.KeyValue.ValueAndEpoch; import java.util.concurrent.CompletableFuture; @@ -36,6 +38,14 @@ public interface KVClient { */ CompletableFuture putKVIfAbsent(KeyValue keyValue); + /** + * Put namespaced key value if key not exist, return current key value namespace epoch after putting. + * + * @param keyValue {@link KeyValue} k-v pair namespace and epoch + * @return async put result. {@link ValueAndEpoch} current value and epoch after putting. + */ + CompletableFuture putNamespacedKVIfAbsent(KeyValue keyValue); + /** * Put key value, overwrite if key exist, return current key value after putting. * @@ -44,6 +54,14 @@ public interface KVClient { */ CompletableFuture putKV(KeyValue keyValue); + /** + * Put key value, overwrite if key exist, return current key value namespace epoch after putting. + * + * @param keyValue {@link KeyValue} k-v pair namespace and epoch + * @return async put result. {@link ValueAndEpoch} current value and epoch after putting. + */ + CompletableFuture putNamespacedKV(KeyValue keyValue); + /** * Get value by key. * @@ -52,6 +70,14 @@ public interface KVClient { */ CompletableFuture getKV(Key key); + /** + * Get value by key. + * + * @param keyAndNamespace key and namespace. + * @return async get result. {@link ValueAndEpoch} value and epoch, null if key not exist. + */ + CompletableFuture getNamespacedKV(KeyAndNamespace keyAndNamespace); + /** * Delete key value by key. If key not exist, return null. * @@ -59,4 +85,12 @@ public interface KVClient { * @return async delete result. {@link Value} deleted value, null if key not exist. */ CompletableFuture delKV(Key key); + + /** + * Delete key value by key. If key not exist, return null. + * + * @param keyValue k-v pair namespace and epoch + * @return async delete result. {@link ValueAndEpoch} deleted value and epoch, null if key not exist. + */ + CompletableFuture delNamespacedKV(KeyValue keyValue); } diff --git a/s3stream/src/main/java/com/automq/stream/api/KeyValue.java b/s3stream/src/main/java/com/automq/stream/api/KeyValue.java index 7722120c48..0d17bf131f 100644 --- a/s3stream/src/main/java/com/automq/stream/api/KeyValue.java +++ b/s3stream/src/main/java/com/automq/stream/api/KeyValue.java @@ -25,16 +25,31 @@ public class KeyValue { private final Key key; private final Value value; + private final String namespace; + private final long epoch; private KeyValue(Key key, Value value) { this.key = key; this.value = value; + this.namespace = null; + this.epoch = 0L; + } + + public KeyValue(Key key, Value value, String namespace, long epoch) { + this.key = key; + this.value = value; + this.namespace = namespace; + this.epoch = epoch; } public static KeyValue of(String key, ByteBuffer value) { return new KeyValue(Key.of(key), Value.of(value)); } + public static KeyValue of(String key, ByteBuffer value, String namespace, long epoch) { + return new KeyValue(Key.of(key), Value.of(value), namespace, epoch); + } + public Key key() { return key; } @@ -43,6 +58,14 @@ public Value value() { return value; } + public String namespace() { + return namespace; + } + + public long epoch() { + return epoch; + } + @Override public boolean equals(Object o) { if (this == o) @@ -154,4 +177,52 @@ public String toString() { '}'; } } + + public static class KeyAndNamespace { + private final Key key; + private final String namespace; + + public KeyAndNamespace(Key key, String namespace) { + this.key = key; + this.namespace = namespace; + } + + public Key key() { + return key; + } + + public String namespace() { + return namespace; + } + + public static KeyAndNamespace of(String key, String namespace) { + return new KeyAndNamespace(Key.of(key), namespace); + } + } + + public static class ValueAndEpoch { + private final Value value; + private final long epoch; + + public ValueAndEpoch(Value value, long epoch) { + this.value = value; + this.epoch = epoch; + } + + public Value value() { + return value; + } + + public long epoch() { + return epoch; + } + + public static ValueAndEpoch of(byte[] value, long epoch) { + return new ValueAndEpoch(Value.of(value), epoch); + } + + public static ValueAndEpoch of(ByteBuffer value, long epoch) { + return new ValueAndEpoch(Value.of(value), epoch); + } + } } diff --git a/s3stream/src/main/java/com/automq/stream/utils/KVRecordUtils.java b/s3stream/src/main/java/com/automq/stream/utils/KVRecordUtils.java new file mode 100644 index 0000000000..0e8a136085 --- /dev/null +++ b/s3stream/src/main/java/com/automq/stream/utils/KVRecordUtils.java @@ -0,0 +1,11 @@ +package com.automq.stream.utils; + +public class KVRecordUtils { + + public static String buildCompositeKey(String namespace, String key) { + if (namespace == null || namespace.isEmpty()) { + return key; + } + return namespace + "/" + key; + } +} diff --git a/server-common/src/main/java/org/apache/kafka/server/common/automq/AutoMQVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/automq/AutoMQVersion.java index 082a96e0a2..4f9f9979de 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/automq/AutoMQVersion.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/automq/AutoMQVersion.java @@ -31,10 +31,12 @@ public enum AutoMQVersion { // Support object bucket index // Support huge cluster // Support node registration - V2((short) 3); + V2((short) 3), + // Support kv record namespace + V3((short) 4); public static final String FEATURE_NAME = "automq.version"; - public static final AutoMQVersion LATEST = V2; + public static final AutoMQVersion LATEST = V3; private final short level; private final Version s3streamVersion; @@ -125,6 +127,14 @@ public short streamObjectRecordVersion() { } } + public short namespacedKVRecordVersion() { + if (isAtLeast(V3)) { + return 1; + } else { + return 0; + } + } + public Version s3streamVersion() { return s3streamVersion; } @@ -139,6 +149,7 @@ private Version mapS3StreamVersion(short automqVersion) { case 2: return Version.V0; case 3: + case 4: return Version.V1; default: throw new IllegalArgumentException("Unknown AutoMQVersion level: " + automqVersion);