Skip to content

Commit 0606b49

Browse files
authored
JAVA-2949: Provide mapper support for CompletionStage<Stream<T>> (#1563)
1 parent 4ee475e commit 0606b49

File tree

10 files changed

+72
-5
lines changed

10 files changed

+72
-5
lines changed

changelog/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
### 4.11.3 (in progress)
66

7+
- [bug] JAVA-2949: Provide mapper support for CompletionStage<Stream<T>>
78
- [bug] JAVA-2950: Remove reference to Reflection class from DependencyCheck
89

910
### 4.11.2

integration-tests/src/test/java/com/datastax/oss/driver/mapper/QueryReturnTypesIT.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import java.util.Optional;
4848
import java.util.concurrent.CompletableFuture;
4949
import java.util.concurrent.CompletionStage;
50+
import java.util.concurrent.ExecutionException;
5051
import java.util.stream.Stream;
5152
import org.junit.Before;
5253
import org.junit.BeforeClass;
@@ -239,6 +240,13 @@ public void should_execute_async_query_and_map_to_iterable() {
239240
assertThat(iterable.hasMorePages()).isFalse();
240241
}
241242

243+
@Test
244+
public void should_execute_query_and_map_to_stream_async()
245+
throws ExecutionException, InterruptedException {
246+
CompletableFuture<Stream<TestEntity>> stream = dao.findByIdAsStreamAsync(1);
247+
assertThat(stream.get()).hasSize(10);
248+
}
249+
242250
@Dao
243251
@DefaultNullSavingStrategy(NullSavingStrategy.SET_TO_NULL)
244252
public interface TestDao {
@@ -300,6 +308,9 @@ public interface TestDao {
300308

301309
@Query("SELECT * FROM ${qualifiedTableId} WHERE id = :id")
302310
CompletableFuture<MappedAsyncPagingIterable<TestEntity>> findByIdAsync(int id);
311+
312+
@Query("SELECT * FROM ${qualifiedTableId} WHERE id = :id")
313+
CompletableFuture<Stream<TestEntity>> findByIdAsStreamAsync(int id);
303314
}
304315

305316
@Entity

integration-tests/src/test/java/com/datastax/oss/driver/mapper/SelectIT.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import static org.assertj.core.api.Assertions.assertThat;
1919

2020
import com.datastax.oss.driver.api.core.CqlSession;
21+
import com.datastax.oss.driver.api.core.MappedAsyncPagingIterable;
2122
import com.datastax.oss.driver.api.core.PagingIterable;
2223
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
2324
import com.datastax.oss.driver.api.mapper.annotations.Dao;
@@ -100,11 +101,21 @@ public void should_select_all() {
100101
assertThat(dao.all().all()).hasSize(2);
101102
}
102103

104+
@Test
105+
public void should_select_all_async() {
106+
assertThat(CompletableFutures.getUninterruptibly(dao.allAsync()).currentPage()).hasSize(2);
107+
}
108+
103109
@Test
104110
public void should_select_all_stream() {
105111
assertThat(dao.stream()).hasSize(2);
106112
}
107113

114+
@Test
115+
public void should_select_all_stream_async() {
116+
assertThat(CompletableFutures.getUninterruptibly(dao.streamAsync())).hasSize(2);
117+
}
118+
108119
@Test
109120
public void should_select_by_primary_key_asynchronously() {
110121
assertThat(CompletableFutures.getUninterruptibly(dao.findByIdAsync(FLAMETHROWER.getId())))
@@ -211,9 +222,15 @@ public interface ProductDao {
211222
@Select
212223
PagingIterable<Product> all();
213224

225+
@Select
226+
CompletionStage<MappedAsyncPagingIterable<Product>> allAsync();
227+
214228
@Select
215229
Stream<Product> stream();
216230

231+
@Select
232+
CompletionStage<Stream<Product>> streamAsync();
233+
217234
@Select
218235
Optional<Product> findOptionalById(UUID productId);
219236

manual/mapper/daos/select/README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,11 @@ In all cases, the method can return:
135135
@Select(customWhereClause = "description LIKE :searchString")
136136
CompletionStage<MappedAsyncPagingIterable<Product>> findByDescriptionAsync(String searchString);
137137
```
138+
139+
For streams, even if the initial query is executed asynchronously, traversing the returned
140+
stream may block the traversing thread. Blocking calls can indeed be required as more results
141+
are fetched from the server in the background. For this reason, _the usage of
142+
`CompletionStage<Stream<T>>` cannot be considered as a fully asynchronous execution method_.
138143
139144
* a [MappedReactiveResultSet] of the entity class.
140145

mapper-processor/src/main/java/com/datastax/oss/driver/internal/mapper/processor/dao/DaoSelectMethodGenerator.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static com.datastax.oss.driver.internal.mapper.processor.dao.DefaultDaoReturnTypeKind.FUTURE_OF_ASYNC_PAGING_ITERABLE;
2121
import static com.datastax.oss.driver.internal.mapper.processor.dao.DefaultDaoReturnTypeKind.FUTURE_OF_ENTITY;
2222
import static com.datastax.oss.driver.internal.mapper.processor.dao.DefaultDaoReturnTypeKind.FUTURE_OF_OPTIONAL_ENTITY;
23+
import static com.datastax.oss.driver.internal.mapper.processor.dao.DefaultDaoReturnTypeKind.FUTURE_OF_STREAM;
2324
import static com.datastax.oss.driver.internal.mapper.processor.dao.DefaultDaoReturnTypeKind.MAPPED_REACTIVE_RESULT_SET;
2425
import static com.datastax.oss.driver.internal.mapper.processor.dao.DefaultDaoReturnTypeKind.OPTIONAL_ENTITY;
2526
import static com.datastax.oss.driver.internal.mapper.processor.dao.DefaultDaoReturnTypeKind.PAGING_ITERABLE;
@@ -71,6 +72,7 @@ protected Set<DaoReturnTypeKind> getSupportedReturnTypes() {
7172
PAGING_ITERABLE,
7273
STREAM,
7374
FUTURE_OF_ASYNC_PAGING_ITERABLE,
75+
FUTURE_OF_STREAM,
7476
MAPPED_REACTIVE_RESULT_SET,
7577
CUSTOM);
7678
}

mapper-processor/src/main/java/com/datastax/oss/driver/internal/mapper/processor/dao/DefaultDaoReturnTypeKind.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,26 @@ public CodeBlock wrapWithErrorHandling(
475475
return innerBlock;
476476
}
477477
},
478+
479+
FUTURE_OF_STREAM {
480+
@Override
481+
public void addExecuteStatement(
482+
CodeBlock.Builder methodBuilder,
483+
String helperFieldName,
484+
ExecutableElement methodElement,
485+
Map<Name, TypeElement> typeParameters) {
486+
methodBuilder.addStatement(
487+
"return executeAsyncAndMapToEntityStream(boundStatement, $L)", helperFieldName);
488+
}
489+
490+
@Override
491+
public CodeBlock wrapWithErrorHandling(
492+
CodeBlock innerBlock,
493+
ExecutableElement methodElement,
494+
Map<Name, TypeElement> typeParameters) {
495+
return wrapWithErrorHandling(innerBlock, FAILED_FUTURE);
496+
}
497+
},
478498
;
479499

480500
@Override

mapper-processor/src/main/java/com/datastax/oss/driver/internal/mapper/processor/dao/DefaultDaoReturnTypeParser.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ public class DefaultDaoReturnTypeParser implements DaoReturnTypeParser {
101101
.put(
102102
MappedAsyncPagingIterable.class,
103103
DefaultDaoReturnTypeKind.FUTURE_OF_ASYNC_PAGING_ITERABLE)
104+
.put(Stream.class, DefaultDaoReturnTypeKind.FUTURE_OF_STREAM)
104105
.build();
105106

106107
protected final ProcessorContext context;

mapper-processor/src/test/java/com/datastax/oss/driver/internal/mapper/processor/dao/DaoQueryMethodGeneratorTest.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,11 @@ public static Object[][] invalidSignatures() {
4343
{
4444
"Invalid return type: Query methods must return one of [VOID, BOOLEAN, LONG, ROW, "
4545
+ "ENTITY, OPTIONAL_ENTITY, RESULT_SET, BOUND_STATEMENT, PAGING_ITERABLE, FUTURE_OF_VOID, "
46-
+ "FUTURE_OF_BOOLEAN, FUTURE_OF_LONG, FUTURE_OF_ROW, FUTURE_OF_ENTITY, "
47-
+ "FUTURE_OF_OPTIONAL_ENTITY, FUTURE_OF_ASYNC_RESULT_SET, "
48-
+ "FUTURE_OF_ASYNC_PAGING_ITERABLE, REACTIVE_RESULT_SET, MAPPED_REACTIVE_RESULT_SET, STREAM]",
46+
+ "FUTURE_OF_BOOLEAN, FUTURE_OF_LONG, FUTURE_OF_ROW, "
47+
+ "FUTURE_OF_ENTITY, FUTURE_OF_OPTIONAL_ENTITY, "
48+
+ "FUTURE_OF_ASYNC_RESULT_SET, FUTURE_OF_ASYNC_PAGING_ITERABLE, "
49+
+ "REACTIVE_RESULT_SET, MAPPED_REACTIVE_RESULT_SET, "
50+
+ "STREAM, FUTURE_OF_STREAM]",
4951
MethodSpec.methodBuilder("select")
5052
.addAnnotation(
5153
AnnotationSpec.builder(Query.class)

mapper-processor/src/test/java/com/datastax/oss/driver/internal/mapper/processor/dao/DaoSelectMethodGeneratorTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public static Object[][] invalidSignatures() {
4242
{
4343
"Invalid return type: Select methods must return one of [ENTITY, OPTIONAL_ENTITY, "
4444
+ "FUTURE_OF_ENTITY, FUTURE_OF_OPTIONAL_ENTITY, PAGING_ITERABLE, STREAM, "
45-
+ "FUTURE_OF_ASYNC_PAGING_ITERABLE, MAPPED_REACTIVE_RESULT_SET]",
45+
+ "FUTURE_OF_ASYNC_PAGING_ITERABLE, FUTURE_OF_STREAM, MAPPED_REACTIVE_RESULT_SET]",
4646
MethodSpec.methodBuilder("select")
4747
.addAnnotation(Select.class)
4848
.addModifiers(Modifier.PUBLIC, Modifier.ABSTRACT)
@@ -52,7 +52,7 @@ public static Object[][] invalidSignatures() {
5252
{
5353
"Invalid return type: Select methods must return one of [ENTITY, OPTIONAL_ENTITY, "
5454
+ "FUTURE_OF_ENTITY, FUTURE_OF_OPTIONAL_ENTITY, PAGING_ITERABLE, STREAM, "
55-
+ "FUTURE_OF_ASYNC_PAGING_ITERABLE, MAPPED_REACTIVE_RESULT_SET]",
55+
+ "FUTURE_OF_ASYNC_PAGING_ITERABLE, FUTURE_OF_STREAM, MAPPED_REACTIVE_RESULT_SET]",
5656
MethodSpec.methodBuilder("select")
5757
.addAnnotation(Select.class)
5858
.addModifiers(Modifier.PUBLIC, Modifier.ABSTRACT)

mapper-runtime/src/main/java/com/datastax/oss/driver/internal/mapper/DaoBase.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import com.datastax.oss.driver.api.mapper.entity.saving.NullSavingStrategy;
3636
import com.datastax.oss.driver.internal.core.ConsistencyLevelRegistry;
3737
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
38+
import com.datastax.oss.driver.internal.core.cql.ResultSets;
3839
import com.datastax.oss.protocol.internal.ProtocolConstants;
3940
import java.time.Duration;
4041
import java.util.Optional;
@@ -290,6 +291,13 @@ CompletableFuture<MappedAsyncPagingIterable<EntityT>> executeAsyncAndMapToEntity
290291
return executeAsync(statement).thenApply(rs -> rs.map(entityHelper::get));
291292
}
292293

294+
protected <EntityT> CompletableFuture<Stream<EntityT>> executeAsyncAndMapToEntityStream(
295+
Statement<?> statement, EntityHelper<EntityT> entityHelper) {
296+
return executeAsync(statement)
297+
.thenApply(ResultSets::newInstance)
298+
.thenApply(rs -> StreamSupport.stream(rs.map(entityHelper::get).spliterator(), false));
299+
}
300+
293301
protected static void throwIfProtocolVersionV3(MapperContext context) {
294302
if (isProtocolVersionV3(context)) {
295303
throw new MapperException(

0 commit comments

Comments
 (0)