Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.lettuce.core.models.stream.ClaimedMessages;
import io.lettuce.core.models.stream.PendingMessage;
import io.lettuce.core.models.stream.PendingMessages;
import io.lettuce.core.models.stream.StreamEntryDeletionResult;
import io.lettuce.core.output.CommandOutput;
import io.lettuce.core.output.KeyStreamingChannel;
import io.lettuce.core.output.KeyValueStreamingChannel;
Expand Down Expand Up @@ -2708,6 +2709,17 @@ public RedisFuture<Long> xack(K key, K group, String... messageIds) {
return dispatch(commandBuilder.xack(key, group, messageIds));
}

@Override
public RedisFuture<List<StreamEntryDeletionResult>> xackdel(K key, K group, String... messageIds) {
return dispatch(commandBuilder.xackdel(key, group, messageIds));
}

@Override
public RedisFuture<List<StreamEntryDeletionResult>> xackdel(K key, K group, StreamDeletionPolicy policy,
String... messageIds) {
return dispatch(commandBuilder.xackdel(key, group, policy, messageIds));
}

@Override
public RedisFuture<String> xadd(K key, Map<K, V> body) {
return dispatch(commandBuilder.xadd(key, null, body));
Expand Down Expand Up @@ -2748,6 +2760,16 @@ public RedisFuture<Long> xdel(K key, String... messageIds) {
return dispatch(commandBuilder.xdel(key, messageIds));
}

@Override
public RedisFuture<List<StreamEntryDeletionResult>> xdelex(K key, String... messageIds) {
return dispatch(commandBuilder.xdelex(key, messageIds));
}

@Override
public RedisFuture<List<StreamEntryDeletionResult>> xdelex(K key, StreamDeletionPolicy policy, String... messageIds) {
return dispatch(commandBuilder.xdelex(key, policy, messageIds));
}

@Override
public RedisFuture<String> xgroupCreate(XReadArgs.StreamOffset<K> offset, K group) {
return dispatch(commandBuilder.xgroupCreate(offset, group, null));
Expand Down
21 changes: 21 additions & 0 deletions src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.lettuce.core.models.stream.ClaimedMessages;
import io.lettuce.core.models.stream.PendingMessage;
import io.lettuce.core.models.stream.PendingMessages;
import io.lettuce.core.models.stream.StreamEntryDeletionResult;
import io.lettuce.core.output.CommandOutput;
import io.lettuce.core.output.KeyStreamingChannel;
import io.lettuce.core.output.KeyValueStreamingChannel;
Expand Down Expand Up @@ -2790,6 +2791,16 @@ public Mono<Long> xack(K key, K group, String... messageIds) {
return createMono(() -> commandBuilder.xack(key, group, messageIds));
}

@Override
public Flux<StreamEntryDeletionResult> xackdel(K key, K group, String... messageIds) {
return createDissolvingFlux(() -> commandBuilder.xackdel(key, group, messageIds));
}

@Override
public Flux<StreamEntryDeletionResult> xackdel(K key, K group, StreamDeletionPolicy policy, String... messageIds) {
return createDissolvingFlux(() -> commandBuilder.xackdel(key, group, policy, messageIds));
}

@Override
public Mono<String> xadd(K key, Map<K, V> body) {
return createMono(() -> commandBuilder.xadd(key, null, body));
Expand Down Expand Up @@ -2831,6 +2842,16 @@ public Mono<Long> xdel(K key, String... messageIds) {
return createMono(() -> commandBuilder.xdel(key, messageIds));
}

@Override
public Flux<StreamEntryDeletionResult> xdelex(K key, String... messageIds) {
return createDissolvingFlux(() -> commandBuilder.xdelex(key, messageIds));
}

@Override
public Flux<StreamEntryDeletionResult> xdelex(K key, StreamDeletionPolicy policy, String... messageIds) {
return createDissolvingFlux(() -> commandBuilder.xdelex(key, policy, messageIds));
}

@Override
public Mono<String> xgroupCreate(XReadArgs.StreamOffset<K> streamOffset, K group) {
return createMono(() -> commandBuilder.xgroupCreate(streamOffset, group, null));
Expand Down
52 changes: 52 additions & 0 deletions src/main/java/io/lettuce/core/RedisCommandBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import io.lettuce.core.models.stream.ClaimedMessages;
import io.lettuce.core.models.stream.PendingMessage;
import io.lettuce.core.models.stream.PendingMessages;
import io.lettuce.core.models.stream.StreamEntryDeletionResult;
import io.lettuce.core.output.*;
import io.lettuce.core.output.StreamEntryDeletionResultListOutput;
import io.lettuce.core.protocol.BaseRedisCommandBuilder;
import io.lettuce.core.protocol.Command;
import io.lettuce.core.protocol.CommandArgs;
Expand Down Expand Up @@ -3159,6 +3161,32 @@ public Command<K, V, Long> xack(K key, K group, String[] messageIds) {
return createCommand(XACK, new IntegerOutput<>(codec), args);
}

public Command<K, V, List<StreamEntryDeletionResult>> xackdel(K key, K group, String[] messageIds) {
return xackdel(key, group, null, messageIds);
}

public Command<K, V, List<StreamEntryDeletionResult>> xackdel(K key, K group, StreamDeletionPolicy policy,
String[] messageIds) {
notNullKey(key);
LettuceAssert.notNull(group, "Group " + MUST_NOT_BE_NULL);
LettuceAssert.notEmpty(messageIds, "MessageIds " + MUST_NOT_BE_EMPTY);
LettuceAssert.noNullElements(messageIds, "MessageIds " + MUST_NOT_CONTAIN_NULL_ELEMENTS);

CommandArgs<K, V> args = new CommandArgs<>(codec).addKey(key).addKey(group);

if (policy != null) {
args.add(policy);
}

args.add(CommandKeyword.IDS).add(messageIds.length);

for (String messageId : messageIds) {
args.add(messageId);
}

return createCommand(XACKDEL, new StreamEntryDeletionResultListOutput<>(codec), args);
}

public Command<K, V, ClaimedMessages<K, V>> xautoclaim(K key, XAutoClaimArgs<K> xAutoClaimArgs) {
notNullKey(key);
LettuceAssert.notNull(xAutoClaimArgs, "XAutoClaimArgs " + MUST_NOT_BE_NULL);
Expand Down Expand Up @@ -3243,6 +3271,30 @@ public Command<K, V, Long> xdel(K key, String[] messageIds) {
return createCommand(XDEL, new IntegerOutput<>(codec), args);
}

public Command<K, V, List<StreamEntryDeletionResult>> xdelex(K key, String[] messageIds) {
return xdelex(key, null, messageIds);
}

public Command<K, V, List<StreamEntryDeletionResult>> xdelex(K key, StreamDeletionPolicy policy, String[] messageIds) {
notNullKey(key);
LettuceAssert.notEmpty(messageIds, "MessageIds " + MUST_NOT_BE_EMPTY);
LettuceAssert.noNullElements(messageIds, "MessageIds " + MUST_NOT_CONTAIN_NULL_ELEMENTS);

CommandArgs<K, V> args = new CommandArgs<>(codec).addKey(key);

if (policy != null) {
args.add(policy);
}

args.add(CommandKeyword.IDS).add(messageIds.length);

for (String messageId : messageIds) {
args.add(messageId);
}

return createCommand(XDELEX, new StreamEntryDeletionResultListOutput<>(codec), args);
}

public Command<K, V, String> xgroupCreate(StreamOffset<K> offset, K group, XGroupCreateArgs commandArgs) {
LettuceAssert.notNull(offset, "StreamOffset " + MUST_NOT_BE_NULL);
LettuceAssert.notNull(group, "Group " + MUST_NOT_BE_NULL);
Expand Down
46 changes: 46 additions & 0 deletions src/main/java/io/lettuce/core/StreamDeletionPolicy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2025-Present, Redis Ltd. and Contributors
* All rights reserved.
*
* Licensed under the MIT License.
*/
package io.lettuce.core;

import io.lettuce.core.protocol.ProtocolKeyword;

import java.nio.charset.StandardCharsets;

/**
* Deletion policy for stream commands that handle consumer group references. Used with XDELEX, XACKDEL, and enhanced XADD/XTRIM
* commands.
*/
public enum StreamDeletionPolicy implements ProtocolKeyword {

/**
* Preserves existing references to entries in all consumer groups' PEL. This is the default behavior similar to XDEL.
*/
KEEP_REFERENCES("KEEPREF"),

/**
* Removes all references to entries from all consumer groups' pending entry lists, effectively cleaning up all traces of
* the messages.
*/
DELETE_REFERENCES("DELREF"),

/**
* Only operates on entries that were read and acknowledged by all consumer groups.
*/
ACKNOWLEDGED("ACKED");

public final byte[] bytes;

StreamDeletionPolicy(String redisParamName) {
bytes = redisParamName.getBytes(StandardCharsets.US_ASCII);
}

@Override
public byte[] getBytes() {
return bytes;
}

}
18 changes: 18 additions & 0 deletions src/main/java/io/lettuce/core/XAddArgs.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public class XAddArgs implements CompositeArgument {

private Long limit;

private StreamDeletionPolicy trimmingMode;

/**
* Builder entry points for {@link XAddArgs}.
*/
Expand Down Expand Up @@ -155,6 +157,18 @@ public XAddArgs limit(long limit) {
return this;
}

/**
* When trimming, defines desired behaviour for handling consumer group references. See {@link StreamDeletionPolicy} for
* details.
*
* @param trimmingMode the deletion policy to apply during trimming.
* @return {@code this}
*/
public XAddArgs trimmingMode(StreamDeletionPolicy trimmingMode) {
this.trimmingMode = trimmingMode;
return this;
}

/**
* Apply efficient trimming for capped streams using the {@code ~} flag.
*
Expand Down Expand Up @@ -253,6 +267,10 @@ public <K, V> void build(CommandArgs<K, V> args) {
args.add(CommandKeyword.LIMIT).add(limit);
}

if (trimmingMode != null) {
args.add(trimmingMode);
}

if (nomkstream) {
args.add(CommandKeyword.NOMKSTREAM);
}
Expand Down
18 changes: 18 additions & 0 deletions src/main/java/io/lettuce/core/XTrimArgs.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public class XTrimArgs implements CompositeArgument {

private Long limit;

private StreamDeletionPolicy trimmingMode;

/**
* Builder entry points for {@link XTrimArgs}.
*/
Expand Down Expand Up @@ -164,6 +166,18 @@ public XTrimArgs exactTrimming(boolean exactTrimming) {
return this;
}

/**
* Defines desired behaviour for handling consumer group references during trimming. See {@link StreamDeletionPolicy} for
* details.
*
* @param trimmingMode the deletion policy to apply during trimming.
* @return {@code this}
*/
public XTrimArgs trimmingMode(StreamDeletionPolicy trimmingMode) {
this.trimmingMode = trimmingMode;
return this;
}

@Override
public <K, V> void build(CommandArgs<K, V> args) {

Expand Down Expand Up @@ -193,6 +207,10 @@ public <K, V> void build(CommandArgs<K, V> args) {
if (limit != null && approximateTrimming) {
args.add(CommandKeyword.LIMIT).add(limit);
}

if (trimmingMode != null) {
args.add(trimmingMode);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.lettuce.core.models.stream.ClaimedMessages;
import io.lettuce.core.models.stream.PendingMessage;
import io.lettuce.core.models.stream.PendingMessages;
import io.lettuce.core.models.stream.StreamEntryDeletionResult;

/**
* Asynchronous executed commands for Streams.
Expand All @@ -49,6 +50,32 @@ public interface RedisStreamAsyncCommands<K, V> {
*/
RedisFuture<Long> xack(K key, K group, String... messageIds);

/**
* Acknowledge and delete one or more messages from the stream and consumer group. Returns detailed results for each message
* ID indicating whether it was deleted, not found, or not deleted due to pending references.
*
* @param key the stream key.
* @param group name of the consumer group.
* @param messageIds message Id's to acknowledge and delete.
* @return List of {@link StreamEntryDeletionResult} indicating the result for each message ID.
* @since 6.8
*/
RedisFuture<List<StreamEntryDeletionResult>> xackdel(K key, K group, String... messageIds);

/**
* Acknowledge and delete one or more messages from the stream and consumer group with a specific deletion policy. Returns
* detailed results for each message ID indicating whether it was deleted, not found, or not deleted due to pending
* references.
*
* @param key the stream key.
* @param group name of the consumer group.
* @param policy the deletion policy to apply.
* @param messageIds message Id's to acknowledge and delete.
* @return List of {@link StreamEntryDeletionResult} indicating the result for each message ID.
* @since 6.8
*/
RedisFuture<List<StreamEntryDeletionResult>> xackdel(K key, K group, StreamDeletionPolicy policy, String... messageIds);

/**
* Append a message to the stream {@code key}.
*
Expand Down Expand Up @@ -132,6 +159,30 @@ public interface RedisStreamAsyncCommands<K, V> {
*/
RedisFuture<Long> xdel(K key, String... messageIds);

/**
* Extended delete operation that removes the specified entries from the stream and returns detailed results for each
* message ID indicating whether it was deleted, not found, or not deleted due to acknowledgment status.
*
* @param key the stream key.
* @param messageIds stream message Id's.
* @return List of {@link StreamEntryDeletionResult} indicating the result for each message ID.
* @since 6.8
*/
RedisFuture<List<StreamEntryDeletionResult>> xdelex(K key, String... messageIds);

/**
* Extended delete operation that removes the specified entries from the stream with a specific deletion policy and returns
* detailed results for each message ID indicating whether it was deleted, not found, or not deleted due to acknowledgment
* status.
*
* @param key the stream key.
* @param policy the deletion policy to apply.
* @param messageIds stream message Id's.
* @return List of {@link StreamEntryDeletionResult} indicating the result for each message ID.
* @since 6.8
*/
RedisFuture<List<StreamEntryDeletionResult>> xdelex(K key, StreamDeletionPolicy policy, String... messageIds);

/**
* Create a consumer group.
*
Expand Down
Loading