Skip to content

Commit fe7d61a

Browse files
committed
Align Search APIs and behaviour with Redis 8.0+
- Introduce executeKeylessCommand - Implement round-robin execution in ClusterCommandExecutor - Update search commands according to the latest request/response policies - Remove JedisBroadcastAndRoundRobinConfig - Introduce AggregateIterator for proper Cluster support - Deprecate APIs that rely on legacy RediSeach behaviour
1 parent a40d400 commit fe7d61a

14 files changed

+1287
-161
lines changed

src/main/java/redis/clients/jedis/CommandObjects.java

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,7 @@ protected RedisProtocol getProtocol() {
4848
}
4949

5050
protected volatile CommandKeyArgumentPreProcessor keyPreProcessor = null;
51-
private JedisBroadcastAndRoundRobinConfig broadcastAndRoundRobinConfig = null;
52-
private Lock mapperLock = new ReentrantLock(true);
51+
private Lock mapperLock = new ReentrantLock(true);
5352
private volatile JsonObjectMapper jsonObjectMapper;
5453
private final AtomicInteger searchDialect = new AtomicInteger(SearchProtocol.DEFAULT_DIALECT);
5554

@@ -58,10 +57,6 @@ public void setKeyArgumentPreProcessor(CommandKeyArgumentPreProcessor keyPreProc
5857
this.keyPreProcessor = keyPreProcessor;
5958
}
6059

61-
void setBroadcastAndRoundRobinConfig(JedisBroadcastAndRoundRobinConfig config) {
62-
this.broadcastAndRoundRobinConfig = config;
63-
}
64-
6560
protected CommandArguments commandArguments(ProtocolCommand command) {
6661
CommandArguments comArgs = new CommandArguments(command);
6762
if (keyPreProcessor != null) comArgs.setKeyArgumentPreProcessor(keyPreProcessor);
@@ -2977,7 +2972,7 @@ public final CommandObject<List<Object>> xreadGroup(byte[] groupName, byte[] con
29772972
}
29782973

29792974
public final CommandObject<List<Map.Entry<byte[], List<StreamEntryBinary>>>> xreadGroupBinary(
2980-
byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams,
2975+
byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams,
29812976
Map.Entry<byte[], StreamEntryID>... streams) {
29822977
CommandArguments args = commandArguments(XREADGROUP)
29832978
.add(GROUP).add(groupName).add(consumer)
@@ -2992,7 +2987,7 @@ public final CommandObject<List<Map.Entry<byte[], List<StreamEntryBinary>>>> xre
29922987
}
29932988

29942989
public final CommandObject<Map<byte[], List<StreamEntryBinary>>> xreadGroupBinaryAsMap(
2995-
byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams,
2990+
byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams,
29962991
Map.Entry<byte[], StreamEntryID>... streams) {
29972992
CommandArguments args = commandArguments(XREADGROUP)
29982993
.add(GROUP).add(groupName).add(consumer)
@@ -3481,18 +3476,16 @@ public final CommandObject<Long> hsetObject(String key, Map<String, Object> hash
34813476
return new CommandObject<>(addFlatMapArgs(commandArguments(HSET).key(key), hash), BuilderFactory.LONG);
34823477
}
34833478

3484-
private boolean isRoundRobinSearchCommand() {
3485-
if (broadcastAndRoundRobinConfig == null) {
3486-
return true;
3487-
} else if (broadcastAndRoundRobinConfig.getRediSearchModeInCluster() == JedisBroadcastAndRoundRobinConfig.RediSearchMode.LIGHT) {
3488-
return false;
3489-
}
3490-
return true;
3479+
private boolean isRoundRobinSearchCommand(SearchCommand sc) {
3480+
3481+
return !(sc.equals(SearchCommand.SUGGET) || sc.equals(SearchCommand.SUGADD) || sc.equals(
3482+
SearchCommand.SUGLEN) || sc.equals(SearchCommand.SUGDEL) || sc.equals(
3483+
SearchCommand.CURSOR));
34913484
}
34923485

34933486
private CommandArguments checkAndRoundRobinSearchCommand(SearchCommand sc, String idx) {
34943487
CommandArguments ca = commandArguments(sc);
3495-
if (isRoundRobinSearchCommand()) {
3488+
if (isRoundRobinSearchCommand(sc)) {
34963489
ca.add(idx);
34973490
} else {
34983491
ca.key(idx);
@@ -3502,16 +3495,22 @@ private CommandArguments checkAndRoundRobinSearchCommand(SearchCommand sc, Strin
35023495

35033496
private CommandArguments checkAndRoundRobinSearchCommand(SearchCommand sc, String idx1, String idx2) {
35043497
CommandArguments ca = commandArguments(sc);
3505-
if (isRoundRobinSearchCommand()) {
3498+
if (isRoundRobinSearchCommand(sc)) {
35063499
ca.add(idx1).add(idx2);
35073500
} else {
35083501
ca.key(idx1).key(idx2);
35093502
}
35103503
return ca;
35113504
}
35123505

3513-
private CommandArguments checkAndRoundRobinSearchCommand(CommandArguments commandArguments, byte[] indexName) {
3514-
return isRoundRobinSearchCommand() ? commandArguments.add(indexName) : commandArguments.key(indexName);
3506+
private CommandArguments checkAndRoundRobinSearchCommand(SearchCommand sc, byte[] indexName) {
3507+
CommandArguments ca = commandArguments(sc);
3508+
if (isRoundRobinSearchCommand(sc)) {
3509+
ca.add(indexName);
3510+
} else {
3511+
ca.key(indexName);
3512+
}
3513+
return ca;
35153514
}
35163515

35173516
private <T> CommandObject<T> directSearchCommand(CommandObject<T> object, String indexName) {
@@ -3592,7 +3591,7 @@ public final CommandObject<SearchResult> ftSearch(byte[] indexName, Query query)
35923591
if (protocol == RedisProtocol.RESP3) {
35933592
throw new UnsupportedOperationException("binary ft.search is not implemented with resp3.");
35943593
}
3595-
return new CommandObject<>(checkAndRoundRobinSearchCommand(commandArguments(SearchCommand.SEARCH), indexName)
3594+
return new CommandObject<>(checkAndRoundRobinSearchCommand(SearchCommand.SEARCH, indexName)
35963595
.addParams(query.dialectOptional(searchDialect.get())), getSearchResultBuilder(null,
35973596
() -> new SearchResultBuilder(!query.getNoContent(), query.getWithScores(), false)));
35983597
}

src/main/java/redis/clients/jedis/JedisBroadcastAndRoundRobinConfig.java

Lines changed: 0 additions & 10 deletions
This file was deleted.

0 commit comments

Comments
 (0)