Skip to content

Commit db8d397

Browse files
uglideggivo
andauthored
Add support for new stream commands (#4211)
* Add support for new stream commands - Add support for xackdel and xdelex - Extend xadd and xtrim - Add relevant tests * Clean up tests * Rename StreamTrimMode and StreamTrimResult StreamTrimMode -> StreamDeletionPolicy StreamTrimResult -> StreamEntryDeletionResult * Fix formatting in new files * fix doc comments formatting --------- Co-authored-by: ggivo <[email protected]>
1 parent 3a2c704 commit db8d397

24 files changed

+2274
-6
lines changed

pom.xml

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -335,8 +335,18 @@
335335
<configuration>
336336
<configFile>${project.basedir}/hbase-formatter.xml</configFile>
337337
<directories>
338-
<directory>${project.basedir}/src/main/java/redis/clients/jedis/annots</directory>
338+
<directory>${project.basedir}</directory>
339339
</directories>
340+
<includes>
341+
<!-- Specific files -->
342+
<include>src/main/java/redis/clients/jedis/annots/*.java</include>
343+
<include>src/main/java/redis/clients/jedis/resps/StreamEntryDeletionResult.java</include>
344+
<include>src/main/java/redis/clients/jedisargs/StreamDeletionPolicy.java</include>
345+
<include>src/test/java/redis/clients/jedis/commands/StreamsCommandsTestBase.java</include>
346+
<include>src/test/java/redis/clients/jedis/commands/jedis/ClusterStreamsCommandsTest.java</include>
347+
<include>src/test/java/redis/clients/jedis/commands/jedis/PooledStreamsCommandsTest.java</include>
348+
<include>src/test/java/redis/clients/jedis/resps/StreamEntryDeletionResultTest.java</include>
349+
</includes>
340350
</configuration>
341351
<executions>
342352
<execution>

src/main/java/redis/clients/jedis/BuilderFactory.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1261,6 +1261,42 @@ public List<StreamEntryID> build(Object data) {
12611261
}
12621262
};
12631263

1264+
public static final Builder<StreamEntryDeletionResult> STREAM_ENTRY_DELETION_RESULT = new Builder<StreamEntryDeletionResult>() {
1265+
@Override
1266+
public StreamEntryDeletionResult build(Object data) {
1267+
if (data == null) {
1268+
return null;
1269+
}
1270+
return StreamEntryDeletionResult.fromLong((Long) data);
1271+
}
1272+
1273+
@Override
1274+
public String toString() {
1275+
return "StreamEntryDeletionResult";
1276+
}
1277+
};
1278+
1279+
public static final Builder<List<StreamEntryDeletionResult>> STREAM_ENTRY_DELETION_RESULT_LIST = new Builder<List<StreamEntryDeletionResult>>() {
1280+
@Override
1281+
@SuppressWarnings("unchecked")
1282+
public List<StreamEntryDeletionResult> build(Object data) {
1283+
if (data == null) {
1284+
return null;
1285+
}
1286+
List<Object> objectList = (List<Object>) data;
1287+
List<StreamEntryDeletionResult> responses = new ArrayList<>(objectList.size());
1288+
for (Object object : objectList) {
1289+
responses.add(STREAM_ENTRY_DELETION_RESULT.build(object));
1290+
}
1291+
return responses;
1292+
}
1293+
1294+
@Override
1295+
public String toString() {
1296+
return "List<StreamEntryDeletionResult>";
1297+
}
1298+
};
1299+
12641300
public static final Builder<StreamEntry> STREAM_ENTRY = new Builder<StreamEntry>() {
12651301
@Override
12661302
@SuppressWarnings("unchecked")

src/main/java/redis/clients/jedis/CommandObjects.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2626,10 +2626,26 @@ public final CommandObject<Long> xack(String key, String group, StreamEntryID...
26262626
return new CommandObject<>(commandArguments(XACK).key(key).add(group).addObjects((Object[]) ids), BuilderFactory.LONG);
26272627
}
26282628

2629+
public final CommandObject<List<StreamEntryDeletionResult>> xackdel(String key, String group, StreamEntryID... ids) {
2630+
return new CommandObject<>(commandArguments(XACKDEL).key(key).add(group).add("IDS").add(ids.length).addObjects((Object[]) ids), BuilderFactory.STREAM_ENTRY_DELETION_RESULT_LIST);
2631+
}
2632+
2633+
public final CommandObject<List<StreamEntryDeletionResult>> xackdel(String key, String group, StreamDeletionPolicy trimMode, StreamEntryID... ids) {
2634+
return new CommandObject<>(commandArguments(XACKDEL).key(key).add(group).add(trimMode).add("IDS").add(ids.length).addObjects((Object[]) ids), BuilderFactory.STREAM_ENTRY_DELETION_RESULT_LIST);
2635+
}
2636+
26292637
public final CommandObject<Long> xack(byte[] key, byte[] group, byte[]... ids) {
26302638
return new CommandObject<>(commandArguments(XACK).key(key).add(group).addObjects((Object[]) ids), BuilderFactory.LONG);
26312639
}
26322640

2641+
public final CommandObject<List<StreamEntryDeletionResult>> xackdel(byte[] key, byte[] group, byte[]... ids) {
2642+
return new CommandObject<>(commandArguments(XACKDEL).key(key).add(group).add("IDS").add(ids.length).addObjects((Object[]) ids), BuilderFactory.STREAM_ENTRY_DELETION_RESULT_LIST);
2643+
}
2644+
2645+
public final CommandObject<List<StreamEntryDeletionResult>> xackdel(byte[] key, byte[] group, StreamDeletionPolicy trimMode, byte[]... ids) {
2646+
return new CommandObject<>(commandArguments(XACKDEL).key(key).add(group).add(trimMode).add("IDS").add(ids.length).addObjects((Object[]) ids), BuilderFactory.STREAM_ENTRY_DELETION_RESULT_LIST);
2647+
}
2648+
26332649
public final CommandObject<String> xgroupCreate(String key, String groupName, StreamEntryID id, boolean makeStream) {
26342650
CommandArguments args = commandArguments(XGROUP).add(CREATE).key(key)
26352651
.add(groupName).add(id == null ? "0-0" : id);
@@ -2687,6 +2703,14 @@ public final CommandObject<Long> xdel(String key, StreamEntryID... ids) {
26872703
return new CommandObject<>(commandArguments(XDEL).key(key).addObjects((Object[]) ids), BuilderFactory.LONG);
26882704
}
26892705

2706+
public final CommandObject<List<StreamEntryDeletionResult>> xdelex(String key, StreamEntryID... ids) {
2707+
return new CommandObject<>(commandArguments(XDELEX).key(key).add("IDS").add(ids.length).addObjects((Object[]) ids), BuilderFactory.STREAM_ENTRY_DELETION_RESULT_LIST);
2708+
}
2709+
2710+
public final CommandObject<List<StreamEntryDeletionResult>> xdelex(String key, StreamDeletionPolicy trimMode, StreamEntryID... ids) {
2711+
return new CommandObject<>(commandArguments(XDELEX).key(key).add(trimMode).add("IDS").add(ids.length).addObjects((Object[]) ids), BuilderFactory.STREAM_ENTRY_DELETION_RESULT_LIST);
2712+
}
2713+
26902714
public final CommandObject<Long> xtrim(String key, long maxLen, boolean approximate) {
26912715
CommandArguments args = commandArguments(XTRIM).key(key).add(MAXLEN);
26922716
if (approximate) args.add(Protocol.BYTES_TILDE);
@@ -2702,6 +2726,14 @@ public final CommandObject<Long> xdel(byte[] key, byte[]... ids) {
27022726
return new CommandObject<>(commandArguments(XDEL).key(key).addObjects((Object[]) ids), BuilderFactory.LONG);
27032727
}
27042728

2729+
public final CommandObject<List<StreamEntryDeletionResult>> xdelex(byte[] key, byte[]... ids) {
2730+
return new CommandObject<>(commandArguments(XDELEX).key(key).add("IDS").add(ids.length).addObjects((Object[]) ids), BuilderFactory.STREAM_ENTRY_DELETION_RESULT_LIST);
2731+
}
2732+
2733+
public final CommandObject<List<StreamEntryDeletionResult>> xdelex(byte[] key, StreamDeletionPolicy trimMode, byte[]... ids) {
2734+
return new CommandObject<>(commandArguments(XDELEX).key(key).add(trimMode).add("IDS").add(ids.length).addObjects((Object[]) ids), BuilderFactory.STREAM_ENTRY_DELETION_RESULT_LIST);
2735+
}
2736+
27052737
public final CommandObject<Long> xtrim(byte[] key, long maxLen, boolean approximateLength) {
27062738
CommandArguments args = commandArguments(XTRIM).key(key).add(MAXLEN);
27072739
if (approximateLength) args.add(Protocol.BYTES_TILDE);

src/main/java/redis/clients/jedis/Jedis.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4866,6 +4866,18 @@ public long xack(byte[] key, byte[] group, byte[]... ids) {
48664866
return connection.executeCommand(commandObjects.xack(key, group, ids));
48674867
}
48684868

4869+
@Override
4870+
public List<StreamEntryDeletionResult> xackdel(byte[] key, byte[] group, byte[]... ids) {
4871+
checkIsInMultiOrPipeline();
4872+
return connection.executeCommand(commandObjects.xackdel(key, group, ids));
4873+
}
4874+
4875+
@Override
4876+
public List<StreamEntryDeletionResult> xackdel(byte[] key, byte[] group, StreamDeletionPolicy trimMode, byte[]... ids) {
4877+
checkIsInMultiOrPipeline();
4878+
return connection.executeCommand(commandObjects.xackdel(key, group, trimMode, ids));
4879+
}
4880+
48694881
@Override
48704882
public String xgroupCreate(byte[] key, byte[] consumer, byte[] id, boolean makeStream) {
48714883
checkIsInMultiOrPipeline();
@@ -4902,6 +4914,18 @@ public long xdel(byte[] key, byte[]... ids) {
49024914
return connection.executeCommand(commandObjects.xdel(key, ids));
49034915
}
49044916

4917+
@Override
4918+
public List<StreamEntryDeletionResult> xdelex(byte[] key, byte[]... ids) {
4919+
checkIsInMultiOrPipeline();
4920+
return connection.executeCommand(commandObjects.xdelex(key, ids));
4921+
}
4922+
4923+
@Override
4924+
public List<StreamEntryDeletionResult> xdelex(byte[] key, StreamDeletionPolicy trimMode, byte[]... ids) {
4925+
checkIsInMultiOrPipeline();
4926+
return connection.executeCommand(commandObjects.xdelex(key, trimMode, ids));
4927+
}
4928+
49054929
@Override
49064930
public long xtrim(byte[] key, long maxLen, boolean approximateLength) {
49074931
checkIsInMultiOrPipeline();
@@ -9677,6 +9701,18 @@ public long xack(final String key, final String group, final StreamEntryID... id
96779701
return connection.executeCommand(commandObjects.xack(key, group, ids));
96789702
}
96799703

9704+
@Override
9705+
public List<StreamEntryDeletionResult> xackdel(final String key, final String group, final StreamEntryID... ids) {
9706+
checkIsInMultiOrPipeline();
9707+
return connection.executeCommand(commandObjects.xackdel(key, group, ids));
9708+
}
9709+
9710+
@Override
9711+
public List<StreamEntryDeletionResult> xackdel(final String key, final String group, final StreamDeletionPolicy trimMode, final StreamEntryID... ids) {
9712+
checkIsInMultiOrPipeline();
9713+
return connection.executeCommand(commandObjects.xackdel(key, group, trimMode, ids));
9714+
}
9715+
96809716
@Override
96819717
public String xgroupCreate(final String key, final String groupName, final StreamEntryID id,
96829718
final boolean makeStream) {
@@ -9714,6 +9750,18 @@ public long xdel(final String key, final StreamEntryID... ids) {
97149750
return connection.executeCommand(commandObjects.xdel(key, ids));
97159751
}
97169752

9753+
@Override
9754+
public List<StreamEntryDeletionResult> xdelex(final String key, final StreamEntryID... ids) {
9755+
checkIsInMultiOrPipeline();
9756+
return connection.executeCommand(commandObjects.xdelex(key, ids));
9757+
}
9758+
9759+
@Override
9760+
public List<StreamEntryDeletionResult> xdelex(final String key, final StreamDeletionPolicy trimMode, final StreamEntryID... ids) {
9761+
checkIsInMultiOrPipeline();
9762+
return connection.executeCommand(commandObjects.xdelex(key, trimMode, ids));
9763+
}
9764+
97179765
@Override
97189766
public long xtrim(final String key, final long maxLen, final boolean approximateLength) {
97199767
checkIsInMultiOrPipeline();

src/main/java/redis/clients/jedis/PipeliningBase.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1552,6 +1552,16 @@ public Response<Long> xack(String key, String group, StreamEntryID... ids) {
15521552
return appendCommand(commandObjects.xack(key, group, ids));
15531553
}
15541554

1555+
@Override
1556+
public Response<List<StreamEntryDeletionResult>> xackdel(String key, String group, StreamEntryID... ids) {
1557+
return appendCommand(commandObjects.xackdel(key, group, ids));
1558+
}
1559+
1560+
@Override
1561+
public Response<List<StreamEntryDeletionResult>> xackdel(String key, String group, StreamDeletionPolicy trimMode, StreamEntryID... ids) {
1562+
return appendCommand(commandObjects.xackdel(key, group, trimMode, ids));
1563+
}
1564+
15551565
@Override
15561566
public Response<String> xgroupCreate(String key, String groupName, StreamEntryID id, boolean makeStream) {
15571567
return appendCommand(commandObjects.xgroupCreate(key, groupName, id, makeStream));
@@ -1592,6 +1602,16 @@ public Response<Long> xdel(String key, StreamEntryID... ids) {
15921602
return appendCommand(commandObjects.xdel(key, ids));
15931603
}
15941604

1605+
@Override
1606+
public Response<List<StreamEntryDeletionResult>> xdelex(String key, StreamEntryID... ids) {
1607+
return appendCommand(commandObjects.xdelex(key, ids));
1608+
}
1609+
1610+
@Override
1611+
public Response<List<StreamEntryDeletionResult>> xdelex(String key, StreamDeletionPolicy trimMode, StreamEntryID... ids) {
1612+
return appendCommand(commandObjects.xdelex(key, trimMode, ids));
1613+
}
1614+
15951615
@Override
15961616
public Response<Long> xtrim(String key, long maxLen, boolean approximate) {
15971617
return appendCommand(commandObjects.xtrim(key, maxLen, approximate));
@@ -3264,6 +3284,16 @@ public Response<Long> xack(byte[] key, byte[] group, byte[]... ids) {
32643284
return appendCommand(commandObjects.xack(key, group, ids));
32653285
}
32663286

3287+
@Override
3288+
public Response<List<StreamEntryDeletionResult>> xackdel(byte[] key, byte[] group, byte[]... ids) {
3289+
return appendCommand(commandObjects.xackdel(key, group, ids));
3290+
}
3291+
3292+
@Override
3293+
public Response<List<StreamEntryDeletionResult>> xackdel(byte[] key, byte[] group, StreamDeletionPolicy trimMode, byte[]... ids) {
3294+
return appendCommand(commandObjects.xackdel(key, group, trimMode, ids));
3295+
}
3296+
32673297
@Override
32683298
public Response<String> xgroupCreate(byte[] key, byte[] groupName, byte[] id, boolean makeStream) {
32693299
return appendCommand(commandObjects.xgroupCreate(key, groupName, id, makeStream));
@@ -3294,6 +3324,16 @@ public Response<Long> xdel(byte[] key, byte[]... ids) {
32943324
return appendCommand(commandObjects.xdel(key, ids));
32953325
}
32963326

3327+
@Override
3328+
public Response<List<StreamEntryDeletionResult>> xdelex(byte[] key, byte[]... ids) {
3329+
return appendCommand(commandObjects.xdelex(key, ids));
3330+
}
3331+
3332+
@Override
3333+
public Response<List<StreamEntryDeletionResult>> xdelex(byte[] key, StreamDeletionPolicy trimMode, byte[]... ids) {
3334+
return appendCommand(commandObjects.xdelex(key, trimMode, ids));
3335+
}
3336+
32973337
@Override
32983338
public Response<Long> xtrim(byte[] key, long maxLen, boolean approximateLength) {
32993339
return appendCommand(commandObjects.xtrim(key, maxLen, approximateLength));

src/main/java/redis/clients/jedis/Protocol.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ public static enum Command implements ProtocolCommand {
305305
GEORADIUSBYMEMBER, GEORADIUSBYMEMBER_RO, // <-- geo
306306
PFADD, PFCOUNT, PFMERGE, // <-- hyper log log
307307
XADD, XLEN, XDEL, XTRIM, XRANGE, XREVRANGE, XREAD, XACK, XGROUP, XREADGROUP, XPENDING, XCLAIM,
308-
XAUTOCLAIM, XINFO, // <-- stream
308+
XAUTOCLAIM, XINFO, XDELEX, XACKDEL, // <-- stream
309309
EVAL, EVALSHA, SCRIPT, EVAL_RO, EVALSHA_RO, FUNCTION, FCALL, FCALL_RO, // <-- program
310310
SUBSCRIBE, UNSUBSCRIBE, PSUBSCRIBE, PUNSUBSCRIBE, PUBLISH, PUBSUB,
311311
SSUBSCRIBE, SUNSUBSCRIBE, SPUBLISH, // <-- pub sub

src/main/java/redis/clients/jedis/UnifiedJedis.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3201,6 +3201,16 @@ public long xack(String key, String group, StreamEntryID... ids) {
32013201
return executeCommand(commandObjects.xack(key, group, ids));
32023202
}
32033203

3204+
@Override
3205+
public List<StreamEntryDeletionResult> xackdel(String key, String group, StreamEntryID... ids) {
3206+
return executeCommand(commandObjects.xackdel(key, group, ids));
3207+
}
3208+
3209+
@Override
3210+
public List<StreamEntryDeletionResult> xackdel(String key, String group, StreamDeletionPolicy trimMode, StreamEntryID... ids) {
3211+
return executeCommand(commandObjects.xackdel(key, group, trimMode, ids));
3212+
}
3213+
32043214
@Override
32053215
public String xgroupCreate(String key, String groupName, StreamEntryID id, boolean makeStream) {
32063216
return executeCommand(commandObjects.xgroupCreate(key, groupName, id, makeStream));
@@ -3241,6 +3251,16 @@ public long xdel(String key, StreamEntryID... ids) {
32413251
return executeCommand(commandObjects.xdel(key, ids));
32423252
}
32433253

3254+
@Override
3255+
public List<StreamEntryDeletionResult> xdelex(String key, StreamEntryID... ids) {
3256+
return executeCommand(commandObjects.xdelex(key, ids));
3257+
}
3258+
3259+
@Override
3260+
public List<StreamEntryDeletionResult> xdelex(String key, StreamDeletionPolicy trimMode, StreamEntryID... ids) {
3261+
return executeCommand(commandObjects.xdelex(key, trimMode, ids));
3262+
}
3263+
32443264
@Override
32453265
public long xtrim(String key, long maxLen, boolean approximate) {
32463266
return executeCommand(commandObjects.xtrim(key, maxLen, approximate));
@@ -3356,6 +3376,16 @@ public long xack(byte[] key, byte[] group, byte[]... ids) {
33563376
return executeCommand(commandObjects.xack(key, group, ids));
33573377
}
33583378

3379+
@Override
3380+
public List<StreamEntryDeletionResult> xackdel(byte[] key, byte[] group, byte[]... ids) {
3381+
return executeCommand(commandObjects.xackdel(key, group, ids));
3382+
}
3383+
3384+
@Override
3385+
public List<StreamEntryDeletionResult> xackdel(byte[] key, byte[] group, StreamDeletionPolicy trimMode, byte[]... ids) {
3386+
return executeCommand(commandObjects.xackdel(key, group, trimMode, ids));
3387+
}
3388+
33593389
@Override
33603390
public String xgroupCreate(byte[] key, byte[] groupName, byte[] id, boolean makeStream) {
33613391
return executeCommand(commandObjects.xgroupCreate(key, groupName, id, makeStream));
@@ -3386,6 +3416,16 @@ public long xdel(byte[] key, byte[]... ids) {
33863416
return executeCommand(commandObjects.xdel(key, ids));
33873417
}
33883418

3419+
@Override
3420+
public List<StreamEntryDeletionResult> xdelex(byte[] key, byte[]... ids) {
3421+
return executeCommand(commandObjects.xdelex(key, ids));
3422+
}
3423+
3424+
@Override
3425+
public List<StreamEntryDeletionResult> xdelex(byte[] key, StreamDeletionPolicy trimMode, byte[]... ids) {
3426+
return executeCommand(commandObjects.xdelex(key, trimMode, ids));
3427+
}
3428+
33893429
@Override
33903430
public long xtrim(byte[] key, long maxLen, boolean approximateLength) {
33913431
return executeCommand(commandObjects.xtrim(key, maxLen, approximateLength));
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package redis.clients.jedis.args;
2+
3+
import redis.clients.jedis.util.SafeEncoder;
4+
5+
/**
6+
* Deletion policy for stream commands that handle consumer group references. Used with XDELEX,
7+
* XACKDEL, and enhanced XADD/XTRIM commands.
8+
*/
9+
public enum StreamDeletionPolicy implements Rawable {
10+
11+
/**
12+
* Preserves existing references to entries in all consumer groups' PEL. This is the default
13+
* behavior similar to XDEL.
14+
*/
15+
KEEP_REFERENCES("KEEPREF"),
16+
17+
/**
18+
* Removes all references to entries from all consumer groups' pending entry lists, effectively
19+
* cleaning up all traces of the messages.
20+
*/
21+
DELETE_REFERENCES("DELREF"),
22+
23+
/**
24+
* Only operates on entries that were read and acknowledged by all consumer groups.
25+
*/
26+
ACKNOWLEDGED("ACKED");
27+
28+
private final byte[] raw;
29+
30+
StreamDeletionPolicy(String redisParamName) {
31+
raw = SafeEncoder.encode(redisParamName);
32+
}
33+
34+
@Override
35+
public byte[] getRaw() {
36+
return raw;
37+
}
38+
}

0 commit comments

Comments
 (0)