Skip to content

Commit befa0ff

Browse files
committed
Introduce support for CassandraScrollPosition.
We now represent the PagingState as ScrollPosition to provide scrolling queries that are closer to Cassandra's internal pagination. In combination with Limit, queries are now as flexible as using the Statement API directly. Closes #1408
1 parent 78da030 commit befa0ff

16 files changed

+488
-72
lines changed

spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/query/CassandraPageRequest.java

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package org.springframework.data.cassandra.core.query;
1717

1818
import java.nio.ByteBuffer;
19+
import java.util.Objects;
1920

2021
import org.springframework.data.domain.PageRequest;
2122
import org.springframework.data.domain.Pageable;
@@ -39,15 +40,18 @@
3940
*/
4041
public class CassandraPageRequest extends PageRequest {
4142

42-
private final @Nullable ByteBuffer pagingState;
43+
private final CassandraScrollPosition scrollPosition;
4344

4445
private final boolean nextAllowed;
4546

46-
private CassandraPageRequest(int page, int size, Sort sort, @Nullable ByteBuffer pagingState, boolean nextAllowed) {
47+
private CassandraPageRequest(int page, int size, Sort sort, CassandraScrollPosition scrollPosition,
48+
boolean nextAllowed) {
4749

4850
super(page, size, sort);
4951

50-
this.pagingState = pagingState;
52+
Assert.notNull(scrollPosition, "ScrollPosition must not be null");
53+
54+
this.scrollPosition = scrollPosition;
5155
this.nextAllowed = nextAllowed;
5256
}
5357

@@ -79,7 +83,7 @@ public static CassandraPageRequest of(int page, int size, Sort sort) {
7983
Assert.isTrue(page == 0,
8084
"Cannot create a Cassandra page request for an indexed page other than the first page (0)");
8185

82-
return new CassandraPageRequest(page, size, sort, null, false);
86+
return new CassandraPageRequest(page, size, sort, CassandraScrollPosition.initial(), false);
8387
}
8488

8589
/**
@@ -100,14 +104,15 @@ public static CassandraPageRequest of(int page, int size, Direction direction, S
100104
}
101105

102106
/**
103-
* Creates a a {@link PageRequest} with sort direction and properties applied.
107+
* Creates a {@link PageRequest} with sort direction and properties applied.
104108
*
105109
* @param current the current {@link Pageable}, must not be {@literal null}.
106110
* @param pagingState the paging state associated with the current {@link Pageable}. Can be {@literal null} if there
107111
* is no paging state associated.
108112
*/
109113
public static CassandraPageRequest of(Pageable current, @Nullable ByteBuffer pagingState) {
110-
return new CassandraPageRequest(current.getPageNumber(), current.getPageSize(), current.getSort(), pagingState,
114+
return new CassandraPageRequest(current.getPageNumber(), current.getPageSize(), current.getSort(),
115+
pagingState != null ? CassandraScrollPosition.of(pagingState) : CassandraScrollPosition.initial(),
111116
pagingState != null);
112117
}
113118

@@ -127,7 +132,7 @@ public static CassandraPageRequest first(int size) {
127132
* @param sort must not be {@literal null}.
128133
*/
129134
public static CassandraPageRequest first(int size, Sort sort) {
130-
return new CassandraPageRequest(0, size, sort, null, false);
135+
return new CassandraPageRequest(0, size, sort, CassandraScrollPosition.initial(), false);
131136
}
132137

133138
/**
@@ -178,11 +183,11 @@ public static void validatePageable(Pageable pageable) {
178183
@Nullable
179184
public ByteBuffer getPagingState() {
180185

181-
if (this.pagingState == null) {
186+
if (this.scrollPosition.isInitial()) {
182187
return null;
183188
}
184189

185-
return this.pagingState.asReadOnlyBuffer();
190+
return this.scrollPosition.getPagingState();
186191
}
187192

188193
/**
@@ -200,7 +205,7 @@ public CassandraPageRequest next() {
200205

201206
Assert.state(hasNext(), "Cannot create a next page request without a PagingState");
202207

203-
return new CassandraPageRequest(getPageNumber() + 1, getPageSize(), getSort(), getPagingState(), false);
208+
return new CassandraPageRequest(getPageNumber() + 1, getPageSize(), getSort(), this.scrollPosition, false);
204209
}
205210

206211
/**
@@ -214,7 +219,8 @@ public CassandraPageRequest withSort(Sort sort) {
214219

215220
Assert.notNull(sort, "Sort must not be null");
216221

217-
return new CassandraPageRequest(this.getPageNumber(), this.getPageSize(), sort, getPagingState(), this.nextAllowed);
222+
return new CassandraPageRequest(this.getPageNumber(), this.getPageSize(), sort, this.scrollPosition,
223+
this.nextAllowed);
218224
}
219225

220226
@Override
@@ -225,6 +231,16 @@ public PageRequest previous() {
225231
return super.previous();
226232
}
227233

234+
/**
235+
* Returns the underlying {@link CassandraScrollPosition}.
236+
*
237+
* @return the underlying {@link CassandraScrollPosition}.
238+
* @since 4.2
239+
*/
240+
public CassandraScrollPosition getScrollPosition() {
241+
return this.scrollPosition;
242+
}
243+
228244
@Override
229245
public boolean equals(@Nullable Object obj) {
230246

@@ -244,15 +260,15 @@ public boolean equals(@Nullable Object obj) {
244260
return false;
245261
}
246262

247-
return (pagingState != null ? pagingState.equals(that.pagingState) : that.pagingState == null);
263+
return Objects.equals(this.scrollPosition, that.scrollPosition);
248264
}
249265

250266
@Override
251267
public int hashCode() {
252268

253269
int result = super.hashCode();
254270

255-
result = 31 * result + (pagingState != null ? pagingState.hashCode() : 0);
271+
result = 31 * result + (scrollPosition != null ? scrollPosition.hashCode() : 0);
256272
result = 31 * result + (nextAllowed ? 1 : 0);
257273

258274
return result;
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.cassandra.core.query;
17+
18+
import java.nio.ByteBuffer;
19+
20+
import org.springframework.data.cassandra.core.query.CassandraScrollPosition.Initial;
21+
import org.springframework.data.cassandra.core.query.CassandraScrollPosition.PagingState;
22+
import org.springframework.data.domain.ScrollPosition;
23+
import org.springframework.util.Assert;
24+
import org.springframework.util.ObjectUtils;
25+
26+
/**
27+
* Cassandra-specific implementation of {@link ScrollPosition} using
28+
* {@link com.datastax.oss.driver.api.core.cql.PagingState}.
29+
*
30+
* @author Mark Paluch
31+
* @since 4.2
32+
*/
33+
public abstract sealed class CassandraScrollPosition implements ScrollPosition permits Initial,PagingState {
34+
35+
/**
36+
* Returns an initial {@link CassandraScrollPosition}.
37+
*
38+
* @return an initial {@link CassandraScrollPosition}.
39+
*/
40+
public static CassandraScrollPosition initial() {
41+
return Initial.INSTANCE;
42+
}
43+
44+
/**
45+
* Creates a continuation {@link CassandraScrollPosition} given {@code pagingState}.
46+
*
47+
* @return a continuation {@link CassandraScrollPosition} given {@code pagingState}.
48+
*/
49+
public static CassandraScrollPosition of(ByteBuffer pagingState) {
50+
51+
Assert.notNull(pagingState, "PagingState must not be null");
52+
53+
return new PagingState(pagingState);
54+
}
55+
56+
/**
57+
* Returns the underlying binary representation of the paging state as read-only buffer if the scroll position is not
58+
* {@link #initial()}.
59+
*
60+
* @return the underlying binary representation of the paging state.
61+
* @throws IllegalStateException if the scroll position {@link #initial()} (i.e. the scroll position isn't associated
62+
* with a continuation).
63+
*/
64+
public abstract ByteBuffer getPagingState();
65+
66+
static final class Initial extends CassandraScrollPosition {
67+
68+
private final static Initial INSTANCE = new Initial();
69+
70+
@Override
71+
public boolean isInitial() {
72+
return true;
73+
}
74+
75+
@Override
76+
public ByteBuffer getPagingState() {
77+
throw new IllegalStateException("Initial scroll position does not provide a PagingState");
78+
}
79+
}
80+
81+
static final class PagingState extends CassandraScrollPosition {
82+
83+
private final ByteBuffer pagingState;
84+
85+
PagingState(ByteBuffer pagingState) {
86+
this.pagingState = pagingState;
87+
}
88+
89+
@Override
90+
public boolean isInitial() {
91+
return false;
92+
}
93+
94+
@Override
95+
public ByteBuffer getPagingState() {
96+
return pagingState.asReadOnlyBuffer();
97+
}
98+
99+
@Override
100+
public boolean equals(Object o) {
101+
if (this == o)
102+
return true;
103+
if (o == null || getClass() != o.getClass())
104+
return false;
105+
106+
PagingState that = (PagingState) o;
107+
108+
return ObjectUtils.nullSafeEquals(pagingState, that.pagingState);
109+
}
110+
111+
@Override
112+
public int hashCode() {
113+
return ObjectUtils.nullSafeHashCode(pagingState);
114+
}
115+
}
116+
}

0 commit comments

Comments
 (0)