Skip to content

Commit 2c36ccf

Browse files
committed
update serializer for vector
1 parent 6b6dd8b commit 2c36ccf

20 files changed

+709
-130
lines changed

astra-db-java/src/main/java/com/datastax/astra/client/collections/Collection.java

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,36 +20,37 @@
2020
* #L%
2121
*/
2222

23+
import com.datastax.astra.client.collections.documents.Document;
24+
import com.datastax.astra.client.collections.documents.ReturnDocument;
25+
import com.datastax.astra.client.collections.documents.Update;
26+
import com.datastax.astra.client.collections.exceptions.TooManyDocumentsToCountException;
2327
import com.datastax.astra.client.collections.options.CollectionDeleteManyOptions;
2428
import com.datastax.astra.client.collections.options.CollectionDeleteOneOptions;
25-
import com.datastax.astra.client.collections.options.CollectionInsertManyOptions;
26-
import com.datastax.astra.client.collections.results.CollectionInsertManyResult;
27-
import com.datastax.astra.client.collections.options.CountDocumentsOptions;
28-
import com.datastax.astra.client.collections.results.CollectionDeleteResult;
29-
import com.datastax.astra.client.core.commands.CommandType;
30-
import com.datastax.astra.client.core.paging.CollectionDistinctIterable;
31-
import com.datastax.astra.client.collections.options.EstimatedCountDocumentsOptions;
32-
import com.datastax.astra.client.core.paging.FindIterable;
3329
import com.datastax.astra.client.collections.options.CollectionFindOneAndDeleteOptions;
3430
import com.datastax.astra.client.collections.options.CollectionFindOneAndReplaceOptions;
35-
import com.datastax.astra.client.collections.results.FindOneAndReplaceResult;
3631
import com.datastax.astra.client.collections.options.CollectionFindOneAndUpdateOptions;
3732
import com.datastax.astra.client.collections.options.CollectionFindOneOptions;
3833
import com.datastax.astra.client.collections.options.CollectionFindOptions;
34+
import com.datastax.astra.client.collections.options.CollectionInsertManyOptions;
3935
import com.datastax.astra.client.collections.options.CollectionInsertOneOptions;
40-
import com.datastax.astra.client.collections.results.CollectionInsertOneResult;
4136
import com.datastax.astra.client.collections.options.CollectionReplaceOneOptions;
4237
import com.datastax.astra.client.collections.options.CollectionUpdateManyOptions;
38+
import com.datastax.astra.client.collections.options.CountDocumentsOptions;
39+
import com.datastax.astra.client.collections.options.EstimatedCountDocumentsOptions;
4340
import com.datastax.astra.client.collections.options.UpdateOneOptions;
41+
import com.datastax.astra.client.collections.results.CollectionDeleteResult;
42+
import com.datastax.astra.client.collections.results.CollectionInsertManyResult;
43+
import com.datastax.astra.client.collections.results.CollectionInsertOneResult;
4444
import com.datastax.astra.client.collections.results.CollectionUpdateResult;
45-
import com.datastax.astra.client.collections.documents.Document;
46-
import com.datastax.astra.client.collections.documents.ReturnDocument;
47-
import com.datastax.astra.client.collections.documents.Update;
48-
import com.datastax.astra.client.collections.exceptions.TooManyDocumentsToCountException;
45+
import com.datastax.astra.client.collections.results.FindOneAndReplaceResult;
4946
import com.datastax.astra.client.core.commands.Command;
5047
import com.datastax.astra.client.core.commands.CommandOptions;
5148
import com.datastax.astra.client.core.options.DataAPIOptions;
49+
import com.datastax.astra.client.core.paging.CollectionCursor;
50+
import com.datastax.astra.client.core.paging.CollectionDistinctIterable;
51+
import com.datastax.astra.client.core.paging.FindIterable;
5252
import com.datastax.astra.client.core.paging.Page;
53+
import com.datastax.astra.client.core.paging.TableCursor;
5354
import com.datastax.astra.client.core.query.Filter;
5455
import com.datastax.astra.client.core.query.Filters;
5556
import com.datastax.astra.client.core.types.DataAPIKeywords;
@@ -1044,6 +1045,10 @@ public FindIterable<T> findAll() {
10441045
return find(null, new CollectionFindOptions());
10451046
}
10461047

1048+
public CollectionCursor<T> findAllWithCursor() {
1049+
return new CollectionCursor<T>(this, null, new CollectionFindOptions());
1050+
}
1051+
10471052
/**
10481053
* Retrieves a document by its identifier from the collection.
10491054
* <p>

astra-db-java/src/main/java/com/datastax/astra/client/core/options/DataAPIOptions.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,13 @@ public static void disableEncodeDataApiVectorsAsBase64() {
175175
encodeDataApiVectorsAsBase64 = false;
176176
}
177177

178+
/**
179+
* Disabling the encoding of the vectors as base64.
180+
*/
181+
public static void disableEncodeDurationAsISO8601() {
182+
encodeDurationAsISO8601 = false;
183+
}
184+
178185
@Override
179186
public String toString() {
180187
return new DatabaseSerializer().marshall(this);
Lines changed: 317 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,317 @@
1+
package com.datastax.astra.client.core.paging;
2+
3+
/*-
4+
* #%L
5+
* Data API Java Client
6+
* --
7+
* Copyright (C) 2024 DataStax
8+
* --
9+
* Licensed under the Apache License, Version 2.0
10+
* You may not use this file except in compliance with the License.
11+
* You may obtain a copy of the License at
12+
*
13+
* http://www.apache.org/licenses/LICENSE-2.0
14+
*
15+
* Unless required by applicable law or agreed to in writing, software
16+
* distributed under the License is distributed on an "AS IS" BASIS,
17+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
* See the License for the specific language governing permissions and
19+
* limitations under the License.
20+
* #L%
21+
*/
22+
23+
import com.datastax.astra.client.collections.Collection;
24+
import com.datastax.astra.client.collections.options.CollectionFindOptions;
25+
import com.datastax.astra.client.core.query.Filter;
26+
import com.datastax.astra.client.core.query.Projection;
27+
import com.datastax.astra.client.core.query.Sort;
28+
import com.datastax.astra.client.exception.CursorException;
29+
import lombok.Getter;
30+
31+
import java.io.Closeable;
32+
import java.util.ArrayList;
33+
import java.util.Collections;
34+
import java.util.Iterator;
35+
import java.util.List;
36+
import java.util.NoSuchElementException;
37+
38+
/**
39+
* Implementation of a cursor across the find items
40+
*
41+
* @param <T>
42+
* type of the table
43+
*/
44+
public class CollectionCursor<T> implements Iterable<T>, Closeable, Cloneable {
45+
46+
/**
47+
* Input table reference
48+
*/
49+
@Getter
50+
private final Collection<T> myCollection;
51+
52+
/**
53+
* Input Filter provided.
54+
* Immutable as not setter is provided.
55+
*/
56+
private Filter filter;
57+
58+
/**
59+
* Input Find options. Where will change the different options.
60+
* Immutable as not setter is provided.
61+
*/
62+
private CollectionFindOptions findOptions;
63+
64+
/**
65+
* Cursor state.
66+
*/
67+
private CursorState state;
68+
69+
/**
70+
* Records to process
71+
*/
72+
private List<T> buffer;
73+
74+
/**
75+
* Current page
76+
*/
77+
private Page<T> currentPage;
78+
79+
/**
80+
* How many consumed in the current buffer.
81+
*/
82+
@Getter
83+
private int consumedCount;
84+
85+
/**
86+
* Cursor to iterate on the result of a query.
87+
*
88+
* @param col
89+
* source collection
90+
* @param filter
91+
* current filter
92+
* @param options
93+
* options of the find operation
94+
*/
95+
public CollectionCursor(Collection<T> col, Filter filter, CollectionFindOptions options) {
96+
this.myCollection = col;
97+
this.filter = filter;
98+
this.findOptions = options;
99+
this.state = CursorState.IDLE;
100+
this.buffer = new ArrayList<>();
101+
this.consumedCount = 0;
102+
}
103+
104+
/**
105+
* Constructor by copy. Once cloning the cursor is set back at the beginning.
106+
*
107+
* @param colCursor
108+
* previous cursor
109+
*/
110+
private CollectionCursor(CollectionCursor<T> colCursor) {
111+
this.state = CursorState.IDLE;
112+
this.myCollection = colCursor.myCollection;
113+
this.findOptions = colCursor.findOptions;
114+
this.filter = colCursor.filter;
115+
this.buffer = new ArrayList<>();
116+
this.consumedCount = 0;
117+
}
118+
119+
/** {@inheritDoc} */
120+
@Override
121+
public CollectionCursor<T> clone() {
122+
return new CollectionCursor<>(this);
123+
}
124+
125+
/**
126+
* Immutable methods that return a new Cursor instance.
127+
*
128+
* @param newFilter
129+
* a new filter
130+
*/
131+
public CollectionCursor<T> filter(Filter newFilter) {
132+
checkIdleState();
133+
CollectionCursor<T> newTableCursor = this.clone();
134+
newTableCursor.filter = newFilter;
135+
return newTableCursor;
136+
}
137+
138+
/**
139+
* Immutable methods that return a new Cursor instance.
140+
*
141+
* @param newProjection
142+
* a new projection
143+
*/
144+
public CollectionCursor<T> project(Projection... newProjection) {
145+
checkIdleState();
146+
CollectionCursor<T> newTableCursor = this.clone();
147+
newTableCursor.findOptions.projection(newProjection);
148+
return newTableCursor;
149+
}
150+
151+
public CollectionCursor<T> sort(Sort... sort) {
152+
checkIdleState();
153+
CollectionCursor<T> newTableCursor = this.clone();
154+
newTableCursor.findOptions.sort(sort);
155+
return newTableCursor;
156+
}
157+
158+
public CollectionCursor<T> limit(int newLimit) {
159+
checkIdleState();
160+
CollectionCursor<T> newTableCursor = this.clone();
161+
newTableCursor.limit(newLimit);
162+
return newTableCursor;
163+
}
164+
165+
public CollectionCursor<T> skip(int newSkip) {
166+
checkIdleState();
167+
CollectionCursor<T> newTableCursor = this.clone();
168+
newTableCursor.skip(newSkip);
169+
return newTableCursor;
170+
}
171+
172+
public CollectionCursor<T> includeSimilarity() {
173+
checkIdleState();
174+
CollectionCursor<T> newTableCursor = this.clone();
175+
newTableCursor.includeSimilarity();
176+
return newTableCursor;
177+
}
178+
179+
public CollectionCursor<T> includeSortVector() {
180+
checkIdleState();
181+
CollectionCursor<T> newTableCursor = this.clone();
182+
newTableCursor.includeSortVector();
183+
return newTableCursor;
184+
}
185+
186+
/**
187+
* Change the state of the cursor to close.
188+
*/
189+
@Override
190+
public void close() {
191+
this.state = CursorState.CLOSED;
192+
}
193+
194+
/**
195+
* Rewind the cursor to the beginning.
196+
*/
197+
private void rewind() {
198+
this.state = CursorState.IDLE;
199+
this.buffer.clear();
200+
this.consumedCount = 0;
201+
}
202+
203+
// Buffer consumption
204+
public List<T> consumeBuffer(int n) {
205+
if (state == CursorState.CLOSED || state == CursorState.IDLE) {
206+
return Collections.emptyList();
207+
}
208+
List<T> result = new ArrayList<>();
209+
int count = 0;
210+
while (!buffer.isEmpty() && count < n) {
211+
result.add(buffer.remove(0));
212+
count++;
213+
}
214+
return result;
215+
}
216+
217+
/**
218+
* Validate that the cursor is in the IDLE state.
219+
*/
220+
private void checkIdleState() {
221+
if (state != CursorState.IDLE) {
222+
throw new CursorException("Cannot modify cursor after it has been started.", state.toString());
223+
}
224+
}
225+
226+
// Iterator implementation
227+
@Override
228+
public Iterator<T> iterator() {
229+
return new CursorIterator();
230+
}
231+
232+
/**
233+
* Iterator about options
234+
*/
235+
private class CursorIterator implements Iterator<T> {
236+
237+
@Override
238+
public boolean hasNext() {
239+
if (state == CursorState.CLOSED) {
240+
return false;
241+
}
242+
if (state == CursorState.IDLE) {
243+
state = CursorState.STARTED;
244+
}
245+
if (!buffer.isEmpty()) {
246+
return true;
247+
}
248+
// Fetch next batch of documents into buffer (if buffer is empty)
249+
fetchNextBatch();
250+
return !buffer.isEmpty();
251+
}
252+
253+
@Override
254+
public T next() {
255+
if (!hasNext()) {
256+
throw new NoSuchElementException();
257+
}
258+
T rawDoc = buffer.remove(0);
259+
consumedCount++;
260+
return (T) rawDoc;
261+
}
262+
}
263+
264+
// Fetch next batch of documents
265+
private void fetchNextBatch() {
266+
if (currentPage == null) {
267+
currentPage = myCollection.findPage(filter, findOptions);
268+
buffer.addAll(currentPage.getResults());
269+
} else if (currentPage.getPageState().isPresent()) {
270+
findOptions.pageState(currentPage.getPageState().get());
271+
currentPage = myCollection.findPage(filter, findOptions);
272+
buffer.addAll(currentPage.getResults());
273+
} else {
274+
System.out.println("no");
275+
}
276+
}
277+
278+
// Additional methods
279+
public boolean hasNext() {
280+
return iterator().hasNext();
281+
}
282+
283+
public T next() {
284+
return iterator().next();
285+
}
286+
287+
public List<T> toList() {
288+
List<T> result = new ArrayList<>();
289+
try {
290+
forEach(result::add);
291+
} finally {
292+
close();
293+
}
294+
return result;
295+
}
296+
297+
/**
298+
* Access the size of the buffer.
299+
*
300+
* @return
301+
* buffer count
302+
*/
303+
public int getBufferedCount() {
304+
return buffer.size();
305+
}
306+
307+
/**
308+
* Retrieve keyspace name.
309+
*
310+
* @return
311+
* keyspace name
312+
*/
313+
public String getKeyspace() {
314+
return myCollection.getKeyspaceName();
315+
}
316+
317+
}

0 commit comments

Comments
 (0)