Skip to content

Commit c8135e2

Browse files
a-TODO-rovggivo
andauthored
Add support for CLAIM arg in XREADGROUP (#4344)
* Add support for CLAIM arg in XREADGROUP * New values type - integer. Rename new values. * Revamp tests. * Fix builder * Fix tests * Refactor tests --------- Co-authored-by: Ivo Gaydazhiev <[email protected]>
1 parent 0582c3b commit c8135e2

File tree

11 files changed

+740
-30
lines changed

11 files changed

+740
-30
lines changed

src/main/java/redis/clients/jedis/BuilderFactory.java

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1408,17 +1408,26 @@ public List<StreamEntry> build(Object data) {
14081408
String entryIdString = SafeEncoder.encode((byte[]) res.get(0));
14091409
StreamEntryID entryID = new StreamEntryID(entryIdString);
14101410
List<byte[]> hash = (List<byte[]>) res.get(1);
1411-
if (hash == null) {
1412-
responses.add(new StreamEntry(entryID, null));
1413-
continue;
1411+
1412+
Map<String, String> fieldsMap = null;
1413+
1414+
if (hash != null) {
1415+
Iterator<byte[]> hashIterator = hash.iterator();
1416+
fieldsMap = new HashMap<>(hash.size() / 2, 1f);
1417+
1418+
while (hashIterator.hasNext()) {
1419+
fieldsMap.put(SafeEncoder.encode(hashIterator.next()), SafeEncoder.encode(hashIterator.next()));
1420+
}
14141421
}
14151422

1416-
Iterator<byte[]> hashIterator = hash.iterator();
1417-
Map<String, String> map = new HashMap<>(hash.size() / 2, 1f);
1418-
while (hashIterator.hasNext()) {
1419-
map.put(SafeEncoder.encode(hashIterator.next()), SafeEncoder.encode(hashIterator.next()));
1423+
if (res.size() >= 4) {
1424+
Long millisElapsedFromDelivery = LONG.build(res.get(2));
1425+
Long deliveredCount = LONG.build(res.get(3));
1426+
responses.add(new StreamEntry(entryID, fieldsMap, millisElapsedFromDelivery, deliveredCount));
1427+
continue;
14201428
}
1421-
responses.add(new StreamEntry(entryID, map));
1429+
1430+
responses.add(new StreamEntry(entryID, fieldsMap));
14221431
}
14231432

14241433
return responses;
@@ -1959,16 +1968,25 @@ public List<StreamEntryBinary> build(Object data) {
19591968
String entryIdString = SafeEncoder.encode((byte[]) res.get(0));
19601969
StreamEntryID entryID = new StreamEntryID(entryIdString);
19611970
List<byte[]> hash = (List<byte[]>) res.get(1);
1962-
if (hash == null) {
1963-
responses.add(new StreamEntryBinary(entryID, null));
1964-
continue;
1971+
1972+
Map<byte[], byte[]> map = null;
1973+
1974+
if (hash != null) {
1975+
Iterator<byte[]> hashIterator = hash.iterator();
1976+
map = new JedisByteHashMap();
1977+
1978+
while (hashIterator.hasNext()) {
1979+
map.put(BINARY.build(hashIterator.next()), BINARY.build(hashIterator.next()));
1980+
}
19651981
}
19661982

1967-
Iterator<byte[]> hashIterator = hash.iterator();
1968-
Map<byte[], byte[]> map = new JedisByteHashMap();
1969-
while (hashIterator.hasNext()) {
1970-
map.put(BINARY.build(hashIterator.next()), BINARY.build(hashIterator.next()));
1983+
if (res.size() >= 4) {
1984+
Long millisElapsedFromDelivery = LONG.build(res.get(2));
1985+
Long deliveredCount = LONG.build(res.get(3));
1986+
responses.add(new StreamEntryBinary(entryID, map, millisElapsedFromDelivery, deliveredCount));
1987+
continue;
19711988
}
1989+
19721990
responses.add(new StreamEntryBinary(entryID, map));
19731991
}
19741992

src/main/java/redis/clients/jedis/Protocol.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ public static enum Keyword implements Rawable {
331331
AGGREGATE, ALPHA, BY, GET, LIMIT, NO, NOSORT, ONE, SET, STORE, WEIGHTS, WITHSCORE, WITHSCORES,
332332
RESETSTAT, REWRITE, RESET, FLUSH, EXISTS, LOAD, LEN, HELP, SCHEDULE, MATCH, COUNT, TYPE, KEYS,
333333
REFCOUNT, ENCODING, IDLETIME, FREQ, REPLACE, GETNAME, SETNAME, SETINFO, LIST, ID, KILL, PERSIST,
334-
STREAMS, CREATE, MKSTREAM, SETID, DESTROY, DELCONSUMER, MAXLEN, GROUP, IDLE, TIME, BLOCK, NOACK,
334+
STREAMS, CREATE, MKSTREAM, SETID, DESTROY, DELCONSUMER, MAXLEN, GROUP, IDLE, TIME, BLOCK, NOACK, CLAIM,
335335
RETRYCOUNT, STREAM, GROUPS, CONSUMERS, JUSTID, WITHVALUES, NOMKSTREAM, MINID, CREATECONSUMER,
336336
SETUSER, GETUSER, DELUSER, WHOAMI, USERS, CAT, GENPASS, LOG, SAVE, DRYRUN, COPY, AUTH, AUTH2,
337337
NX, XX, EX, PX, EXAT, PXAT, ABSTTL, KEEPTTL, INCR, LT, GT, CH, INFO, PAUSE, UNPAUSE, UNBLOCK,

src/main/java/redis/clients/jedis/params/XReadGroupParams.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ public class XReadGroupParams implements IParams {
1010
private Integer count = null;
1111
private Integer block = null;
1212
private boolean noack = false;
13+
private Long claim = null;
1314

1415
public static XReadGroupParams xReadGroupParams() {
1516
return new XReadGroupParams();
@@ -30,6 +31,11 @@ public XReadGroupParams noAck() {
3031
return this;
3132
}
3233

34+
public XReadGroupParams claim(long minIdleMillis) {
35+
this.claim = minIdleMillis;
36+
return this;
37+
}
38+
3339
@Override
3440
public void addParams(CommandArguments args) {
3541
if (count != null) {
@@ -41,18 +47,22 @@ public void addParams(CommandArguments args) {
4147
if (noack) {
4248
args.add(Keyword.NOACK);
4349
}
50+
if (claim != null) {
51+
args.add(Keyword.CLAIM).add(claim);
52+
}
4453
}
4554

4655
@Override
4756
public boolean equals(Object o) {
4857
if (this == o) return true;
4958
if (o == null || getClass() != o.getClass()) return false;
5059
XReadGroupParams that = (XReadGroupParams) o;
51-
return noack == that.noack && Objects.equals(count, that.count) && Objects.equals(block, that.block);
60+
return noack == that.noack && Objects.equals(count, that.count) && Objects.equals(block, that.block)
61+
&& Objects.equals(claim, that.claim);
5262
}
5363

5464
@Override
5565
public int hashCode() {
56-
return Objects.hash(count, block, noack);
66+
return Objects.hash(count, block, noack, claim);
5767
}
5868
}

src/main/java/redis/clients/jedis/resps/StreamEntry.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,51 @@ public class StreamEntry implements Serializable {
1111

1212
private StreamEntryID id;
1313
private Map<String, String> fields;
14+
private Long millisElapsedFromDelivery;
15+
private Long deliveredCount;
1416

1517
public StreamEntry(StreamEntryID id, Map<String, String> fields) {
1618
this.id = id;
1719
this.fields = fields;
1820
}
1921

22+
public StreamEntry(StreamEntryID id, Map<String, String> fields, Long millisElapsedFromDelivery, Long deliveredCount) {
23+
this.id = id;
24+
this.fields = fields;
25+
this.millisElapsedFromDelivery = millisElapsedFromDelivery;
26+
this.deliveredCount = deliveredCount;
27+
}
28+
29+
/**
30+
* @return the milliseconds since the last delivery of this message when CLAIM was used.
31+
* <ul>
32+
* <li>{@code null} when not applicable</li>
33+
* <li>{@code 0} means not claimed from the pending entries list (PEL)</li>
34+
* <li>{@code > 0} means claimed from the PEL</li>
35+
* </ul>
36+
* @since 7.1
37+
*/
38+
public Long getMillisElapsedFromDelivery() {
39+
return millisElapsedFromDelivery;
40+
}
41+
42+
/**
43+
* @return the number of prior deliveries of this message when CLAIM was used:
44+
* <ul>
45+
* <li>{@code null} when not applicable</li>
46+
* <li>{@code 0} means not claimed from the pending entries list (PEL)</li>
47+
* <li>{@code > 0} means claimed from the PEL</li>
48+
* </ul>
49+
* @since 7.1
50+
*/
51+
public Long getDeliveredCount() {
52+
return deliveredCount;
53+
}
54+
55+
public boolean isClaimed() {
56+
return this.deliveredCount != null && this.deliveredCount > 0;
57+
}
58+
2059
public StreamEntryID getID() {
2160
return id;
2261
}

src/main/java/redis/clients/jedis/resps/StreamEntryBinary.java

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,51 @@ public class StreamEntryBinary implements Serializable {
1111

1212
private StreamEntryID id;
1313
private Map<byte[], byte[]> fields;
14+
private Long millisElapsedFromDelivery;
15+
private Long deliveredCount;
1416

1517
public StreamEntryBinary(StreamEntryID id, Map<byte[], byte[]> fields) {
1618
this.id = id;
1719
this.fields = fields;
1820
}
1921

22+
public StreamEntryBinary(StreamEntryID id, Map<byte[], byte[]> fields, Long millisElapsedFromDelivery, Long deliveredCount) {
23+
this.id = id;
24+
this.fields = fields;
25+
this.millisElapsedFromDelivery = millisElapsedFromDelivery;
26+
this.deliveredCount = deliveredCount;
27+
}
28+
29+
/**
30+
* @return the milliseconds since the last delivery of this message when CLAIM was used.
31+
* <ul>
32+
* <li>{@code null} when not applicable</li>
33+
* <li>{@code 0} means not claimed from the pending entries list (PEL)</li>
34+
* <li>{@code > 0} means claimed from the PEL</li>
35+
* </ul>
36+
* @since 7.1
37+
*/
38+
public Long getMillisElapsedFromDelivery() {
39+
return millisElapsedFromDelivery;
40+
}
41+
42+
/**
43+
* @return the number of prior deliveries of this message when CLAIM was used:
44+
* <ul>
45+
* <li>{@code null} when not applicable</li>
46+
* <li>{@code 0} means not claimed from the pending entries list (PEL)</li>
47+
* <li>{@code > 0} means claimed from the PEL</li>
48+
* </ul>
49+
* @since 7.1
50+
*/
51+
public Long getDeliveredCount() {
52+
return deliveredCount;
53+
}
54+
55+
public boolean isClaimed() {
56+
return this.deliveredCount != null && this.deliveredCount > 0;
57+
}
58+
2059
public StreamEntryID getID() {
2160
return id;
2261
}
@@ -39,4 +78,4 @@ private void readObject(java.io.ObjectInputStream in) throws IOException, ClassN
3978
this.id = (StreamEntryID) in.readUnshared();
4079
this.fields = (Map<byte[], byte[]>) in.readUnshared();
4180
}
42-
}
81+
}

src/test/java/io/redis/test/utils/RedisVersion.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ public class RedisVersion implements Comparable<RedisVersion> {
77
public static final RedisVersion V7_4 = RedisVersion.of("7.4");
88
public static final RedisVersion V8_0_0_PRE = RedisVersion.of("7.9.0");
99
public static final RedisVersion V8_0_0 = RedisVersion.of("8.0.0");
10+
public static final String V8_4_0_STRING= "8.4.0";
1011
public static final RedisVersion V8_4_0 = RedisVersion.of("8.4.0");
1112

1213
private final int major;
@@ -91,4 +92,4 @@ public boolean isGreaterThan(RedisVersion other) {
9192
public static int compare(RedisVersion v1, RedisVersion v2) {
9293
return v1.compareTo(v2);
9394
}
94-
}
95+
}

0 commit comments

Comments
 (0)