Skip to content

Commit 79e408f

Browse files
committed
Add ListKeyVersions Api Implementation
1 parent 5de859d commit 79e408f

File tree

1 file changed

+51
-2
lines changed

1 file changed

+51
-2
lines changed

app/src/main/java/org/vss/impl/postgres/PostgresBackendImpl.java

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import java.util.ArrayList;
66
import java.util.List;
77
import java.util.Map;
8+
import java.util.Objects;
89
import javax.inject.Singleton;
910
import org.jooq.DSLContext;
1011
import org.jooq.Insert;
@@ -21,12 +22,12 @@
2122
import org.vss.exception.ConflictException;
2223
import org.vss.postgres.tables.records.VssDbRecord;
2324

24-
import static org.jooq.impl.DSL.val;
2525
import static org.vss.postgres.tables.VssDb.VSS_DB;
2626

2727
@Singleton
2828
public class PostgresBackendImpl implements KVStore {
2929

30+
private static final int LIST_KEY_VERSIONS_MAX_PAGE_SIZE = 100;
3031
private final DSLContext context;
3132

3233
@Inject
@@ -127,6 +128,54 @@ private VssDbRecord buildVssRecord(String storeId, KeyValue kv) {
127128

128129
@Override
129130
public ListKeyVersionsResponse listKeyVersions(ListKeyVersionsRequest request) {
130-
throw new UnsupportedOperationException("Operation not implemented");
131+
String storeId = request.getStoreId();
132+
String keyPrefix = request.getKeyPrefix();
133+
String pageToken = request.getPageToken();
134+
int pageSize = request.hasPageSize() ? request.getPageSize() : Integer.MAX_VALUE;
135+
136+
// Only fetch global_version for first page.
137+
// Fetch global_version before fetching any key_versions to ensure that,
138+
// all current key_versions were stored at global_version or later.
139+
Long globalVersion = null;
140+
if (!request.hasPageToken()) {
141+
GetObjectRequest getGlobalVersionRequest = GetObjectRequest.newBuilder()
142+
.setStoreId(storeId)
143+
.setKey(GLOBAL_VERSION_KEY)
144+
.build();
145+
globalVersion = get(getGlobalVersionRequest).getValue().getVersion();
146+
}
147+
148+
List<VssDbRecord> vssDbRecords = context.select(VSS_DB.KEY, VSS_DB.VERSION).from(VSS_DB)
149+
.where(VSS_DB.STORE_ID.eq(storeId)
150+
.and(VSS_DB.KEY.startsWith(keyPrefix)))
151+
.orderBy(VSS_DB.KEY)
152+
.seek(pageToken)
153+
.limit(Math.min(pageSize, LIST_KEY_VERSIONS_MAX_PAGE_SIZE))
154+
.stream()
155+
.map(record -> record.into(VssDbRecord.class))
156+
.toList();
157+
158+
List<KeyValue> keyVersions = vssDbRecords.stream()
159+
.filter(kv -> !GLOBAL_VERSION_KEY.equals(kv.getKey()))
160+
.map(kv -> KeyValue.newBuilder()
161+
.setKey(kv.getKey())
162+
.setVersion(kv.getVersion())
163+
.build())
164+
.toList();
165+
166+
String nextPageToken = "";
167+
if (!keyVersions.isEmpty()) {
168+
nextPageToken = keyVersions.get(keyVersions.size() - 1).getKey();
169+
}
170+
171+
ListKeyVersionsResponse.Builder responseBuilder = ListKeyVersionsResponse.newBuilder()
172+
.addAllKeyVersions(keyVersions)
173+
.setNextPageToken(nextPageToken);
174+
175+
if (Objects.nonNull(globalVersion)) {
176+
responseBuilder.setGlobalVersion(globalVersion);
177+
}
178+
179+
return responseBuilder.build();
131180
}
132181
}

0 commit comments

Comments
 (0)