@@ -5,13 +5,15 @@ import scala.util.Try
55
66
77case class VersionedKey (key : String , version : Long ) {
8- override def toString : String = s " $key@ ${ (~ version).toHexString.toUpperCase}@ $version"
8+ override def toString : String = s " $key${ VersionedKey .versionSeparator}${ (~ version).toHexString.toUpperCase}${ VersionedKey .versionSeparator} $version"
99}
1010
1111object VersionedKey {
1212
13+ val versionSeparator : Char = '@'
14+
1315 def apply (key : String ): Option [VersionedKey ] = {
14- val parts = key.split('@' )
16+ val parts = key.split(versionSeparator )
1517 for {
1618 key <- parts.headOption
1719 versionString <- parts.lastOption
@@ -91,8 +93,6 @@ class VersionedKeyValueStore(underlying: RocksDBStore) {
9193 def get (key : String , version : Option [Long ] = None ): Option [VersionedKeyValuePair [Array [Byte ]]] =
9294 scanVersionValuePairs(key, version).toStream.headOption
9395
94- def getUnderlying : RocksDBStore = underlying
95-
9696 def getMultipleVersions (key : String , oldestVersion : Option [Long ] = None , newestVersion : Option [Long ] = None ): (List [Array [Byte ]], List [Long ]) = {
9797
9898 @ tailrec
@@ -113,33 +113,54 @@ class VersionedKeyValueStore(underlying: RocksDBStore) {
113113
114114 private def scanVersionValuePairs (key : String , version : Option [Long ] = None ): Iterator [VersionedKeyValuePair [Array [Byte ]]] = {
115115 requireValidKey(key)
116- val prefix = s " $key@ "
116+ val prefix = s " $key${ VersionedKey .versionSeparator} "
117117 underlying.scan(version.map(VersionedKey (key, _).toString).getOrElse(prefix), Some (prefix)).flatMap { pair =>
118118 VersionedKey (pair.key).map(VersionedKeyValuePair (_, pair.value))
119119 }
120120 }
121121
122122 private def scanVersionsOnly (key : String , version : Option [Long ] = None ): Iterator [VersionedKey ] = {
123123 requireValidKey(key)
124- val prefix = s " $key@ "
124+ val prefix = s " $key${ VersionedKey .versionSeparator} "
125125 underlying.scanKeysOnly(version.map(VersionedKey (key, _).toString).getOrElse(prefix), Some (prefix)).flatMap { key =>
126126 VersionedKey (key)
127127 }
128128 }
129129
130- def getMultipleKeys (key : String , prefix : Option [String ] = None , version : Option [Long ] = None , limit : Option [Int ]): (Seq [String ], Seq [Array [Byte ]], Seq [Long ]) = {
131- requireValidKey(key )
130+ def getMultipleKeys (startAfterKey : Option [ String ] , prefix : Option [String ] = None , version : Option [Long ] = None , limit : Option [Int ]): (Seq [String ], Seq [Array [Byte ]], Seq [Long ]) = {
131+ startAfterKey.foreach(requireValidKey )
132132 prefix.foreach{ p => requireValidKey(p)}
133- val iterator : VersionFilterIterator = scanKeys(key, prefix, version)
134- val asSequence = iterator.take(limit.getOrElse(Int .MaxValue )).toSeq
135- val keys = asSequence.map(_.key)
136- val values = asSequence.map(_.value)
137- val versions = asSequence.map(_.version)
133+ val iterator : VersionFilterIterator = scanKeys(startAfterKey, prefix, version)
134+
135+ /*
136+ Note that seek in the underlying iterators either hits precisely or goes to the
137+ lexicographically *next* key. To achieve correct behavior with startAfterKey,
138+ we have to advance once in case of the exact hit.
139+ */
140+ val firstItemOpt : Option [VersionedKeyValuePair [Array [Byte ]]] = if (iterator.hasNext) {
141+ val firstItem = iterator.next()
142+ if (startAfterKey.contains(firstItem.key)) {
143+ None
144+ } else {
145+ Some (firstItem)
146+ }
147+ } else None
148+
149+ val limitPadded = limit.map(_ - 1 ).getOrElse(Int .MaxValue )
150+ val asVector = iterator.take(limitPadded).toVector
151+ val asSequenceAdvancedIfNeeded = firstItemOpt.map(_ +: asVector).getOrElse(asVector).take(limit.getOrElse(Int .MaxValue ))
152+ val keys = asSequenceAdvancedIfNeeded.map(_.key)
153+ println(s " Returning keys ${keys.toList}" )
154+ val values = asSequenceAdvancedIfNeeded.map(_.value)
155+ val versions = asSequenceAdvancedIfNeeded.map(_.version)
138156 (keys, values, versions)
139157 }
140158
141- private def scanKeys (key : String , prefix : Option [String ] = None , version : Option [Long ] = None ): VersionFilterIterator =
142- new VersionFilterIterator (underlying.scan(key, prefix), version)
159+ private def scanKeys (startAfterKey : Option [String ], prefix : Option [String ] = None , version : Option [Long ] = None ): VersionFilterIterator = {
160+ val fullKey = startAfterKey.map(key => s " $key${VersionedKey .versionSeparator}" ).getOrElse(" " )
161+ println(s " Scanning to $fullKey" )
162+ new VersionFilterIterator (underlying.scan(fullKey, prefix), version)
163+ }
143164
144165 def deleteMultipleVersions (key : String , oldestVersion : Option [Long ] = None , newestVersion : Option [Long ] = None ): Unit = {
145166 @ tailrec
@@ -178,6 +199,6 @@ class VersionedKeyValueStore(underlying: RocksDBStore) {
178199 }
179200
180201 private def requireValidKey (key : String ): Unit = {
181- require(! ( key contains " @ " ), " keys cannot contain the char @ " )
202+ require(! key. contains( VersionedKey .versionSeparator ), s " keys cannot contain the char ${ VersionedKey .versionSeparator} " )
182203 }
183204}
0 commit comments