Skip to content

Commit 9177cdd

Browse files
authored
Merge pull request #48 from scalableminds/multi-get-put
Add put + get with multiple keys *and* versions
2 parents a423a48 + 75806dc commit 9177cdd

File tree

7 files changed

+165
-13
lines changed

7 files changed

+165
-13
lines changed

Changelog.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
## Added
44
- New API endpoints `DeleteAllByPrefix` and `PutMultipleVersions`. [#47](https://github.com/scalableminds/fossildb/pull/47)
5+
- New API endpoints `GetMultipleKeysByListWithMultipleVersions` and `PutMultipleKeysWithMultipleVersions` for reading and writing multiple keys/versions in one request. [#48](https://github.com/scalableminds/fossildb/pull/48)
6+
- `ListKeys` now supports optional `prefix` field
57

68
## Breaking Changes
79

build.sbt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ ThisBuild / scalacOptions ++= Seq(
2121

2222
version := getVersionFromGit
2323

24-
scalaVersion := "2.13.12"
24+
scalaVersion := "2.13.15"
2525

2626
libraryDependencies ++= Seq(
2727
"ch.qos.logback" % "logback-classic" % "1.5.6",
@@ -30,7 +30,7 @@ libraryDependencies ++= Seq(
3030
"io.grpc" % "grpc-netty" % scalapb.compiler.Version.grpcJavaVersion,
3131
"io.grpc" % "grpc-services" % scalapb.compiler.Version.grpcJavaVersion,
3232
"com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapb.compiler.Version.scalapbVersion,
33-
"org.rocksdb" % "rocksdbjni" % "8.10.0",
33+
"org.rocksdb" % "rocksdbjni" % "9.4.0",
3434
"com.github.scopt" %% "scopt" % "4.1.0"
3535
)
3636

project/scalapb.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
addSbtPlugin("com.thesamet" % "sbt-protoc" % "1.0.2")
22

3-
libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.11.13"
3+
libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.11.15"

src/main/protobuf/fossildbapi.proto

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,22 @@ syntax = "proto2";
22

33
package com.scalableminds.fossildb.proto;
44

5+
message VersionedKeyValuePairProto {
6+
required string key = 1;
7+
required uint64 version = 2;
8+
required bytes value = 3;
9+
}
10+
11+
message KeyVersionsValuesPairProto {
12+
required string key = 1;
13+
repeated VersionValuePairProto versionValuePairs = 2;
14+
}
15+
16+
message VersionValuePairProto {
17+
required uint64 actualVersion = 1;
18+
required bytes value = 2;
19+
}
20+
521
message HealthRequest {}
622
message HealthReply {
723
required bool success = 1;
@@ -46,6 +62,16 @@ message PutMultipleVersionsReply {
4662
optional string errorMessage = 2;
4763
}
4864

65+
message PutMultipleKeysWithMultipleVersionsRequest {
66+
required string collection = 1;
67+
repeated VersionedKeyValuePairProto versionedKeyValuePairs = 2;
68+
}
69+
70+
message PutMultipleKeysWithMultipleVersionsReply {
71+
required bool success = 1;
72+
optional string errorMessage = 2;
73+
}
74+
4975
message DeleteRequest {
5076
required string collection = 1;
5177
required string key = 2;
@@ -97,6 +123,19 @@ message GetMultipleKeysReply {
97123
repeated uint64 actualVersions = 5;
98124
}
99125

126+
message GetMultipleKeysByListWithMultipleVersionsRequest {
127+
required string collection = 1;
128+
repeated string keys = 2;
129+
optional uint64 newestVersion = 3; // Applied to all requested keys
130+
optional uint64 oldestVersion = 4; // Applied to all requested keys
131+
}
132+
133+
message GetMultipleKeysByListWithMultipleVersionsReply {
134+
required bool success = 1;
135+
optional string errorMessage = 2;
136+
repeated KeyVersionsValuesPairProto keyVersionsValuesPairs = 3;
137+
}
138+
100139
message DeleteMultipleVersionsRequest {
101140
required string collection = 1;
102141
required string key = 2;
@@ -113,6 +152,7 @@ message ListKeysRequest {
113152
required string collection = 1;
114153
optional uint32 limit = 2;
115154
optional string startAfterKey = 3;
155+
optional string prefix = 4;
116156
}
117157

118158
message ListKeysReply {
@@ -175,8 +215,10 @@ service FossilDB {
175215
rpc Get (GetRequest) returns (GetReply) {}
176216
rpc GetMultipleVersions (GetMultipleVersionsRequest) returns (GetMultipleVersionsReply) {}
177217
rpc GetMultipleKeys (GetMultipleKeysRequest) returns (GetMultipleKeysReply) {}
218+
rpc GetMultipleKeysByListWithMultipleVersions (GetMultipleKeysByListWithMultipleVersionsRequest) returns (GetMultipleKeysByListWithMultipleVersionsReply) {}
178219
rpc Put (PutRequest) returns (PutReply) {}
179220
rpc PutMultipleVersions (PutMultipleVersionsRequest) returns (PutMultipleVersionsReply) {}
221+
rpc PutMultipleKeysWithMultipleVersions (PutMultipleKeysWithMultipleVersionsRequest) returns (PutMultipleKeysWithMultipleVersionsReply) {}
180222
rpc Delete (DeleteRequest) returns (DeleteReply) {}
181223
rpc DeleteMultipleVersions (DeleteMultipleVersionsRequest) returns (DeleteMultipleVersionsReply) {}
182224
rpc DeleteAllByPrefix (DeleteAllByPrefixRequest) returns (DeleteAllByPrefixReply) {}

src/main/scala/com/scalableminds/fossildb/FossilDBGrpcImpl.scala

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,27 @@ class FossilDBGrpcImpl(storeManager: StoreManager)
6464
GetMultipleKeysReply(success = true, None, keys, values.map(ByteString.copyFrom), versions)
6565
} { errorMsg => GetMultipleKeysReply(success = false, errorMsg) }
6666

67+
override def getMultipleKeysByListWithMultipleVersions(req: GetMultipleKeysByListWithMultipleVersionsRequest): Future[GetMultipleKeysByListWithMultipleVersionsReply] = withExceptionHandler(req) {
68+
val store = storeManager.getStore(req.collection)
69+
val keyVersionsValuesPairs = req.keys.map { key =>
70+
val (values, versions) = store.withRawRocksIterator{rocksIt => store.getMultipleVersions(rocksIt, key, req.oldestVersion, req.newestVersion)}
71+
val versionValuePairs = values.zip(versions).map { case (value, version) =>
72+
VersionValuePairProto(version, ByteString.copyFrom(value))
73+
}
74+
KeyVersionsValuesPairProto(key, versionValuePairs)
75+
}
76+
GetMultipleKeysByListWithMultipleVersionsReply(success = true, None, keyVersionsValuesPairs)
77+
} { errorMsg => GetMultipleKeysByListWithMultipleVersionsReply(success = false, errorMsg) }
78+
79+
override def putMultipleKeysWithMultipleVersions(req: PutMultipleKeysWithMultipleVersionsRequest): Future[PutMultipleKeysWithMultipleVersionsReply] = withExceptionHandler(req) {
80+
val store = storeManager.getStore(req.collection)
81+
require(req.versionedKeyValuePairs.forall(_.version >= 0), "Version numbers must be non-negative")
82+
req.versionedKeyValuePairs.foreach { pair =>
83+
store.put(pair.key, pair.version, pair.value.toByteArray)
84+
}
85+
PutMultipleKeysWithMultipleVersionsReply(success = true, None)
86+
} { errorMsg => PutMultipleKeysWithMultipleVersionsReply(success = false, errorMsg) }
87+
6788
override def deleteMultipleVersions(req: DeleteMultipleVersionsRequest): Future[DeleteMultipleVersionsReply] = withExceptionHandler(req) {
6889
val store = storeManager.getStore(req.collection)
6990
store.withRawRocksIterator{rocksIt => store.deleteMultipleVersions(rocksIt, req.key, req.oldestVersion, req.newestVersion)}
@@ -78,7 +99,7 @@ class FossilDBGrpcImpl(storeManager: StoreManager)
7899

79100
override def listKeys(req: ListKeysRequest): Future[ListKeysReply] = withExceptionHandler(req) {
80101
val store = storeManager.getStore(req.collection)
81-
val keys = store.withRawRocksIterator{rocksIt => store.listKeys(rocksIt, req.limit, req.startAfterKey)}
102+
val keys = store.withRawRocksIterator{rocksIt => store.listKeys(rocksIt, req.limit, req.startAfterKey, req.prefix)}
82103
ListKeysReply(success = true, None, keys)
83104
} { errorMsg => ListKeysReply(success = false, errorMsg) }
84105

src/main/scala/com/scalableminds/fossildb/db/VersionedKeyValueStore.scala

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@ import scala.util.Try
77

88

99
case class VersionedKey(key: String, version: Long) {
10-
override def toString: String = s"$key${VersionedKey.versionSeparator}${(~version).toHexString.toUpperCase}${VersionedKey.versionSeparator}$version"
10+
override def toString: String = VersionedKey.asString(key, version)
1111
}
1212

1313
object VersionedKey {
1414

15+
def asString(key: String, version: Long) = s"$key${VersionedKey.versionSeparator}${(~version).toHexString.toUpperCase}${VersionedKey.versionSeparator}$version"
16+
1517
val versionSeparator: Char = '@'
1618

1719
def apply(key: String): Option[VersionedKey] = {
@@ -24,6 +26,7 @@ object VersionedKey {
2426
VersionedKey(key, version)
2527
}
2628
}
29+
2730
}
2831

2932
case class VersionedKeyValuePair[T](versionedKey: VersionedKey, value: T) {
@@ -58,7 +61,7 @@ class VersionFilterIterator(it: RocksDBIterator, version: Option[Long]) extends
5861

5962
}
6063

61-
class KeyOnlyIterator[T](rocksIt: RocksIterator, startAfterKey: Option[String]) extends Iterator[String] {
64+
class KeyOnlyIterator[T](rocksIt: RocksIterator, startAfterKey: Option[String], prefix: Option[String]) extends Iterator[String] {
6265

6366
/*
6467
Note that seek in the underlying iterators either hits precisely or goes to the
@@ -70,17 +73,18 @@ class KeyOnlyIterator[T](rocksIt: RocksIterator, startAfterKey: Option[String])
7073

7174
private def compositeKeyFor(keyOpt: Option[String]) = keyOpt match {
7275
case Some(key) => VersionedKey(key, 0).toString
73-
case None => ""
76+
// If the currentKey is not yet set, seek to the very beginning, or, if set, to the prefix.
77+
case None => prefix.getOrElse("")
7478
}
7579

7680
override def hasNext: Boolean = {
77-
val it = RocksDBStore.scanKeysOnly(rocksIt, compositeKeyFor(currentKey), None)
81+
val it = RocksDBStore.scanKeysOnly(rocksIt, compositeKeyFor(currentKey), prefix)
7882
if (it.hasNext && currentKey.isDefined && currentKey.contains(VersionedKey(it.peek).get.key)) it.next()
7983
it.hasNext
8084
}
8185

8286
override def next(): String = {
83-
val it = RocksDBStore.scanKeysOnly(rocksIt, compositeKeyFor(currentKey), None)
87+
val it = RocksDBStore.scanKeysOnly(rocksIt, compositeKeyFor(currentKey), prefix)
8488
if (it.hasNext && currentKey.isDefined && currentKey.contains(VersionedKey(it.peek).get.key)) it.next()
8589
val nextKey = VersionedKey(it.next()).get.key
8690
currentKey = Some(nextKey)
@@ -186,16 +190,16 @@ class VersionedKeyValueStore(underlying: RocksDBStore) {
186190

187191
def put(key: String, version: Long, value: Array[Byte]): Unit = {
188192
requireValidKey(key)
189-
underlying.put(VersionedKey(key, version).toString, value)
193+
underlying.put(VersionedKey.asString(key, version), value)
190194
}
191195

192196
def delete(key: String, version: Long): Unit = {
193197
requireValidKey(key)
194-
underlying.delete(VersionedKey(key, version).toString)
198+
underlying.delete(VersionedKey.asString(key, version))
195199
}
196200

197-
def listKeys(rocksIt: RocksIterator, limit: Option[Int], startAfterKey: Option[String]): Seq[String] = {
198-
val iterator = new KeyOnlyIterator(rocksIt, startAfterKey)
201+
def listKeys(rocksIt: RocksIterator, limit: Option[Int], startAfterKey: Option[String], prefix: Option[String]): Seq[String] = {
202+
val iterator = new KeyOnlyIterator(rocksIt, startAfterKey, prefix)
199203
iterator.take(limit.getOrElse(Int.MaxValue)).toSeq
200204
}
201205

@@ -207,4 +211,5 @@ class VersionedKeyValueStore(underlying: RocksDBStore) {
207211
private def requireValidKey(key: String): Unit = {
208212
require(!key.contains(VersionedKey.versionSeparator), s"keys cannot contain the char ${VersionedKey.versionSeparator}")
209213
}
214+
210215
}

src/test/scala/com/scalableminds/fossildb/FossilDBSuite.scala

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,17 @@ class FossilDBSuite extends AnyFlatSpec with BeforeAndAfterEach with TestHelpers
9797
assert(reply.actualVersion == 0)
9898
}
9999

100+
"PutMultipleKeysWithMultipleVersions" should "write all versions of all specified keys" in {
101+
client.putMultipleKeysWithMultipleVersions(PutMultipleKeysWithMultipleVersionsRequest(collectionA, Seq(VersionedKeyValuePairProto(aKey, 0, testData1), VersionedKeyValuePairProto(aKey, 2, testData2), VersionedKeyValuePairProto(aNotherKey, 5, testData3))))
102+
val reply = client.get(GetRequest(collectionA, aKey))
103+
assert(reply.actualVersion == 2)
104+
val reply2 = client.get(GetRequest(collectionA, aKey, version = Some(0)))
105+
assert(reply2.actualVersion == 0)
106+
val reply3 = client.get(GetRequest(collectionA, aNotherKey))
107+
assert(reply3.actualVersion == 5)
108+
assert(reply3.value == testData3)
109+
}
110+
100111
"Get" should "return matching value after matching Put" in {
101112
client.put(PutRequest(collectionA, aKey, Some(0), testData1))
102113
val reply = client.get(GetRequest(collectionA, aKey, Some(0)))
@@ -195,6 +206,37 @@ class FossilDBSuite extends AnyFlatSpec with BeforeAndAfterEach with TestHelpers
195206
assert(reply.keys.length == 3)
196207
}
197208

209+
it should "respect prefix argument" in {
210+
client.put(PutRequest(collectionA, "123456", Some(1), testData1))
211+
client.put(PutRequest(collectionA, "123457", Some(123), testData2))
212+
client.put(PutRequest(collectionA, "12345800", Some(123), testData3))
213+
client.put(PutRequest(collectionA, "12345801", Some(123), testData3))
214+
client.put(PutRequest(collectionA, "12345802", Some(123), testData3))
215+
client.put(PutRequest(collectionA, "123458", Some(123), testData3))
216+
client.put(PutRequest(collectionA, "123459", Some(123), testData3))
217+
218+
val reply = client.listKeys(ListKeysRequest(collectionA, None, None, prefix = Some("123458")))
219+
assert(reply.keys.length == 4)
220+
assert(reply.keys(0) == "12345800")
221+
assert(reply.keys(1) == "12345801")
222+
}
223+
224+
it should "respect prefix argument and startAfterKey together" in {
225+
client.put(PutRequest(collectionA, "123456", Some(1), testData1))
226+
client.put(PutRequest(collectionA, "123457", Some(123), testData2))
227+
client.put(PutRequest(collectionA, "12345800", Some(123), testData3))
228+
client.put(PutRequest(collectionA, "12345801", Some(123), testData3))
229+
client.put(PutRequest(collectionA, "12345802", Some(123), testData3))
230+
client.put(PutRequest(collectionA, "123458", Some(123), testData3))
231+
client.put(PutRequest(collectionA, "123459", Some(123), testData3))
232+
233+
val reply = client.listKeys(ListKeysRequest(collectionA, None, startAfterKey = Some("12345800"), prefix = Some("123458")))
234+
assert(reply.keys.length == 3)
235+
assert(reply.keys(0) == "12345801")
236+
assert(reply.keys(1) == "12345802")
237+
assert(reply.keys(2) == "123458")
238+
}
239+
198240
"GetMultipleVersions" should "return all versions in decending order if called without limits" in {
199241
client.put(PutRequest(collectionA, aKey, Some(0), testData1))
200242
client.put(PutRequest(collectionA, aKey, Some(1), testData2))
@@ -348,6 +390,46 @@ class FossilDBSuite extends AnyFlatSpec with BeforeAndAfterEach with TestHelpers
348390
assert(reply.keys.isEmpty)
349391
}
350392

393+
"GetMultipleKeysByListWithVersions" should "return selected keys with versions in descending order" in {
394+
client.put(PutRequest(collectionA, aKey, Some(0), testData1))
395+
client.put(PutRequest(collectionA, aNotherKey, Some(0), testData1))
396+
client.put(PutRequest(collectionA, aThirdKey, Some(0), testData1))
397+
client.put(PutRequest(collectionA, aKey, Some(1), testData2))
398+
client.put(PutRequest(collectionA, aNotherKey, Some(1), testData2))
399+
client.put(PutRequest(collectionA, aThirdKey, Some(1), testData2))
400+
client.put(PutRequest(collectionA, aKey, Some(2), testData3))
401+
client.put(PutRequest(collectionA, aNotherKey, Some(2), testData3))
402+
client.put(PutRequest(collectionA, aThirdKey, Some(2), testData3))
403+
val reply = client.getMultipleKeysByListWithMultipleVersions(GetMultipleKeysByListWithMultipleVersionsRequest(collectionA, keys = Seq(aNotherKey, aThirdKey)))
404+
assert(reply.keyVersionsValuesPairs.map(_.key) == Seq(aNotherKey, aThirdKey))
405+
assert(reply.keyVersionsValuesPairs(0).versionValuePairs.length == 3)
406+
assert(reply.keyVersionsValuesPairs(1).versionValuePairs.length == 3)
407+
assert(reply.keyVersionsValuesPairs(0).versionValuePairs(0) == VersionValuePairProto(2L, testData3))
408+
assert(reply.keyVersionsValuesPairs(0).versionValuePairs(1) == VersionValuePairProto(1L, testData2))
409+
assert(reply.keyVersionsValuesPairs(0).versionValuePairs(2) == VersionValuePairProto(0L, testData1))
410+
assert(reply.keyVersionsValuesPairs(1).versionValuePairs(0) == VersionValuePairProto(2L, testData3))
411+
assert(reply.keyVersionsValuesPairs(1).versionValuePairs(1) == VersionValuePairProto(1L, testData2))
412+
assert(reply.keyVersionsValuesPairs(1).versionValuePairs(2) == VersionValuePairProto(0L, testData1))
413+
}
414+
415+
it should "limit the versions if specified" in {
416+
client.put(PutRequest(collectionA, aKey, Some(0), testData1))
417+
client.put(PutRequest(collectionA, aNotherKey, Some(0), testData1))
418+
client.put(PutRequest(collectionA, aThirdKey, Some(0), testData1))
419+
client.put(PutRequest(collectionA, aKey, Some(1), testData2))
420+
client.put(PutRequest(collectionA, aNotherKey, Some(1), testData2))
421+
client.put(PutRequest(collectionA, aThirdKey, Some(1), testData2))
422+
client.put(PutRequest(collectionA, aKey, Some(2), testData3))
423+
client.put(PutRequest(collectionA, aNotherKey, Some(2), testData3))
424+
client.put(PutRequest(collectionA, aThirdKey, Some(2), testData3))
425+
val reply = client.getMultipleKeysByListWithMultipleVersions(GetMultipleKeysByListWithMultipleVersionsRequest(collectionA, keys = Seq(aNotherKey, aThirdKey), newestVersion = Some(1), oldestVersion = Some(1)))
426+
assert(reply.keyVersionsValuesPairs.map(_.key) == Seq(aNotherKey, aThirdKey))
427+
assert(reply.keyVersionsValuesPairs(0).versionValuePairs.length == 1)
428+
assert(reply.keyVersionsValuesPairs(1).versionValuePairs.length == 1)
429+
assert(reply.keyVersionsValuesPairs(0).versionValuePairs(0) == VersionValuePairProto(1L, testData2))
430+
assert(reply.keyVersionsValuesPairs(1).versionValuePairs(0) == VersionValuePairProto(1L, testData2))
431+
}
432+
351433
"Backup" should "create non-empty backup directory" in {
352434
client.put(PutRequest(collectionA, aKey, Some(0), testData1))
353435
client.backup(BackupRequest())

0 commit comments

Comments
 (0)