Skip to content

Commit e1aecee

Browse files
feat(java): add stream commands to Jedis compatibility layer via GLIDE type-safe API
Signed-off-by: prashanna-frsh <prashanna.rajendran@freshworks.com>
1 parent 00f5d6d commit e1aecee

File tree

4 files changed

+916
-0
lines changed

4 files changed

+916
-0
lines changed

java/integTest/src/test/java/compatibility/jedis/JedisTest.java

Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import redis.clients.jedis.Jedis;
2323
import redis.clients.jedis.JedisClientConfig;
2424
import redis.clients.jedis.Protocol;
25+
import redis.clients.jedis.StreamEntryID;
2526
import redis.clients.jedis.args.BitOP;
2627
import redis.clients.jedis.args.ExpiryOption;
2728
import redis.clients.jedis.args.ListDirection;
@@ -34,6 +35,12 @@
3435
import redis.clients.jedis.params.ScanParams;
3536
import redis.clients.jedis.params.SetParams;
3637
import redis.clients.jedis.resps.ScanResult;
38+
import redis.clients.jedis.resps.StreamConsumerInfo;
39+
import redis.clients.jedis.resps.StreamEntry;
40+
import redis.clients.jedis.resps.StreamGroupInfo;
41+
import redis.clients.jedis.resps.StreamInfo;
42+
import redis.clients.jedis.resps.StreamPendingEntry;
43+
import redis.clients.jedis.resps.StreamPendingSummary;
3744
import redis.clients.jedis.util.KeyValue;
3845

3946
/**
@@ -3232,4 +3239,177 @@ void list_edge_cases() {
32323239
assertEquals(
32333240
0, jedis.rpushx("non_existent_key", "value"), "RPUSHX on non-existent key should return 0");
32343241
}
3242+
3243+
// ========== STREAM COMMANDS ==========
3244+
3245+
@Test
3246+
void stream_xadd_xlen_xdel() {
3247+
String key = "stream:" + UUID.randomUUID();
3248+
Map<String, String> hash = new HashMap<>();
3249+
hash.put("f1", "v1");
3250+
hash.put("f2", "v2");
3251+
3252+
StreamEntryID id = jedis.xadd(key, hash);
3253+
assertNotNull(id, "XADD should return entry ID");
3254+
assertTrue(id.getTime() > 0 || id.getSequence() >= 0, "XADD should return valid ID");
3255+
3256+
long len = jedis.xlen(key);
3257+
assertEquals(1, len, "XLEN should be 1 after one XADD");
3258+
3259+
long del = jedis.xdel(key, id.toString());
3260+
assertEquals(1, del, "XDEL should return 1");
3261+
assertEquals(0, jedis.xlen(key), "XLEN should be 0 after XDEL");
3262+
}
3263+
3264+
@Test
3265+
void stream_xrange_xrevrange() {
3266+
String key = "stream:" + UUID.randomUUID();
3267+
jedis.xadd(key, Map.of("a", "1"));
3268+
jedis.xadd(key, Map.of("b", "2"));
3269+
jedis.xadd(key, Map.of("c", "3"));
3270+
3271+
List<StreamEntry> range = jedis.xrange(key, "-", "+");
3272+
assertNotNull(range);
3273+
assertTrue(range.size() >= 3, "XRANGE should return at least 3 entries");
3274+
3275+
List<StreamEntry> rev = jedis.xrevrange(key, "+", "-");
3276+
assertNotNull(rev);
3277+
assertTrue(rev.size() >= 3, "XREVRANGE should return at least 3 entries");
3278+
3279+
List<StreamEntry> limited = jedis.xrange(key, "-", "+", 2L);
3280+
assertNotNull(limited);
3281+
assertEquals(2, limited.size(), "XRANGE with COUNT 2 should return 2 entries");
3282+
}
3283+
3284+
@Test
3285+
void stream_xread() {
3286+
String key = "stream:" + UUID.randomUUID();
3287+
jedis.xadd(key, Map.of("x", "1"));
3288+
3289+
Map<String, String> keysAndIds = new HashMap<>();
3290+
keysAndIds.put(key, "0-0");
3291+
3292+
Map<String, List<StreamEntry>> result = jedis.xread(keysAndIds);
3293+
assertNotNull(result);
3294+
assertTrue(result.containsKey(key));
3295+
assertFalse(result.get(key).isEmpty(), "XREAD should return entries from start");
3296+
}
3297+
3298+
@Test
3299+
void stream_xtrim() {
3300+
String key = "stream:" + UUID.randomUUID();
3301+
for (int i = 0; i < 10; i++) {
3302+
jedis.xadd(key, Map.of("i", String.valueOf(i)));
3303+
}
3304+
long lenBefore = jedis.xlen(key);
3305+
assertTrue(lenBefore >= 10, "Stream should have at least 10 entries");
3306+
3307+
long trimmed = jedis.xtrim(key, 5L);
3308+
assertTrue(trimmed >= 0, "XTRIM should return non-negative count");
3309+
long lenAfter = jedis.xlen(key);
3310+
assertTrue(lenAfter <= 5, "Stream length after XTRIM MAXLEN 5 should be <= 5");
3311+
}
3312+
3313+
@Test
3314+
void stream_xgroup_create_destroy() {
3315+
String key = "stream:" + UUID.randomUUID();
3316+
jedis.xadd(key, Map.of("init", "1"));
3317+
3318+
String group = "g1";
3319+
String createResult = jedis.xgroupCreate(key, group, "0", true);
3320+
assertNotNull(createResult, "XGROUP CREATE should succeed");
3321+
3322+
boolean destroyed = jedis.xgroupDestroy(key, group);
3323+
assertTrue(destroyed, "XGROUP DESTROY should return true");
3324+
}
3325+
3326+
@Test
3327+
void stream_xgroup_setid_createconsumer_delconsumer() {
3328+
String key = "stream:" + UUID.randomUUID();
3329+
jedis.xadd(key, Map.of("a", "1"));
3330+
String group = "g2";
3331+
jedis.xgroupCreate(key, group, "0", true);
3332+
3333+
String setidResult = jedis.xgroupSetId(key, group, "0");
3334+
assertNotNull(setidResult);
3335+
3336+
boolean created = jedis.xgroupCreateConsumer(key, group, "c1");
3337+
assertTrue(created, "XGROUP CREATECONSUMER should succeed");
3338+
3339+
long del = jedis.xgroupDelConsumer(key, group, "c1");
3340+
assertTrue(del >= 0, "XGROUP DELCONSUMER should return non-negative");
3341+
}
3342+
3343+
@Test
3344+
void stream_xreadgroup_xack() {
3345+
String key = "stream:" + UUID.randomUUID();
3346+
jedis.xadd(key, Map.of("m1", "v1"));
3347+
jedis.xadd(key, Map.of("m2", "v2"));
3348+
String group = "g3";
3349+
jedis.xgroupCreate(key, group, "0", true);
3350+
3351+
Map<String, String> keysAndIds = new HashMap<>();
3352+
keysAndIds.put(key, ">");
3353+
3354+
Map<String, List<StreamEntry>> read = jedis.xreadgroup(group, "consumer1", keysAndIds);
3355+
assertNotNull(read);
3356+
assertTrue(read.containsKey(key));
3357+
List<StreamEntry> entries = read.get(key);
3358+
assertNotNull(entries);
3359+
assertFalse(entries.isEmpty(), "XREADGROUP should return entries");
3360+
3361+
String[] ids = entries.stream()
3362+
.map(e -> e.getID().toString())
3363+
.toArray(String[]::new);
3364+
long ack = jedis.xack(key, group, ids);
3365+
assertEquals(entries.size(), ack, "XACK should acknowledge all read entries");
3366+
}
3367+
3368+
@Test
3369+
void stream_xpending() {
3370+
String key = "stream:" + UUID.randomUUID();
3371+
jedis.xadd(key, Map.of("p", "1"));
3372+
String group = "g4";
3373+
jedis.xgroupCreate(key, group, "0", true);
3374+
3375+
StreamPendingSummary summary = jedis.xpending(key, group);
3376+
assertNotNull(summary);
3377+
assertTrue(summary.getTotal() >= 0, "XPENDING summary total should be non-negative");
3378+
3379+
List<StreamPendingEntry> pending =
3380+
jedis.xpending(key, group, "-", "+", 10L);
3381+
assertNotNull(pending);
3382+
}
3383+
3384+
@Test
3385+
void stream_xinfo_stream_and_groups() {
3386+
String key = "stream:" + UUID.randomUUID();
3387+
jedis.xadd(key, Map.of("i", "1"));
3388+
jedis.xgroupCreate(key, "ginfo", "0", true);
3389+
3390+
Map<String, Object> raw = jedis.xinfoStream(key);
3391+
assertNotNull(raw);
3392+
assertTrue(raw.containsKey("length") || raw.containsKey("last-generated-id"));
3393+
3394+
StreamInfo info = jedis.xinfoStreamAsInfo(key);
3395+
assertNotNull(info);
3396+
assertTrue(info.getLength() >= 1, "Stream should have at least 1 entry");
3397+
3398+
List<StreamGroupInfo> groups = jedis.xinfoGroups(key);
3399+
assertNotNull(groups);
3400+
assertFalse(groups.isEmpty(), "XINFO GROUPS should return at least one group");
3401+
}
3402+
3403+
@Test
3404+
void stream_xinfo_consumers() {
3405+
String key = "stream:" + UUID.randomUUID();
3406+
jedis.xadd(key, Map.of("c", "1"));
3407+
String group = "gcons";
3408+
jedis.xgroupCreate(key, group, "0", true);
3409+
jedis.xgroupCreateConsumer(key, group, "consumer1");
3410+
3411+
List<StreamConsumerInfo> consumers = jedis.xinfoConsumers(key, group);
3412+
assertNotNull(consumers);
3413+
assertFalse(consumers.isEmpty(), "XINFO CONSUMERS should return at least one consumer");
3414+
}
32353415
}

java/jedis-compatibility/compatibility-layer-migration-guide.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ blockingSocketTimeoutMillis
6868
- ✅ List operations (LPUSH, RPUSH, LPOP, RPOP)
6969
- ⚠️ Set operations (SADD, SREM, SMEMBERS) - **Available via `sendCommand()` only**
7070
- ⚠️ Sorted set operations (ZADD, ZREM, ZRANGE) - **Available via `sendCommand()` only**
71+
- ✅ Stream operations (XADD, XLEN, XDEL, XRANGE, XREVRANGE, XREAD, XTRIM, XGROUP CREATE/DESTROY/SETID, XREADGROUP, XACK, XPENDING, XCLAIM, XAUTOCLAIM, XINFO STREAM/GROUPS/CONSUMERS)
7172
- ✅ Key operations (DEL, EXISTS, EXPIRE, TTL)
7273
- ✅ Connection commands (PING, SELECT)
7374
- ✅ Generic commands via `sendCommand()` (Protocol.Command types only)

0 commit comments

Comments
 (0)