Skip to content

Commit ff4b5fc

Browse files
authored
Merge pull request #8 from G8XSU/keys-summary-impl
Add Implementation for ListKeyVersions Api and AbstractKVStore tests for the same
2 parents 5de859d + 69fb119 commit ff4b5fc

File tree

2 files changed

+259
-5
lines changed

2 files changed

+259
-5
lines changed

app/src/main/java/org/vss/impl/postgres/PostgresBackendImpl.java

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import java.util.ArrayList;
66
import java.util.List;
77
import java.util.Map;
8+
import java.util.Objects;
89
import javax.inject.Singleton;
910
import org.jooq.DSLContext;
1011
import org.jooq.Insert;
@@ -21,12 +22,12 @@
2122
import org.vss.exception.ConflictException;
2223
import org.vss.postgres.tables.records.VssDbRecord;
2324

24-
import static org.jooq.impl.DSL.val;
2525
import static org.vss.postgres.tables.VssDb.VSS_DB;
2626

2727
@Singleton
2828
public class PostgresBackendImpl implements KVStore {
2929

30+
private static final int LIST_KEY_VERSIONS_MAX_PAGE_SIZE = 100;
3031
private final DSLContext context;
3132

3233
@Inject
@@ -127,6 +128,54 @@ private VssDbRecord buildVssRecord(String storeId, KeyValue kv) {
127128

128129
@Override
129130
public ListKeyVersionsResponse listKeyVersions(ListKeyVersionsRequest request) {
130-
throw new UnsupportedOperationException("Operation not implemented");
131+
String storeId = request.getStoreId();
132+
String keyPrefix = request.getKeyPrefix();
133+
String pageToken = request.getPageToken();
134+
int pageSize = request.hasPageSize() ? request.getPageSize() : Integer.MAX_VALUE;
135+
136+
// Only fetch global_version for first page.
137+
// Fetch global_version before fetching any key_versions to ensure that,
138+
// all current key_versions were stored at global_version or later.
139+
Long globalVersion = null;
140+
if (!request.hasPageToken()) {
141+
GetObjectRequest getGlobalVersionRequest = GetObjectRequest.newBuilder()
142+
.setStoreId(storeId)
143+
.setKey(GLOBAL_VERSION_KEY)
144+
.build();
145+
globalVersion = get(getGlobalVersionRequest).getValue().getVersion();
146+
}
147+
148+
List<VssDbRecord> vssDbRecords = context.select(VSS_DB.KEY, VSS_DB.VERSION).from(VSS_DB)
149+
.where(VSS_DB.STORE_ID.eq(storeId)
150+
.and(VSS_DB.KEY.startsWith(keyPrefix)))
151+
.orderBy(VSS_DB.KEY)
152+
.seek(pageToken)
153+
.limit(Math.min(pageSize, LIST_KEY_VERSIONS_MAX_PAGE_SIZE))
154+
.stream()
155+
.map(record -> record.into(VssDbRecord.class))
156+
.toList();
157+
158+
List<KeyValue> keyVersions = vssDbRecords.stream()
159+
.filter(kv -> !GLOBAL_VERSION_KEY.equals(kv.getKey()))
160+
.map(kv -> KeyValue.newBuilder()
161+
.setKey(kv.getKey())
162+
.setVersion(kv.getVersion())
163+
.build())
164+
.toList();
165+
166+
String nextPageToken = "";
167+
if (!keyVersions.isEmpty()) {
168+
nextPageToken = keyVersions.get(keyVersions.size() - 1).getKey();
169+
}
170+
171+
ListKeyVersionsResponse.Builder responseBuilder = ListKeyVersionsResponse.newBuilder()
172+
.addAllKeyVersions(keyVersions)
173+
.setNextPageToken(nextPageToken);
174+
175+
if (Objects.nonNull(globalVersion)) {
176+
responseBuilder.setGlobalVersion(globalVersion);
177+
}
178+
179+
return responseBuilder.build();
131180
}
132181
}

app/src/test/java/org/vss/AbstractKVStoreIntegrationTest.java

Lines changed: 208 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,22 @@
22

33
import com.google.protobuf.ByteString;
44
import java.nio.charset.StandardCharsets;
5+
import java.util.ArrayList;
56
import java.util.List;
67
import java.util.Objects;
8+
import java.util.Set;
9+
import java.util.stream.Collectors;
10+
import javax.annotation.Nullable;
711
import org.junit.jupiter.api.Test;
12+
import org.testcontainers.shaded.org.apache.commons.lang3.StringUtils;
813
import org.vss.exception.ConflictException;
914

1015
import static org.hamcrest.MatcherAssert.assertThat;
1116
import static org.hamcrest.Matchers.is;
17+
import static org.hamcrest.Matchers.lessThan;
18+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
1219
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
20+
import static org.junit.jupiter.api.Assertions.assertFalse;
1321
import static org.junit.jupiter.api.Assertions.assertThrows;
1422
import static org.junit.jupiter.api.Assertions.assertTrue;
1523

@@ -28,6 +36,8 @@ void putShouldSucceedWhenSingleObjectPutOperation() {
2836
assertThat(response.getKey(), is("k1"));
2937
assertThat(response.getVersion(), is(2L));
3038
assertThat(response.getValue().toStringUtf8(), is("k1v2"));
39+
40+
assertThat(getObject(KVStore.GLOBAL_VERSION_KEY).getVersion(), is(2L));
3141
}
3242

3343
@Test
@@ -50,6 +60,8 @@ void putShouldSucceedWhenMultiObjectPutOperation() {
5060
assertThat(response.getKey(), is("k2"));
5161
assertThat(response.getVersion(), is(2L));
5262
assertThat(response.getValue().toStringUtf8(), is("k2v2"));
63+
64+
assertThat(getObject(KVStore.GLOBAL_VERSION_KEY).getVersion(), is(2L));
5365
}
5466

5567
@Test
@@ -59,11 +71,13 @@ void putShouldFailWhenKeyVersionMismatched() {
5971
// global_version correctly changed but key-version conflict.
6072
assertThrows(ConflictException.class, () -> putObjects(1L, List.of(kv("k1", "k1v2", 0))));
6173

62-
//Verify that values didn't change
74+
// Verify that values didn't change
6375
KeyValue response = getObject("k1");
6476
assertThat(response.getKey(), is("k1"));
6577
assertThat(response.getVersion(), is(1L));
6678
assertThat(response.getValue().toStringUtf8(), is("k1v1"));
79+
80+
assertThat(getObject(KVStore.GLOBAL_VERSION_KEY).getVersion(), is(1L));
6781
}
6882

6983
@Test
@@ -78,7 +92,7 @@ void putMultiObjectShouldFailWhenSingleKeyVersionMismatched() {
7892

7993
assertThrows(ConflictException.class, () -> putObjects(null, second_request));
8094

81-
//Verify that values didn't change
95+
// Verify that values didn't change
8296
KeyValue response = getObject("k1");
8397
assertThat(response.getKey(), is("k1"));
8498
assertThat(response.getVersion(), is(1L));
@@ -113,6 +127,8 @@ void putShouldSucceedWhenNoGlobalVersionIsGiven() {
113127
assertThat(response.getKey(), is("k1"));
114128
assertThat(response.getVersion(), is(2L));
115129
assertThat(response.getValue().toStringUtf8(), is("k1v2"));
130+
131+
assertThat(getObject(KVStore.GLOBAL_VERSION_KEY).getVersion(), is(0L));
116132
}
117133

118134
@Test
@@ -163,6 +179,177 @@ void getShouldReturnCorrectValueWhenKeyExists() {
163179
assertThat(response.getValue().toStringUtf8(), is("k3v1"));
164180
}
165181

182+
@Test
183+
void listShouldReturnPaginatedResponse() {
184+
185+
int totalKvObjects = 1000;
186+
for (int i = 0; i < totalKvObjects; i++) {
187+
putObjects((long) i, List.of(kv("k" + i, "k1v1", 0)));
188+
}
189+
// Overwrite k1 once and k2 twice.
190+
putObjects(1000L, List.of(kv("k1", "k1v2", 1)));
191+
putObjects(1001L, List.of(kv("k2", "k2v2", 1)));
192+
putObjects(1002L, List.of(kv("k2", "k2v3", 2)));
193+
194+
ListKeyVersionsResponse previousPage = null;
195+
List<KeyValue> allKeyVersions = new ArrayList<>();
196+
197+
while (previousPage == null || !previousPage.getKeyVersionsList().isEmpty()) {
198+
ListKeyVersionsResponse currentPage;
199+
200+
if (previousPage == null) {
201+
currentPage = list(null, null, null);
202+
203+
// Ensure first page contains correct global version
204+
assertThat(currentPage.getGlobalVersion(), is(1003L));
205+
} else {
206+
String nextPageToken = previousPage.getNextPageToken();
207+
currentPage = list(nextPageToken, null, null);
208+
209+
// Ensure pages after first page dont contain global version.
210+
assertThat(currentPage.hasGlobalVersion(), is(false));
211+
}
212+
213+
allKeyVersions.addAll(currentPage.getKeyVersionsList());
214+
previousPage = currentPage;
215+
}
216+
217+
// Ensure page results don't intersect/duplicate and return complete view.
218+
Set<String> uniqueKeys = allKeyVersions.stream().map(KeyValue::getKey).distinct()
219+
.collect(Collectors.toSet());
220+
assertThat(uniqueKeys.size(), is(totalKvObjects));
221+
222+
// Ensure that we don't return "vss_global_version" as part of keys.
223+
assertFalse(uniqueKeys.contains(KVStore.GLOBAL_VERSION_KEY));
224+
225+
// Ensure correct key version for k1
226+
KeyValue k1_response =
227+
allKeyVersions.stream().filter(kv -> "k1".equals(kv.getKey())).findFirst().get();
228+
assertThat(k1_response.getKey(), is("k1"));
229+
assertThat(k1_response.getVersion(), is(2L));
230+
assertThat(k1_response.getValue().toStringUtf8(), is(""));
231+
232+
// Ensure correct key version for k2
233+
KeyValue k2_response =
234+
allKeyVersions.stream().filter(kv -> "k2".equals(kv.getKey())).findFirst().get();
235+
assertThat(k2_response.getKey(), is("k2"));
236+
assertThat(k2_response.getVersion(), is(3L));
237+
assertThat(k2_response.getValue().toStringUtf8(), is(""));
238+
}
239+
240+
@Test
241+
void listShouldHonourPageSizeAndKeyPrefixIfProvided() {
242+
int totalKvObjects = 20;
243+
int pageSize = 5;
244+
for (int i = 0; i < totalKvObjects; i++) {
245+
putObjects((long) i, List.of(kv(i + "k", "k1v1", 0)));
246+
}
247+
248+
ListKeyVersionsResponse previousPage = null;
249+
List<KeyValue> allKeyVersions = new ArrayList<>();
250+
String keyPrefix = "1";
251+
252+
while (previousPage == null || !previousPage.getKeyVersionsList().isEmpty()) {
253+
ListKeyVersionsResponse currentPage;
254+
255+
if (previousPage == null) {
256+
currentPage = list(null, pageSize, keyPrefix);
257+
} else {
258+
String nextPageToken = previousPage.getNextPageToken();
259+
currentPage = list(nextPageToken, pageSize, keyPrefix);
260+
}
261+
262+
allKeyVersions.addAll(currentPage.getKeyVersionsList());
263+
264+
// Each page.size() is less than or equal to pageSize in request.
265+
assertThat(currentPage.getKeyVersionsList().size(), lessThanOrEqualTo(pageSize));
266+
previousPage = currentPage;
267+
}
268+
269+
Set<String> uniqueKeys =
270+
allKeyVersions.stream().map(KeyValue::getKey).collect(Collectors.toSet());
271+
272+
// Returns keys only with provided keyPrefix
273+
assertThat(uniqueKeys.size(), is(11));
274+
assertThat(uniqueKeys,
275+
is(Set.of("1k", "10k", "11k", "12k", "13k", "14k", "15k", "16k", "17k", "18k", "19k")));
276+
}
277+
278+
@Test
279+
void listShouldReturnZeroGlobalVersionWhenGlobalVersioningNotEnabled() {
280+
int totalKvObjects = 1000;
281+
for (int i = 0; i < totalKvObjects; i++) {
282+
putObjects(null, List.of(kv("k" + i, "k1v1", 0)));
283+
}
284+
285+
ListKeyVersionsResponse previousPage = null;
286+
List<KeyValue> allKeyVersions = new ArrayList<>();
287+
288+
while (previousPage == null || !previousPage.getKeyVersionsList().isEmpty()) {
289+
ListKeyVersionsResponse currentPage;
290+
291+
if (previousPage == null) {
292+
currentPage = list(null, null, null);
293+
294+
// Ensure first page returns global version as ZERO
295+
assertThat(currentPage.getGlobalVersion(), is(0L));
296+
} else {
297+
String nextPageToken = previousPage.getNextPageToken();
298+
currentPage = list(nextPageToken, null, null);
299+
300+
// Ensure pages after first page do not contain global version.
301+
assertThat(currentPage.hasGlobalVersion(), is(false));
302+
}
303+
304+
allKeyVersions.addAll(currentPage.getKeyVersionsList());
305+
previousPage = currentPage;
306+
}
307+
// Returns complete view.
308+
Set<String> uniqueKeys = allKeyVersions.stream().map(KeyValue::getKey).distinct()
309+
.collect(Collectors.toSet());
310+
assertThat(uniqueKeys.size(), is(totalKvObjects));
311+
312+
// Ensure that we don't return "vss_global_version" as part of keys.
313+
assertFalse(uniqueKeys.contains(KVStore.GLOBAL_VERSION_KEY));
314+
}
315+
316+
@Test
317+
void listShouldLimitMaxPageSize() {
318+
319+
int totalKvObjects = 10000;
320+
321+
// Each implementation is free to choose its own max_page_size but there should be a reasonable max
322+
// keeping scalability and performance in mind.
323+
// Revisit this test case if some implementation wants to support higher page size.
324+
int vssArbitraryPageSizeMax = 3000;
325+
326+
for (int i = 0; i < totalKvObjects; i++) {
327+
putObjects((long) i, List.of(kv("k" + i, "k1v1", 0)));
328+
}
329+
330+
ListKeyVersionsResponse previousPage = null;
331+
List<KeyValue> allKeyVersions = new ArrayList<>();
332+
333+
while (previousPage == null || !previousPage.getKeyVersionsList().isEmpty()) {
334+
ListKeyVersionsResponse currentPage;
335+
336+
if (previousPage == null) {
337+
currentPage = list(null, null, null);
338+
} else {
339+
String nextPageToken = previousPage.getNextPageToken();
340+
currentPage = list(nextPageToken, null, null);
341+
}
342+
343+
allKeyVersions.addAll(currentPage.getKeyVersionsList());
344+
345+
// Each page.size() is less than MAX_PAGE_SIZE
346+
assertThat(currentPage.getKeyVersionsList().size(), lessThan(vssArbitraryPageSizeMax));
347+
previousPage = currentPage;
348+
}
349+
350+
assertThat(allKeyVersions.size(), is(totalKvObjects));
351+
}
352+
166353
private KeyValue getObject(String key) {
167354
GetObjectRequest getRequest = GetObjectRequest.newBuilder()
168355
.setStoreId(STORE_ID)
@@ -171,7 +358,7 @@ private KeyValue getObject(String key) {
171358
return this.kvStore.get(getRequest).getValue();
172359
}
173360

174-
private void putObjects(Long globalVersion, List<KeyValue> keyValues) {
361+
private void putObjects(@Nullable Long globalVersion, List<KeyValue> keyValues) {
175362
PutObjectRequest.Builder putObjectRequestBuilder = PutObjectRequest.newBuilder()
176363
.setStoreId(STORE_ID)
177364
.addAllTransactionItems(keyValues);
@@ -183,6 +370,24 @@ private void putObjects(Long globalVersion, List<KeyValue> keyValues) {
183370
this.kvStore.put(putObjectRequestBuilder.build());
184371
}
185372

373+
private ListKeyVersionsResponse list(@Nullable String nextPageToken, @Nullable Integer pageSize,
374+
@Nullable String keyPrefix) {
375+
ListKeyVersionsRequest.Builder listRequestBuilder = ListKeyVersionsRequest.newBuilder()
376+
.setStoreId(STORE_ID);
377+
378+
if (StringUtils.isNotBlank(nextPageToken)) {
379+
listRequestBuilder.setPageToken(nextPageToken);
380+
}
381+
if (pageSize != null) {
382+
listRequestBuilder.setPageSize(pageSize);
383+
}
384+
if (StringUtils.isNotBlank(keyPrefix)) {
385+
listRequestBuilder.setKeyPrefix(keyPrefix);
386+
}
387+
388+
return this.kvStore.listKeyVersions(listRequestBuilder.build());
389+
}
390+
186391
private KeyValue kv(String key, String value, int version) {
187392
return KeyValue.newBuilder().setKey(key).setVersion(version).setValue(
188393
ByteString.copyFrom(value.getBytes(

0 commit comments

Comments
 (0)