Skip to content

Commit cd85e10

Browse files
committed
Introduce AggregateIterator to replace outdated APIs
New AggregateIterator-based APIs replace legacy ftCursorRead, ftCursorDel, and other APIs that don't properly support OSS Cluster
1 parent f4ef9d1 commit cd85e10

File tree

7 files changed

+644
-1
lines changed

7 files changed

+644
-1
lines changed

src/main/java/redis/clients/jedis/UnifiedJedis.java

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import redis.clients.jedis.providers.*;
3737
import redis.clients.jedis.resps.*;
3838
import redis.clients.jedis.search.*;
39+
import redis.clients.jedis.search.aggr.AggregateIterator;
3940
import redis.clients.jedis.search.aggr.AggregationBuilder;
4041
import redis.clients.jedis.search.aggr.AggregationResult;
4142
import redis.clients.jedis.search.aggr.FtAggregateIteration;
@@ -3991,6 +3992,7 @@ public SearchResult ftSearch(String indexName, String query, FTSearchParams para
39913992
* @param params limit will be ignored
39923993
* @return search iteration
39933994
*/
3995+
@Deprecated
39943996
public FtSearchIteration ftSearchIteration(int batchSize, String indexName, String query, FTSearchParams params) {
39953997
return new FtSearchIteration(provider, commandObjects.getProtocol(), batchSize, indexName, query, params);
39963998
}
@@ -4007,6 +4009,7 @@ public SearchResult ftSearch(String indexName, Query query) {
40074009
* @param query limit will be ignored
40084010
* @return search iteration
40094011
*/
4012+
@Deprecated
40104013
public FtSearchIteration ftSearchIteration(int batchSize, String indexName, Query query) {
40114014
return new FtSearchIteration(provider, commandObjects.getProtocol(), batchSize, indexName, query);
40124015
}
@@ -4029,7 +4032,7 @@ public List<String> ftExplainCLI(String indexName, Query query) {
40294032

40304033
@Override
40314034
public AggregationResult ftAggregate(String indexName, AggregationBuilder aggr) {
4032-
return executeKeylessCommand(commandObjects.ftAggregate(indexName, aggr));
4035+
return executeCommand(commandObjects.ftAggregate(indexName, aggr));
40334036
}
40344037

40354038
@Override
@@ -4048,10 +4051,40 @@ public String ftCursorDel(String indexName, long cursorId) {
40484051
* @param aggr cursor must be set
40494052
* @return aggregate iteration
40504053
*/
4054+
@Deprecated
40514055
public FtAggregateIteration ftAggregateIteration(String indexName, AggregationBuilder aggr) {
40524056
return new FtAggregateIteration(provider, indexName, aggr);
40534057
}
40544058

4059+
/**
4060+
* Creates an iterator for aggregation results with cursor support.
4061+
* This method provides a clean, connection-aware iterator that ensures cursor operations
4062+
* are executed on the same Redis node.
4063+
*
4064+
* <p>Usage example:
4065+
* <pre>{@code
4066+
* AggregationBuilder aggr = new AggregationBuilder()
4067+
* .groupBy("@category", Reducers.sum("@price").as("total"))
4068+
* .cursor(50, 30000);
4069+
*
4070+
* try (AggregateIterator iterator = jedis.ftAggregateIterator("products", aggr)) {
4071+
* while (iterator.hasNext()) {
4072+
* AggregationResult batch = iterator.next();
4073+
* // Process batch - access rows via batch.getRows()
4074+
* }
4075+
* }
4076+
* }</pre>
4077+
*
4078+
* @param indexName the search index name
4079+
* @param aggr aggregation builder with cursor configuration
4080+
* @return aggregate iterator for cursor-based pagination
4081+
* @throws IllegalArgumentException if aggregation doesn't have cursor configured
4082+
* @since 6.1.0
4083+
*/
4084+
public AggregateIterator ftAggregateIterator(String indexName, AggregationBuilder aggr) {
4085+
return new AggregateIterator(provider, indexName, aggr);
4086+
}
4087+
40554088
@Override
40564089
public Map.Entry<AggregationResult, ProfilingInfo> ftProfileAggregate(String indexName,
40574090
FTProfileParams profileParams, AggregationBuilder aggr) {

src/main/java/redis/clients/jedis/search/FtSearchIteration.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,14 @@
99
import redis.clients.jedis.search.SearchResult.SearchResultBuilder;
1010
import redis.clients.jedis.util.JedisCommandIterationBase;
1111

12+
/**
13+
* Iterator for FT.SEARCH results across cluster nodes.
14+
*
15+
* @deprecated Since Redis 8.0, FT.SEARCH automatically retrieves results from all cluster nodes,
16+
* eliminating the need for manual iteration across nodes. Use the standard
17+
* {@code ftSearch} methods directly instead.
18+
*/
19+
@Deprecated
1220
public class FtSearchIteration extends JedisCommandIterationBase<SearchResult, Document> {
1321

1422
private int batchStart;

src/main/java/redis/clients/jedis/search/RediSearchCommands.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import redis.clients.jedis.commands.ConfigCommands;
99
import redis.clients.jedis.resps.Tuple;
10+
import redis.clients.jedis.search.aggr.AggregateIterator;
1011
import redis.clients.jedis.search.aggr.AggregationBuilder;
1112
import redis.clients.jedis.search.aggr.AggregationResult;
1213
import redis.clients.jedis.search.schemafields.SchemaField;
@@ -68,10 +69,47 @@ default SearchResult ftSearch(String indexName) {
6869

6970
List<String> ftExplainCLI(String indexName, Query query);
7071

72+
/**
73+
* Execute an aggregation query and return all results at once.
74+
* Use this method when you don't need cursor-based pagination and want to retrieve
75+
* all results in a single operation.
76+
* @param indexName the index name
77+
* @param aggr the aggregation builder containing the query
78+
* @return the complete aggregation result
79+
*/
7180
AggregationResult ftAggregate(String indexName, AggregationBuilder aggr);
7281

82+
/**
83+
* Execute an aggregation query with cursor-based iteration support.
84+
* Use this method when you need to paginate through large result sets or want
85+
* to process results incrementally using cursor-based iteration.
86+
* @param indexName the index name
87+
* @param aggr the aggregation builder containing the query (should include cursor configuration)
88+
* @return an iterator for cursor-based result pagination
89+
*/
90+
AggregateIterator ftAggregateIterator(String indexName, AggregationBuilder aggr);
91+
92+
/**
93+
* Read from an aggregation cursor.
94+
* @param indexName the index name
95+
* @param cursorId the cursor ID
96+
* @param count the number of results to read
97+
* @return the aggregation result
98+
* @deprecated Use {@link #ftAggregate(String, AggregationBuilder)} for operations without cursors,
99+
* or {@link #ftAggregateIterator(String, AggregationBuilder)} for cursor-based iteration
100+
*/
101+
@Deprecated
73102
AggregationResult ftCursorRead(String indexName, long cursorId, int count);
74103

104+
/**
105+
* Delete an aggregation cursor.
106+
* @param indexName the index name
107+
* @param cursorId the cursor ID
108+
* @return the result of the cursor deletion
109+
* @deprecated Use {@link #ftAggregate(String, AggregationBuilder)} for operations without cursors,
110+
* or {@link #ftAggregateIterator(String, AggregationBuilder)} for cursor-based iteration
111+
*/
112+
@Deprecated
75113
String ftCursorDel(String indexName, long cursorId);
76114

77115
Map.Entry<AggregationResult, ProfilingInfo> ftProfileAggregate(String indexName,
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
package redis.clients.jedis.search.aggr;
2+
3+
import java.io.Closeable;
4+
import java.util.Iterator;
5+
import java.util.NoSuchElementException;
6+
7+
import redis.clients.jedis.Connection;
8+
import redis.clients.jedis.exceptions.JedisException;
9+
import redis.clients.jedis.providers.ConnectionProvider;
10+
import redis.clients.jedis.search.SearchProtocol;
11+
import redis.clients.jedis.util.IOUtils;
12+
13+
/**
14+
* Iterator for Redis search aggregation results with cursor support.
15+
* This class manages the connection to a specific Redis node and handles
16+
* cursor-based pagination for large aggregation results.
17+
*
18+
* <p>The iterator supports the {@link #remove()} method which deletes the cursor
19+
* on the server and terminates the iteration, freeing server resources immediately.
20+
*
21+
* <p>Usage example:
22+
* <pre>{@code
23+
* AggregationBuilder aggr = new AggregationBuilder()
24+
* .groupBy("@field")
25+
* .cursor(100, 60000); // 100 results per batch, 60 second TTL for the cursor
26+
*
27+
* try (AggregateIterator iterator = new AggregateIterator(provider, "myindex", aggr)) {
28+
* while (iterator.hasNext()) {
29+
* AggregationResult batch = iterator.next();
30+
* // Process batch - access rows via batch.getRows()
31+
*
32+
* // Optionally terminate early and free server resources
33+
* if (someCondition) {
34+
* iterator.remove(); // Deletes cursor and stops iteration
35+
* break;
36+
* }
37+
* }
38+
* }
39+
* }</pre>
40+
*/
41+
public class AggregateIterator implements Iterator<AggregationResult>, Closeable {
42+
43+
private final ConnectionProvider connectionProvider;
44+
private final String indexName;
45+
private final Integer batchSize;
46+
47+
private Connection connection;
48+
private Long cursorId = -1L;
49+
private AggregationResult aggrCommandResult;
50+
51+
/**
52+
* Creates a new AggregateIterator.
53+
*
54+
* @param connectionProvider the connection provider for cluster/standalone Redis
55+
* @param indexName the search index name
56+
* @param aggregationBuilder the aggregation query with cursor configuration
57+
* @throws IllegalArgumentException if aggregation doesn't have cursor configured
58+
*/
59+
public AggregateIterator(ConnectionProvider connectionProvider, String indexName,
60+
AggregationBuilder aggregationBuilder) {
61+
if (!aggregationBuilder.isWithCursor()) {
62+
throw new IllegalArgumentException("AggregationBuilder must have cursor configured");
63+
}
64+
65+
this.connectionProvider = connectionProvider;
66+
this.indexName = indexName;
67+
this.batchSize = aggregationBuilder.getCursorCount();
68+
69+
// Get a dedicated connection for this cursor session
70+
this.connection = acquireConnection(aggregationBuilder);
71+
}
72+
73+
@Override
74+
public boolean hasNext() {
75+
// If aggrCommandResult is not null, we have initial results from FT.AGGREGATE that haven't been returned yet
76+
// For subsequent batches, check if there are more cursor results available
77+
// If cursor has been removed (cursorId <= 0), no more results are available
78+
return aggrCommandResult != null || cursorId > 0;
79+
}
80+
81+
@Override
82+
public AggregationResult next() {
83+
if (!hasNext()) {
84+
throw new NoSuchElementException("No more aggregation results available");
85+
}
86+
87+
try {
88+
if (aggrCommandResult != null) {
89+
try {
90+
return aggrCommandResult;
91+
} finally {
92+
aggrCommandResult = null;
93+
}
94+
} else {
95+
return doFetch();
96+
}
97+
98+
} catch (Exception e) {
99+
throw new JedisException("Failed to fetch next aggregation batch", e);
100+
}
101+
}
102+
103+
/**
104+
* Returns the current cursor ID.
105+
*
106+
* @return cursor ID, or null if not initialized
107+
*/
108+
public Long getCursorId() {
109+
return cursorId;
110+
}
111+
112+
@Override
113+
public void remove() {
114+
if (cursorId == null || cursorId <= 0) {
115+
// Cursor is already closed or not initialized, nothing to do
116+
return;
117+
}
118+
119+
deleteCursor();
120+
// Mark cursor as deleted to prevent further operations
121+
cursorId = -1L;
122+
}
123+
124+
@Override
125+
public void close() {
126+
deleteCursor();
127+
// Mark cursor as closed to prevent further operations
128+
cursorId = -1L;
129+
IOUtils.closeQuietly(connection);
130+
}
131+
132+
/**
133+
* Deletes the cursor on the server to free resources.
134+
* This method is idempotent and safe to call multiple times.
135+
*/
136+
private void deleteCursor() {
137+
if (cursorId != null && cursorId > 0) {
138+
try {
139+
// Delete the cursor to free server resources
140+
connection.executeCommand(
141+
new redis.clients.jedis.CommandArguments(SearchProtocol.SearchCommand.CURSOR)
142+
.add(SearchProtocol.SearchKeyword.DEL)
143+
.add(indexName)
144+
.add(cursorId)
145+
);
146+
} catch (Exception e) {
147+
// Log but don't throw - cursor will expire naturally
148+
System.err.println("Warning: Failed to delete cursor " + cursorId + ": " + e.getMessage());
149+
}
150+
}
151+
}
152+
153+
private AggregationResult doFetch() {
154+
if (cursorId == null || cursorId <= 0) {
155+
return null;
156+
}
157+
158+
redis.clients.jedis.CommandArguments args = new redis.clients.jedis.CommandArguments(SearchProtocol.SearchCommand.CURSOR)
159+
.add(SearchProtocol.SearchKeyword.READ)
160+
.add(indexName)
161+
.add(cursorId);
162+
163+
// Only add COUNT argument if a batch size was explicitly specified
164+
if (batchSize != null) {
165+
args.add(SearchProtocol.SearchKeyword.COUNT).add(batchSize);
166+
}
167+
168+
Object rawReply = connection.executeCommand(args);
169+
AggregationResult result = AggregationResult.SEARCH_AGGREGATION_RESULT_WITH_CURSOR.build(rawReply);
170+
171+
cursorId = result.getCursorId();
172+
return result;
173+
}
174+
175+
private Connection acquireConnection(AggregationBuilder aggregationBuilder) {
176+
// Create the initial FT.AGGREGATE command
177+
redis.clients.jedis.CommandArguments args = new redis.clients.jedis.CommandArguments(SearchProtocol.SearchCommand.AGGREGATE)
178+
.add(indexName)
179+
.addParams(aggregationBuilder);
180+
181+
Connection conn = null;
182+
try {
183+
// Get connection and execute initial command
184+
conn = connectionProvider.getConnection(args);
185+
Object rawReply = conn.executeCommand(args);
186+
aggrCommandResult = AggregationResult.SEARCH_AGGREGATION_RESULT_WITH_CURSOR.build(rawReply);
187+
188+
cursorId = aggrCommandResult.getCursorId();
189+
190+
return conn;
191+
192+
} catch (Exception e) {
193+
IOUtils.closeQuietly(conn);
194+
throw new JedisException("Failed to initialize aggregation cursor", e);
195+
}
196+
}
197+
198+
199+
}

src/main/java/redis/clients/jedis/search/aggr/AggregationBuilder.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ public class AggregationBuilder implements IParams {
2222
private final List<Object> aggrArgs = new ArrayList<>();
2323
private Integer dialect;
2424
private boolean isWithCursor = false;
25+
private Integer cursorCount;
2526

2627
public AggregationBuilder(String query) {
2728
aggrArgs.add(query);
@@ -143,6 +144,7 @@ public AggregationBuilder filter(String expression) {
143144

144145
public AggregationBuilder cursor(int count) {
145146
isWithCursor = true;
147+
this.cursorCount = count;
146148
aggrArgs.add(SearchKeyword.WITHCURSOR);
147149
aggrArgs.add(SearchKeyword.COUNT);
148150
aggrArgs.add(count);
@@ -151,6 +153,7 @@ public AggregationBuilder cursor(int count) {
151153

152154
public AggregationBuilder cursor(int count, long maxIdle) {
153155
isWithCursor = true;
156+
this.cursorCount = count;
154157
aggrArgs.add(SearchKeyword.WITHCURSOR);
155158
aggrArgs.add(SearchKeyword.COUNT);
156159
aggrArgs.add(count);
@@ -206,6 +209,15 @@ public boolean isWithCursor() {
206209
return isWithCursor;
207210
}
208211

212+
/**
213+
* Returns the cursor count (batch size) if cursor is configured.
214+
*
215+
* @return cursor count, or null if cursor is not configured
216+
*/
217+
public Integer getCursorCount() {
218+
return cursorCount;
219+
}
220+
209221
@Override
210222
public void addParams(CommandArguments commArgs) {
211223
commArgs.addObjects(aggrArgs);

src/main/java/redis/clients/jedis/search/aggr/FtAggregateIteration.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,12 @@
77
import redis.clients.jedis.search.SearchProtocol;
88
import redis.clients.jedis.util.JedisCommandIterationBase;
99

10+
/**
11+
* @deprecated Since Redis 8.0, FT.AGGREGATE automatically retrieves results from all cluster nodes,
12+
* eliminating the need for manual iteration across nodes. Use {@link AggregateIterator}
13+
* instead, which provides better cursor management and connection handling.
14+
*/
15+
@Deprecated
1016
public class FtAggregateIteration extends JedisCommandIterationBase<AggregationResult, Row> {
1117

1218
private final String indexName;

0 commit comments

Comments
 (0)