diff --git a/java/integTest/src/test/java/compatibility/jedis/JedisTest.java b/java/integTest/src/test/java/compatibility/jedis/JedisTest.java index 61557d0d408..d5fd2bd74f9 100644 --- a/java/integTest/src/test/java/compatibility/jedis/JedisTest.java +++ b/java/integTest/src/test/java/compatibility/jedis/JedisTest.java @@ -23,6 +23,7 @@ import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisClientConfig; import redis.clients.jedis.Protocol; +import redis.clients.jedis.StreamEntryID; import redis.clients.jedis.args.BitOP; import redis.clients.jedis.args.ExpiryOption; import redis.clients.jedis.args.ListDirection; @@ -38,6 +39,12 @@ import redis.clients.jedis.resps.AccessControlLogEntry; import redis.clients.jedis.resps.AccessControlUser; import redis.clients.jedis.resps.ScanResult; +import redis.clients.jedis.resps.StreamConsumerInfo; +import redis.clients.jedis.resps.StreamEntry; +import redis.clients.jedis.resps.StreamGroupInfo; +import redis.clients.jedis.resps.StreamInfo; +import redis.clients.jedis.resps.StreamPendingEntry; +import redis.clients.jedis.resps.StreamPendingSummary; import redis.clients.jedis.util.KeyValue; /** @@ -3376,6 +3383,312 @@ void list_edge_cases() { 0, jedis.rpushx("non_existent_key", "value"), "RPUSHX on non-existent key should return 0"); } + // ========== STREAM COMMANDS ========== + + @Test + void stream_xadd_xlen_xdel() { + String key = "stream:" + UUID.randomUUID(); + Map hash = new HashMap<>(); + hash.put("f1", "v1"); + hash.put("f2", "v2"); + + StreamEntryID id = jedis.xadd(key, hash); + assertNotNull(id, "XADD should return entry ID"); + assertTrue(id.getTime() > 0 || id.getSequence() >= 0, "XADD should return valid ID"); + + long len = jedis.xlen(key); + assertEquals(1, len, "XLEN should be 1 after one XADD"); + + long del = jedis.xdel(key, id.toString()); + assertEquals(1, del, "XDEL should return 1"); + assertEquals(0, jedis.xlen(key), "XLEN should be 0 after XDEL"); + } + + @Test + void stream_xrange_xrevrange() { + String key = "stream:" + UUID.randomUUID(); + jedis.xadd(key, Map.of("a", "1")); + jedis.xadd(key, Map.of("b", "2")); + jedis.xadd(key, Map.of("c", "3")); + + List range = jedis.xrange(key, "-", "+"); + assertNotNull(range); + assertTrue(range.size() >= 3, "XRANGE should return at least 3 entries"); + + List rev = jedis.xrevrange(key, "+", "-"); + assertNotNull(rev); + assertTrue(rev.size() >= 3, "XREVRANGE should return at least 3 entries"); + + List limited = jedis.xrange(key, "-", "+", 2L); + assertNotNull(limited); + assertEquals(2, limited.size(), "XRANGE with COUNT 2 should return 2 entries"); + } + + @Test + void stream_xread() { + String key = "stream:" + UUID.randomUUID(); + jedis.xadd(key, Map.of("x", "1")); + + Map keysAndIds = new HashMap<>(); + keysAndIds.put(key, "0-0"); + + Map> result = jedis.xread(keysAndIds); + assertNotNull(result); + assertTrue(result.containsKey(key)); + assertFalse(result.get(key).isEmpty(), "XREAD should return entries from start"); + } + + @Test + void stream_xtrim() { + String key = "stream:" + UUID.randomUUID(); + for (int i = 0; i < 10; i++) { + jedis.xadd(key, Map.of("i", String.valueOf(i))); + } + long lenBefore = jedis.xlen(key); + assertTrue(lenBefore >= 10, "Stream should have at least 10 entries"); + + long trimmed = jedis.xtrim(key, 5L); + assertTrue(trimmed >= 0, "XTRIM should return non-negative count"); + long lenAfter = jedis.xlen(key); + assertTrue(lenAfter <= 5, "Stream length after XTRIM MAXLEN 5 should be <= 5"); + } + + @Test + void stream_xgroup_create_destroy() { + String key = "stream:" + UUID.randomUUID(); + jedis.xadd(key, Map.of("init", "1")); + + String group = "g1"; + String createResult = jedis.xgroupCreate(key, group, "0", true); + assertNotNull(createResult, "XGROUP CREATE should succeed"); + + boolean destroyed = jedis.xgroupDestroy(key, group); + assertTrue(destroyed, "XGROUP DESTROY should return true"); + } + + @Test + void stream_xgroup_setid_createconsumer_delconsumer() { + String key = "stream:" + UUID.randomUUID(); + jedis.xadd(key, Map.of("a", "1")); + String group = "g2"; + jedis.xgroupCreate(key, group, "0", true); + + String setidResult = jedis.xgroupSetId(key, group, "0"); + assertNotNull(setidResult); + + boolean created = jedis.xgroupCreateConsumer(key, group, "c1"); + assertTrue(created, "XGROUP CREATECONSUMER should succeed"); + + long del = jedis.xgroupDelConsumer(key, group, "c1"); + assertTrue(del >= 0, "XGROUP DELCONSUMER should return non-negative"); + } + + @Test + void stream_xreadgroup_xack() { + String key = "stream:" + UUID.randomUUID(); + jedis.xadd(key, Map.of("m1", "v1")); + jedis.xadd(key, Map.of("m2", "v2")); + String group = "g3"; + jedis.xgroupCreate(key, group, "0", true); + + Map keysAndIds = new HashMap<>(); + keysAndIds.put(key, ">"); + + Map> read = jedis.xreadgroup(group, "consumer1", keysAndIds); + assertNotNull(read); + assertTrue(read.containsKey(key)); + List entries = read.get(key); + assertNotNull(entries); + assertFalse(entries.isEmpty(), "XREADGROUP should return entries"); + + String[] ids = entries.stream().map(e -> e.getID().toString()).toArray(String[]::new); + long ack = jedis.xack(key, group, ids); + assertEquals(entries.size(), ack, "XACK should acknowledge all read entries"); + } + + @Test + void stream_xpending() { + String key = "stream:" + UUID.randomUUID(); + jedis.xadd(key, Map.of("p", "1")); + String group = "g4"; + jedis.xgroupCreate(key, group, "0", true); + + StreamPendingSummary summary = jedis.xpending(key, group); + assertNotNull(summary); + assertTrue(summary.getTotal() >= 0, "XPENDING summary total should be non-negative"); + + List pending = jedis.xpending(key, group, "-", "+", 10L); + assertNotNull(pending); + } + + @Test + void stream_xinfo_stream_and_groups() { + String key = "stream:" + UUID.randomUUID(); + jedis.xadd(key, Map.of("i", "1")); + jedis.xgroupCreate(key, "ginfo", "0", true); + + Map raw = jedis.xinfoStream(key); + assertNotNull(raw); + assertTrue(raw.containsKey("length") || raw.containsKey("last-generated-id")); + + StreamInfo info = jedis.xinfoStreamAsInfo(key); + assertNotNull(info); + assertTrue(info.getLength() >= 1, "Stream should have at least 1 entry"); + + List groups = jedis.xinfoGroups(key); + assertNotNull(groups); + assertFalse(groups.isEmpty(), "XINFO GROUPS should return at least one group"); + } + + @Test + void stream_xinfo_consumers() { + String key = "stream:" + UUID.randomUUID(); + jedis.xadd(key, Map.of("c", "1")); + String group = "gcons"; + jedis.xgroupCreate(key, group, "0", true); + jedis.xgroupCreateConsumer(key, group, "consumer1"); + + List consumers = jedis.xinfoConsumers(key, group); + assertNotNull(consumers); + assertFalse(consumers.isEmpty(), "XINFO CONSUMERS should return at least one consumer"); + } + + @Test + void stream_binary_xlen_xdel() { + byte[] key = ("stream:" + UUID.randomUUID()).getBytes(); + Map hash = new HashMap<>(); + hash.put("f1".getBytes(), "v1".getBytes()); + hash.put("f2".getBytes(), "v2".getBytes()); + + // Use XAddParams to add entry + redis.clients.jedis.params.XAddParams params = + redis.clients.jedis.params.XAddParams.xAddParams(); + byte[] id = jedis.xadd(key, params, hash); + assertNotNull(id, "Binary XADD should return entry ID"); + + long len = jedis.xlen(key); + assertEquals(1, len, "Binary XLEN should be 1 after one XADD"); + + long del = jedis.xdel(key, id); + assertEquals(1, del, "Binary XDEL should return 1"); + assertEquals(0, jedis.xlen(key), "Binary XLEN should be 0 after XDEL"); + } + + @Test + void stream_binary_xrange_xrevrange() { + byte[] key = ("stream:" + UUID.randomUUID()).getBytes(); + + // Add entries using XAddParams + redis.clients.jedis.params.XAddParams params = + redis.clients.jedis.params.XAddParams.xAddParams(); + Map hash1 = Map.of("a".getBytes(), "1".getBytes()); + Map hash2 = Map.of("b".getBytes(), "2".getBytes()); + Map hash3 = Map.of("c".getBytes(), "3".getBytes()); + + jedis.xadd(key, params, hash1); + jedis.xadd(key, params, hash2); + jedis.xadd(key, params, hash3); + + List range = jedis.xrange(key, "-".getBytes(), "+".getBytes()); + assertNotNull(range); + assertTrue(range.size() >= 3, "Binary XRANGE should return at least 3 entries"); + + List rev = jedis.xrevrange(key, "+".getBytes(), "-".getBytes()); + assertNotNull(rev); + assertTrue(rev.size() >= 3, "Binary XREVRANGE should return at least 3 entries"); + + List limited = jedis.xrange(key, "-".getBytes(), "+".getBytes(), 2); + assertNotNull(limited); + assertEquals(2, limited.size(), "Binary XRANGE with COUNT 2 should return 2 entries"); + } + + @Test + void stream_binary_xtrim() { + byte[] key = ("stream:" + UUID.randomUUID()).getBytes(); + redis.clients.jedis.params.XAddParams addParams = + redis.clients.jedis.params.XAddParams.xAddParams(); + + for (int i = 0; i < 10; i++) { + Map hash = Map.of("i".getBytes(), String.valueOf(i).getBytes()); + jedis.xadd(key, addParams, hash); + } + long lenBefore = jedis.xlen(key); + assertTrue(lenBefore >= 10, "Stream should have at least 10 entries"); + + long trimmed = jedis.xtrim(key, 5L); + assertTrue(trimmed >= 0, "Binary XTRIM should return non-negative count"); + long lenAfter = jedis.xlen(key); + assertTrue(lenAfter <= 5, "Stream length after binary XTRIM MAXLEN 5 should be <= 5"); + } + + @Test + void stream_xadd_with_xaddparams() { + String key = "stream:" + UUID.randomUUID(); + Map hash = Map.of("field", "value"); + + // Test with custom ID + redis.clients.jedis.params.XAddParams params = + redis.clients.jedis.params.XAddParams.xAddParams().id("1000-0"); + StreamEntryID id = jedis.xadd(key, params, hash); + assertNotNull(id); + assertEquals("1000-0", id.toString()); + + // Test with NOMKSTREAM + String nonExistentKey = "stream:" + UUID.randomUUID(); + params = redis.clients.jedis.params.XAddParams.xAddParams().noMkStream(); + StreamEntryID result = jedis.xadd(nonExistentKey, params, hash); + assertNull(result, "XADD with NOMKSTREAM should return null for non-existent stream"); + + // Test with MAXLEN trimming (exact) + String trimKey = "stream:" + UUID.randomUUID(); + for (int i = 0; i < 5; i++) { + jedis.xadd(trimKey, Map.of("i", String.valueOf(i))); + } + params = redis.clients.jedis.params.XAddParams.xAddParams().maxLenExact(3); + jedis.xadd(trimKey, params, Map.of("new", "entry")); + long len = jedis.xlen(trimKey); + assertTrue(len <= 3, "Stream should be trimmed to exactly 3 entries"); + } + + @Test + void stream_xtrim_with_xtrimparams() { + String key = "stream:" + UUID.randomUUID(); + for (int i = 0; i < 10; i++) { + jedis.xadd(key, Map.of("i", String.valueOf(i))); + } + + // Test MAXLEN with XTrimParams (exact trimming) + redis.clients.jedis.params.XTrimParams params = + redis.clients.jedis.params.XTrimParams.xTrimParams().maxLenExact(5); + long trimmed = jedis.xtrim(key, params); + assertTrue(trimmed >= 0, "XTRIM with XTrimParams should return non-negative count"); + long len = jedis.xlen(key); + assertTrue(len <= 5, "Stream should be trimmed to exactly 5 entries"); + + // Test MAXLEN with approximate trimming + String key3 = "stream:" + UUID.randomUUID(); + for (int i = 0; i < 10; i++) { + jedis.xadd(key3, Map.of("i", String.valueOf(i))); + } + params = redis.clients.jedis.params.XTrimParams.xTrimParams().maxLen(5); + jedis.xtrim(key3, params); + len = jedis.xlen(key3); + assertTrue( + len <= 10 && len >= 5, + "Stream with approximate trim should be reduced but may not be exact"); + + // Test MINID with XTrimParams (exact trimming) + String key2 = "stream:" + UUID.randomUUID(); + StreamEntryID firstId = jedis.xadd(key2, Map.of("a", "1")); + jedis.xadd(key2, Map.of("b", "2")); + StreamEntryID thirdId = jedis.xadd(key2, Map.of("c", "3")); + + params = redis.clients.jedis.params.XTrimParams.xTrimParams().minIdExact(thirdId); + jedis.xtrim(key2, params); + len = jedis.xlen(key2); + assertTrue(len == 1, "Stream should be trimmed to exactly entries >= minId"); + } + // --- ACL command integration tests --- @Test diff --git a/java/jedis-compatibility/compatibility-layer-migration-guide.md b/java/jedis-compatibility/compatibility-layer-migration-guide.md index 215731ed852..034e77ff5ea 100644 --- a/java/jedis-compatibility/compatibility-layer-migration-guide.md +++ b/java/jedis-compatibility/compatibility-layer-migration-guide.md @@ -68,6 +68,8 @@ blockingSocketTimeoutMillis - ✅ List operations (LPUSH, RPUSH, LPOP, RPOP) - ✅ Set operations (SADD, SREM, SMEMBERS, SCARD, SISMEMBER, SMISMEMBER, SPOP, SRANDMEMBER, SMOVE, SINTER, SINTERCARD, SINTERSTORE, SUNION, SUNIONSTORE, SDIFF, SDIFFSTORE, SSCAN) via type-safe methods - ⚠️ Sorted set operations (ZADD, ZREM, ZRANGE) - **Available via `sendCommand()` only** +- ✅ Stream operations (XADD, XLEN, XDEL, XRANGE, XREVRANGE, XREAD, XTRIM, XGROUP CREATE/DESTROY/SETID, XREADGROUP, XACK, XPENDING, XCLAIM, XAUTOCLAIM, XINFO STREAM/GROUPS/CONSUMERS) + - **Note:** `XSETID` is not supported (only `XGROUP SETID` is available) - ✅ Key operations (DEL, EXISTS, EXPIRE, TTL) - ✅ Connection commands (PING, SELECT) - ✅ ACL commands (ACL LIST, ACL GETUSER, ACL SETUSER, ACL DELUSER, ACL CAT, ACL GENPASS, ACL LOG, ACL LOG RESET, ACL WHOAMI, ACL USERS, ACL SAVE, ACL LOAD, ACL DRYRUN) diff --git a/java/jedis-compatibility/src/main/java/redis/clients/jedis/Jedis.java b/java/jedis-compatibility/src/main/java/redis/clients/jedis/Jedis.java index 99f234026da..bcd59ace9ba 100644 --- a/java/jedis-compatibility/src/main/java/redis/clients/jedis/Jedis.java +++ b/java/jedis-compatibility/src/main/java/redis/clients/jedis/Jedis.java @@ -30,6 +30,14 @@ import glide.api.models.commands.scan.SScanOptions; import glide.api.models.commands.scan.SScanOptionsBinary; import glide.api.models.commands.scan.ScanOptions; +import glide.api.models.commands.stream.StreamAddOptions; +import glide.api.models.commands.stream.StreamClaimOptions; +import glide.api.models.commands.stream.StreamGroupOptions; +import glide.api.models.commands.stream.StreamPendingOptions; +import glide.api.models.commands.stream.StreamRange; +import glide.api.models.commands.stream.StreamReadGroupOptions; +import glide.api.models.commands.stream.StreamReadOptions; +import glide.api.models.commands.stream.StreamTrimOptions; import glide.api.models.configuration.GlideClientConfiguration; import java.io.Closeable; import java.nio.charset.Charset; @@ -70,6 +78,12 @@ import redis.clients.jedis.resps.FunctionStats; import redis.clients.jedis.resps.LibraryInfo; import redis.clients.jedis.resps.ScanResult; +import redis.clients.jedis.resps.StreamConsumerInfo; +import redis.clients.jedis.resps.StreamEntry; +import redis.clients.jedis.resps.StreamGroupInfo; +import redis.clients.jedis.resps.StreamInfo; +import redis.clients.jedis.resps.StreamPendingEntry; +import redis.clients.jedis.resps.StreamPendingSummary; import redis.clients.jedis.util.KeyValue; import redis.clients.jedis.util.Pool; @@ -6106,6 +6120,786 @@ private static HScanOptionsBinary convertScanParamsToHScanOptionsBinary(ScanPara return builder.build(); } + // ========== STREAM COMMANDS (GLIDE type-safe API) ========== + + /** Convert GLIDE stream entry map (id -> field-value pairs) to List of StreamEntry. */ + private static List toStreamEntryList(Map idToFields) { + if (idToFields == null) { + return Collections.emptyList(); + } + List out = new ArrayList<>(); + for (Map.Entry e : idToFields.entrySet()) { + Map fields = new HashMap<>(); + String[][] pairs = e.getValue(); + if (pairs != null) { + for (String[] kv : pairs) { + if (kv != null && kv.length >= 2) { + fields.put(kv[0], kv[1]); + } + } + } + out.add(new StreamEntry(new StreamEntryID(e.getKey()), fields)); + } + return out; + } + + /** Convert GLIDE xread response to Map of stream key to List of StreamEntry. */ + private static Map> toStreamReadResponse( + Map> response) { + if (response == null) { + return Collections.emptyMap(); + } + Map> out = new HashMap<>(); + for (Map.Entry> e : response.entrySet()) { + out.put(e.getKey(), toStreamEntryList(e.getValue())); + } + return out; + } + + /** + * Adds an entry to the stream at key. Uses GLIDE type-safe xadd. + * + * @param key stream key + * @param hash field-value map + * @return generated entry ID + */ + public StreamEntryID xadd(String key, Map hash) { + return executeCommandWithGlide( + "XADD", + () -> { + String id = glideClient.xadd(key, hash).get(); + return id == null ? null : new StreamEntryID(id); + }); + } + + /** + * Adds an entry to the stream at key with optional entry id. Uses GLIDE type-safe xadd. + * + * @param key stream key + * @param id entry id (use "*" for auto-generate) + * @param hash field-value map + * @return generated or specified entry ID + */ + public StreamEntryID xadd(String key, StreamEntryID id, Map hash) { + return executeCommandWithGlide( + "XADD", + () -> { + StreamAddOptions opts = + StreamAddOptions.builder().id(id != null ? id.toString() : "*").build(); + String result = glideClient.xadd(key, hash, opts).get(); + return result == null ? null : new StreamEntryID(result); + }); + } + + /** + * Adds an entry to the stream at key with options. Uses GLIDE type-safe xadd. + * + * @param key stream key + * @param hash field-value map + * @param makeStream if false, NOMKSTREAM is sent + * @return generated entry ID, or null if stream did not exist and makeStream was false + */ + public StreamEntryID xadd(String key, Map hash, boolean makeStream) { + return executeCommandWithGlide( + "XADD", + () -> { + StreamAddOptions opts = StreamAddOptions.builder().makeStream(makeStream).build(); + String result = glideClient.xadd(key, hash, opts).get(); + return result == null ? null : new StreamEntryID(result); + }); + } + + /** + * Adds an entry to the stream at key with XAddParams. Uses GLIDE type-safe xadd. + * + * @param key stream key + * @param params add parameters + * @param hash field-value map + * @return generated entry ID, or null if stream did not exist and makeStream was false + */ + public StreamEntryID xadd( + String key, redis.clients.jedis.params.XAddParams params, Map hash) { + return executeCommandWithGlide( + "XADD", + () -> { + StreamAddOptions.StreamAddOptionsBuilder builder = StreamAddOptions.builder(); + if (params.getId() != null) { + builder.id(params.getId()); + } + if (params.getMakeStream() != null) { + builder.makeStream(params.getMakeStream()); + } + if (params.getMaxLen() != null) { + boolean exact = params.getExactTrimming() != null && params.getExactTrimming(); + StreamTrimOptions trimOpts; + if (params.getLimit() != null) { + trimOpts = new StreamTrimOptions.MaxLen(params.getMaxLen(), params.getLimit()); + } else { + trimOpts = new StreamTrimOptions.MaxLen(exact, params.getMaxLen()); + } + builder.trim(trimOpts); + } else if (params.getMinId() != null) { + boolean exact = params.getExactTrimming() != null && params.getExactTrimming(); + StreamTrimOptions trimOpts; + if (params.getLimit() != null) { + trimOpts = new StreamTrimOptions.MinId(params.getMinId(), params.getLimit()); + } else { + trimOpts = new StreamTrimOptions.MinId(exact, params.getMinId()); + } + builder.trim(trimOpts); + } + String result = glideClient.xadd(key, hash, builder.build()).get(); + return result == null ? null : new StreamEntryID(result); + }); + } + + /** + * Adds an entry to the stream at key with XAddParams - binary version. Uses GLIDE type-safe xadd. + * + * @param key stream key + * @param params add parameters + * @param hash field-value map + * @return generated entry ID as byte[], or null if stream did not exist and makeStream was false + */ + public byte[] xadd( + byte[] key, redis.clients.jedis.params.XAddParams params, Map hash) { + return executeCommandWithGlide( + "XADD", + () -> { + // Convert byte[] map to String map + Map stringHash = new HashMap<>(); + for (Map.Entry entry : hash.entrySet()) { + stringHash.put(new String(entry.getKey()), new String(entry.getValue())); + } + + StreamAddOptions.StreamAddOptionsBuilder builder = StreamAddOptions.builder(); + if (params.getId() != null) { + builder.id(params.getId()); + } + if (params.getMakeStream() != null) { + builder.makeStream(params.getMakeStream()); + } + if (params.getMaxLen() != null) { + boolean exact = params.getExactTrimming() != null && params.getExactTrimming(); + StreamTrimOptions trimOpts; + if (params.getLimit() != null) { + trimOpts = new StreamTrimOptions.MaxLen(params.getMaxLen(), params.getLimit()); + } else { + trimOpts = new StreamTrimOptions.MaxLen(exact, params.getMaxLen()); + } + builder.trim(trimOpts); + } else if (params.getMinId() != null) { + boolean exact = params.getExactTrimming() != null && params.getExactTrimming(); + StreamTrimOptions trimOpts; + if (params.getLimit() != null) { + trimOpts = new StreamTrimOptions.MinId(params.getMinId(), params.getLimit()); + } else { + trimOpts = new StreamTrimOptions.MinId(exact, params.getMinId()); + } + builder.trim(trimOpts); + } + String result = glideClient.xadd(new String(key), stringHash, builder.build()).get(); + return result == null ? null : result.getBytes(); + }); + } + + /** Returns the number of entries in the stream. Uses GLIDE xlen. */ + public long xlen(String key) { + return executeCommandWithGlide("XLEN", () -> glideClient.xlen(key).get()); + } + + /** + * Returns the number of entries in the stream - binary version. Uses GLIDE xlen. + * + * @param key stream key + * @return number of entries in the stream + */ + public long xlen(byte[] key) { + return executeCommandWithGlide("XLEN", () -> glideClient.xlen(new String(key)).get()); + } + + /** Removes entries by id from the stream. Uses GLIDE xdel. */ + public long xdel(String key, String... ids) { + return executeCommandWithGlide("XDEL", () -> glideClient.xdel(key, ids).get()); + } + + /** Removes entries by id from the stream. Uses GLIDE xdel. */ + public long xdel(String key, StreamEntryID... ids) { + String[] idStrs = new String[ids.length]; + for (int i = 0; i < ids.length; i++) { + idStrs[i] = ids[i].toString(); + } + return executeCommandWithGlide("XDEL", () -> glideClient.xdel(key, idStrs).get()); + } + + /** + * Removes entries by id from the stream - binary version. Uses GLIDE xdel. + * + * @param key stream key + * @param ids entry IDs to delete + * @return number of entries deleted + */ + public long xdel(byte[] key, byte[]... ids) { + String[] idStrs = new String[ids.length]; + for (int i = 0; i < ids.length; i++) { + idStrs[i] = new String(ids[i]); + } + return executeCommandWithGlide("XDEL", () -> glideClient.xdel(new String(key), idStrs).get()); + } + + /** + * Returns entries in the stream in range [start, end]. Uses GLIDE xrange. + * + * @param key stream key + * @param start start id ("-" for minimum) + * @param end end id ("+" for maximum) + */ + public List xrange(String key, String start, String end) { + return executeCommandWithGlide( + "XRANGE", + () -> { + StreamRange s = + "-".equals(start) ? StreamRange.InfRangeBound.MIN : StreamRange.IdBound.of(start); + StreamRange e = + "+".equals(end) ? StreamRange.InfRangeBound.MAX : StreamRange.IdBound.of(end); + Map raw = glideClient.xrange(key, s, e).get(); + return toStreamEntryList(raw); + }); + } + + /** Returns up to count entries in the stream in range [start, end]. Uses GLIDE xrange. */ + public List xrange(String key, String start, String end, long count) { + return executeCommandWithGlide( + "XRANGE", + () -> { + StreamRange s = + "-".equals(start) ? StreamRange.InfRangeBound.MIN : StreamRange.IdBound.of(start); + StreamRange e = + "+".equals(end) ? StreamRange.InfRangeBound.MAX : StreamRange.IdBound.of(end); + Map raw = glideClient.xrange(key, s, e, count).get(); + return raw == null ? Collections.emptyList() : toStreamEntryList(raw); + }); + } + + /** Returns entries in the stream in reverse order [end, start]. Uses GLIDE xrevrange. */ + public List xrevrange(String key, String end, String start) { + return executeCommandWithGlide( + "XREVRANGE", + () -> { + StreamRange e = + "+".equals(end) ? StreamRange.InfRangeBound.MAX : StreamRange.IdBound.of(end); + StreamRange s = + "-".equals(start) ? StreamRange.InfRangeBound.MIN : StreamRange.IdBound.of(start); + Map raw = glideClient.xrevrange(key, e, s).get(); + return toStreamEntryList(raw); + }); + } + + /** Returns up to count entries in reverse order. Uses GLIDE xrevrange. */ + public List xrevrange(String key, String end, String start, long count) { + return executeCommandWithGlide( + "XREVRANGE", + () -> { + StreamRange e = + "+".equals(end) ? StreamRange.InfRangeBound.MAX : StreamRange.IdBound.of(end); + StreamRange s = + "-".equals(start) ? StreamRange.InfRangeBound.MIN : StreamRange.IdBound.of(start); + Map raw = glideClient.xrevrange(key, e, s, count).get(); + return raw == null ? Collections.emptyList() : toStreamEntryList(raw); + }); + } + + /** + * Returns entries in the stream in range [start, end] - binary version. Uses GLIDE xrange. + * + * @param key stream key + * @param start start id + * @param end end id + * @return list of stream entries + */ + public List xrange(byte[] key, byte[] start, byte[] end) { + return xrange(new String(key), new String(start), new String(end)); + } + + /** + * Returns up to count entries in the stream in range [start, end] - binary version. Uses GLIDE + * xrange. + * + * @param key stream key + * @param start start id + * @param end end id + * @param count maximum number of entries + * @return list of stream entries + */ + public List xrange(byte[] key, byte[] start, byte[] end, int count) { + return xrange(new String(key), new String(start), new String(end), count); + } + + /** + * Returns entries in the stream in reverse order [end, start] - binary version. Uses GLIDE + * xrevrange. + * + * @param key stream key + * @param end end id + * @param start start id + * @return list of stream entries + */ + public List xrevrange(byte[] key, byte[] end, byte[] start) { + return xrevrange(new String(key), new String(end), new String(start)); + } + + /** + * Returns up to count entries in reverse order - binary version. Uses GLIDE xrevrange. + * + * @param key stream key + * @param end end id + * @param start start id + * @param count maximum number of entries + * @return list of stream entries + */ + public List xrevrange(byte[] key, byte[] end, byte[] start, int count) { + return xrevrange(new String(key), new String(end), new String(start), count); + } + + /** + * Reads from multiple streams. Uses GLIDE xread. + * + * @param keysAndIds map of stream key to start id ("0" or "0-0" for beginning) + * @return map of stream key to list of entries read + */ + public Map> xread(Map keysAndIds) { + return executeCommandWithGlide( + "XREAD", + () -> { + Map> raw = + glideClient.xread(keysAndIds, StreamReadOptions.builder().build()).get(); + return toStreamReadResponse(raw); + }); + } + + /** + * Reads from multiple streams with count and/or block. Uses GLIDE xread. + * + * @param count max entries per stream (null to omit) + * @param block block milliseconds (null to omit) + * @param keysAndIds map of stream key to start id + */ + public Map> xread( + Long count, Long block, Map keysAndIds) { + return executeCommandWithGlide( + "XREAD", + () -> { + StreamReadOptions.StreamReadOptionsBuilder b = StreamReadOptions.builder(); + if (count != null) b.count(count); + if (block != null) b.block(block); + Map> raw = glideClient.xread(keysAndIds, b.build()).get(); + return toStreamReadResponse(raw); + }); + } + + /** Trims the stream by max length. Uses GLIDE xtrim. */ + public long xtrim(String key, long maxLen) { + return executeCommandWithGlide( + "XTRIM", () -> glideClient.xtrim(key, new StreamTrimOptions.MaxLen(maxLen)).get()); + } + + /** Trims the stream by max length (exact or approximate). Uses GLIDE xtrim. */ + public long xtrim(String key, long maxLen, boolean exact) { + return executeCommandWithGlide( + "XTRIM", () -> glideClient.xtrim(key, new StreamTrimOptions.MaxLen(exact, maxLen)).get()); + } + + /** Trims the stream by minimum id. Uses GLIDE xtrim. */ + public long xtrim(String key, String minId) { + return executeCommandWithGlide( + "XTRIM", () -> glideClient.xtrim(key, new StreamTrimOptions.MinId(minId)).get()); + } + + /** + * Trims the stream using XTrimParams. Uses GLIDE xtrim. + * + * @param key stream key + * @param params trim parameters + * @return number of entries deleted + */ + public long xtrim(String key, redis.clients.jedis.params.XTrimParams params) { + return executeCommandWithGlide( + "XTRIM", + () -> { + StreamTrimOptions trimOpts; + boolean exact = params.getExactTrimming() != null && params.getExactTrimming(); + if (params.getMaxLen() != null) { + if (params.getLimit() != null) { + trimOpts = new StreamTrimOptions.MaxLen(params.getMaxLen(), params.getLimit()); + } else { + trimOpts = new StreamTrimOptions.MaxLen(exact, params.getMaxLen()); + } + } else if (params.getMinId() != null) { + if (params.getLimit() != null) { + trimOpts = new StreamTrimOptions.MinId(params.getMinId(), params.getLimit()); + } else { + trimOpts = new StreamTrimOptions.MinId(exact, params.getMinId()); + } + } else { + throw new IllegalArgumentException("XTrimParams must specify either maxLen or minId"); + } + return glideClient.xtrim(key, trimOpts).get(); + }); + } + + /** + * Trims the stream by max length - binary version. Uses GLIDE xtrim. + * + * @param key stream key + * @param maxLen maximum length + * @return number of entries deleted + */ + public long xtrim(byte[] key, long maxLen) { + return executeCommandWithGlide( + "XTRIM", + () -> glideClient.xtrim(new String(key), new StreamTrimOptions.MaxLen(maxLen)).get()); + } + + /** + * Trims the stream by max length (exact or approximate) - binary version. Uses GLIDE xtrim. + * + * @param key stream key + * @param maxLen maximum length + * @param exact if true, trim exactly; if false, trim approximately + * @return number of entries deleted + */ + public long xtrim(byte[] key, long maxLen, boolean exact) { + return executeCommandWithGlide( + "XTRIM", + () -> + glideClient.xtrim(new String(key), new StreamTrimOptions.MaxLen(exact, maxLen)).get()); + } + + /** + * Trims the stream using XTrimParams - binary version. Uses GLIDE xtrim. + * + * @param key stream key + * @param params trim parameters + * @return number of entries deleted + */ + public long xtrim(byte[] key, redis.clients.jedis.params.XTrimParams params) { + return xtrim(new String(key), params); + } + + /** Creates a consumer group. Uses GLIDE xgroupCreate. */ + public String xgroupCreate(String key, String groupName, String id) { + return executeCommandWithGlide( + "XGROUP CREATE", () -> glideClient.xgroupCreate(key, groupName, id).get()); + } + + /** Creates a consumer group, optionally creating the stream. Uses GLIDE xgroupCreate. */ + public String xgroupCreate(String key, String groupName, String id, boolean makeStream) { + return executeCommandWithGlide( + "XGROUP CREATE", + () -> + glideClient + .xgroupCreate( + key, groupName, id, StreamGroupOptions.builder().mkStream(makeStream).build()) + .get()); + } + + /** Destroys a consumer group. Uses GLIDE xgroupDestroy. */ + public boolean xgroupDestroy(String key, String groupName) { + return executeCommandWithGlide( + "XGROUP DESTROY", () -> glideClient.xgroupDestroy(key, groupName).get()); + } + + /** Sets the last delivered id of a group. Uses GLIDE xgroupSetId. */ + public String xgroupSetId(String key, String groupName, String id) { + return executeCommandWithGlide( + "XGROUP SETID", () -> glideClient.xgroupSetId(key, groupName, id).get()); + } + + /** Creates a consumer in the group. Uses GLIDE xgroupCreateConsumer. */ + public boolean xgroupCreateConsumer(String key, String group, String consumer) { + return executeCommandWithGlide( + "XGROUP CREATECONSUMER", + () -> glideClient.xgroupCreateConsumer(key, group, consumer).get()); + } + + /** Deletes a consumer from the group. Uses GLIDE xgroupDelConsumer. */ + public long xgroupDelConsumer(String key, String group, String consumer) { + return executeCommandWithGlide( + "XGROUP DELCONSUMER", () -> glideClient.xgroupDelConsumer(key, group, consumer).get()); + } + + /** + * Reads from streams as a consumer in a group. Uses GLIDE xreadgroup. + * + * @param keysAndIds map of stream key to id (typically ">" for new entries) + */ + public Map> xreadgroup( + String group, String consumer, Map keysAndIds) { + return executeCommandWithGlide( + "XREADGROUP", + () -> { + Map> raw = + glideClient + .xreadgroup(keysAndIds, group, consumer, StreamReadGroupOptions.builder().build()) + .get(); + return toStreamReadResponse(raw); + }); + } + + /** Reads from streams as a consumer with options. Uses GLIDE xreadgroup. */ + public Map> xreadgroup( + String group, + String consumer, + Map keysAndIds, + Long count, + Long block, + boolean noack) { + return executeCommandWithGlide( + "XREADGROUP", + () -> { + StreamReadGroupOptions.StreamReadGroupOptionsBuilder b = StreamReadGroupOptions.builder(); + if (count != null) b.count(count); + if (block != null) b.block(block); + if (noack) b.noack(); + Map> raw = + glideClient.xreadgroup(keysAndIds, group, consumer, b.build()).get(); + return toStreamReadResponse(raw); + }); + } + + /** Acknowledges messages. Uses GLIDE xack. */ + public long xack(String key, String group, String... ids) { + return executeCommandWithGlide("XACK", () -> glideClient.xack(key, group, ids).get()); + } + + /** Acknowledges messages. Uses GLIDE xack. */ + public long xack(String key, String group, StreamEntryID... ids) { + String[] idStrs = new String[ids.length]; + for (int i = 0; i < ids.length; i++) { + idStrs[i] = ids[i].toString(); + } + return executeCommandWithGlide("XACK", () -> glideClient.xack(key, group, idStrs).get()); + } + + /** + * Returns pending summary for the group. Uses GLIDE xpending. Converts response to + * StreamPendingSummary (total, minId, maxId, consumerMessageCount). + */ + public StreamPendingSummary xpending(String key, String group) { + return executeCommandWithGlide( + "XPENDING", + () -> { + Object[] arr = glideClient.xpending(key, group).get(); + if (arr == null || arr.length < 4) { + return new StreamPendingSummary(0L, null, null, Collections.emptyMap()); + } + long total = + arr[0] instanceof Long ? (Long) arr[0] : Long.parseLong(String.valueOf(arr[0])); + String minIdStr = String.valueOf(arr[1]); + String maxIdStr = String.valueOf(arr[2]); + StreamEntryID minId = null; + StreamEntryID maxId = null; + if (minIdStr != null && !minIdStr.isEmpty() && !"null".equals(minIdStr)) { + minId = new StreamEntryID(minIdStr); + } + if (maxIdStr != null && !maxIdStr.isEmpty() && !"null".equals(maxIdStr)) { + maxId = new StreamEntryID(maxIdStr); + } + Map consumerCounts = new HashMap<>(); + if (arr.length > 3 && arr[3] instanceof Object[]) { + for (Object o : (Object[]) arr[3]) { + if (o instanceof Object[] && ((Object[]) o).length >= 2) { + Object[] pair = (Object[]) o; + String name = String.valueOf(pair[0]); + long count = + pair[1] instanceof Long + ? (Long) pair[1] + : Long.parseLong(String.valueOf(pair[1])); + consumerCounts.put(name, count); + } + } + } + return new StreamPendingSummary(total, minId, maxId, consumerCounts); + }); + } + + /** Returns pending entries in range. Uses GLIDE xpending. */ + public List xpending( + String key, String group, StreamRange start, StreamRange end, long count) { + return executeCommandWithGlide( + "XPENDING", + () -> { + Object[][] raw = + glideClient + .xpending(key, group, start, end, count, StreamPendingOptions.builder().build()) + .get(); + if (raw == null) return Collections.emptyList(); + List list = new ArrayList<>(); + for (Object[] row : raw) { + if (row != null && row.length >= 4) { + StreamEntryID id = new StreamEntryID(String.valueOf(row[0])); + String consumer = String.valueOf(row[1]); + long idle = + row[2] instanceof Long ? (Long) row[2] : Long.parseLong(String.valueOf(row[2])); + long delivered = + row[3] instanceof Long ? (Long) row[3] : Long.parseLong(String.valueOf(row[3])); + list.add(new StreamPendingEntry(id, consumer, idle, delivered)); + } + } + return list; + }); + } + + /** Returns pending entries in id range. Uses GLIDE xpending. */ + public List xpending( + String key, String group, String start, String end, long count) { + StreamRange s = + "-".equals(start) ? StreamRange.InfRangeBound.MIN : StreamRange.IdBound.of(start); + StreamRange e = "+".equals(end) ? StreamRange.InfRangeBound.MAX : StreamRange.IdBound.of(end); + return xpending(key, group, s, e, count); + } + + /** Claims pending messages. Uses GLIDE xclaim. */ + public List xclaim( + String key, String group, String consumer, long minIdleTime, String... ids) { + return executeCommandWithGlide( + "XCLAIM", + () -> { + Map raw = + glideClient.xclaim(key, group, consumer, minIdleTime, ids).get(); + return toStreamEntryList(raw); + }); + } + + /** Claims pending messages with options. Uses GLIDE xclaim. */ + public List xclaim( + String key, + String group, + String consumer, + long minIdleTime, + StreamClaimOptions options, + String... ids) { + return executeCommandWithGlide( + "XCLAIM", + () -> { + Map raw = + glideClient.xclaim(key, group, consumer, minIdleTime, ids, options).get(); + return toStreamEntryList(raw); + }); + } + + /** + * Auto-claims pending messages. Uses GLIDE xautoclaim. Returns Object[]: [String nextStartId, + * List of StreamEntry claimed]. + */ + public Object[] xautoclaim( + String key, String group, String consumer, long minIdleTime, String start) { + return executeCommandWithGlide( + "XAUTOCLAIM", () -> glideClient.xautoclaim(key, group, consumer, minIdleTime, start).get()); + } + + /** Auto-claims pending messages with count. Uses GLIDE xautoclaim. */ + public Object[] xautoclaim( + String key, String group, String consumer, long minIdleTime, String start, long count) { + return executeCommandWithGlide( + "XAUTOCLAIM", + () -> glideClient.xautoclaim(key, group, consumer, minIdleTime, start, count).get()); + } + + /** + * Returns stream info. Uses GLIDE xinfoStream. Returns raw Map; for StreamInfo use {@link + * #xinfoStreamAsInfo(String)}. + */ + public Map xinfoStream(String key) { + return executeCommandWithGlide("XINFO STREAM", () -> glideClient.xinfoStream(key).get()); + } + + /** Returns stream info as StreamInfo. Uses GLIDE xinfoStream and converts response. */ + public StreamInfo xinfoStreamAsInfo(String key) { + return executeCommandWithGlide( + "XINFO STREAM", + () -> { + Map raw = glideClient.xinfoStream(key).get(); + if (raw == null) return null; + Map converted = new HashMap<>(raw); + Object lastId = raw.get(StreamInfo.LAST_GENERATED_ID); + if (lastId instanceof String) { + converted.put(StreamInfo.LAST_GENERATED_ID, new StreamEntryID((String) lastId)); + } + Object firstEntry = raw.get(StreamInfo.FIRST_ENTRY); + if (firstEntry != null) { + converted.put(StreamInfo.FIRST_ENTRY, parseStreamEntryFromInfo(firstEntry)); + } + Object lastEntry = raw.get(StreamInfo.LAST_ENTRY); + if (lastEntry != null) { + converted.put(StreamInfo.LAST_ENTRY, parseStreamEntryFromInfo(lastEntry)); + } + return new StreamInfo(converted); + }); + } + + private static StreamEntry parseStreamEntryFromInfo(Object entry) { + if (entry == null) return null; + if (entry instanceof StreamEntry) return (StreamEntry) entry; + if (entry instanceof Object[]) { + Object[] arr = (Object[]) entry; + if (arr.length >= 2) { + String id = String.valueOf(arr[0]); + Map fields = new HashMap<>(); + Object second = arr[1]; + if (second instanceof Object[]) { + Object[] pairs = (Object[]) second; + for (int i = 0; i + 1 < pairs.length; i += 2) { + fields.put(String.valueOf(pairs[i]), String.valueOf(pairs[i + 1])); + } + } else if (second instanceof String[]) { + String[] pairs = (String[]) second; + for (int i = 0; i + 1 < pairs.length; i += 2) { + fields.put(pairs[i], pairs[i + 1]); + } + } + return new StreamEntry(new StreamEntryID(id), fields); + } + } + return null; + } + + /** Returns consumer groups info. Uses GLIDE xinfoGroups. */ + public List xinfoGroups(String key) { + return executeCommandWithGlide( + "XINFO GROUPS", + () -> { + Map[] raw = glideClient.xinfoGroups(key).get(); + if (raw == null) return Collections.emptyList(); + List list = new ArrayList<>(); + for (Map m : raw) { + Map converted = new HashMap<>(m); + Object lastDelivered = m.get(StreamGroupInfo.LAST_DELIVERED); + if (lastDelivered instanceof String) { + converted.put( + StreamGroupInfo.LAST_DELIVERED, new StreamEntryID((String) lastDelivered)); + } + list.add(new StreamGroupInfo(converted)); + } + return list; + }); + } + + /** Returns consumers info for a group. Uses GLIDE xinfoConsumers. */ + public List xinfoConsumers(String key, String groupName) { + return executeCommandWithGlide( + "XINFO CONSUMERS", + () -> { + Map[] raw = glideClient.xinfoConsumers(key, groupName).get(); + if (raw == null) return Collections.emptyList(); + List list = new ArrayList<>(); + for (Map m : raw) { + list.add(new StreamConsumerInfo(m)); + } + return list; + }); + } + // ===== MISSING METHODS FOR Valkey JDBC DRIVER COMPATIBILITY ===== /** diff --git a/java/jedis-compatibility/src/main/java/redis/clients/jedis/params/XAddParams.java b/java/jedis-compatibility/src/main/java/redis/clients/jedis/params/XAddParams.java new file mode 100644 index 00000000000..54854938fc9 --- /dev/null +++ b/java/jedis-compatibility/src/main/java/redis/clients/jedis/params/XAddParams.java @@ -0,0 +1,162 @@ +/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */ +package redis.clients.jedis.params; + +import redis.clients.jedis.StreamEntryID; + +/** + * Parameters for XADD command in Jedis compatibility layer. Provides a fluent API for setting + * stream entry ID, stream creation behavior, and trimming options. + */ +public class XAddParams { + + private String id; + private Boolean makeStream; + private Long maxLen; + private String minId; + private Boolean exactTrimming; + private Long limit; + + public static XAddParams xAddParams() { + return new XAddParams(); + } + + /** + * Set the entry ID explicitly. Use "*" for auto-generation. + * + * @param id the entry ID + * @return this + */ + public XAddParams id(String id) { + this.id = id; + return this; + } + + /** + * Set the entry ID explicitly using StreamEntryID. Use "*" for auto-generation. + * + * @param id the entry ID + * @return this + */ + public XAddParams id(StreamEntryID id) { + this.id = id != null ? id.toString() : "*"; + return this; + } + + /** + * If set to false, the stream won't be created if it doesn't exist. Equivalent to NOMKSTREAM. + * + * @return this + */ + public XAddParams noMkStream() { + this.makeStream = false; + return this; + } + + /** + * Trim the stream to approximately the specified maximum length using MAXLEN ~ threshold. + * + * @param maxLen maximum length + * @return this + */ + public XAddParams maxLen(long maxLen) { + this.maxLen = maxLen; + this.exactTrimming = false; + return this; + } + + /** + * Trim the stream to exactly the specified maximum length using MAXLEN = threshold. + * + * @param maxLen maximum length + * @return this + */ + public XAddParams maxLenExact(long maxLen) { + this.maxLen = maxLen; + this.exactTrimming = true; + return this; + } + + /** + * Trim entries with IDs lower than minId using MINID ~ threshold. + * + * @param minId minimum ID threshold + * @return this + */ + public XAddParams minId(String minId) { + this.minId = minId; + this.exactTrimming = false; + return this; + } + + /** + * Trim entries with IDs lower than minId using MINID ~ threshold. + * + * @param minId minimum ID threshold + * @return this + */ + public XAddParams minId(StreamEntryID minId) { + this.minId = minId != null ? minId.toString() : null; + this.exactTrimming = false; + return this; + } + + /** + * Trim entries with IDs lower than minId using MINID = threshold (exact). + * + * @param minId minimum ID threshold + * @return this + */ + public XAddParams minIdExact(String minId) { + this.minId = minId; + this.exactTrimming = true; + return this; + } + + /** + * Trim entries with IDs lower than minId using MINID = threshold (exact). + * + * @param minId minimum ID threshold + * @return this + */ + public XAddParams minIdExact(StreamEntryID minId) { + this.minId = minId != null ? minId.toString() : null; + this.exactTrimming = true; + return this; + } + + /** + * Set the LIMIT count for trimming. + * + * @param limit maximum number of entries to trim + * @return this + */ + public XAddParams limit(long limit) { + this.limit = limit; + return this; + } + + // Getters for internal use + public String getId() { + return id; + } + + public Boolean getMakeStream() { + return makeStream; + } + + public Long getMaxLen() { + return maxLen; + } + + public String getMinId() { + return minId; + } + + public Boolean getExactTrimming() { + return exactTrimming; + } + + public Long getLimit() { + return limit; + } +} diff --git a/java/jedis-compatibility/src/main/java/redis/clients/jedis/params/XTrimParams.java b/java/jedis-compatibility/src/main/java/redis/clients/jedis/params/XTrimParams.java new file mode 100644 index 00000000000..72e01813915 --- /dev/null +++ b/java/jedis-compatibility/src/main/java/redis/clients/jedis/params/XTrimParams.java @@ -0,0 +1,120 @@ +/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */ +package redis.clients.jedis.params; + +import redis.clients.jedis.StreamEntryID; + +/** + * Parameters for XTRIM command in Jedis compatibility layer. Provides a fluent API for trimming + * streams by maximum length or minimum ID. + */ +public class XTrimParams { + + private Long maxLen; + private String minId; + private Boolean exactTrimming; + private Long limit; + + public static XTrimParams xTrimParams() { + return new XTrimParams(); + } + + /** + * Trim the stream to approximately the specified maximum length using MAXLEN ~ threshold. + * + * @param maxLen maximum length + * @return this + */ + public XTrimParams maxLen(long maxLen) { + this.maxLen = maxLen; + this.exactTrimming = false; + return this; + } + + /** + * Trim the stream to exactly the specified maximum length using MAXLEN = threshold. + * + * @param maxLen maximum length + * @return this + */ + public XTrimParams maxLenExact(long maxLen) { + this.maxLen = maxLen; + this.exactTrimming = true; + return this; + } + + /** + * Trim entries with IDs lower than minId using MINID ~ threshold. + * + * @param minId minimum ID threshold + * @return this + */ + public XTrimParams minId(String minId) { + this.minId = minId; + this.exactTrimming = false; + return this; + } + + /** + * Trim entries with IDs lower than minId using MINID ~ threshold. + * + * @param minId minimum ID threshold + * @return this + */ + public XTrimParams minId(StreamEntryID minId) { + this.minId = minId != null ? minId.toString() : null; + this.exactTrimming = false; + return this; + } + + /** + * Trim entries with IDs lower than minId using MINID = threshold (exact). + * + * @param minId minimum ID threshold + * @return this + */ + public XTrimParams minIdExact(String minId) { + this.minId = minId; + this.exactTrimming = true; + return this; + } + + /** + * Trim entries with IDs lower than minId using MINID = threshold (exact). + * + * @param minId minimum ID threshold + * @return this + */ + public XTrimParams minIdExact(StreamEntryID minId) { + this.minId = minId != null ? minId.toString() : null; + this.exactTrimming = true; + return this; + } + + /** + * Set the LIMIT count for trimming. + * + * @param limit maximum number of entries to trim + * @return this + */ + public XTrimParams limit(long limit) { + this.limit = limit; + return this; + } + + // Getters for internal use + public Long getMaxLen() { + return maxLen; + } + + public String getMinId() { + return minId; + } + + public Boolean getExactTrimming() { + return exactTrimming; + } + + public Long getLimit() { + return limit; + } +} diff --git a/java/jedis-compatibility/src/test/java/redis/clients/jedis/JedisMethodsTest.java b/java/jedis-compatibility/src/test/java/redis/clients/jedis/JedisMethodsTest.java index dcd01ecf7aa..2f0d2d8a277 100644 --- a/java/jedis-compatibility/src/test/java/redis/clients/jedis/JedisMethodsTest.java +++ b/java/jedis-compatibility/src/test/java/redis/clients/jedis/JedisMethodsTest.java @@ -5,10 +5,13 @@ import java.lang.reflect.Method; import java.util.List; +import java.util.Map; import java.util.Set; import org.junit.jupiter.api.Test; import redis.clients.jedis.resps.AccessControlUser; import redis.clients.jedis.resps.FunctionStats; +import redis.clients.jedis.resps.StreamInfo; +import redis.clients.jedis.resps.StreamPendingSummary; /** * Unit tests for Jedis method signatures and API contracts. Tests that required methods exist with @@ -345,4 +348,261 @@ public void testAclMethodSignatures() throws NoSuchMethodException { jedisClass.getMethod("aclDryRun", String.class, String.class, String[].class); assertEquals(String.class, aclDryRun.getReturnType()); } + + @Test + public void testStreamMethodSignaturesExist() throws NoSuchMethodException { + Class jedisClass = Jedis.class; + + // XADD + assertNotNull(jedisClass.getMethod("xadd", String.class, Map.class)); + assertNotNull(jedisClass.getMethod("xadd", String.class, StreamEntryID.class, Map.class)); + assertNotNull(jedisClass.getMethod("xadd", String.class, Map.class, boolean.class)); + + // XLEN, XDEL + assertNotNull(jedisClass.getMethod("xlen", String.class)); + assertNotNull(jedisClass.getMethod("xdel", String.class, String[].class)); + assertNotNull(jedisClass.getMethod("xdel", String.class, StreamEntryID[].class)); + + // XRANGE, XREVRANGE + assertNotNull(jedisClass.getMethod("xrange", String.class, String.class, String.class)); + assertNotNull( + jedisClass.getMethod("xrange", String.class, String.class, String.class, long.class)); + assertNotNull(jedisClass.getMethod("xrevrange", String.class, String.class, String.class)); + assertNotNull( + jedisClass.getMethod("xrevrange", String.class, String.class, String.class, long.class)); + + // XREAD + assertNotNull(jedisClass.getMethod("xread", Map.class)); + assertNotNull(jedisClass.getMethod("xread", Long.class, Long.class, Map.class)); + + // XTRIM + assertNotNull(jedisClass.getMethod("xtrim", String.class, long.class)); + assertNotNull(jedisClass.getMethod("xtrim", String.class, long.class, boolean.class)); + assertNotNull(jedisClass.getMethod("xtrim", String.class, String.class)); + + // XGROUP + assertNotNull(jedisClass.getMethod("xgroupCreate", String.class, String.class, String.class)); + assertNotNull( + jedisClass.getMethod( + "xgroupCreate", String.class, String.class, String.class, boolean.class)); + assertNotNull(jedisClass.getMethod("xgroupDestroy", String.class, String.class)); + assertNotNull(jedisClass.getMethod("xgroupSetId", String.class, String.class, String.class)); + assertNotNull( + jedisClass.getMethod("xgroupCreateConsumer", String.class, String.class, String.class)); + assertNotNull( + jedisClass.getMethod("xgroupDelConsumer", String.class, String.class, String.class)); + + // XREADGROUP, XACK + assertNotNull(jedisClass.getMethod("xreadgroup", String.class, String.class, Map.class)); + assertNotNull(jedisClass.getMethod("xack", String.class, String.class, String[].class)); + assertNotNull(jedisClass.getMethod("xack", String.class, String.class, StreamEntryID[].class)); + + // XPENDING + assertNotNull(jedisClass.getMethod("xpending", String.class, String.class)); + assertNotNull( + jedisClass.getMethod( + "xpending", + String.class, + String.class, + glide.api.models.commands.stream.StreamRange.class, + glide.api.models.commands.stream.StreamRange.class, + long.class)); + assertNotNull( + jedisClass.getMethod( + "xpending", String.class, String.class, String.class, String.class, long.class)); + + // XCLAIM, XAUTOCLAIM + assertNotNull( + jedisClass.getMethod( + "xclaim", String.class, String.class, String.class, long.class, String[].class)); + assertNotNull( + jedisClass.getMethod( + "xautoclaim", String.class, String.class, String.class, long.class, String.class)); + assertNotNull( + jedisClass.getMethod( + "xautoclaim", + String.class, + String.class, + String.class, + long.class, + String.class, + long.class)); + + // XINFO + assertNotNull(jedisClass.getMethod("xinfoStream", String.class)); + assertNotNull(jedisClass.getMethod("xinfoStreamAsInfo", String.class)); + assertNotNull(jedisClass.getMethod("xinfoGroups", String.class)); + assertNotNull(jedisClass.getMethod("xinfoConsumers", String.class, String.class)); + } + + @Test + public void testStreamMethodReturnTypes() throws NoSuchMethodException { + Class jedisClass = Jedis.class; + + Method xadd = jedisClass.getMethod("xadd", String.class, Map.class); + assertEquals(StreamEntryID.class, xadd.getReturnType()); + + Method xlen = jedisClass.getMethod("xlen", String.class); + assertEquals(long.class, xlen.getReturnType()); + + Method xrange = jedisClass.getMethod("xrange", String.class, String.class, String.class); + assertEquals(List.class, xrange.getReturnType()); + + Method xread = jedisClass.getMethod("xread", Map.class); + assertEquals(Map.class, xread.getReturnType()); + + Method xpendingSummary = jedisClass.getMethod("xpending", String.class, String.class); + assertEquals(StreamPendingSummary.class, xpendingSummary.getReturnType()); + + Method xinfoStream = jedisClass.getMethod("xinfoStream", String.class); + assertEquals(Map.class, xinfoStream.getReturnType()); + + Method xinfoStreamAsInfo = jedisClass.getMethod("xinfoStreamAsInfo", String.class); + assertEquals(StreamInfo.class, xinfoStreamAsInfo.getReturnType()); + + Method xinfoGroups = jedisClass.getMethod("xinfoGroups", String.class); + assertEquals(List.class, xinfoGroups.getReturnType()); + + Method xinfoConsumers = jedisClass.getMethod("xinfoConsumers", String.class, String.class); + assertEquals(List.class, xinfoConsumers.getReturnType()); + } + + @Test + public void testStreamBinaryMethodSignaturesExist() throws NoSuchMethodException { + Class jedisClass = Jedis.class; + + // XADD binary with XAddParams + assertNotNull( + jedisClass.getMethod( + "xadd", byte[].class, redis.clients.jedis.params.XAddParams.class, Map.class)); + + // XLEN binary + assertNotNull(jedisClass.getMethod("xlen", byte[].class)); + + // XDEL binary + assertNotNull(jedisClass.getMethod("xdel", byte[].class, byte[][].class)); + + // XRANGE binary + assertNotNull(jedisClass.getMethod("xrange", byte[].class, byte[].class, byte[].class)); + assertNotNull( + jedisClass.getMethod("xrange", byte[].class, byte[].class, byte[].class, int.class)); + + // XREVRANGE binary + assertNotNull(jedisClass.getMethod("xrevrange", byte[].class, byte[].class, byte[].class)); + assertNotNull( + jedisClass.getMethod("xrevrange", byte[].class, byte[].class, byte[].class, int.class)); + + // XTRIM binary + assertNotNull(jedisClass.getMethod("xtrim", byte[].class, long.class)); + assertNotNull(jedisClass.getMethod("xtrim", byte[].class, long.class, boolean.class)); + assertNotNull( + jedisClass.getMethod("xtrim", byte[].class, redis.clients.jedis.params.XTrimParams.class)); + } + + @Test + public void testStreamBinaryMethodReturnTypes() throws NoSuchMethodException { + Class jedisClass = Jedis.class; + + // XADD binary returns byte[] + Method xaddBinary = + jedisClass.getMethod( + "xadd", byte[].class, redis.clients.jedis.params.XAddParams.class, Map.class); + assertEquals(byte[].class, xaddBinary.getReturnType()); + + // XLEN binary returns long + Method xlenBinary = jedisClass.getMethod("xlen", byte[].class); + assertEquals(long.class, xlenBinary.getReturnType()); + + // XDEL binary returns long + Method xdelBinary = jedisClass.getMethod("xdel", byte[].class, byte[][].class); + assertEquals(long.class, xdelBinary.getReturnType()); + + // XRANGE binary returns List + Method xrangeBinary = jedisClass.getMethod("xrange", byte[].class, byte[].class, byte[].class); + assertEquals(List.class, xrangeBinary.getReturnType()); + + // XREVRANGE binary returns List + Method xrevrangeBinary = + jedisClass.getMethod("xrevrange", byte[].class, byte[].class, byte[].class); + assertEquals(List.class, xrevrangeBinary.getReturnType()); + + // XTRIM binary returns long + Method xtrimBinary = jedisClass.getMethod("xtrim", byte[].class, long.class); + assertEquals(long.class, xtrimBinary.getReturnType()); + } + + @Test + public void testXAddParamsMethodSignatures() throws NoSuchMethodException { + Class jedisClass = Jedis.class; + + // XADD with XAddParams for String keys + assertNotNull( + jedisClass.getMethod( + "xadd", String.class, redis.clients.jedis.params.XAddParams.class, Map.class)); + + Method xaddWithParams = + jedisClass.getMethod( + "xadd", String.class, redis.clients.jedis.params.XAddParams.class, Map.class); + assertEquals(StreamEntryID.class, xaddWithParams.getReturnType()); + } + + @Test + public void testXTrimParamsMethodSignatures() throws NoSuchMethodException { + Class jedisClass = Jedis.class; + + // XTRIM with XTrimParams for String keys + assertNotNull( + jedisClass.getMethod("xtrim", String.class, redis.clients.jedis.params.XTrimParams.class)); + + Method xtrimWithParams = + jedisClass.getMethod("xtrim", String.class, redis.clients.jedis.params.XTrimParams.class); + assertEquals(long.class, xtrimWithParams.getReturnType()); + } + + @Test + public void testXAddParamsClassExists() { + // Test that XAddParams class exists and has expected methods + assertDoesNotThrow( + () -> { + Class xAddParamsClass = redis.clients.jedis.params.XAddParams.class; + assertNotNull(xAddParamsClass); + + // Check factory method exists + assertNotNull(xAddParamsClass.getMethod("xAddParams")); + + // Check builder methods exist + assertNotNull(xAddParamsClass.getMethod("id", String.class)); + assertNotNull(xAddParamsClass.getMethod("id", StreamEntryID.class)); + assertNotNull(xAddParamsClass.getMethod("noMkStream")); + assertNotNull(xAddParamsClass.getMethod("maxLen", long.class)); + assertNotNull(xAddParamsClass.getMethod("maxLenExact", long.class)); + assertNotNull(xAddParamsClass.getMethod("minId", String.class)); + assertNotNull(xAddParamsClass.getMethod("minId", StreamEntryID.class)); + assertNotNull(xAddParamsClass.getMethod("minIdExact", String.class)); + assertNotNull(xAddParamsClass.getMethod("minIdExact", StreamEntryID.class)); + assertNotNull(xAddParamsClass.getMethod("limit", long.class)); + }); + } + + @Test + public void testXTrimParamsClassExists() { + // Test that XTrimParams class exists and has expected methods + assertDoesNotThrow( + () -> { + Class xTrimParamsClass = redis.clients.jedis.params.XTrimParams.class; + assertNotNull(xTrimParamsClass); + + // Check factory method exists + assertNotNull(xTrimParamsClass.getMethod("xTrimParams")); + + // Check builder methods exist + assertNotNull(xTrimParamsClass.getMethod("maxLen", long.class)); + assertNotNull(xTrimParamsClass.getMethod("maxLenExact", long.class)); + assertNotNull(xTrimParamsClass.getMethod("minId", String.class)); + assertNotNull(xTrimParamsClass.getMethod("minId", StreamEntryID.class)); + assertNotNull(xTrimParamsClass.getMethod("minIdExact", String.class)); + assertNotNull(xTrimParamsClass.getMethod("minIdExact", StreamEntryID.class)); + assertNotNull(xTrimParamsClass.getMethod("limit", long.class)); + }); + } }