Skip to content
Open
313 changes: 313 additions & 0 deletions java/integTest/src/test/java/compatibility/jedis/JedisTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisClientConfig;
import redis.clients.jedis.Protocol;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.args.BitOP;
import redis.clients.jedis.args.ExpiryOption;
import redis.clients.jedis.args.ListDirection;
Expand All @@ -38,6 +39,12 @@
import redis.clients.jedis.resps.AccessControlLogEntry;
import redis.clients.jedis.resps.AccessControlUser;
import redis.clients.jedis.resps.ScanResult;
import redis.clients.jedis.resps.StreamConsumerInfo;
import redis.clients.jedis.resps.StreamEntry;
import redis.clients.jedis.resps.StreamGroupInfo;
import redis.clients.jedis.resps.StreamInfo;
import redis.clients.jedis.resps.StreamPendingEntry;
import redis.clients.jedis.resps.StreamPendingSummary;
import redis.clients.jedis.util.KeyValue;

/**
Expand Down Expand Up @@ -3376,6 +3383,312 @@ void list_edge_cases() {
0, jedis.rpushx("non_existent_key", "value"), "RPUSHX on non-existent key should return 0");
}

// ========== STREAM COMMANDS ==========

@Test
void stream_xadd_xlen_xdel() {
String key = "stream:" + UUID.randomUUID();
Map<String, String> hash = new HashMap<>();
hash.put("f1", "v1");
hash.put("f2", "v2");

StreamEntryID id = jedis.xadd(key, hash);
assertNotNull(id, "XADD should return entry ID");
assertTrue(id.getTime() > 0 || id.getSequence() >= 0, "XADD should return valid ID");

long len = jedis.xlen(key);
assertEquals(1, len, "XLEN should be 1 after one XADD");

long del = jedis.xdel(key, id.toString());
assertEquals(1, del, "XDEL should return 1");
assertEquals(0, jedis.xlen(key), "XLEN should be 0 after XDEL");
}

@Test
void stream_xrange_xrevrange() {
String key = "stream:" + UUID.randomUUID();
jedis.xadd(key, Map.of("a", "1"));
jedis.xadd(key, Map.of("b", "2"));
jedis.xadd(key, Map.of("c", "3"));

List<StreamEntry> range = jedis.xrange(key, "-", "+");
assertNotNull(range);
assertTrue(range.size() >= 3, "XRANGE should return at least 3 entries");

List<StreamEntry> rev = jedis.xrevrange(key, "+", "-");
assertNotNull(rev);
assertTrue(rev.size() >= 3, "XREVRANGE should return at least 3 entries");

List<StreamEntry> limited = jedis.xrange(key, "-", "+", 2L);
assertNotNull(limited);
assertEquals(2, limited.size(), "XRANGE with COUNT 2 should return 2 entries");
}

@Test
void stream_xread() {
String key = "stream:" + UUID.randomUUID();
jedis.xadd(key, Map.of("x", "1"));

Map<String, String> keysAndIds = new HashMap<>();
keysAndIds.put(key, "0-0");

Map<String, List<StreamEntry>> result = jedis.xread(keysAndIds);
assertNotNull(result);
assertTrue(result.containsKey(key));
assertFalse(result.get(key).isEmpty(), "XREAD should return entries from start");
}

@Test
void stream_xtrim() {
String key = "stream:" + UUID.randomUUID();
for (int i = 0; i < 10; i++) {
jedis.xadd(key, Map.of("i", String.valueOf(i)));
}
long lenBefore = jedis.xlen(key);
assertTrue(lenBefore >= 10, "Stream should have at least 10 entries");

long trimmed = jedis.xtrim(key, 5L);
assertTrue(trimmed >= 0, "XTRIM should return non-negative count");
long lenAfter = jedis.xlen(key);
assertTrue(lenAfter <= 5, "Stream length after XTRIM MAXLEN 5 should be <= 5");
}

@Test
void stream_xgroup_create_destroy() {
String key = "stream:" + UUID.randomUUID();
jedis.xadd(key, Map.of("init", "1"));

String group = "g1";
String createResult = jedis.xgroupCreate(key, group, "0", true);
assertNotNull(createResult, "XGROUP CREATE should succeed");

boolean destroyed = jedis.xgroupDestroy(key, group);
assertTrue(destroyed, "XGROUP DESTROY should return true");
}

@Test
void stream_xgroup_setid_createconsumer_delconsumer() {
String key = "stream:" + UUID.randomUUID();
jedis.xadd(key, Map.of("a", "1"));
String group = "g2";
jedis.xgroupCreate(key, group, "0", true);

String setidResult = jedis.xgroupSetId(key, group, "0");
assertNotNull(setidResult);

boolean created = jedis.xgroupCreateConsumer(key, group, "c1");
assertTrue(created, "XGROUP CREATECONSUMER should succeed");

long del = jedis.xgroupDelConsumer(key, group, "c1");
assertTrue(del >= 0, "XGROUP DELCONSUMER should return non-negative");
}

@Test
void stream_xreadgroup_xack() {
String key = "stream:" + UUID.randomUUID();
jedis.xadd(key, Map.of("m1", "v1"));
jedis.xadd(key, Map.of("m2", "v2"));
String group = "g3";
jedis.xgroupCreate(key, group, "0", true);

Map<String, String> keysAndIds = new HashMap<>();
keysAndIds.put(key, ">");

Map<String, List<StreamEntry>> read = jedis.xreadgroup(group, "consumer1", keysAndIds);
assertNotNull(read);
assertTrue(read.containsKey(key));
List<StreamEntry> entries = read.get(key);
assertNotNull(entries);
assertFalse(entries.isEmpty(), "XREADGROUP should return entries");

String[] ids = entries.stream().map(e -> e.getID().toString()).toArray(String[]::new);
long ack = jedis.xack(key, group, ids);
assertEquals(entries.size(), ack, "XACK should acknowledge all read entries");
}

@Test
void stream_xpending() {
String key = "stream:" + UUID.randomUUID();
jedis.xadd(key, Map.of("p", "1"));
String group = "g4";
jedis.xgroupCreate(key, group, "0", true);

StreamPendingSummary summary = jedis.xpending(key, group);
assertNotNull(summary);
assertTrue(summary.getTotal() >= 0, "XPENDING summary total should be non-negative");

List<StreamPendingEntry> pending = jedis.xpending(key, group, "-", "+", 10L);
assertNotNull(pending);
}

@Test
void stream_xinfo_stream_and_groups() {
String key = "stream:" + UUID.randomUUID();
jedis.xadd(key, Map.of("i", "1"));
jedis.xgroupCreate(key, "ginfo", "0", true);

Map<String, Object> raw = jedis.xinfoStream(key);
assertNotNull(raw);
assertTrue(raw.containsKey("length") || raw.containsKey("last-generated-id"));

StreamInfo info = jedis.xinfoStreamAsInfo(key);
assertNotNull(info);
assertTrue(info.getLength() >= 1, "Stream should have at least 1 entry");

List<StreamGroupInfo> groups = jedis.xinfoGroups(key);
assertNotNull(groups);
assertFalse(groups.isEmpty(), "XINFO GROUPS should return at least one group");
}

@Test
void stream_xinfo_consumers() {
String key = "stream:" + UUID.randomUUID();
jedis.xadd(key, Map.of("c", "1"));
String group = "gcons";
jedis.xgroupCreate(key, group, "0", true);
jedis.xgroupCreateConsumer(key, group, "consumer1");

List<StreamConsumerInfo> consumers = jedis.xinfoConsumers(key, group);
assertNotNull(consumers);
assertFalse(consumers.isEmpty(), "XINFO CONSUMERS should return at least one consumer");
}

@Test
void stream_binary_xlen_xdel() {
byte[] key = ("stream:" + UUID.randomUUID()).getBytes();
Map<byte[], byte[]> hash = new HashMap<>();
hash.put("f1".getBytes(), "v1".getBytes());
hash.put("f2".getBytes(), "v2".getBytes());

// Use XAddParams to add entry
redis.clients.jedis.params.XAddParams params =
redis.clients.jedis.params.XAddParams.xAddParams();
byte[] id = jedis.xadd(key, params, hash);
assertNotNull(id, "Binary XADD should return entry ID");

long len = jedis.xlen(key);
assertEquals(1, len, "Binary XLEN should be 1 after one XADD");

long del = jedis.xdel(key, id);
assertEquals(1, del, "Binary XDEL should return 1");
assertEquals(0, jedis.xlen(key), "Binary XLEN should be 0 after XDEL");
}

@Test
void stream_binary_xrange_xrevrange() {
byte[] key = ("stream:" + UUID.randomUUID()).getBytes();

// Add entries using XAddParams
redis.clients.jedis.params.XAddParams params =
redis.clients.jedis.params.XAddParams.xAddParams();
Map<byte[], byte[]> hash1 = Map.of("a".getBytes(), "1".getBytes());
Map<byte[], byte[]> hash2 = Map.of("b".getBytes(), "2".getBytes());
Map<byte[], byte[]> hash3 = Map.of("c".getBytes(), "3".getBytes());

jedis.xadd(key, params, hash1);
jedis.xadd(key, params, hash2);
jedis.xadd(key, params, hash3);

List<StreamEntry> range = jedis.xrange(key, "-".getBytes(), "+".getBytes());
assertNotNull(range);
assertTrue(range.size() >= 3, "Binary XRANGE should return at least 3 entries");

List<StreamEntry> rev = jedis.xrevrange(key, "+".getBytes(), "-".getBytes());
assertNotNull(rev);
assertTrue(rev.size() >= 3, "Binary XREVRANGE should return at least 3 entries");

List<StreamEntry> limited = jedis.xrange(key, "-".getBytes(), "+".getBytes(), 2);
assertNotNull(limited);
assertEquals(2, limited.size(), "Binary XRANGE with COUNT 2 should return 2 entries");
}

@Test
void stream_binary_xtrim() {
byte[] key = ("stream:" + UUID.randomUUID()).getBytes();
redis.clients.jedis.params.XAddParams addParams =
redis.clients.jedis.params.XAddParams.xAddParams();

for (int i = 0; i < 10; i++) {
Map<byte[], byte[]> hash = Map.of("i".getBytes(), String.valueOf(i).getBytes());
jedis.xadd(key, addParams, hash);
}
long lenBefore = jedis.xlen(key);
assertTrue(lenBefore >= 10, "Stream should have at least 10 entries");

long trimmed = jedis.xtrim(key, 5L);
assertTrue(trimmed >= 0, "Binary XTRIM should return non-negative count");
long lenAfter = jedis.xlen(key);
assertTrue(lenAfter <= 5, "Stream length after binary XTRIM MAXLEN 5 should be <= 5");
}

@Test
void stream_xadd_with_xaddparams() {
String key = "stream:" + UUID.randomUUID();
Map<String, String> hash = Map.of("field", "value");

// Test with custom ID
redis.clients.jedis.params.XAddParams params =
redis.clients.jedis.params.XAddParams.xAddParams().id("1000-0");
StreamEntryID id = jedis.xadd(key, params, hash);
assertNotNull(id);
assertEquals("1000-0", id.toString());

// Test with NOMKSTREAM
String nonExistentKey = "stream:" + UUID.randomUUID();
params = redis.clients.jedis.params.XAddParams.xAddParams().noMkStream();
StreamEntryID result = jedis.xadd(nonExistentKey, params, hash);
assertNull(result, "XADD with NOMKSTREAM should return null for non-existent stream");

// Test with MAXLEN trimming (exact)
String trimKey = "stream:" + UUID.randomUUID();
for (int i = 0; i < 5; i++) {
jedis.xadd(trimKey, Map.of("i", String.valueOf(i)));
}
params = redis.clients.jedis.params.XAddParams.xAddParams().maxLenExact(3);
jedis.xadd(trimKey, params, Map.of("new", "entry"));
long len = jedis.xlen(trimKey);
assertTrue(len <= 3, "Stream should be trimmed to exactly 3 entries");
}

@Test
void stream_xtrim_with_xtrimparams() {
String key = "stream:" + UUID.randomUUID();
for (int i = 0; i < 10; i++) {
jedis.xadd(key, Map.of("i", String.valueOf(i)));
}

// Test MAXLEN with XTrimParams (exact trimming)
redis.clients.jedis.params.XTrimParams params =
redis.clients.jedis.params.XTrimParams.xTrimParams().maxLenExact(5);
long trimmed = jedis.xtrim(key, params);
assertTrue(trimmed >= 0, "XTRIM with XTrimParams should return non-negative count");
long len = jedis.xlen(key);
assertTrue(len <= 5, "Stream should be trimmed to exactly 5 entries");

// Test MAXLEN with approximate trimming
String key3 = "stream:" + UUID.randomUUID();
for (int i = 0; i < 10; i++) {
jedis.xadd(key3, Map.of("i", String.valueOf(i)));
}
params = redis.clients.jedis.params.XTrimParams.xTrimParams().maxLen(5);
jedis.xtrim(key3, params);
len = jedis.xlen(key3);
assertTrue(
len <= 10 && len >= 5,
"Stream with approximate trim should be reduced but may not be exact");

// Test MINID with XTrimParams (exact trimming)
String key2 = "stream:" + UUID.randomUUID();
StreamEntryID firstId = jedis.xadd(key2, Map.of("a", "1"));
jedis.xadd(key2, Map.of("b", "2"));
StreamEntryID thirdId = jedis.xadd(key2, Map.of("c", "3"));

params = redis.clients.jedis.params.XTrimParams.xTrimParams().minIdExact(thirdId);
jedis.xtrim(key2, params);
len = jedis.xlen(key2);
assertTrue(len == 1, "Stream should be trimmed to exactly entries >= minId");
}

// --- ACL command integration tests ---

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ blockingSocketTimeoutMillis
- ✅ List operations (LPUSH, RPUSH, LPOP, RPOP)
- ✅ Set operations (SADD, SREM, SMEMBERS, SCARD, SISMEMBER, SMISMEMBER, SPOP, SRANDMEMBER, SMOVE, SINTER, SINTERCARD, SINTERSTORE, SUNION, SUNIONSTORE, SDIFF, SDIFFSTORE, SSCAN) via type-safe methods
- ⚠️ Sorted set operations (ZADD, ZREM, ZRANGE) - **Available via `sendCommand()` only**
- ✅ Stream operations (XADD, XLEN, XDEL, XRANGE, XREVRANGE, XREAD, XTRIM, XGROUP CREATE/DESTROY/SETID, XREADGROUP, XACK, XPENDING, XCLAIM, XAUTOCLAIM, XINFO STREAM/GROUPS/CONSUMERS)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could also mention stream XSETID is not supported (although XGROUP SETID is supported). Perhaps a note after this list of commands.

Also, should we add a Stream Commands section to the README.md, similar to how #5305 added a section for the Scripting Commands?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree to this,

As I've few other PR's for different commands, post merge i've planned to write extensive docs for jedis-compatibility layer which will be in separate PR.

- **Note:** `XSETID` is not supported (only `XGROUP SETID` is available)
- ✅ Key operations (DEL, EXISTS, EXPIRE, TTL)
- ✅ Connection commands (PING, SELECT)
- ✅ ACL commands (ACL LIST, ACL GETUSER, ACL SETUSER, ACL DELUSER, ACL CAT, ACL GENPASS, ACL LOG, ACL LOG RESET, ACL WHOAMI, ACL USERS, ACL SAVE, ACL LOAD, ACL DRYRUN)
Expand Down
Loading
Loading