Skip to content

Commit 6f57f2f

Browse files
committed
Add support for QueryResultConverter.
1 parent 845453a commit 6f57f2f

15 files changed

+871
-121
lines changed

spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/AsyncCassandraOperations.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ <T> CompletableFuture<Void> select(String cql, Consumer<T> entityConsumer, Class
128128

129129
/**
130130
* Execute a {@code SELECT} query with paging and convert the result set to a {@link Slice} of entities. A sliced
131-
* query translates the effective {@link Statement#getFetchSize() fetch size} to the page size.
131+
* query translates the effective {@link Statement#getPageSize() fetch size} to the page size.
132132
*
133133
* @param statement the CQL statement, must not be {@literal null}.
134134
* @param entityClass The entity type must not be {@literal null}.

spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/CassandraOperations.java

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
import java.util.Iterator;
1919
import java.util.List;
20-
import java.util.function.BiFunction;
2120
import java.util.stream.Stream;
2221

2322
import org.jspecify.annotations.Nullable;
@@ -35,7 +34,6 @@
3534
import com.datastax.oss.driver.api.core.CqlIdentifier;
3635
import com.datastax.oss.driver.api.core.cql.BatchType;
3736
import com.datastax.oss.driver.api.core.cql.ResultSet;
38-
import com.datastax.oss.driver.api.core.cql.Row;
3937
import com.datastax.oss.driver.api.core.cql.Statement;
4038

4139
/**
@@ -163,23 +161,9 @@ default CassandraBatchOperations batchOps() {
163161
*/
164162
<T> List<T> select(Statement<?> statement, Class<T> entityClass) throws DataAccessException;
165163

166-
/**
167-
* Execute a {@code SELECT} query and convert the resulting items to a {@link List} of entities considering the given
168-
* {@link BiFunction mapping function}.
169-
*
170-
* @param statement must not be {@literal null}.
171-
* @param entityClass the entity type must not be {@literal null}.
172-
* @param mapper mapping function invoked after materializing {@code entityClass} must not be {@literal null}.
173-
* @return the converted results
174-
* @throws DataAccessException if there is any problem executing the query.
175-
* @since 5.0
176-
*/
177-
<S, T> List<T> select(Statement<?> statement, Class<S> entityClass, BiFunction<S, Row, T> mapper)
178-
throws DataAccessException;
179-
180164
/**
181165
* Execute a {@code SELECT} query with paging and convert the result set to a {@link Slice} of entities. A sliced
182-
* query translates the effective {@link Statement#getFetchSize() fetch size} to the page size.
166+
* query translates the effective {@link Statement#getPageSize()} to the page size.
183167
*
184168
* @param statement the CQL statement, must not be {@literal null}.
185169
* @param entityClass the entity type must not be {@literal null}.

spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/CassandraTemplate.java

Lines changed: 91 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
package org.springframework.data.cassandra.core;
1717

1818
import java.util.List;
19-
import java.util.function.BiFunction;
2019
import java.util.function.Consumer;
2120
import java.util.function.Function;
2221
import java.util.function.Supplier;
@@ -59,6 +58,7 @@
5958
import org.springframework.data.projection.EntityProjection;
6059
import org.springframework.data.projection.ProjectionFactory;
6160
import org.springframework.data.projection.SpelAwareProxyProjectionFactory;
61+
import org.springframework.data.util.Lazy;
6262
import org.springframework.util.Assert;
6363

6464
import com.datastax.oss.driver.api.core.CqlIdentifier;
@@ -350,25 +350,21 @@ public ResultSet execute(Statement<?> statement) {
350350

351351
@Override
352352
public <T> List<T> select(Statement<?> statement, Class<T> entityClass) {
353-
return select(statement, entityClass, (t, row) -> t);
354-
}
355-
356-
@Override
357-
public <S, T> List<T> select(Statement<?> statement, Class<S> entityClass, BiFunction<S, Row, T> mapper)
358-
throws DataAccessException {
359353

360354
Assert.notNull(statement, "Statement must not be null");
361355
Assert.notNull(entityClass, "Entity type must not be null");
362-
Assert.notNull(mapper, "Row Mapper function must not be null");
363356

364-
Function<Row, S> defaultMapper = getMapper(EntityProjection.nonProjecting(entityClass),
365-
EntityQueryUtils.getTableName(statement));
357+
return doSelect(statement, entityClass, getTableName(entityClass), entityClass, QueryResultConverter.entity());
358+
}
366359

367-
return doQuery(statement, (row, rowNum) -> {
360+
<T, R> List<R> doSelect(Statement<?> statement, Class<?> entityClass, CqlIdentifier tableName, Class<T> returnType,
361+
QueryResultConverter<T, R> mappingFunction) {
368362

369-
S intermediate = defaultMapper.apply(row);
370-
return mapper.apply(intermediate, row);
371-
});
363+
EntityProjection<T, ?> projection = entityOperations.introspectProjection(returnType, entityClass);
364+
365+
RowMapper<R> rowMapper = getRowMapper(projection, tableName, mappingFunction);
366+
367+
return doQuery(statement, rowMapper);
372368
}
373369

374370
@Override
@@ -384,13 +380,14 @@ public <T> Slice<T> slice(Statement<?> statement, Class<T> entityClass) {
384380
Assert.notNull(statement, "Statement must not be null");
385381
Assert.notNull(entityClass, "Entity type must not be null");
386382

387-
ResultSet resultSet = doQueryForResultSet(statement);
383+
return doSlice(statement,
384+
getRowMapper(entityClass, EntityQueryUtils.getTableName(statement), QueryResultConverter.entity()));
385+
}
388386

389-
Function<Row, T> mapper = getMapper(EntityProjection.nonProjecting(entityClass),
390-
EntityQueryUtils.getTableName(statement));
387+
<T> Slice<T> doSlice(Statement<?> statement, RowMapper<T> mapper) {
391388

392-
return EntityQueryUtils.readSlice(resultSet, (row, rowNum) -> mapper.apply(row), 0,
393-
getEffectivePageSize(statement));
389+
ResultSet resultSet = doQueryForResultSet(statement);
390+
return EntityQueryUtils.readSlice(resultSet, mapper, 0, getEffectivePageSize(statement));
394391
}
395392

396393
@Override
@@ -399,9 +396,17 @@ public <T> Stream<T> stream(Statement<?> statement, Class<T> entityClass) throws
399396
Assert.notNull(statement, "Statement must not be null");
400397
Assert.notNull(entityClass, "Entity type must not be null");
401398

402-
Function<Row, T> mapper = getMapper(EntityProjection.nonProjecting(entityClass),
403-
EntityQueryUtils.getTableName(statement));
404-
return doQueryForStream(statement, (row, rowNum) -> mapper.apply(row));
399+
return doStream(statement, entityClass, EntityQueryUtils.getTableName(statement), entityClass,
400+
QueryResultConverter.entity());
401+
}
402+
403+
<T, R> Stream<R> doStream(Statement<?> statement, Class<?> entityClass, CqlIdentifier tableName, Class<T> returnType,
404+
QueryResultConverter<T, R> mappingFunction) {
405+
406+
EntityProjection<T, ?> projection = entityOperations.introspectProjection(returnType, entityClass);
407+
408+
RowMapper<R> rowMapper = getRowMapper(projection, tableName, mappingFunction);
409+
return doQueryForStream(statement, rowMapper);
405410
}
406411

407412
// -------------------------------------------------------------------------
@@ -414,10 +419,11 @@ public <T> List<T> select(Query query, Class<T> entityClass) throws DataAccessEx
414419
Assert.notNull(query, "Query must not be null");
415420
Assert.notNull(entityClass, "Entity type must not be null");
416421

417-
return doSelect(query, entityClass, getTableName(entityClass), entityClass);
422+
return doSelect(query, entityClass, getTableName(entityClass), entityClass, QueryResultConverter.entity());
418423
}
419424

420-
<T> List<T> doSelect(Query query, Class<?> entityClass, CqlIdentifier tableName, Class<T> returnType) {
425+
<T, R> List<R> doSelect(Query query, Class<?> entityClass, CqlIdentifier tableName, Class<T> returnType,
426+
QueryResultConverter<? super T, ? extends R> mappingFunction) {
421427

422428
CassandraPersistentEntity<?> entity = getRequiredPersistentEntity(entityClass);
423429
EntityProjection<T, ?> projection = entityOperations.introspectProjection(returnType, entityClass);
@@ -427,9 +433,9 @@ <T> List<T> doSelect(Query query, Class<?> entityClass, CqlIdentifier tableName,
427433
Query queryToUse = query.columns(columns);
428434

429435
StatementBuilder<Select> select = getStatementFactory().select(queryToUse, entity, tableName);
430-
Function<Row, T> mapper = getMapper(projection, tableName);
436+
RowMapper<R> rowMapper = getRowMapper(projection, tableName, mappingFunction);
431437

432-
return doQuery(select.build(), (row, rowNum) -> mapper.apply(row));
438+
return doQuery(select.build(), rowMapper);
433439
}
434440

435441
@Override
@@ -446,9 +452,24 @@ public <T> Slice<T> slice(Query query, Class<T> entityClass) throws DataAccessEx
446452
Assert.notNull(query, "Query must not be null");
447453
Assert.notNull(entityClass, "Entity type must not be null");
448454

449-
StatementBuilder<Select> select = getStatementFactory().select(query, getRequiredPersistentEntity(entityClass));
455+
return doSlice(query, entityClass, getRequiredPersistentEntity(entityClass).getTableName(), entityClass,
456+
QueryResultConverter.entity());
457+
}
458+
459+
<T, R> Slice<R> doSlice(Query query, Class<?> entityClass, CqlIdentifier tableName, Class<T> returnType,
460+
QueryResultConverter<? super T, ? extends R> mappingFunction) {
461+
462+
CassandraPersistentEntity<?> entity = getRequiredPersistentEntity(entityClass);
463+
EntityProjection<T, ?> projection = entityOperations.introspectProjection(returnType, entityClass);
464+
Columns columns = getStatementFactory().computeColumnsForProjection(projection, query.getColumns(), entity,
465+
returnType);
466+
467+
Query queryToUse = query.columns(columns);
468+
469+
StatementBuilder<Select> select = getStatementFactory().select(queryToUse, entity, tableName);
470+
RowMapper<R> rowMapper = getRowMapper(projection, tableName, mappingFunction);
450471

451-
return slice(select.build(), entityClass);
472+
return doSlice(select.build(), rowMapper);
452473
}
453474

454475
@Override
@@ -457,17 +478,19 @@ public <T> Stream<T> stream(Query query, Class<T> entityClass) throws DataAccess
457478
Assert.notNull(query, "Query must not be null");
458479
Assert.notNull(entityClass, "Entity type must not be null");
459480

460-
return doStream(query, entityClass, getTableName(entityClass), entityClass);
481+
return doStream(query, entityClass, getTableName(entityClass), entityClass, QueryResultConverter.entity());
461482
}
462483

463-
<T> Stream<T> doStream(Query query, Class<?> entityClass, CqlIdentifier tableName, Class<T> returnType) {
484+
<T, R> Stream<R> doStream(Query query, Class<?> entityClass, CqlIdentifier tableName, Class<T> returnType,
485+
QueryResultConverter<? super T, ? extends R> mappingFunction) {
464486

465487
StatementBuilder<Select> select = getStatementFactory().select(query, getRequiredPersistentEntity(entityClass),
466488
tableName);
467489
EntityProjection<T, ?> projection = entityOperations.introspectProjection(returnType, entityClass);
468490

469-
Function<Row, T> mapper = getMapper(projection, tableName);
470-
return doQueryForStream(select.build(), (row, rowNum) -> mapper.apply(row));
491+
RowMapper<R> rowMapper = getRowMapper(projection, tableName, mappingFunction);
492+
493+
return doQueryForStream(select.build(), rowMapper);
471494
}
472495

473496
@Override
@@ -779,6 +802,16 @@ public <T> ExecutableSelect<T> query(Class<T> domainType) {
779802
return new ExecutableSelectOperationSupport(this).query(domainType);
780803
}
781804

805+
@Override
806+
public UntypedSelect query(String cql) {
807+
return new ExecutableSelectOperationSupport(this).query(cql);
808+
}
809+
810+
@Override
811+
public UntypedSelect query(Statement<?> statement) {
812+
return new ExecutableSelectOperationSupport(this).query(statement);
813+
}
814+
782815
@Override
783816
public <T> ExecutableInsert<T> insert(Class<T> domainType) {
784817
return new ExecutableInsertOperationSupport(this).insert(domainType);
@@ -921,6 +954,32 @@ public String getCql() {
921954
return getCqlOperations().execute(new GetConfiguredPageSize());
922955
}
923956

957+
@SuppressWarnings("unchecked")
958+
<T, R> RowMapper<R> getRowMapper(EntityProjection<T, ?> projection, CqlIdentifier tableName,
959+
QueryResultConverter<? super T, ? extends R> mappingFunction) {
960+
961+
Function<Row, T> mapper = getMapper(projection, tableName);
962+
963+
return mappingFunction == QueryResultConverter.entity() ? (row, rowNum) -> (R) mapper.apply(row)
964+
: (row, rowNum) -> {
965+
Lazy<T> reader = Lazy.of(() -> mapper.apply(row));
966+
return mappingFunction.mapRow(row, reader::get);
967+
};
968+
}
969+
970+
@SuppressWarnings("unchecked")
971+
<T, R> RowMapper<R> getRowMapper(Class<T> domainClass, CqlIdentifier tableName,
972+
QueryResultConverter<? super T, ? extends R> mappingFunction) {
973+
974+
Function<Row, T> mapper = getMapper(EntityProjection.nonProjecting(domainClass), tableName);
975+
976+
return mappingFunction == QueryResultConverter.entity() ? (row, rowNum) -> (R) mapper.apply(row)
977+
: (row, rowNum) -> {
978+
Lazy<T> reader = Lazy.of(() -> mapper.apply(row));
979+
return mappingFunction.mapRow(row, reader::get);
980+
};
981+
}
982+
924983
@SuppressWarnings("unchecked")
925984
private <T> Function<Row, T> getMapper(EntityProjection<T, ?> projection, CqlIdentifier tableName) {
926985

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright 2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.cassandra.core;
17+
18+
import com.datastax.oss.driver.api.core.cql.Row;
19+
20+
enum EntityResultConverter implements QueryResultConverter<Object, Object> {
21+
22+
INSTANCE;
23+
24+
@Override
25+
public Object mapRow(Row row, ConversionResultSupplier<Object> reader) {
26+
return reader.get();
27+
}
28+
29+
@Override
30+
public <V> QueryResultConverter<Object, V> andThen(QueryResultConverter<? super Object, ? extends V> after) {
31+
return (QueryResultConverter) after;
32+
}
33+
}

0 commit comments

Comments
 (0)