diff --git a/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java b/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java index 1566578e26..7c50c25958 100644 --- a/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java +++ b/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java @@ -37,6 +37,7 @@ import io.lettuce.core.models.stream.ClaimedMessages; import io.lettuce.core.models.stream.PendingMessage; import io.lettuce.core.models.stream.PendingMessages; +import io.lettuce.core.models.stream.StreamEntryDeletionResult; import io.lettuce.core.output.CommandOutput; import io.lettuce.core.output.KeyStreamingChannel; import io.lettuce.core.output.KeyValueStreamingChannel; @@ -2708,6 +2709,17 @@ public RedisFuture xack(K key, K group, String... messageIds) { return dispatch(commandBuilder.xack(key, group, messageIds)); } + @Override + public RedisFuture> xackdel(K key, K group, String... messageIds) { + return dispatch(commandBuilder.xackdel(key, group, messageIds)); + } + + @Override + public RedisFuture> xackdel(K key, K group, StreamDeletionPolicy policy, + String... messageIds) { + return dispatch(commandBuilder.xackdel(key, group, policy, messageIds)); + } + @Override public RedisFuture xadd(K key, Map body) { return dispatch(commandBuilder.xadd(key, null, body)); @@ -2748,6 +2760,16 @@ public RedisFuture xdel(K key, String... messageIds) { return dispatch(commandBuilder.xdel(key, messageIds)); } + @Override + public RedisFuture> xdelex(K key, String... messageIds) { + return dispatch(commandBuilder.xdelex(key, messageIds)); + } + + @Override + public RedisFuture> xdelex(K key, StreamDeletionPolicy policy, String... messageIds) { + return dispatch(commandBuilder.xdelex(key, policy, messageIds)); + } + @Override public RedisFuture xgroupCreate(XReadArgs.StreamOffset offset, K group) { return dispatch(commandBuilder.xgroupCreate(offset, group, null)); diff --git a/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java b/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java index 6e7fe9bb6d..f9dc4b0ca1 100644 --- a/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java +++ b/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java @@ -37,6 +37,7 @@ import io.lettuce.core.models.stream.ClaimedMessages; import io.lettuce.core.models.stream.PendingMessage; import io.lettuce.core.models.stream.PendingMessages; +import io.lettuce.core.models.stream.StreamEntryDeletionResult; import io.lettuce.core.output.CommandOutput; import io.lettuce.core.output.KeyStreamingChannel; import io.lettuce.core.output.KeyValueStreamingChannel; @@ -2790,6 +2791,16 @@ public Mono xack(K key, K group, String... messageIds) { return createMono(() -> commandBuilder.xack(key, group, messageIds)); } + @Override + public Flux xackdel(K key, K group, String... messageIds) { + return createDissolvingFlux(() -> commandBuilder.xackdel(key, group, messageIds)); + } + + @Override + public Flux xackdel(K key, K group, StreamDeletionPolicy policy, String... messageIds) { + return createDissolvingFlux(() -> commandBuilder.xackdel(key, group, policy, messageIds)); + } + @Override public Mono xadd(K key, Map body) { return createMono(() -> commandBuilder.xadd(key, null, body)); @@ -2831,6 +2842,16 @@ public Mono xdel(K key, String... messageIds) { return createMono(() -> commandBuilder.xdel(key, messageIds)); } + @Override + public Flux xdelex(K key, String... messageIds) { + return createDissolvingFlux(() -> commandBuilder.xdelex(key, messageIds)); + } + + @Override + public Flux xdelex(K key, StreamDeletionPolicy policy, String... messageIds) { + return createDissolvingFlux(() -> commandBuilder.xdelex(key, policy, messageIds)); + } + @Override public Mono xgroupCreate(XReadArgs.StreamOffset streamOffset, K group) { return createMono(() -> commandBuilder.xgroupCreate(streamOffset, group, null)); diff --git a/src/main/java/io/lettuce/core/RedisCommandBuilder.java b/src/main/java/io/lettuce/core/RedisCommandBuilder.java index 9c8da12c09..104db8f911 100644 --- a/src/main/java/io/lettuce/core/RedisCommandBuilder.java +++ b/src/main/java/io/lettuce/core/RedisCommandBuilder.java @@ -27,7 +27,9 @@ import io.lettuce.core.models.stream.ClaimedMessages; import io.lettuce.core.models.stream.PendingMessage; import io.lettuce.core.models.stream.PendingMessages; +import io.lettuce.core.models.stream.StreamEntryDeletionResult; import io.lettuce.core.output.*; +import io.lettuce.core.output.StreamEntryDeletionResultListOutput; import io.lettuce.core.protocol.BaseRedisCommandBuilder; import io.lettuce.core.protocol.Command; import io.lettuce.core.protocol.CommandArgs; @@ -3159,6 +3161,32 @@ public Command xack(K key, K group, String[] messageIds) { return createCommand(XACK, new IntegerOutput<>(codec), args); } + public Command> xackdel(K key, K group, String[] messageIds) { + return xackdel(key, group, null, messageIds); + } + + public Command> xackdel(K key, K group, StreamDeletionPolicy policy, + String[] messageIds) { + notNullKey(key); + LettuceAssert.notNull(group, "Group " + MUST_NOT_BE_NULL); + LettuceAssert.notEmpty(messageIds, "MessageIds " + MUST_NOT_BE_EMPTY); + LettuceAssert.noNullElements(messageIds, "MessageIds " + MUST_NOT_CONTAIN_NULL_ELEMENTS); + + CommandArgs args = new CommandArgs<>(codec).addKey(key).addKey(group); + + if (policy != null) { + args.add(policy); + } + + args.add(CommandKeyword.IDS).add(messageIds.length); + + for (String messageId : messageIds) { + args.add(messageId); + } + + return createCommand(XACKDEL, new StreamEntryDeletionResultListOutput<>(codec), args); + } + public Command> xautoclaim(K key, XAutoClaimArgs xAutoClaimArgs) { notNullKey(key); LettuceAssert.notNull(xAutoClaimArgs, "XAutoClaimArgs " + MUST_NOT_BE_NULL); @@ -3243,6 +3271,30 @@ public Command xdel(K key, String[] messageIds) { return createCommand(XDEL, new IntegerOutput<>(codec), args); } + public Command> xdelex(K key, String[] messageIds) { + return xdelex(key, null, messageIds); + } + + public Command> xdelex(K key, StreamDeletionPolicy policy, String[] messageIds) { + notNullKey(key); + LettuceAssert.notEmpty(messageIds, "MessageIds " + MUST_NOT_BE_EMPTY); + LettuceAssert.noNullElements(messageIds, "MessageIds " + MUST_NOT_CONTAIN_NULL_ELEMENTS); + + CommandArgs args = new CommandArgs<>(codec).addKey(key); + + if (policy != null) { + args.add(policy); + } + + args.add(CommandKeyword.IDS).add(messageIds.length); + + for (String messageId : messageIds) { + args.add(messageId); + } + + return createCommand(XDELEX, new StreamEntryDeletionResultListOutput<>(codec), args); + } + public Command xgroupCreate(StreamOffset offset, K group, XGroupCreateArgs commandArgs) { LettuceAssert.notNull(offset, "StreamOffset " + MUST_NOT_BE_NULL); LettuceAssert.notNull(group, "Group " + MUST_NOT_BE_NULL); diff --git a/src/main/java/io/lettuce/core/StreamDeletionPolicy.java b/src/main/java/io/lettuce/core/StreamDeletionPolicy.java new file mode 100644 index 0000000000..85177f6367 --- /dev/null +++ b/src/main/java/io/lettuce/core/StreamDeletionPolicy.java @@ -0,0 +1,46 @@ +/* + * Copyright 2025-Present, Redis Ltd. and Contributors + * All rights reserved. + * + * Licensed under the MIT License. + */ +package io.lettuce.core; + +import io.lettuce.core.protocol.ProtocolKeyword; + +import java.nio.charset.StandardCharsets; + +/** + * Deletion policy for stream commands that handle consumer group references. Used with XDELEX, XACKDEL, and enhanced XADD/XTRIM + * commands. + */ +public enum StreamDeletionPolicy implements ProtocolKeyword { + + /** + * Preserves existing references to entries in all consumer groups' PEL. This is the default behavior similar to XDEL. + */ + KEEP_REFERENCES("KEEPREF"), + + /** + * Removes all references to entries from all consumer groups' pending entry lists, effectively cleaning up all traces of + * the messages. + */ + DELETE_REFERENCES("DELREF"), + + /** + * Only operates on entries that were read and acknowledged by all consumer groups. + */ + ACKNOWLEDGED("ACKED"); + + public final byte[] bytes; + + StreamDeletionPolicy(String redisParamName) { + bytes = redisParamName.getBytes(StandardCharsets.US_ASCII); + } + + @Override + public byte[] getBytes() { + return bytes; + } + +} diff --git a/src/main/java/io/lettuce/core/XAddArgs.java b/src/main/java/io/lettuce/core/XAddArgs.java index d2e3d06823..3326d07a7b 100644 --- a/src/main/java/io/lettuce/core/XAddArgs.java +++ b/src/main/java/io/lettuce/core/XAddArgs.java @@ -49,6 +49,8 @@ public class XAddArgs implements CompositeArgument { private Long limit; + private StreamDeletionPolicy trimmingMode; + /** * Builder entry points for {@link XAddArgs}. */ @@ -155,6 +157,18 @@ public XAddArgs limit(long limit) { return this; } + /** + * When trimming, defines desired behaviour for handling consumer group references. See {@link StreamDeletionPolicy} for + * details. + * + * @param trimmingMode the deletion policy to apply during trimming. + * @return {@code this} + */ + public XAddArgs trimmingMode(StreamDeletionPolicy trimmingMode) { + this.trimmingMode = trimmingMode; + return this; + } + /** * Apply efficient trimming for capped streams using the {@code ~} flag. * @@ -253,6 +267,10 @@ public void build(CommandArgs args) { args.add(CommandKeyword.LIMIT).add(limit); } + if (trimmingMode != null) { + args.add(trimmingMode); + } + if (nomkstream) { args.add(CommandKeyword.NOMKSTREAM); } diff --git a/src/main/java/io/lettuce/core/XTrimArgs.java b/src/main/java/io/lettuce/core/XTrimArgs.java index 0e302ad221..cc76af60da 100644 --- a/src/main/java/io/lettuce/core/XTrimArgs.java +++ b/src/main/java/io/lettuce/core/XTrimArgs.java @@ -44,6 +44,8 @@ public class XTrimArgs implements CompositeArgument { private Long limit; + private StreamDeletionPolicy trimmingMode; + /** * Builder entry points for {@link XTrimArgs}. */ @@ -164,6 +166,18 @@ public XTrimArgs exactTrimming(boolean exactTrimming) { return this; } + /** + * Defines desired behaviour for handling consumer group references during trimming. See {@link StreamDeletionPolicy} for + * details. + * + * @param trimmingMode the deletion policy to apply during trimming. + * @return {@code this} + */ + public XTrimArgs trimmingMode(StreamDeletionPolicy trimmingMode) { + this.trimmingMode = trimmingMode; + return this; + } + @Override public void build(CommandArgs args) { @@ -193,6 +207,10 @@ public void build(CommandArgs args) { if (limit != null && approximateTrimming) { args.add(CommandKeyword.LIMIT).add(limit); } + + if (trimmingMode != null) { + args.add(trimmingMode); + } } } diff --git a/src/main/java/io/lettuce/core/api/async/RedisStreamAsyncCommands.java b/src/main/java/io/lettuce/core/api/async/RedisStreamAsyncCommands.java index 317f02a195..3db8ab2ca6 100644 --- a/src/main/java/io/lettuce/core/api/async/RedisStreamAsyncCommands.java +++ b/src/main/java/io/lettuce/core/api/async/RedisStreamAsyncCommands.java @@ -27,6 +27,7 @@ import io.lettuce.core.models.stream.ClaimedMessages; import io.lettuce.core.models.stream.PendingMessage; import io.lettuce.core.models.stream.PendingMessages; +import io.lettuce.core.models.stream.StreamEntryDeletionResult; /** * Asynchronous executed commands for Streams. @@ -49,6 +50,32 @@ public interface RedisStreamAsyncCommands { */ RedisFuture xack(K key, K group, String... messageIds); + /** + * Acknowledge and delete one or more messages from the stream and consumer group. Returns detailed results for each message + * ID indicating whether it was deleted, not found, or not deleted due to pending references. + * + * @param key the stream key. + * @param group name of the consumer group. + * @param messageIds message Id's to acknowledge and delete. + * @return List of {@link StreamEntryDeletionResult} indicating the result for each message ID. + * @since 6.8 + */ + RedisFuture> xackdel(K key, K group, String... messageIds); + + /** + * Acknowledge and delete one or more messages from the stream and consumer group with a specific deletion policy. Returns + * detailed results for each message ID indicating whether it was deleted, not found, or not deleted due to pending + * references. + * + * @param key the stream key. + * @param group name of the consumer group. + * @param policy the deletion policy to apply. + * @param messageIds message Id's to acknowledge and delete. + * @return List of {@link StreamEntryDeletionResult} indicating the result for each message ID. + * @since 6.8 + */ + RedisFuture> xackdel(K key, K group, StreamDeletionPolicy policy, String... messageIds); + /** * Append a message to the stream {@code key}. * @@ -132,6 +159,30 @@ public interface RedisStreamAsyncCommands { */ RedisFuture xdel(K key, String... messageIds); + /** + * Extended delete operation that removes the specified entries from the stream and returns detailed results for each + * message ID indicating whether it was deleted, not found, or not deleted due to acknowledgment status. + * + * @param key the stream key. + * @param messageIds stream message Id's. + * @return List of {@link StreamEntryDeletionResult} indicating the result for each message ID. + * @since 6.8 + */ + RedisFuture> xdelex(K key, String... messageIds); + + /** + * Extended delete operation that removes the specified entries from the stream with a specific deletion policy and returns + * detailed results for each message ID indicating whether it was deleted, not found, or not deleted due to acknowledgment + * status. + * + * @param key the stream key. + * @param policy the deletion policy to apply. + * @param messageIds stream message Id's. + * @return List of {@link StreamEntryDeletionResult} indicating the result for each message ID. + * @since 6.8 + */ + RedisFuture> xdelex(K key, StreamDeletionPolicy policy, String... messageIds); + /** * Create a consumer group. * diff --git a/src/main/java/io/lettuce/core/api/reactive/RedisStreamReactiveCommands.java b/src/main/java/io/lettuce/core/api/reactive/RedisStreamReactiveCommands.java index e0e5534996..4041c96768 100644 --- a/src/main/java/io/lettuce/core/api/reactive/RedisStreamReactiveCommands.java +++ b/src/main/java/io/lettuce/core/api/reactive/RedisStreamReactiveCommands.java @@ -28,6 +28,7 @@ import io.lettuce.core.XReadArgs.StreamOffset; import io.lettuce.core.models.stream.PendingMessage; import io.lettuce.core.models.stream.PendingMessages; +import io.lettuce.core.models.stream.StreamEntryDeletionResult; /** * Reactive executed commands for Streams. @@ -50,6 +51,32 @@ public interface RedisStreamReactiveCommands { */ Mono xack(K key, K group, String... messageIds); + /** + * Acknowledge and delete one or more messages from the stream and consumer group. Returns detailed results for each message + * ID indicating whether it was deleted, not found, or not deleted due to pending references. + * + * @param key the stream key. + * @param group name of the consumer group. + * @param messageIds message Id's to acknowledge and delete. + * @return List of {@link StreamEntryDeletionResult} indicating the result for each message ID. + * @since 6.8 + */ + Flux xackdel(K key, K group, String... messageIds); + + /** + * Acknowledge and delete one or more messages from the stream and consumer group with a specific deletion policy. Returns + * detailed results for each message ID indicating whether it was deleted, not found, or not deleted due to pending + * references. + * + * @param key the stream key. + * @param group name of the consumer group. + * @param policy the deletion policy to apply. + * @param messageIds message Id's to acknowledge and delete. + * @return List of {@link StreamEntryDeletionResult} indicating the result for each message ID. + * @since 6.8 + */ + Flux xackdel(K key, K group, StreamDeletionPolicy policy, String... messageIds); + /** * Append a message to the stream {@code key}. * @@ -133,6 +160,30 @@ public interface RedisStreamReactiveCommands { */ Mono xdel(K key, String... messageIds); + /** + * Extended delete operation that removes the specified entries from the stream and returns detailed results for each + * message ID indicating whether it was deleted, not found, or not deleted due to acknowledgment status. + * + * @param key the stream key. + * @param messageIds stream message Id's. + * @return List of {@link StreamEntryDeletionResult} indicating the result for each message ID. + * @since 6.8 + */ + Flux xdelex(K key, String... messageIds); + + /** + * Extended delete operation that removes the specified entries from the stream with a specific deletion policy and returns + * detailed results for each message ID indicating whether it was deleted, not found, or not deleted due to acknowledgment + * status. + * + * @param key the stream key. + * @param policy the deletion policy to apply. + * @param messageIds stream message Id's. + * @return List of {@link StreamEntryDeletionResult} indicating the result for each message ID. + * @since 6.8 + */ + Flux xdelex(K key, StreamDeletionPolicy policy, String... messageIds); + /** * Create a consumer group. * diff --git a/src/main/java/io/lettuce/core/api/sync/RedisStreamCommands.java b/src/main/java/io/lettuce/core/api/sync/RedisStreamCommands.java index 68ed5ab897..1cc5634ea8 100644 --- a/src/main/java/io/lettuce/core/api/sync/RedisStreamCommands.java +++ b/src/main/java/io/lettuce/core/api/sync/RedisStreamCommands.java @@ -27,6 +27,7 @@ import io.lettuce.core.models.stream.ClaimedMessages; import io.lettuce.core.models.stream.PendingMessage; import io.lettuce.core.models.stream.PendingMessages; +import io.lettuce.core.models.stream.StreamEntryDeletionResult; /** * Synchronous executed commands for Streams. @@ -49,6 +50,32 @@ public interface RedisStreamCommands { */ Long xack(K key, K group, String... messageIds); + /** + * Acknowledge and delete one or more messages from the stream and consumer group. Returns detailed results for each message + * ID indicating whether it was deleted, not found, or not deleted due to pending references. + * + * @param key the stream key. + * @param group name of the consumer group. + * @param messageIds message Id's to acknowledge and delete. + * @return List of {@link StreamEntryDeletionResult} indicating the result for each message ID. + * @since 6.8 + */ + List xackdel(K key, K group, String... messageIds); + + /** + * Acknowledge and delete one or more messages from the stream and consumer group with a specific deletion policy. Returns + * detailed results for each message ID indicating whether it was deleted, not found, or not deleted due to pending + * references. + * + * @param key the stream key. + * @param group name of the consumer group. + * @param policy the deletion policy to apply. + * @param messageIds message Id's to acknowledge and delete. + * @return List of {@link StreamEntryDeletionResult} indicating the result for each message ID. + * @since 6.8 + */ + List xackdel(K key, K group, StreamDeletionPolicy policy, String... messageIds); + /** * Append a message to the stream {@code key}. * @@ -132,6 +159,30 @@ public interface RedisStreamCommands { */ Long xdel(K key, String... messageIds); + /** + * Extended delete operation that removes the specified entries from the stream and returns detailed results for each + * message ID indicating whether it was deleted, not found, or not deleted due to acknowledgment status. + * + * @param key the stream key. + * @param messageIds stream message Id's. + * @return List of {@link StreamEntryDeletionResult} indicating the result for each message ID. + * @since 6.8 + */ + List xdelex(K key, String... messageIds); + + /** + * Extended delete operation that removes the specified entries from the stream with a specific deletion policy and returns + * detailed results for each message ID indicating whether it was deleted, not found, or not deleted due to acknowledgment + * status. + * + * @param key the stream key. + * @param policy the deletion policy to apply. + * @param messageIds stream message Id's. + * @return List of {@link StreamEntryDeletionResult} indicating the result for each message ID. + * @since 6.8 + */ + List xdelex(K key, StreamDeletionPolicy policy, String... messageIds); + /** * Create a consumer group. * diff --git a/src/main/java/io/lettuce/core/cluster/api/async/NodeSelectionStreamAsyncCommands.java b/src/main/java/io/lettuce/core/cluster/api/async/NodeSelectionStreamAsyncCommands.java index e6469db3a6..a72d1419a2 100644 --- a/src/main/java/io/lettuce/core/cluster/api/async/NodeSelectionStreamAsyncCommands.java +++ b/src/main/java/io/lettuce/core/cluster/api/async/NodeSelectionStreamAsyncCommands.java @@ -27,6 +27,7 @@ import io.lettuce.core.models.stream.ClaimedMessages; import io.lettuce.core.models.stream.PendingMessage; import io.lettuce.core.models.stream.PendingMessages; +import io.lettuce.core.models.stream.StreamEntryDeletionResult; /** * Asynchronous executed commands on a node selection for Streams. @@ -49,6 +50,32 @@ public interface NodeSelectionStreamAsyncCommands { */ AsyncExecutions xack(K key, K group, String... messageIds); + /** + * Acknowledge and delete one or more messages from the stream and consumer group. Returns detailed results for each message + * ID indicating whether it was deleted, not found, or not deleted due to pending references. + * + * @param key the stream key. + * @param group name of the consumer group. + * @param messageIds message Id's to acknowledge and delete. + * @return List of {@link StreamEntryDeletionResult} indicating the result for each message ID. + * @since 6.8 + */ + AsyncExecutions> xackdel(K key, K group, String... messageIds); + + /** + * Acknowledge and delete one or more messages from the stream and consumer group with a specific deletion policy. Returns + * detailed results for each message ID indicating whether it was deleted, not found, or not deleted due to pending + * references. + * + * @param key the stream key. + * @param group name of the consumer group. + * @param policy the deletion policy to apply. + * @param messageIds message Id's to acknowledge and delete. + * @return List of {@link StreamEntryDeletionResult} indicating the result for each message ID. + * @since 6.8 + */ + AsyncExecutions> xackdel(K key, K group, StreamDeletionPolicy policy, String... messageIds); + /** * Append a message to the stream {@code key}. * @@ -132,6 +159,30 @@ public interface NodeSelectionStreamAsyncCommands { */ AsyncExecutions xdel(K key, String... messageIds); + /** + * Extended delete operation that removes the specified entries from the stream and returns detailed results for each + * message ID indicating whether it was deleted, not found, or not deleted due to acknowledgment status. + * + * @param key the stream key. + * @param messageIds stream message Id's. + * @return List of {@link StreamEntryDeletionResult} indicating the result for each message ID. + * @since 6.8 + */ + AsyncExecutions> xdelex(K key, String... messageIds); + + /** + * Extended delete operation that removes the specified entries from the stream with a specific deletion policy and returns + * detailed results for each message ID indicating whether it was deleted, not found, or not deleted due to acknowledgment + * status. + * + * @param key the stream key. + * @param policy the deletion policy to apply. + * @param messageIds stream message Id's. + * @return List of {@link StreamEntryDeletionResult} indicating the result for each message ID. + * @since 6.8 + */ + AsyncExecutions> xdelex(K key, StreamDeletionPolicy policy, String... messageIds); + /** * Create a consumer group. * diff --git a/src/main/java/io/lettuce/core/cluster/api/sync/NodeSelectionStreamCommands.java b/src/main/java/io/lettuce/core/cluster/api/sync/NodeSelectionStreamCommands.java index 342aa2cce3..36a5c76997 100644 --- a/src/main/java/io/lettuce/core/cluster/api/sync/NodeSelectionStreamCommands.java +++ b/src/main/java/io/lettuce/core/cluster/api/sync/NodeSelectionStreamCommands.java @@ -27,6 +27,7 @@ import io.lettuce.core.models.stream.ClaimedMessages; import io.lettuce.core.models.stream.PendingMessage; import io.lettuce.core.models.stream.PendingMessages; +import io.lettuce.core.models.stream.StreamEntryDeletionResult; /** * Synchronous executed commands on a node selection for Streams. @@ -49,6 +50,32 @@ public interface NodeSelectionStreamCommands { */ Executions xack(K key, K group, String... messageIds); + /** + * Acknowledge and delete one or more messages from the stream and consumer group. Returns detailed results for each message + * ID indicating whether it was deleted, not found, or not deleted due to pending references. + * + * @param key the stream key. + * @param group name of the consumer group. + * @param messageIds message Id's to acknowledge and delete. + * @return List of {@link StreamEntryDeletionResult} indicating the result for each message ID. + * @since 6.8 + */ + Executions> xackdel(K key, K group, String... messageIds); + + /** + * Acknowledge and delete one or more messages from the stream and consumer group with a specific deletion policy. Returns + * detailed results for each message ID indicating whether it was deleted, not found, or not deleted due to pending + * references. + * + * @param key the stream key. + * @param group name of the consumer group. + * @param policy the deletion policy to apply. + * @param messageIds message Id's to acknowledge and delete. + * @return List of {@link StreamEntryDeletionResult} indicating the result for each message ID. + * @since 6.8 + */ + Executions> xackdel(K key, K group, StreamDeletionPolicy policy, String... messageIds); + /** * Append a message to the stream {@code key}. * @@ -132,6 +159,30 @@ public interface NodeSelectionStreamCommands { */ Executions xdel(K key, String... messageIds); + /** + * Extended delete operation that removes the specified entries from the stream and returns detailed results for each + * message ID indicating whether it was deleted, not found, or not deleted due to acknowledgment status. + * + * @param key the stream key. + * @param messageIds stream message Id's. + * @return List of {@link StreamEntryDeletionResult} indicating the result for each message ID. + * @since 6.8 + */ + Executions> xdelex(K key, String... messageIds); + + /** + * Extended delete operation that removes the specified entries from the stream with a specific deletion policy and returns + * detailed results for each message ID indicating whether it was deleted, not found, or not deleted due to acknowledgment + * status. + * + * @param key the stream key. + * @param policy the deletion policy to apply. + * @param messageIds stream message Id's. + * @return List of {@link StreamEntryDeletionResult} indicating the result for each message ID. + * @since 6.8 + */ + Executions> xdelex(K key, StreamDeletionPolicy policy, String... messageIds); + /** * Create a consumer group. * diff --git a/src/main/java/io/lettuce/core/models/stream/StreamEntryDeletionResult.java b/src/main/java/io/lettuce/core/models/stream/StreamEntryDeletionResult.java new file mode 100644 index 0000000000..47a7b088bc --- /dev/null +++ b/src/main/java/io/lettuce/core/models/stream/StreamEntryDeletionResult.java @@ -0,0 +1,98 @@ +/* + * Copyright 2025-Present, Redis Ltd. and Contributors + * All rights reserved. + * + * Licensed under the MIT License. + */ +package io.lettuce.core.models.stream; + +/** + * Result of stream entry deletion operations for XDELEX and XACKDEL commands. + *

+ * Represents the outcome of attempting to delete a specific stream entry: + *

    + *
  • NOT_FOUND (-1): ID doesn't exist in stream
  • + *
  • DELETED (1): Entry was deleted/acknowledged and deleted
  • + *
  • NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED (2): Entry wasn't deleted.
  • + *
+ * + * @since 6.8 + */ +public enum StreamEntryDeletionResult { + + UNKNOWN(-2), + + /** + * The stream entry ID was not found in the stream. + */ + NOT_FOUND(-1), + + /** + * The entry was successfully deleted from the stream. + */ + DELETED(1), + + /** + * The entry was not deleted due to one of the following reasons: + *
    + *
  • For XDELEX: The entry was not acknowledged by any consumer group
  • + *
  • For XACKDEL: The entry still has pending references in other consumer groups
  • + *
+ */ + NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED(2); + + private final int code; + + StreamEntryDeletionResult(int code) { + this.code = code; + } + + /** + * Returns the numeric code associated with this result. + * + * @return the numeric code. + */ + public int getCode() { + return code; + } + + /** + * Create a {@link StreamEntryDeletionResult} from its numeric code. + * + * @param code the numeric code. + * @return the {@link StreamEntryDeletionResult}. + * @throws IllegalArgumentException if the code is unknown. + */ + public static StreamEntryDeletionResult fromCode(int code) { + switch (code) { + case -1: + return NOT_FOUND; + case 1: + return DELETED; + case 2: + return NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED; + default: + return UNKNOWN; + } + } + + /** + * Create a {@link StreamEntryDeletionResult} from a {@link Long} value. + * + * @param value the Long value, may be {@code null}. + * @return the {@link StreamEntryDeletionResult}, or {@code null} if the input is {@code null}. + * @throws IllegalArgumentException if the code is unknown. + */ + public static StreamEntryDeletionResult fromLong(Long value) { + if (value == null) { + return null; + } + return fromCode(value.intValue()); + } + + @Override + public String toString() { + return name() + "(" + code + ")"; + } + +} diff --git a/src/main/java/io/lettuce/core/output/StreamEntryDeletionResultListOutput.java b/src/main/java/io/lettuce/core/output/StreamEntryDeletionResultListOutput.java new file mode 100644 index 0000000000..d8362024f1 --- /dev/null +++ b/src/main/java/io/lettuce/core/output/StreamEntryDeletionResultListOutput.java @@ -0,0 +1,63 @@ +/* + * Copyright 2025-Present, Redis Ltd. and Contributors + * All rights reserved. + * + * Licensed under the MIT License. + */ +package io.lettuce.core.output; + +import java.util.Collections; +import java.util.List; + +import io.lettuce.core.codec.RedisCodec; +import io.lettuce.core.internal.LettuceAssert; +import io.lettuce.core.models.stream.StreamEntryDeletionResult; + +/** + * {@link List} of {@link StreamEntryDeletionResult} output for XDELEX and XACKDEL commands. + * + * @param Key type. + * @param Value type. + */ +public class StreamEntryDeletionResultListOutput extends CommandOutput> + implements StreamingOutput { + + private boolean initialized; + + private Subscriber subscriber; + + public StreamEntryDeletionResultListOutput(RedisCodec codec) { + super(codec, Collections.emptyList()); + setSubscriber(ListSubscriber.instance()); + } + + @Override + public void set(long integer) { + if (!initialized) { + multi(1); + } + + StreamEntryDeletionResult result = StreamEntryDeletionResult.fromCode((int) integer); + subscriber.onNext(output, result); + } + + @Override + public void multi(int count) { + if (!initialized) { + output = OutputFactory.newList(count); + initialized = true; + } + } + + @Override + public void setSubscriber(Subscriber subscriber) { + LettuceAssert.notNull(subscriber, "Subscriber must not be null"); + this.subscriber = subscriber; + } + + @Override + public Subscriber getSubscriber() { + return subscriber; + } + +} diff --git a/src/main/java/io/lettuce/core/protocol/CommandKeyword.java b/src/main/java/io/lettuce/core/protocol/CommandKeyword.java index 628365ca4e..c162dfb331 100644 --- a/src/main/java/io/lettuce/core/protocol/CommandKeyword.java +++ b/src/main/java/io/lettuce/core/protocol/CommandKeyword.java @@ -37,7 +37,7 @@ public enum CommandKeyword implements ProtocolKeyword { BY, BYLEX, BYSCORE, CACHING, CAT, CH, CHANNELS, COPY, COUNT, COUNTKEYSINSLOT, CONSUMERS, CREATE, DB, DELSLOTS, DELSLOTSRANGE, DELUSER, DESC, DIFF, DIFF1, DRYRUN, SOFT, HARD, ENCODING, - FAILOVER, FORGET, FIELDS, FLAGS, FLUSH, FORCE, FREQ, FLUSHSLOTS, GENPASS, GETNAME, GETUSER, GETKEYSINSLOT, GETREDIR, GROUP, GROUPS, HTSTATS, ID, IDLE, IDX, INFO, + FAILOVER, FORGET, FIELDS, FLAGS, FLUSH, FORCE, FREQ, FLUSHSLOTS, GENPASS, GETNAME, GETUSER, GETKEYSINSLOT, GETREDIR, GROUP, GROUPS, HTSTATS, ID, IDS, IDLE, IDX, INFO, IDLETIME, JUSTID, KILL, KEYSLOT, LEFT, LEN, LIMIT, LINKS, LIST, LOAD, LOG, MATCH, diff --git a/src/main/java/io/lettuce/core/protocol/CommandType.java b/src/main/java/io/lettuce/core/protocol/CommandType.java index aed6e358b8..3127ad06d9 100644 --- a/src/main/java/io/lettuce/core/protocol/CommandType.java +++ b/src/main/java/io/lettuce/core/protocol/CommandType.java @@ -101,7 +101,7 @@ public enum CommandType implements ProtocolKeyword { // Stream - XACK, XADD, XAUTOCLAIM, XCLAIM, XDEL, XGROUP, XINFO, XLEN, XPENDING, XRANGE, XREVRANGE, XREAD, XREADGROUP, XTRIM, + XACK, XACKDEL, XADD, XAUTOCLAIM, XCLAIM, XDEL, XDELEX, XGROUP, XINFO, XLEN, XPENDING, XRANGE, XREVRANGE, XREAD, XREADGROUP, XTRIM, // JSON diff --git a/src/main/templates/io/lettuce/core/api/RedisStreamCommands.java b/src/main/templates/io/lettuce/core/api/RedisStreamCommands.java index 954216a544..6e34067c14 100644 --- a/src/main/templates/io/lettuce/core/api/RedisStreamCommands.java +++ b/src/main/templates/io/lettuce/core/api/RedisStreamCommands.java @@ -27,6 +27,7 @@ import io.lettuce.core.models.stream.ClaimedMessages; import io.lettuce.core.models.stream.PendingMessage; import io.lettuce.core.models.stream.PendingMessages; +import io.lettuce.core.models.stream.StreamEntryDeletionResult; /** * ${intent} for Streams. @@ -48,6 +49,32 @@ public interface RedisStreamCommands { */ Long xack(K key, K group, String... messageIds); + /** + * Acknowledge and delete one or more messages from the stream and consumer group. Returns detailed results for each message + * ID indicating whether it was deleted, not found, or not deleted due to pending references. + * + * @param key the stream key. + * @param group name of the consumer group. + * @param messageIds message Id's to acknowledge and delete. + * @return List of {@link StreamEntryDeletionResult} indicating the result for each message ID. + * @since 6.8 + */ + List xackdel(K key, K group, String... messageIds); + + /** + * Acknowledge and delete one or more messages from the stream and consumer group with a specific deletion policy. Returns + * detailed results for each message ID indicating whether it was deleted, not found, or not deleted due to pending + * references. + * + * @param key the stream key. + * @param group name of the consumer group. + * @param policy the deletion policy to apply. + * @param messageIds message Id's to acknowledge and delete. + * @return List of {@link StreamEntryDeletionResult} indicating the result for each message ID. + * @since 6.8 + */ + List xackdel(K key, K group, StreamDeletionPolicy policy, String... messageIds); + /** * Append a message to the stream {@code key}. * @@ -131,6 +158,30 @@ public interface RedisStreamCommands { */ Long xdel(K key, String... messageIds); + /** + * Extended delete operation that removes the specified entries from the stream and returns detailed results for each + * message ID indicating whether it was deleted, not found, or not deleted due to acknowledgment status. + * + * @param key the stream key. + * @param messageIds stream message Id's. + * @return List of {@link StreamEntryDeletionResult} indicating the result for each message ID. + * @since 6.8 + */ + List xdelex(K key, String... messageIds); + + /** + * Extended delete operation that removes the specified entries from the stream with a specific deletion policy and returns + * detailed results for each message ID indicating whether it was deleted, not found, or not deleted due to acknowledgment + * status. + * + * @param key the stream key. + * @param policy the deletion policy to apply. + * @param messageIds stream message Id's. + * @return List of {@link StreamEntryDeletionResult} indicating the result for each message ID. + * @since 6.8 + */ + List xdelex(K key, StreamDeletionPolicy policy, String... messageIds); + /** * Create a consumer group. * diff --git a/src/test/java/io/lettuce/core/RedisCommandBuilderUnitTests.java b/src/test/java/io/lettuce/core/RedisCommandBuilderUnitTests.java index b6df2bd109..6c77dfed1f 100644 --- a/src/test/java/io/lettuce/core/RedisCommandBuilderUnitTests.java +++ b/src/test/java/io/lettuce/core/RedisCommandBuilderUnitTests.java @@ -1,6 +1,7 @@ package io.lettuce.core; import io.lettuce.core.codec.StringCodec; +import io.lettuce.core.models.stream.StreamEntryDeletionResult; import io.lettuce.core.protocol.Command; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -15,6 +16,7 @@ import static io.lettuce.TestTags.UNIT_TEST; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** * Unit tests for {@link RedisCommandBuilder}. @@ -32,6 +34,16 @@ class RedisCommandBuilderUnitTests { public static final String MY_FIELD3 = "hField3"; + public static final String STREAM_KEY = "test-stream"; + + public static final String GROUP_NAME = "test-group"; + + public static final String MESSAGE_ID1 = "1234567890-0"; + + public static final String MESSAGE_ID2 = "1234567891-0"; + + public static final String MESSAGE_ID3 = "1234567892-0"; + RedisCommandBuilder sut = new RedisCommandBuilder<>(StringCodec.UTF8); @Test() @@ -297,4 +309,177 @@ void shouldCorrectlyConstructBitopOne() { .isEqualTo("*6\r\n$5\r\nBITOP\r\n$3\r\nONE\r\n$4\r\ndest\r\n$4\r\nkey1\r\n$4\r\nkey2\r\n$4\r\nkey3\r\n"); } + @Test + void shouldCorrectlyConstructXackdel() { + Command> command = sut.xackdel(STREAM_KEY, GROUP_NAME, + new String[] { MESSAGE_ID1, MESSAGE_ID2 }); + ByteBuf buf = Unpooled.directBuffer(); + command.encode(buf); + + assertThat(buf.toString(StandardCharsets.UTF_8)).isEqualTo( + "*7\r\n" + "$7\r\n" + "XACKDEL\r\n" + "$11\r\n" + "test-stream\r\n" + "$10\r\n" + "test-group\r\n" + "$3\r\n" + + "IDS\r\n" + "$1\r\n" + "2\r\n" + "$12\r\n" + "1234567890-0\r\n" + "$12\r\n" + "1234567891-0\r\n"); + } + + @Test + void shouldCorrectlyConstructXackdelWithPolicy() { + Command> command = sut.xackdel(STREAM_KEY, GROUP_NAME, + StreamDeletionPolicy.KEEP_REFERENCES, new String[] { MESSAGE_ID1, MESSAGE_ID2, MESSAGE_ID3 }); + ByteBuf buf = Unpooled.directBuffer(); + command.encode(buf); + + assertThat(buf.toString(StandardCharsets.UTF_8)).isEqualTo("*9\r\n" + "$7\r\n" + "XACKDEL\r\n" + "$11\r\n" + + "test-stream\r\n" + "$10\r\n" + "test-group\r\n" + "$7\r\n" + "KEEPREF\r\n" + "$3\r\n" + "IDS\r\n" + "$1\r\n" + + "3\r\n" + "$12\r\n" + "1234567890-0\r\n" + "$12\r\n" + "1234567891-0\r\n" + "$12\r\n" + "1234567892-0\r\n"); + } + + @Test + void shouldCorrectlyConstructXackdelWithDeleteReferencesPolicy() { + Command> command = sut.xackdel(STREAM_KEY, GROUP_NAME, + StreamDeletionPolicy.DELETE_REFERENCES, new String[] { MESSAGE_ID1 }); + ByteBuf buf = Unpooled.directBuffer(); + command.encode(buf); + + assertThat(buf.toString(StandardCharsets.UTF_8)) + .isEqualTo("*7\r\n" + "$7\r\n" + "XACKDEL\r\n" + "$11\r\n" + "test-stream\r\n" + "$10\r\n" + "test-group\r\n" + + "$6\r\n" + "DELREF\r\n" + "$3\r\n" + "IDS\r\n" + "$1\r\n" + "1\r\n" + "$12\r\n" + "1234567890-0\r\n"); + } + + @Test + void shouldCorrectlyConstructXackdelWithAcknowledgedPolicy() { + Command> command = sut.xackdel(STREAM_KEY, GROUP_NAME, + StreamDeletionPolicy.ACKNOWLEDGED, new String[] { MESSAGE_ID1, MESSAGE_ID2 }); + ByteBuf buf = Unpooled.directBuffer(); + command.encode(buf); + + assertThat(buf.toString(StandardCharsets.UTF_8)).isEqualTo("*8\r\n" + "$7\r\n" + "XACKDEL\r\n" + "$11\r\n" + + "test-stream\r\n" + "$10\r\n" + "test-group\r\n" + "$5\r\n" + "ACKED\r\n" + "$3\r\n" + "IDS\r\n" + "$1\r\n" + + "2\r\n" + "$12\r\n" + "1234567890-0\r\n" + "$12\r\n" + "1234567891-0\r\n"); + } + + @Test + void shouldCorrectlyConstructXdelex() { + Command> command = sut.xdelex(STREAM_KEY, + new String[] { MESSAGE_ID1, MESSAGE_ID2 }); + ByteBuf buf = Unpooled.directBuffer(); + command.encode(buf); + + assertThat(buf.toString(StandardCharsets.UTF_8)) + .isEqualTo("*6\r\n" + "$6\r\n" + "XDELEX\r\n" + "$11\r\n" + "test-stream\r\n" + "$3\r\n" + "IDS\r\n" + "$1\r\n" + + "2\r\n" + "$12\r\n" + "1234567890-0\r\n" + "$12\r\n" + "1234567891-0\r\n"); + } + + @Test + void shouldCorrectlyConstructXdelexWithSingleMessageId() { + Command> command = sut.xdelex(STREAM_KEY, new String[] { MESSAGE_ID1 }); + ByteBuf buf = Unpooled.directBuffer(); + command.encode(buf); + + assertThat(buf.toString(StandardCharsets.UTF_8)).isEqualTo("*5\r\n" + "$6\r\n" + "XDELEX\r\n" + "$11\r\n" + + "test-stream\r\n" + "$3\r\n" + "IDS\r\n" + "$1\r\n" + "1\r\n" + "$12\r\n" + "1234567890-0\r\n"); + } + + @Test + void shouldCorrectlyConstructXdelexWithPolicy() { + Command> command = sut.xdelex(STREAM_KEY, + StreamDeletionPolicy.KEEP_REFERENCES, new String[] { MESSAGE_ID1, MESSAGE_ID2, MESSAGE_ID3 }); + ByteBuf buf = Unpooled.directBuffer(); + command.encode(buf); + + assertThat(buf.toString(StandardCharsets.UTF_8)).isEqualTo("*8\r\n" + "$6\r\n" + "XDELEX\r\n" + "$11\r\n" + + "test-stream\r\n" + "$7\r\n" + "KEEPREF\r\n" + "$3\r\n" + "IDS\r\n" + "$1\r\n" + "3\r\n" + "$12\r\n" + + "1234567890-0\r\n" + "$12\r\n" + "1234567891-0\r\n" + "$12\r\n" + "1234567892-0\r\n"); + } + + @Test + void shouldCorrectlyConstructXdelexWithDeleteReferencesPolicy() { + Command> command = sut.xdelex(STREAM_KEY, + StreamDeletionPolicy.DELETE_REFERENCES, new String[] { MESSAGE_ID1 }); + ByteBuf buf = Unpooled.directBuffer(); + command.encode(buf); + + assertThat(buf.toString(StandardCharsets.UTF_8)) + .isEqualTo("*6\r\n" + "$6\r\n" + "XDELEX\r\n" + "$11\r\n" + "test-stream\r\n" + "$6\r\n" + "DELREF\r\n" + + "$3\r\n" + "IDS\r\n" + "$1\r\n" + "1\r\n" + "$12\r\n" + "1234567890-0\r\n"); + } + + @Test + void shouldCorrectlyConstructXdelexWithAcknowledgedPolicy() { + Command> command = sut.xdelex(STREAM_KEY, + StreamDeletionPolicy.ACKNOWLEDGED, new String[] { MESSAGE_ID1, MESSAGE_ID2 }); + ByteBuf buf = Unpooled.directBuffer(); + command.encode(buf); + + assertThat(buf.toString(StandardCharsets.UTF_8)).isEqualTo( + "*7\r\n" + "$6\r\n" + "XDELEX\r\n" + "$11\r\n" + "test-stream\r\n" + "$5\r\n" + "ACKED\r\n" + "$3\r\n" + + "IDS\r\n" + "$1\r\n" + "2\r\n" + "$12\r\n" + "1234567890-0\r\n" + "$12\r\n" + "1234567891-0\r\n"); + } + + @Test + void xackdelShouldRejectNullKey() { + assertThatThrownBy(() -> sut.xackdel(null, GROUP_NAME, new String[] { MESSAGE_ID1 })) + .isInstanceOf(IllegalArgumentException.class).hasMessageContaining("Key must not be null"); + } + + @Test + void xackdelShouldRejectNullGroup() { + assertThatThrownBy(() -> sut.xackdel(STREAM_KEY, null, new String[] { MESSAGE_ID1 })) + .isInstanceOf(IllegalArgumentException.class).hasMessageContaining("Group must not be null"); + } + + @Test + void xackdelShouldRejectEmptyMessageIds() { + assertThatThrownBy(() -> sut.xackdel(STREAM_KEY, GROUP_NAME, new String[] {})) + .isInstanceOf(IllegalArgumentException.class).hasMessageContaining("MessageIds must not be empty"); + } + + @Test + void xackdelShouldRejectNullMessageIds() { + assertThatThrownBy(() -> sut.xackdel(STREAM_KEY, GROUP_NAME, (String[]) null)) + .isInstanceOf(IllegalArgumentException.class).hasMessageContaining("MessageIds must not be empty"); + } + + @Test + void xackdelShouldRejectNullElementsInMessageIds() { + assertThatThrownBy(() -> sut.xackdel(STREAM_KEY, GROUP_NAME, new String[] { MESSAGE_ID1, null, MESSAGE_ID2 })) + .isInstanceOf(IllegalArgumentException.class).hasMessageContaining("MessageIds must not contain null elements"); + } + + @Test + void xackdelWithPolicyShouldRejectNullKey() { + assertThatThrownBy( + () -> sut.xackdel(null, GROUP_NAME, StreamDeletionPolicy.KEEP_REFERENCES, new String[] { MESSAGE_ID1 })) + .isInstanceOf(IllegalArgumentException.class).hasMessageContaining("Key must not be null"); + } + + @Test + void xdelexShouldRejectNullKey() { + assertThatThrownBy(() -> sut.xdelex(null, new String[] { MESSAGE_ID1 })).isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Key must not be null"); + } + + @Test + void xdelexShouldRejectEmptyMessageIds() { + assertThatThrownBy(() -> sut.xdelex(STREAM_KEY, new String[] {})).isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("MessageIds must not be empty"); + } + + @Test + void xdelexShouldRejectNullMessageIds() { + assertThatThrownBy(() -> sut.xdelex(STREAM_KEY, (String[]) null)).isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("MessageIds must not be empty"); + } + + @Test + void xdelexShouldRejectNullElementsInMessageIds() { + assertThatThrownBy(() -> sut.xdelex(STREAM_KEY, new String[] { MESSAGE_ID1, null, MESSAGE_ID2 })) + .isInstanceOf(IllegalArgumentException.class).hasMessageContaining("MessageIds must not contain null elements"); + } + + @Test + void xdelexWithPolicyShouldRejectNullKey() { + assertThatThrownBy(() -> sut.xdelex(null, StreamDeletionPolicy.KEEP_REFERENCES, new String[] { MESSAGE_ID1 })) + .isInstanceOf(IllegalArgumentException.class).hasMessageContaining("Key must not be null"); + } + } diff --git a/src/test/java/io/lettuce/core/commands/StreamCommandIntegrationTests.java b/src/test/java/io/lettuce/core/commands/StreamCommandIntegrationTests.java index 57d613f823..5308bb132a 100644 --- a/src/test/java/io/lettuce/core/commands/StreamCommandIntegrationTests.java +++ b/src/test/java/io/lettuce/core/commands/StreamCommandIntegrationTests.java @@ -26,6 +26,7 @@ import io.lettuce.core.models.stream.ClaimedMessages; import io.lettuce.core.models.stream.PendingMessage; import io.lettuce.core.models.stream.PendingMessages; +import io.lettuce.core.models.stream.StreamEntryDeletionResult; import io.lettuce.core.output.NestedMultiOutput; import io.lettuce.core.protocol.CommandArgs; import io.lettuce.test.LettuceExtension; @@ -760,4 +761,308 @@ void xgroupSetid() { assertThat(redis.xgroupSetid(StreamOffset.latest(key), "group")).isEqualTo("OK"); } + // Redis 8.2 Stream Commands Tests + + @Test + @EnabledOnCommand("XDELEX") // Redis 8.2 + void xdelex() { + // Add some entries to the stream + String id1 = redis.xadd(key, Collections.singletonMap("field1", "value1")); + String id2 = redis.xadd(key, Collections.singletonMap("field2", "value2")); + String nonExistentId = "999999-0"; + + // Verify initial state + assertThat(redis.xlen(key)).isEqualTo(2L); + + // Test XDELEX + List results = redis.xdelex(key, id1, id2, nonExistentId); + + assertThat(results).hasSize(3); + assertThat(results.get(0)).isEqualTo(StreamEntryDeletionResult.DELETED); + assertThat(results.get(1)).isEqualTo(StreamEntryDeletionResult.DELETED); + assertThat(results.get(2)).isEqualTo(StreamEntryDeletionResult.NOT_FOUND); + + // Verify entries were deleted + assertThat(redis.xlen(key)).isEqualTo(0L); + } + + @Test + @EnabledOnCommand("XDELEX") // Redis 8.2 + void xdelexWithPolicy() { + // Add some entries to the stream + String id1 = redis.xadd(key, Collections.singletonMap("field1", "value1")); + String id2 = redis.xadd(key, Collections.singletonMap("field2", "value2")); + + // Verify initial state + assertThat(redis.xlen(key)).isEqualTo(2L); + + // Test XDELEX with KEEP_REFERENCES policy + List results = redis.xdelex(key, StreamDeletionPolicy.KEEP_REFERENCES, id1, id2); + + assertThat(results).hasSize(2); + assertThat(results.get(0)).isEqualTo(StreamEntryDeletionResult.DELETED); + assertThat(results.get(1)).isEqualTo(StreamEntryDeletionResult.DELETED); + + // Verify entries were deleted + assertThat(redis.xlen(key)).isEqualTo(0L); + } + + @Test + @EnabledOnCommand("XACKDEL") // Redis 8.2 + void xackdel() { + // Set up stream with consumer group + String groupName = "test-group"; + String consumerName = "test-consumer"; + + // Add entries to the stream + String id1 = redis.xadd(key, Collections.singletonMap("field1", "value1")); + String id2 = redis.xadd(key, Collections.singletonMap("field2", "value2")); + + // Verify initial state + assertThat(redis.xlen(key)).isEqualTo(2L); + + // Create consumer group + redis.xgroupCreate(StreamOffset.from(key, "0-0"), groupName, XGroupCreateArgs.Builder.mkstream()); + + // Read messages to create pending entries + List> messages = redis.xreadgroup(Consumer.from(groupName, consumerName), + StreamOffset.lastConsumed(key)); + + assertThat(messages).hasSize(2); + + // Test XACKDEL + List results = redis.xackdel(key, groupName, id1, id2); + + assertThat(results).hasSize(2); + assertThat(results.get(0)).isEqualTo(StreamEntryDeletionResult.DELETED); + assertThat(results.get(1)).isEqualTo(StreamEntryDeletionResult.DELETED); + + // Verify no pending messages remain + List pending = redis.xpending(key, groupName, Range.unbounded(), io.lettuce.core.Limit.from(10)); + assertThat(pending).isEmpty(); + } + + @Test + @EnabledOnCommand("XACKDEL") // Redis 8.2 + void xackdelWithPolicy() { + // Set up stream with consumer group + String groupName = "test-group"; + String consumerName = "test-consumer"; + + // Add entries to the stream + String id1 = redis.xadd(key, Collections.singletonMap("field1", "value1")); + + // Verify initial state + assertThat(redis.xlen(key)).isEqualTo(1L); + + // Create consumer group + redis.xgroupCreate(StreamOffset.from(key, "0-0"), groupName, XGroupCreateArgs.Builder.mkstream()); + + // Read message to create pending entry + redis.xreadgroup(Consumer.from(groupName, consumerName), StreamOffset.lastConsumed(key)); + + // Test XACKDEL with DELETE_REFERENCES policy + List results = redis.xackdel(key, groupName, StreamDeletionPolicy.DELETE_REFERENCES, id1); + + assertThat(results).hasSize(1); + assertThat(results.get(0)).isEqualTo(StreamEntryDeletionResult.DELETED); + } + + @Test + @EnabledOnCommand("XACKDEL") // Redis 8.2 + void xackdelNotFound() { + String groupName = "test-group"; + String nonExistentId = "999999-0"; + + // Create consumer group on empty stream + redis.xgroupCreate(StreamOffset.from(key, "0-0"), groupName, XGroupCreateArgs.Builder.mkstream()); + + // Test XACKDEL with non-existent ID + List results = redis.xackdel(key, groupName, nonExistentId); + + assertThat(results).hasSize(1); + assertThat(results.get(0)).isEqualTo(StreamEntryDeletionResult.NOT_FOUND); + } + + @Test + @EnabledOnCommand("XDELEX") // Redis 8.2 + void xdelexEmptyStream() { + String nonExistentId = "999999-0"; + + // Test XDELEX on empty stream + List results = redis.xdelex(key, nonExistentId); + + assertThat(results).hasSize(1); + assertThat(results.get(0)).isEqualTo(StreamEntryDeletionResult.NOT_FOUND); + } + + @Test + @EnabledOnCommand("XDELEX") // Redis 8.2 + void xdelexWithDelrefPolicy() { + // Add entries to the stream + String id1 = redis.xadd(key, Collections.singletonMap("field1", "value1")); + String id2 = redis.xadd(key, Collections.singletonMap("field2", "value2")); + + // Verify initial state + assertThat(redis.xlen(key)).isEqualTo(2L); + + // Test XDELEX with DELETE_REFERENCES policy + List results = redis.xdelex(key, StreamDeletionPolicy.DELETE_REFERENCES, id1, id2); + + assertThat(results).hasSize(2); + assertThat(results.get(0)).isEqualTo(StreamEntryDeletionResult.DELETED); + assertThat(results.get(1)).isEqualTo(StreamEntryDeletionResult.DELETED); + + // Verify entries were deleted + assertThat(redis.xlen(key)).isEqualTo(0L); + } + + @Test + @EnabledOnCommand("XACKDEL") // Redis 8.2 + void xackdelWithAckedPolicy() { + // Set up stream with consumer group + String groupName = "test-group"; + String consumerName = "test-consumer"; + + // Add entries to the stream + String id1 = redis.xadd(key, Collections.singletonMap("field1", "value1")); + + // Verify initial state + assertThat(redis.xlen(key)).isEqualTo(1L); + + // Create consumer group + redis.xgroupCreate(StreamOffset.from(key, "0-0"), groupName, XGroupCreateArgs.Builder.mkstream()); + + // Read message to create pending entry + redis.xreadgroup(Consumer.from(groupName, consumerName), StreamOffset.lastConsumed(key)); + + // Test XACKDEL with ACKNOWLEDGED policy on pending entry + // The ACKNOWLEDGED policy behavior: it deletes the entry from the stream and acknowledges it + List results = redis.xackdel(key, groupName, StreamDeletionPolicy.ACKNOWLEDGED, id1); + + assertThat(results).hasSize(1); + assertThat(results.get(0)).isEqualTo(StreamEntryDeletionResult.DELETED); + } + + @Test + @EnabledOnCommand("XDELEX") // Redis 8.2 + void xaddWithTrimmingMode() { + // Add initial entries to the stream + redis.xadd(key, Collections.singletonMap("field1", "value1")); + redis.xadd(key, Collections.singletonMap("field2", "value2")); + redis.xadd(key, Collections.singletonMap("field3", "value3")); + redis.xadd(key, Collections.singletonMap("field4", "value4")); + redis.xadd(key, Collections.singletonMap("field5", "value5")); + + // Verify initial state + assertThat(redis.xlen(key)).isEqualTo(5L); + + // Create consumer group and read messages to create PEL entries + redis.xgroupCreate(StreamOffset.from(key, "0-0"), "test-group", XGroupCreateArgs.Builder.mkstream()); + List> messages = redis.xreadgroup(Consumer.from("test-group", "test-consumer"), + XReadArgs.Builder.count(3), StreamOffset.lastConsumed(key)); + + assertThat(messages).hasSize(3); + + // Add new entry with maxLen=3 and KEEP_REFERENCES mode - should preserve PEL references + String newId = redis.xadd(key, XAddArgs.Builder.maxlen(3).trimmingMode(StreamDeletionPolicy.KEEP_REFERENCES), + Collections.singletonMap("field6", "value6")); + assertThat(newId).isNotNull(); + + // Stream should be trimmed to 3 entries + assertThat(redis.xlen(key)).isEqualTo(3L); + + // PEL should still contain references to read messages + PendingMessages pending = redis.xpending(key, "test-group"); + assertThat(pending.getCount()).isEqualTo(3L); + } + + @Test + @EnabledOnCommand("XDELEX") // Redis 8.2 + void xaddWithTrimmingModeDelref() { + // Add initial entries to the stream + redis.xadd(key, Collections.singletonMap("field1", "value1")); + redis.xadd(key, Collections.singletonMap("field2", "value2")); + redis.xadd(key, Collections.singletonMap("field3", "value3")); + redis.xadd(key, Collections.singletonMap("field4", "value4")); + redis.xadd(key, Collections.singletonMap("field5", "value5")); + + // Create consumer group and read messages + redis.xgroupCreate(StreamOffset.from(key, "0-0"), "test-group", XGroupCreateArgs.Builder.mkstream()); + List> messages = redis.xreadgroup(Consumer.from("test-group", "test-consumer"), + XReadArgs.Builder.count(3), StreamOffset.lastConsumed(key)); + + assertThat(messages).hasSize(3); + + // Add new entry with maxLen=3 and DELETE_REFERENCES mode - should remove PEL references + String newId = redis.xadd(key, XAddArgs.Builder.maxlen(3).trimmingMode(StreamDeletionPolicy.DELETE_REFERENCES), + Collections.singletonMap("field6", "value6")); + assertThat(newId).isNotNull(); + + // Stream should be trimmed to 3 entries + assertThat(redis.xlen(key)).isEqualTo(3L); + + // PEL should have fewer references due to DELREF policy + PendingMessages pending = redis.xpending(key, "test-group"); + assertThat(pending.getCount()).isLessThan(3L); + } + + @Test + @EnabledOnCommand("XDELEX") // Redis 8.2 + void xtrimWithTrimmingMode() { + // Add initial entries to the stream + redis.xadd(key, Collections.singletonMap("field1", "value1")); + redis.xadd(key, Collections.singletonMap("field2", "value2")); + redis.xadd(key, Collections.singletonMap("field3", "value3")); + redis.xadd(key, Collections.singletonMap("field4", "value4")); + redis.xadd(key, Collections.singletonMap("field5", "value5")); + + // Create consumer group and read messages + redis.xgroupCreate(StreamOffset.from(key, "0-0"), "test-group", XGroupCreateArgs.Builder.mkstream()); + List> messages = redis.xreadgroup(Consumer.from("test-group", "test-consumer"), + XReadArgs.Builder.count(3), StreamOffset.lastConsumed(key)); + + assertThat(messages).hasSize(3); + + // Trim with KEEP_REFERENCES mode + Long trimmed = redis.xtrim(key, XTrimArgs.Builder.maxlen(3).trimmingMode(StreamDeletionPolicy.KEEP_REFERENCES)); + assertThat(trimmed).isEqualTo(2L); + + // Stream should be trimmed to 3 entries + assertThat(redis.xlen(key)).isEqualTo(3L); + + // PEL should still contain references + PendingMessages pending = redis.xpending(key, "test-group"); + assertThat(pending.getCount()).isEqualTo(3L); + } + + @Test + @EnabledOnCommand("XDELEX") // Redis 8.2 + void xtrimWithTrimmingModeDelref() { + // Add initial entries to the stream + redis.xadd(key, Collections.singletonMap("field1", "value1")); + redis.xadd(key, Collections.singletonMap("field2", "value2")); + redis.xadd(key, Collections.singletonMap("field3", "value3")); + redis.xadd(key, Collections.singletonMap("field4", "value4")); + redis.xadd(key, Collections.singletonMap("field5", "value5")); + + // Create consumer group and read messages + redis.xgroupCreate(StreamOffset.from(key, "0-0"), "test-group", XGroupCreateArgs.Builder.mkstream()); + List> messages = redis.xreadgroup(Consumer.from("test-group", "test-consumer"), + XReadArgs.Builder.count(3), StreamOffset.lastConsumed(key)); + + assertThat(messages).hasSize(3); + + // Trim with DELETE_REFERENCES mode + Long trimmed = redis.xtrim(key, XTrimArgs.Builder.maxlen(3).trimmingMode(StreamDeletionPolicy.DELETE_REFERENCES)); + assertThat(trimmed).isEqualTo(2L); + + // Stream should be trimmed to 3 entries + assertThat(redis.xlen(key)).isEqualTo(3L); + + // PEL should have fewer references due to DELREF policy + PendingMessages pending = redis.xpending(key, "test-group"); + assertThat(pending.getCount()).isLessThan(3L); + } + } diff --git a/src/test/java/io/lettuce/core/models/stream/StreamEntryDeletionResultUnitTests.java b/src/test/java/io/lettuce/core/models/stream/StreamEntryDeletionResultUnitTests.java new file mode 100644 index 0000000000..167ab5c72d --- /dev/null +++ b/src/test/java/io/lettuce/core/models/stream/StreamEntryDeletionResultUnitTests.java @@ -0,0 +1,60 @@ +/* + * Copyright 2025-Present, Redis Ltd. and Contributors + * All rights reserved. + * + * Licensed under the MIT License. + */ +package io.lettuce.core.models.stream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import org.junit.jupiter.api.Test; + +/** + * Unit tests for {@link StreamEntryDeletionResult}. + */ +class StreamEntryDeletionResultUnitTests { + + @Test + void testFromCode() { + assertThat(StreamEntryDeletionResult.fromCode(-1)).isEqualTo(StreamEntryDeletionResult.NOT_FOUND); + assertThat(StreamEntryDeletionResult.fromCode(1)).isEqualTo(StreamEntryDeletionResult.DELETED); + assertThat(StreamEntryDeletionResult.fromCode(2)) + .isEqualTo(StreamEntryDeletionResult.NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED); + } + + @Test + void testFromCodeInvalid() { + assertThat(StreamEntryDeletionResult.fromCode(99)).isEqualTo(StreamEntryDeletionResult.UNKNOWN); + } + + @Test + void testFromLong() { + assertThat(StreamEntryDeletionResult.fromLong(-1L)).isEqualTo(StreamEntryDeletionResult.NOT_FOUND); + assertThat(StreamEntryDeletionResult.fromLong(1L)).isEqualTo(StreamEntryDeletionResult.DELETED); + assertThat(StreamEntryDeletionResult.fromLong(2L)) + .isEqualTo(StreamEntryDeletionResult.NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED); + } + + @Test + void testFromLongNull() { + assertThat(StreamEntryDeletionResult.fromLong(null)).isNull(); + } + + @Test + void testGetCode() { + assertThat(StreamEntryDeletionResult.NOT_FOUND.getCode()).isEqualTo(-1); + assertThat(StreamEntryDeletionResult.DELETED.getCode()).isEqualTo(1); + assertThat(StreamEntryDeletionResult.NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED.getCode()).isEqualTo(2); + } + + @Test + void testToString() { + assertThat(StreamEntryDeletionResult.NOT_FOUND.toString()).isEqualTo("NOT_FOUND(-1)"); + assertThat(StreamEntryDeletionResult.DELETED.toString()).isEqualTo("DELETED(1)"); + assertThat(StreamEntryDeletionResult.NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED.toString()) + .isEqualTo("NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED(2)"); + } + +} diff --git a/src/test/java/io/lettuce/core/output/StreamEntryDeletionResultListOutputUnitTests.java b/src/test/java/io/lettuce/core/output/StreamEntryDeletionResultListOutputUnitTests.java new file mode 100644 index 0000000000..e3ccf734d7 --- /dev/null +++ b/src/test/java/io/lettuce/core/output/StreamEntryDeletionResultListOutputUnitTests.java @@ -0,0 +1,90 @@ +/* + * Copyright 2025-Present, Redis Ltd. and Contributors + * All rights reserved. + * + * Licensed under the MIT License. + */ +package io.lettuce.core.output; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Collection; +import java.util.List; + +import org.junit.jupiter.api.Test; + +import io.lettuce.core.codec.StringCodec; +import io.lettuce.core.models.stream.StreamEntryDeletionResult; + +/** + * Unit tests for {@link StreamEntryDeletionResultListOutput}. + */ +class StreamEntryDeletionResultListOutputUnitTests { + + private StreamEntryDeletionResultListOutput output = new StreamEntryDeletionResultListOutput<>( + StringCodec.UTF8); + + @Test + void shouldParseEmptyList() { + output.multi(0); + + List result = output.get(); + assertThat(result).isEmpty(); + } + + @Test + void shouldParseSingleResult() { + output.multi(1); + output.set(1L); // DELETED + + List result = output.get(); + assertThat(result).hasSize(1); + assertThat(result.get(0)).isEqualTo(StreamEntryDeletionResult.DELETED); + } + + @Test + void shouldParseMultipleResults() { + output.multi(3); + output.set(-1L); // NOT_FOUND + output.set(1L); // DELETED + output.set(2L); // NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED + + List result = output.get(); + assertThat(result).hasSize(3); + assertThat(result.get(0)).isEqualTo(StreamEntryDeletionResult.NOT_FOUND); + assertThat(result.get(1)).isEqualTo(StreamEntryDeletionResult.DELETED); + assertThat(result.get(2)).isEqualTo(StreamEntryDeletionResult.NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED); + } + + @Test + void shouldHandleStreamingOutput() { + TestSubscriber subscriber = new TestSubscriber(); + output.setSubscriber(subscriber); + + output.multi(2); + output.set(1L); + output.set(-1L); + + assertThat(subscriber.results).hasSize(2); + assertThat(subscriber.results.get(0)).isEqualTo(StreamEntryDeletionResult.DELETED); + assertThat(subscriber.results.get(1)).isEqualTo(StreamEntryDeletionResult.NOT_FOUND); + } + + private static class TestSubscriber extends StreamingOutput.Subscriber { + + private final List results = new java.util.ArrayList<>(); + + @Override + public void onNext(StreamEntryDeletionResult item) { + results.add(item); + } + + @Override + public void onNext(Collection outputTarget, StreamEntryDeletionResult item) { + results.add(item); + outputTarget.add(item); + } + + } + +}