Skip to content

Commit 18a119e

Browse files
authored
Merge pull request #38 from scalableminds/fix-get-multiple-keys-pagination
Fix pagination for GetMultipleKeys, introducing startAfterKey
2 parents 7477ec1 + e928433 commit 18a119e

File tree

5 files changed

+133
-47
lines changed

5 files changed

+133
-47
lines changed

Changelog.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# Changelog
2+
3+
## Breaking Changes
4+
5+
- The `GetMultipleKeys` call now takes a `startAfterKey` instead of a `key` for pagination. The returned list will only start *after* this key. [#38](https://github.com/scalableminds/fossildb/pull/38)
6+
7+
## Fixes
8+
9+
- Fixed a bug where the pagination for `GetMultipleKeys` could lead to an endless loop if some keys are prefixes of others. [#38](https://github.com/scalableminds/fossildb/pull/38)

src/main/protobuf/fossildbapi.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ message GetMultipleVersionsReply {
6161

6262
message GetMultipleKeysRequest {
6363
required string collection = 1;
64-
required string key = 2;
64+
optional string startAfterKey = 2;
6565
optional string prefix = 3;
6666
optional uint64 version = 4;
6767
optional uint32 limit = 5;

src/main/scala/com/scalableminds/fossildb/FossilDBGrpcImpl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ class FossilDBGrpcImpl(storeManager: StoreManager)
5151

5252
override def getMultipleKeys(req: GetMultipleKeysRequest): Future[GetMultipleKeysReply] = withExceptionHandler(req) {
5353
val store = storeManager.getStore(req.collection)
54-
val (keys, values, versions) = store.getMultipleKeys(req.key, req.prefix, req.version, req.limit)
54+
val (keys, values, versions) = store.getMultipleKeys(req.startAfterKey, req.prefix, req.version, req.limit)
5555
GetMultipleKeysReply(success = true, None, keys, values.map(ByteString.copyFrom), versions)
5656
} { errorMsg => GetMultipleKeysReply(success = false, errorMsg) }
5757

src/main/scala/com/scalableminds/fossildb/db/VersionedKeyValueStore.scala

Lines changed: 35 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,15 @@ import scala.util.Try
55

66

77
case class VersionedKey(key: String, version: Long) {
8-
override def toString: String = s"$key@${(~version).toHexString.toUpperCase}@$version"
8+
override def toString: String = s"$key${VersionedKey.versionSeparator}${(~version).toHexString.toUpperCase}${VersionedKey.versionSeparator}$version"
99
}
1010

1111
object VersionedKey {
1212

13+
val versionSeparator: Char = '@'
14+
1315
def apply(key: String): Option[VersionedKey] = {
14-
val parts = key.split('@')
16+
val parts = key.split(versionSeparator)
1517
for {
1618
key <- parts.headOption
1719
versionString <- parts.lastOption
@@ -91,8 +93,6 @@ class VersionedKeyValueStore(underlying: RocksDBStore) {
9193
def get(key: String, version: Option[Long] = None): Option[VersionedKeyValuePair[Array[Byte]]] =
9294
scanVersionValuePairs(key, version).toStream.headOption
9395

94-
def getUnderlying: RocksDBStore = underlying
95-
9696
def getMultipleVersions(key: String, oldestVersion: Option[Long] = None, newestVersion: Option[Long] = None): (List[Array[Byte]], List[Long]) = {
9797

9898
@tailrec
@@ -113,33 +113,52 @@ class VersionedKeyValueStore(underlying: RocksDBStore) {
113113

114114
private def scanVersionValuePairs(key: String, version: Option[Long] = None): Iterator[VersionedKeyValuePair[Array[Byte]]] = {
115115
requireValidKey(key)
116-
val prefix = s"$key@"
116+
val prefix = s"$key${VersionedKey.versionSeparator}"
117117
underlying.scan(version.map(VersionedKey(key, _).toString).getOrElse(prefix), Some(prefix)).flatMap { pair =>
118118
VersionedKey(pair.key).map(VersionedKeyValuePair(_, pair.value))
119119
}
120120
}
121121

122122
private def scanVersionsOnly(key: String, version: Option[Long] = None): Iterator[VersionedKey] = {
123123
requireValidKey(key)
124-
val prefix = s"$key@"
124+
val prefix = s"$key${VersionedKey.versionSeparator}"
125125
underlying.scanKeysOnly(version.map(VersionedKey(key, _).toString).getOrElse(prefix), Some(prefix)).flatMap { key =>
126126
VersionedKey(key)
127127
}
128128
}
129129

130-
def getMultipleKeys(key: String, prefix: Option[String] = None, version: Option[Long] = None, limit: Option[Int]): (Seq[String], Seq[Array[Byte]], Seq[Long]) = {
131-
requireValidKey(key)
130+
def getMultipleKeys(startAfterKey: Option[String], prefix: Option[String] = None, version: Option[Long] = None, limit: Option[Int]): (Seq[String], Seq[Array[Byte]], Seq[Long]) = {
131+
startAfterKey.foreach(requireValidKey)
132132
prefix.foreach{ p => requireValidKey(p)}
133-
val iterator: VersionFilterIterator = scanKeys(key, prefix, version)
134-
val asSequence = iterator.take(limit.getOrElse(Int.MaxValue)).toSeq
135-
val keys = asSequence.map(_.key)
136-
val values = asSequence.map(_.value)
137-
val versions = asSequence.map(_.version)
133+
val iterator: VersionFilterIterator = scanKeys(startAfterKey, prefix, version)
134+
135+
/*
136+
Note that seek in the underlying iterators either hits precisely or goes to the
137+
lexicographically *next* key. To achieve correct behavior with startAfterKey,
138+
we have to advance once in case of the exact hit.
139+
*/
140+
val firstItemOpt: Option[VersionedKeyValuePair[Array[Byte]]] = if (iterator.hasNext) {
141+
val firstItem = iterator.next()
142+
if (startAfterKey.contains(firstItem.key)) {
143+
None
144+
} else {
145+
Some(firstItem)
146+
}
147+
} else None
148+
149+
val limitPadded = limit.map(_ + 1).getOrElse(Int.MaxValue)
150+
val asVector = iterator.take(limitPadded).toVector
151+
val asSequenceAdvancedIfNeeded = firstItemOpt.map(_ +: asVector).getOrElse(asVector).take(limit.getOrElse(Int.MaxValue))
152+
val keys = asSequenceAdvancedIfNeeded.map(_.key)
153+
val values = asSequenceAdvancedIfNeeded.map(_.value)
154+
val versions = asSequenceAdvancedIfNeeded.map(_.version)
138155
(keys, values, versions)
139156
}
140157

141-
private def scanKeys(key: String, prefix: Option[String] = None, version: Option[Long] = None): VersionFilterIterator =
142-
new VersionFilterIterator(underlying.scan(key, prefix), version)
158+
private def scanKeys(startAfterKey: Option[String], prefix: Option[String] = None, version: Option[Long] = None): VersionFilterIterator = {
159+
val fullKey = startAfterKey.map(key => s"$key${VersionedKey.versionSeparator}").orElse(prefix).getOrElse("")
160+
new VersionFilterIterator(underlying.scan(fullKey, prefix), version)
161+
}
143162

144163
def deleteMultipleVersions(key: String, oldestVersion: Option[Long] = None, newestVersion: Option[Long] = None): Unit = {
145164
@tailrec
@@ -178,6 +197,6 @@ class VersionedKeyValueStore(underlying: RocksDBStore) {
178197
}
179198

180199
private def requireValidKey(key: String): Unit = {
181-
require(!(key contains "@"), "keys cannot contain the char @")
200+
require(!key.contains(VersionedKey.versionSeparator), s"keys cannot contain the char ${VersionedKey.versionSeparator}")
182201
}
183202
}

src/test/scala/com/scalableminds/fossildb/FossilDBSuite.scala

Lines changed: 87 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class FossilDBSuite extends FlatSpec with BeforeAndAfterEach with TestHelpers wi
3030
private val testData3 = ByteString.copyFromUtf8("testData3")
3131

3232
private val aKey = "aKey"
33-
private val anotherKey = "anotherKey"
33+
private val aNotherKey = "aNotherKey"
3434
private val aThirdKey = "aThirdKey"
3535

3636
override def beforeEach: Unit = {
@@ -114,7 +114,7 @@ class FossilDBSuite extends FlatSpec with BeforeAndAfterEach with TestHelpers wi
114114
}
115115

116116
it should "fail after Put with other key" in {
117-
client.put(PutRequest(collectionA, anotherKey, Some(0), testData1))
117+
client.put(PutRequest(collectionA, aNotherKey, Some(0), testData1))
118118
val reply = client.get(GetRequest(collectionA, aKey))
119119
assert(!reply.success)
120120
}
@@ -136,24 +136,24 @@ class FossilDBSuite extends FlatSpec with BeforeAndAfterEach with TestHelpers wi
136136
"ListKeys" should "list all keys of a collection" in {
137137
client.put(PutRequest(collectionA, aKey, Some(0), testData1))
138138
client.put(PutRequest(collectionA, aKey, Some(1), testData2))
139-
client.put(PutRequest(collectionA, anotherKey, Some(4), testData2))
139+
client.put(PutRequest(collectionA, aNotherKey, Some(4), testData2))
140140
client.put(PutRequest(collectionB, aThirdKey, Some(1), testData1))
141141
val reply = client.listKeys(ListKeysRequest(collectionA))
142142
assert(reply.keys.contains(aKey))
143-
assert(reply.keys.contains(anotherKey))
143+
assert(reply.keys.contains(aNotherKey))
144144
assert(reply.keys.length == 2)
145145
}
146146

147147
it should "support pagination with startAfterKey" in {
148148
client.put(PutRequest(collectionA, aKey, Some(0), testData1))
149149
client.put(PutRequest(collectionA, aKey, Some(1), testData2))
150-
client.put(PutRequest(collectionA, anotherKey, Some(4), testData2))
150+
client.put(PutRequest(collectionA, aNotherKey, Some(4), testData2))
151151
client.put(PutRequest(collectionB, aThirdKey, Some(1), testData1))
152152
val reply = client.listKeys(ListKeysRequest(collectionA, Some(1)))
153153
assert(reply.keys.length == 1)
154154
assert(reply.keys.contains(aKey))
155155
val reply2 = client.listKeys(ListKeysRequest(collectionA, Some(1), Some(reply.keys.last)))
156-
assert(reply2.keys.contains(anotherKey))
156+
assert(reply2.keys.contains(aNotherKey))
157157
assert(reply2.keys.length == 1)
158158
}
159159

@@ -173,7 +173,7 @@ class FossilDBSuite extends FlatSpec with BeforeAndAfterEach with TestHelpers wi
173173
client.put(PutRequest(collectionA, aKey, Some(0), testData1))
174174
client.put(PutRequest(collectionA, aKey, Some(1), testData2))
175175
client.put(PutRequest(collectionA, aKey, Some(2), testData3))
176-
client.put(PutRequest(collectionA, anotherKey, Some(0), testData1))
176+
client.put(PutRequest(collectionA, aNotherKey, Some(0), testData1))
177177
val reply = client.getMultipleVersions(GetMultipleVersionsRequest(collectionA, aKey))
178178
assert(reply.versions(0) == 2)
179179
assert(reply.versions(1) == 1)
@@ -191,7 +191,7 @@ class FossilDBSuite extends FlatSpec with BeforeAndAfterEach with TestHelpers wi
191191
client.put(PutRequest(collectionA, aKey, Some(3), testData3))
192192
client.put(PutRequest(collectionA, aKey, Some(4), testData1))
193193
client.put(PutRequest(collectionA, aKey, Some(5), testData1))
194-
client.put(PutRequest(collectionA, anotherKey, Some(0), testData1))
194+
client.put(PutRequest(collectionA, aNotherKey, Some(0), testData1))
195195

196196
val reply = client.getMultipleVersions(GetMultipleVersionsRequest(collectionA, aKey, Some(4), Some(2)))
197197
assert(reply.versions(0) == 4)
@@ -202,68 +202,126 @@ class FossilDBSuite extends FlatSpec with BeforeAndAfterEach with TestHelpers wi
202202
assert(reply.values.length == 2)
203203
}
204204

205-
"GetMultipleKeys" should "return keys starting with initial one (no prefix)" in {
205+
"GetMultipleKeys" should "return all keys" in {
206206
client.put(PutRequest(collectionA, aKey, Some(0), testData1))
207-
client.put(PutRequest(collectionA, anotherKey, Some(0), testData2))
207+
client.put(PutRequest(collectionA, aNotherKey, Some(0), testData2))
208208
client.put(PutRequest(collectionA, aThirdKey, Some(0), testData3))
209-
val reply = client.getMultipleKeys(GetMultipleKeysRequest(collectionA, aThirdKey))
210-
assert(reply.keys.length == 2)
211-
assert(reply.keys.contains(anotherKey))
209+
val reply = client.getMultipleKeys(GetMultipleKeysRequest(collectionA))
210+
assert(reply.keys.length == 3)
211+
assert(reply.keys.contains(aNotherKey))
212212
assert(reply.keys.contains(aThirdKey))
213-
assert(reply.values.length == 2)
213+
assert(reply.values.length == 3)
214214
assert(reply.values.contains(testData2))
215215
assert(reply.values.contains(testData3))
216-
assert(reply.actualVersions.length == 2)
216+
assert(reply.actualVersions.length == 3)
217217
assert(reply.actualVersions.contains(0))
218218
}
219219

220-
it should "return keys of matching version (sorted alphabetically)" in {
220+
it should "return keys of matching version" in {
221221
client.put(PutRequest(collectionA, aKey, Some(0), testData1))
222-
client.put(PutRequest(collectionA, anotherKey, Some(0), testData1))
222+
client.put(PutRequest(collectionA, aNotherKey, Some(0), testData1))
223223
client.put(PutRequest(collectionA, aThirdKey, Some(0), testData1))
224224
client.put(PutRequest(collectionA, aKey, Some(1), testData2))
225-
client.put(PutRequest(collectionA, anotherKey, Some(1), testData2))
225+
client.put(PutRequest(collectionA, aNotherKey, Some(1), testData2))
226226
client.put(PutRequest(collectionA, aThirdKey, Some(1), testData2))
227227
client.put(PutRequest(collectionA, aKey, Some(2), testData3))
228-
client.put(PutRequest(collectionA, anotherKey, Some(2), testData3))
228+
client.put(PutRequest(collectionA, aNotherKey, Some(2), testData3))
229229
client.put(PutRequest(collectionA, aThirdKey, Some(2), testData3))
230-
val reply = client.getMultipleKeys(GetMultipleKeysRequest(collectionA, aKey, None, Some(1)))
230+
val reply = client.getMultipleKeys(GetMultipleKeysRequest(collectionA, None, None, Some(1)))
231231
assert(reply.keys.length == 3)
232232
assert(reply.values.contains(testData2))
233233
}
234234

235-
it should "return keys of matching version, matching prefix (sorted alphabetically)" in {
235+
it should "return keys of matching version, matching prefix" in {
236236
client.put(PutRequest(collectionA, aKey, Some(0), testData1))
237-
client.put(PutRequest(collectionA, anotherKey, Some(0), testData1))
237+
client.put(PutRequest(collectionA, aNotherKey, Some(0), testData1))
238238
client.put(PutRequest(collectionA, aThirdKey, Some(0), testData1))
239239
client.put(PutRequest(collectionA, aKey, Some(1), testData2))
240-
client.put(PutRequest(collectionA, anotherKey, Some(1), testData2))
240+
client.put(PutRequest(collectionA, aNotherKey, Some(1), testData2))
241241
client.put(PutRequest(collectionA, aThirdKey, Some(1), testData2))
242242
client.put(PutRequest(collectionA, aKey, Some(2), testData3))
243-
client.put(PutRequest(collectionA, anotherKey, Some(2), testData3))
243+
client.put(PutRequest(collectionA, aNotherKey, Some(2), testData3))
244244
client.put(PutRequest(collectionA, aThirdKey, Some(2), testData3))
245-
val reply = client.getMultipleKeys(GetMultipleKeysRequest(collectionA, aKey, Some("aK"), Some(1)))
245+
val reply = client.getMultipleKeys(GetMultipleKeysRequest(collectionA, None, Some("aN"), Some(1)))
246246
assert(reply.keys.length == 1)
247+
assert(reply.keys.contains(aNotherKey))
248+
assert(reply.values.contains(testData2))
249+
assert(reply.actualVersions.contains(1))
250+
}
251+
252+
it should "return keys of matching version, matching prefix even if it is exact match" in {
253+
client.put(PutRequest(collectionA, aKey, Some(0), testData1))
254+
client.put(PutRequest(collectionA, aNotherKey, Some(0), testData1))
255+
client.put(PutRequest(collectionA, aThirdKey, Some(0), testData1))
256+
client.put(PutRequest(collectionA, aKey, Some(1), testData2))
257+
client.put(PutRequest(collectionA, aNotherKey, Some(1), testData2))
258+
client.put(PutRequest(collectionA, aThirdKey, Some(1), testData2))
259+
client.put(PutRequest(collectionA, aKey, Some(2), testData3))
260+
client.put(PutRequest(collectionA, aNotherKey, Some(2), testData3))
261+
client.put(PutRequest(collectionA, aThirdKey, Some(2), testData3))
262+
val reply = client.getMultipleKeys(GetMultipleKeysRequest(collectionA, None, Some(aNotherKey), Some(1)))
263+
assert(reply.keys.length == 1)
264+
assert(reply.keys.contains(aNotherKey))
247265
assert(reply.values.contains(testData2))
248266
assert(reply.actualVersions.contains(1))
249267
}
250268

251269
it should "with limit return only the first n keys of matching version " in {
252270
client.put(PutRequest(collectionA, aKey, Some(0), testData1))
253-
client.put(PutRequest(collectionA, anotherKey, Some(0), testData1))
271+
client.put(PutRequest(collectionA, aNotherKey, Some(0), testData1))
254272
client.put(PutRequest(collectionA, aThirdKey, Some(0), testData1))
255273
client.put(PutRequest(collectionA, aKey, Some(1), testData2))
256-
client.put(PutRequest(collectionA, anotherKey, Some(1), testData2))
274+
client.put(PutRequest(collectionA, aNotherKey, Some(1), testData2))
257275
client.put(PutRequest(collectionA, aThirdKey, Some(1), testData2))
258276
client.put(PutRequest(collectionA, aKey, Some(2), testData3))
259-
client.put(PutRequest(collectionA, anotherKey, Some(2), testData3))
277+
client.put(PutRequest(collectionA, aNotherKey, Some(2), testData3))
260278
client.put(PutRequest(collectionA, aThirdKey, Some(2), testData3))
261-
val reply = client.getMultipleKeys(GetMultipleKeysRequest(collectionA, aKey, None, Some(1), Some(2)))
279+
val reply = client.getMultipleKeys(GetMultipleKeysRequest(collectionA, None, None, Some(1), Some(2)))
262280
assert(reply.keys.length == 2)
263281
assert(reply.values.contains(testData2))
264282
assert(reply.actualVersions.contains(1))
265283
}
266284

285+
it should "support pagination with startAfterKey" in {
286+
client.put(PutRequest(collectionA, aKey, Some(0), testData1))
287+
client.put(PutRequest(collectionA, aNotherKey, Some(0), testData1))
288+
client.put(PutRequest(collectionA, aThirdKey, Some(0), testData1))
289+
val reply = client.getMultipleKeys(GetMultipleKeysRequest(collectionA, Some(aKey), None, None, Some(2)))
290+
assert(reply.keys.length == 2)
291+
assert(reply.values.contains(testData1))
292+
assert(reply.actualVersions.contains(0))
293+
}
294+
295+
it should "support pagination with startAfterKey, with prefix and version" in {
296+
client.put(PutRequest(collectionA, aKey, Some(0), testData1))
297+
client.put(PutRequest(collectionA, aNotherKey, Some(0), testData1))
298+
client.put(PutRequest(collectionA, aThirdKey, Some(0), testData1))
299+
client.put(PutRequest(collectionA, aKey, Some(1), testData2))
300+
client.put(PutRequest(collectionA, aNotherKey, Some(1), testData2))
301+
client.put(PutRequest(collectionA, aThirdKey, Some(1), testData2))
302+
client.put(PutRequest(collectionA, aKey, Some(2), testData3))
303+
client.put(PutRequest(collectionA, aNotherKey, Some(2), testData3))
304+
client.put(PutRequest(collectionA, aThirdKey, Some(2), testData3))
305+
val reply = client.getMultipleKeys(GetMultipleKeysRequest(collectionA, Some(aKey), Some("a"), Some(1), Some(1)))
306+
assert(reply.keys.length == 1)
307+
assert(reply.values.contains(testData2))
308+
assert(reply.actualVersions.contains(1))
309+
}
310+
311+
it should "support pagination with startAfterKey, with prefix and version where no keys match the prefix" in {
312+
client.put(PutRequest(collectionA, aKey, Some(0), testData1))
313+
client.put(PutRequest(collectionA, aNotherKey, Some(0), testData1))
314+
client.put(PutRequest(collectionA, aThirdKey, Some(0), testData1))
315+
client.put(PutRequest(collectionA, aKey, Some(1), testData2))
316+
client.put(PutRequest(collectionA, aNotherKey, Some(1), testData2))
317+
client.put(PutRequest(collectionA, aThirdKey, Some(1), testData2))
318+
client.put(PutRequest(collectionA, aKey, Some(2), testData3))
319+
client.put(PutRequest(collectionA, aNotherKey, Some(2), testData3))
320+
client.put(PutRequest(collectionA, aThirdKey, Some(2), testData3))
321+
val reply = client.getMultipleKeys(GetMultipleKeysRequest(collectionA, Some(aKey), Some("BogusPrefix"), Some(1), Some(2)))
322+
assert(reply.keys.isEmpty)
323+
}
324+
267325
"Backup" should "create non-empty backup directory" in {
268326
client.put(PutRequest(collectionA, aKey, Some(0), testData1))
269327
client.backup(BackupRequest())

0 commit comments

Comments
 (0)