Skip to content

Commit 5de859d

Browse files
authored
Merge pull request #5 from G8XSU/postgres-impl-get-put
Add initial PostgreSQL implementation for get & put operations
2 parents 06a1a81 + 403b0c2 commit 5de859d

File tree

7 files changed

+487
-19
lines changed

7 files changed

+487
-19
lines changed

app/build.gradle

Lines changed: 81 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,101 @@
11
buildscript {
2-
ext.gradleVersion = '7.5.1'
3-
ext.protobufPlugInVersion = '0.8.12'
4-
ext.protobufVersion = '3.21.7'
5-
ext.jerseyVersion = '3.1.0'
6-
ext.junitVersion = '5.9.0'
2+
ext.gradleVersion = '7.5.1'
3+
ext.protobufPlugInVersion = '0.8.12'
4+
ext.protobufVersion = '3.21.7'
5+
ext.jerseyVersion = '3.1.0'
6+
ext.junitVersion = '5.9.0'
7+
ext.postgresVersion = '42.5.1'
8+
ext.jooqVersion = '3.17.7'
9+
ext.guiceVersion = '5.1.0'
710
}
811

912
plugins {
10-
id 'java'
11-
id 'com.google.protobuf' version "${protobufPlugInVersion}"
12-
id 'war'
13-
id 'idea'
13+
id 'java'
14+
id 'com.google.protobuf' version "${protobufPlugInVersion}"
15+
id 'war'
16+
id 'idea'
17+
id 'nu.studer.jooq' version '8.0'
1418
}
1519

1620
repositories {
17-
mavenCentral()
21+
mavenCentral()
1822
}
1923

2024
idea {
21-
module {
22-
generatedSourceDirs.add(file("build/generated/proto/main"))
23-
}
25+
module {
26+
generatedSourceDirs.add(file("build/generated/proto/main"))
27+
}
2428
}
2529

2630
group 'org.vss'
2731
version '1.0'
2832

29-
3033
dependencies {
31-
implementation "com.google.protobuf:protobuf-java:$protobufVersion"
34+
implementation "com.google.protobuf:protobuf-java:$protobufVersion"
35+
36+
//jOOQ & Postgres impl deps
37+
implementation "org.jooq:jooq:$jooqVersion"
38+
implementation "org.jooq:jooq-meta:$jooqVersion"
39+
implementation "org.jooq:jooq-codegen:$jooqVersion"
40+
runtimeOnly "org.postgresql:postgresql:$postgresVersion"
41+
jooqGenerator "org.postgresql:postgresql:$postgresVersion"
3242

33-
testImplementation "org.junit.jupiter:junit-jupiter-api:$junitVersion"
34-
testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:$junitVersion"
43+
implementation "com.google.inject:guice:$guiceVersion"
44+
45+
testImplementation "org.junit.jupiter:junit-jupiter-api:$junitVersion"
46+
testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:$junitVersion"
47+
testImplementation "org.hamcrest:hamcrest-library:2.2"
48+
testImplementation "org.testcontainers:junit-jupiter:1.17.6"
49+
testImplementation "org.testcontainers:postgresql:1.17.6"
3550
}
3651

3752
test {
38-
useJUnitPlatform()
39-
}
53+
useJUnitPlatform()
54+
}
55+
56+
protobuf {
57+
protoc {
58+
artifact = "com.google.protobuf:protoc:$protobufVersion"
59+
}
60+
}
61+
62+
jooq {
63+
configurations {
64+
main {
65+
generateSchemaSourceOnCompilation = true
66+
67+
generationTool {
68+
jdbc {
69+
driver = 'org.postgresql.Driver'
70+
url = 'jdbc:postgresql://localhost:5432/postgres'
71+
user = 'postgres'
72+
password = ''
73+
properties {
74+
property {
75+
key = 'ssl'
76+
value = 'false'
77+
}
78+
}
79+
}
80+
generator {
81+
name = 'org.jooq.codegen.DefaultGenerator'
82+
database {
83+
name = 'org.jooq.meta.postgres.PostgresDatabase'
84+
inputSchema = 'public'
85+
}
86+
generate {
87+
deprecated = false
88+
records = true
89+
immutablePojos = true
90+
fluentSetters = true
91+
}
92+
target {
93+
packageName = 'org.vss.postgres'
94+
directory = 'build/generated-src/jooq/main'
95+
}
96+
strategy.name = 'org.jooq.codegen.DefaultGeneratorStrategy'
97+
}
98+
}
99+
}
100+
}
101+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package org.vss;
2+
3+
public interface KVStore {
4+
5+
String GLOBAL_VERSION_KEY = "vss_global_version";
6+
7+
GetObjectResponse get(GetObjectRequest request);
8+
9+
PutObjectResponse put(PutObjectRequest request);
10+
11+
ListKeyVersionsResponse listKeyVersions(ListKeyVersionsRequest request);
12+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package org.vss.exception;
2+
3+
public class ConflictException extends RuntimeException {
4+
public ConflictException(String message) {
5+
super(message);
6+
}
7+
}
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
package org.vss.impl.postgres;
2+
3+
import com.google.inject.Inject;
4+
import com.google.protobuf.ByteString;
5+
import java.util.ArrayList;
6+
import java.util.List;
7+
import java.util.Map;
8+
import javax.inject.Singleton;
9+
import org.jooq.DSLContext;
10+
import org.jooq.Insert;
11+
import org.jooq.Query;
12+
import org.jooq.Update;
13+
import org.vss.GetObjectRequest;
14+
import org.vss.GetObjectResponse;
15+
import org.vss.KVStore;
16+
import org.vss.KeyValue;
17+
import org.vss.ListKeyVersionsRequest;
18+
import org.vss.ListKeyVersionsResponse;
19+
import org.vss.PutObjectRequest;
20+
import org.vss.PutObjectResponse;
21+
import org.vss.exception.ConflictException;
22+
import org.vss.postgres.tables.records.VssDbRecord;
23+
24+
import static org.jooq.impl.DSL.val;
25+
import static org.vss.postgres.tables.VssDb.VSS_DB;
26+
27+
@Singleton
28+
public class PostgresBackendImpl implements KVStore {
29+
30+
private final DSLContext context;
31+
32+
@Inject
33+
public PostgresBackendImpl(DSLContext context) {
34+
this.context = context;
35+
}
36+
37+
@Override
38+
public GetObjectResponse get(GetObjectRequest request) {
39+
40+
VssDbRecord vssDbRecord = context.selectFrom(VSS_DB)
41+
.where(VSS_DB.STORE_ID.eq(request.getStoreId())
42+
.and(VSS_DB.KEY.eq(request.getKey())))
43+
.fetchOne();
44+
45+
final KeyValue keyValue;
46+
47+
if (vssDbRecord != null) {
48+
keyValue = KeyValue.newBuilder()
49+
.setKey(vssDbRecord.getKey())
50+
.setValue(ByteString.copyFrom(vssDbRecord.getValue()))
51+
.setVersion(vssDbRecord.getVersion())
52+
.build();
53+
} else {
54+
keyValue = KeyValue.newBuilder()
55+
.setKey(request.getKey()).build();
56+
}
57+
58+
return GetObjectResponse.newBuilder()
59+
.setValue(keyValue)
60+
.build();
61+
}
62+
63+
@Override
64+
public PutObjectResponse put(PutObjectRequest request) {
65+
66+
String storeId = request.getStoreId();
67+
68+
List<VssDbRecord> vssRecords = new ArrayList<>(request.getTransactionItemsList().stream()
69+
.map(kv -> buildVssRecord(storeId, kv)).toList());
70+
71+
if (request.hasGlobalVersion()) {
72+
VssDbRecord globalVersionRecord = buildVssRecord(storeId,
73+
KeyValue.newBuilder()
74+
.setKey(GLOBAL_VERSION_KEY)
75+
.setVersion(request.getGlobalVersion())
76+
.build());
77+
78+
vssRecords.add(globalVersionRecord);
79+
}
80+
81+
context.transaction((ctx) -> {
82+
DSLContext dsl = ctx.dsl();
83+
List<Query> batchQueries = vssRecords.stream()
84+
.map(vssRecord -> buildPutObjectQuery(dsl, vssRecord)).toList();
85+
86+
int[] batchResult = dsl.batch(batchQueries).execute();
87+
88+
for (int numOfRowsUpdated : batchResult) {
89+
if (numOfRowsUpdated == 0) {
90+
throw new ConflictException(
91+
"Transaction could not be completed due to a possible conflict");
92+
}
93+
}
94+
});
95+
96+
return PutObjectResponse.newBuilder().build();
97+
}
98+
99+
private Query buildPutObjectQuery(DSLContext dsl, VssDbRecord vssRecord) {
100+
return vssRecord.getVersion() == 0 ? buildInsertRecordQuery(dsl, vssRecord)
101+
: buildUpdateRecordQuery(dsl, vssRecord);
102+
}
103+
104+
private Insert<VssDbRecord> buildInsertRecordQuery(DSLContext dsl, VssDbRecord vssRecord) {
105+
return dsl.insertInto(VSS_DB)
106+
.values(vssRecord.getStoreId(), vssRecord.getKey(),
107+
vssRecord.getValue(), 1)
108+
.onDuplicateKeyIgnore();
109+
}
110+
111+
private Update<VssDbRecord> buildUpdateRecordQuery(DSLContext dsl, VssDbRecord vssRecord) {
112+
return dsl.update(VSS_DB)
113+
.set(Map.of(VSS_DB.VALUE, vssRecord.getValue(),
114+
VSS_DB.VERSION, vssRecord.getVersion() + 1))
115+
.where(VSS_DB.STORE_ID.eq(vssRecord.getStoreId())
116+
.and(VSS_DB.KEY.eq(vssRecord.getKey()))
117+
.and(VSS_DB.VERSION.eq(vssRecord.getVersion())));
118+
}
119+
120+
private VssDbRecord buildVssRecord(String storeId, KeyValue kv) {
121+
return new VssDbRecord()
122+
.setStoreId(storeId)
123+
.setKey(kv.getKey())
124+
.setValue(kv.getValue().toByteArray())
125+
.setVersion(kv.getVersion());
126+
}
127+
128+
@Override
129+
public ListKeyVersionsResponse listKeyVersions(ListKeyVersionsRequest request) {
130+
throw new UnsupportedOperationException("Operation not implemented");
131+
}
132+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
CREATE TABLE vss_db (
2+
store_id character varying(120) NOT NULL CHECK (store_id <> ''),
3+
key character varying(120) NOT NULL,
4+
value bytea NULL,
5+
version bigint NOT NULL,
6+
PRIMARY KEY (store_id, key)
7+
);

0 commit comments

Comments
 (0)