Skip to content

Commit c33483b

Browse files
authored
Clarify why new stream entries aren't deleted with XDELEX (#4218)
* Improve naming for status 2 in StreamEntryDeletionResult * Clarify why new stream entries aren't deleted with XDELEX * Fix formatting of StreamEntryDeletionResultTest
1 parent 8cdc90f commit c33483b

File tree

5 files changed

+60
-18
lines changed

5 files changed

+60
-18
lines changed

src/main/java/redis/clients/jedis/resps/StreamEntryDeletionResult.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@
55
* <ul>
66
* <li>NOT_FOUND (-1): ID doesn't exist in stream</li>
77
* <li>DELETED (1): Entry was deleted/acknowledged and deleted</li>
8-
* <li>ACKNOWLEDGED_NOT_DELETED (2): Entry was acknowledged but not deleted (still has dangling
9-
* references)</li>
8+
* <li>NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED (2): Entry wasn't deleted.</li>
109
* </ul>
1110
*/
1211
public enum StreamEntryDeletionResult {
@@ -28,10 +27,13 @@ public enum StreamEntryDeletionResult {
2827
DELETED(1),
2928

3029
/**
31-
* The entry was acknowledged but not deleted because it still has dangling references in other
32-
* consumer groups' pending entry lists.
30+
* The entry was not deleted due to one of the following reasons:
31+
* <ul>
32+
* <li>For XDELEX: The entry was not acknowledged by any consumer group</li>
33+
* <li>For XACKDEL: The entry still has pending references in other consumer groups</li>
34+
* </ul>
3335
*/
34-
ACKNOWLEDGED_NOT_DELETED(2);
36+
NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED(2);
3537

3638
private final int code;
3739

@@ -60,7 +62,7 @@ public static StreamEntryDeletionResult fromCode(int code) {
6062
case 1:
6163
return DELETED;
6264
case 2:
63-
return ACKNOWLEDGED_NOT_DELETED;
65+
return NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED;
6466
default:
6567
throw new IllegalArgumentException("Unknown stream entry deletion result code: " + code);
6668
}

src/test/java/redis/clients/jedis/commands/jedis/StreamsBinaryCommandsTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,7 @@ public void testXdelexWithConsumerGroups() {
457457
List<StreamEntryDeletionResult> results = jedis.xdelex(STREAM_KEY_1, StreamDeletionPolicy.ACKNOWLEDGED, readId1, readId2);
458458
assertThat(results, hasSize(2));
459459
assertEquals(StreamEntryDeletionResult.DELETED, results.get(0)); // id1 was acknowledged
460-
assertEquals(StreamEntryDeletionResult.ACKNOWLEDGED_NOT_DELETED, results.get(1)); // id2 not acknowledged
460+
assertEquals(StreamEntryDeletionResult.NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED, results.get(1)); // id2 not acknowledged
461461

462462
// Verify only acknowledged entry was deleted
463463
assertEquals(1L, jedis.xlen(STREAM_KEY_1));

src/test/java/redis/clients/jedis/commands/unified/StreamsBinaryCommandsTestBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -474,7 +474,7 @@ public void testXdelexWithConsumerGroups() {
474474
List<StreamEntryDeletionResult> results = jedis.xdelex(STREAM_KEY_1, StreamDeletionPolicy.ACKNOWLEDGED, readId1, readId2);
475475
assertThat(results, hasSize(2));
476476
assertEquals(StreamEntryDeletionResult.DELETED, results.get(0)); // id1 was acknowledged
477-
assertEquals(StreamEntryDeletionResult.ACKNOWLEDGED_NOT_DELETED, results.get(1)); // id2 not acknowledged
477+
assertEquals(StreamEntryDeletionResult.NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED, results.get(1)); // id2 not acknowledged
478478

479479
// Verify only acknowledged entry was deleted
480480
assertEquals(1L, jedis.xlen(STREAM_KEY_1));

src/test/java/redis/clients/jedis/commands/unified/StreamsCommandsTestBase.java

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,7 @@
1313
import redis.clients.jedis.params.XPendingParams;
1414
import redis.clients.jedis.params.XReadGroupParams;
1515
import redis.clients.jedis.params.XTrimParams;
16-
import redis.clients.jedis.resps.StreamEntry;
17-
import redis.clients.jedis.resps.StreamPendingEntry;
18-
import redis.clients.jedis.resps.StreamEntryDeletionResult;
16+
import redis.clients.jedis.resps.*;
1917

2018
import java.util.HashMap;
2119
import java.util.List;
@@ -760,8 +758,9 @@ public void xdelexWithConsumerGroups() {
760758
StreamDeletionPolicy.ACKNOWLEDGED, readId1, readId2);
761759
assertThat(results, hasSize(2));
762760
assertEquals(StreamEntryDeletionResult.DELETED, results.get(0)); // id1 was acknowledged
763-
assertEquals(StreamEntryDeletionResult.ACKNOWLEDGED_NOT_DELETED, results.get(1)); // id2 not
764-
// acknowledged
761+
assertEquals(StreamEntryDeletionResult.NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED,
762+
results.get(1)); // id2 not
763+
// acknowledged
765764

766765
// Verify only acknowledged entry was deleted
767766
assertEquals(1L, jedis.xlen(STREAM_KEY_1));
@@ -778,4 +777,44 @@ public void xdelexEmptyStream() {
778777
assertThat(results, hasSize(1));
779778
assertEquals(StreamEntryDeletionResult.NOT_FOUND, results.get(0));
780779
}
780+
781+
@Test
782+
@SinceRedisVersion("8.1.240")
783+
public void xdelexNotAcknowledged() {
784+
setUpTestStream();
785+
786+
String groupName = "test_group";
787+
788+
// Add initial entries and create consumer group
789+
Map<String, String> entry1 = singletonMap("field1", "value1");
790+
jedis.xadd(STREAM_KEY_1, new StreamEntryID("1-0"), entry1);
791+
jedis.xgroupCreate(STREAM_KEY_1, groupName, new StreamEntryID("0-0"), true);
792+
793+
// Read one message to create PEL entry
794+
String consumerName = "consumer1";
795+
Map<String, StreamEntryID> streamQuery = singletonMap(STREAM_KEY_1,
796+
StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY);
797+
jedis.xreadGroup(groupName, consumerName, XReadGroupParams.xReadGroupParams().count(1),
798+
streamQuery);
799+
800+
// Add a new entry that was never delivered to any consumer
801+
Map<String, String> entry2 = singletonMap("field4", "value4");
802+
StreamEntryID id2 = jedis.xadd(STREAM_KEY_1, new StreamEntryID("2-0"), entry2);
803+
804+
// Verify initial state
805+
StreamPendingSummary pending = jedis.xpending(STREAM_KEY_1, groupName);
806+
assertEquals(1L, pending.getTotal()); // Only id1 is in PEL
807+
808+
StreamInfo info = jedis.xinfoStream(STREAM_KEY_1);
809+
assertEquals(2L, info.getLength()); // Stream has 2 entries
810+
811+
// Test XDELEX with ACKNOWLEDGED policy on entry that was never delivered
812+
// This should return NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED since id2 was never
813+
// delivered to any consumer
814+
List<StreamEntryDeletionResult> result = jedis.xdelex(STREAM_KEY_1,
815+
StreamDeletionPolicy.ACKNOWLEDGED, id2);
816+
assertThat(result, hasSize(1));
817+
assertEquals(StreamEntryDeletionResult.NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED,
818+
result.get(0));
819+
}
781820
}

src/test/java/redis/clients/jedis/resps/StreamEntryDeletionResultTest.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ public class StreamEntryDeletionResultTest {
1010
public void testFromCode() {
1111
assertEquals(StreamEntryDeletionResult.NOT_FOUND, StreamEntryDeletionResult.fromCode(-1));
1212
assertEquals(StreamEntryDeletionResult.DELETED, StreamEntryDeletionResult.fromCode(1));
13-
assertEquals(StreamEntryDeletionResult.ACKNOWLEDGED_NOT_DELETED,
13+
assertEquals(StreamEntryDeletionResult.NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED,
1414
StreamEntryDeletionResult.fromCode(2));
1515
}
1616

@@ -25,7 +25,7 @@ public void testFromCodeInvalid() {
2525
public void testFromLong() {
2626
assertEquals(StreamEntryDeletionResult.NOT_FOUND, StreamEntryDeletionResult.fromLong(-1L));
2727
assertEquals(StreamEntryDeletionResult.DELETED, StreamEntryDeletionResult.fromLong(1L));
28-
assertEquals(StreamEntryDeletionResult.ACKNOWLEDGED_NOT_DELETED,
28+
assertEquals(StreamEntryDeletionResult.NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED,
2929
StreamEntryDeletionResult.fromLong(2L));
3030
}
3131

@@ -38,14 +38,15 @@ public void testFromLongNull() {
3838
public void testGetCode() {
3939
assertEquals(-1, StreamEntryDeletionResult.NOT_FOUND.getCode());
4040
assertEquals(1, StreamEntryDeletionResult.DELETED.getCode());
41-
assertEquals(2, StreamEntryDeletionResult.ACKNOWLEDGED_NOT_DELETED.getCode());
41+
assertEquals(2,
42+
StreamEntryDeletionResult.NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED.getCode());
4243
}
4344

4445
@Test
4546
public void testToString() {
4647
assertEquals("NOT_FOUND(-1)", StreamEntryDeletionResult.NOT_FOUND.toString());
4748
assertEquals("DELETED(1)", StreamEntryDeletionResult.DELETED.toString());
48-
assertEquals("ACKNOWLEDGED_NOT_DELETED(2)",
49-
StreamEntryDeletionResult.ACKNOWLEDGED_NOT_DELETED.toString());
49+
assertEquals("NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED(2)",
50+
StreamEntryDeletionResult.NOT_DELETED_UNACKNOWLEDGED_OR_STILL_REFERENCED.toString());
5051
}
5152
}

0 commit comments

Comments
 (0)