Skip to content

Commit 7395a7e

Browse files
committed
Use Reactor's expand(…) operator for recursive pagination.
We now use Mono.expand(…) to recursively read paginated results. Using Mono.expand(…) avoids deep stack recursion and therefore doesn't lead to StackOverflowError. Closes #1215
1 parent f855b58 commit 7395a7e

File tree

1 file changed

+9
-41
lines changed

1 file changed

+9
-41
lines changed

spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/cql/session/DefaultBridgedReactiveSession.java

Lines changed: 9 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2021 the original author or authors.
2+
* Copyright 2017-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,7 +17,6 @@
1717

1818
import reactor.core.publisher.Flux;
1919
import reactor.core.publisher.Mono;
20-
import reactor.core.publisher.MonoProcessor;
2120
import reactor.core.scheduler.Scheduler;
2221

2322
import java.util.Collections;
@@ -33,6 +32,7 @@
3332
import org.springframework.data.cassandra.ReactiveSession;
3433
import org.springframework.util.Assert;
3534

35+
import com.datastax.oss.driver.api.core.AsyncPagingIterable;
3636
import com.datastax.oss.driver.api.core.CqlIdentifier;
3737
import com.datastax.oss.driver.api.core.CqlSession;
3838
import com.datastax.oss.driver.api.core.context.DriverContext;
@@ -263,55 +263,23 @@ static class DefaultReactiveResultSet implements ReactiveResultSet {
263263
*/
264264
@Override
265265
public Flux<Row> rows() {
266-
return getRows(Mono.just(this.resultSet));
266+
267+
return Mono.just(this.resultSet).expand(asyncResultSet -> {
268+
if (asyncResultSet.hasMorePages()) {
269+
return Mono.fromCompletionStage(asyncResultSet.fetchNextPage());
270+
}
271+
return Mono.empty();
272+
}).flatMapIterable(AsyncPagingIterable::currentPage);
267273
}
268274

269275
/* (non-Javadoc)
270276
* @see org.springframework.data.cassandra.ReactiveResultSet#availableRows()
271277
*/
272278
@Override
273279
public Flux<Row> availableRows() {
274-
return toRows(this.resultSet);
275-
}
276-
277-
private Flux<Row> getRows(Mono<AsyncResultSet> nextResults) {
278-
279-
return nextResults.flatMapMany(it -> {
280-
281-
Flux<Row> rows = toRows(it);
282-
283-
if (!it.hasMorePages()) {
284-
return rows;
285-
}
286-
287-
MonoProcessor<AsyncResultSet> processor = MonoProcessor.create();
288-
289-
return rows.doOnComplete(() -> fetchMore(it.fetchNextPage(), processor)).concatWith(getRows(processor));
290-
});
291-
}
292-
293-
static Flux<Row> toRows(AsyncResultSet resultSet) {
294280
return Flux.fromIterable(resultSet.currentPage());
295281
}
296282

297-
static void fetchMore(CompletionStage<AsyncResultSet> future, MonoProcessor<AsyncResultSet> sink) {
298-
299-
try {
300-
301-
future.whenComplete((rs, err) -> {
302-
303-
if (err != null) {
304-
sink.onError(err);
305-
} else {
306-
sink.onNext(rs);
307-
sink.onComplete();
308-
}
309-
});
310-
311-
} catch (Exception cause) {
312-
sink.onError(cause);
313-
}
314-
}
315283

316284
/* (non-Javadoc)
317285
* @see org.springframework.data.cassandra.ReactiveResultSet#getColumnDefinitions()

0 commit comments

Comments
 (0)