Skip to content

Commit d851fd3

Browse files
authored
Merge pull request #46 from scalableminds/close-all-iterators
Close all RocksIterators
2 parents 475c34b + f8efd58 commit d851fd3

File tree

3 files changed

+63
-45
lines changed

3 files changed

+63
-45
lines changed

src/main/scala/com/scalableminds/fossildb/FossilDBGrpcImpl.scala

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package com.scalableminds.fossildb
22

33
import java.io.{PrintWriter, StringWriter}
4-
54
import com.google.protobuf.ByteString
65
import com.scalableminds.fossildb.db.StoreManager
76
import com.scalableminds.fossildb.proto.fossildbapi._
@@ -20,7 +19,7 @@ class FossilDBGrpcImpl(storeManager: StoreManager)
2019

2120
override def get(req: GetRequest): Future[GetReply] = withExceptionHandler(req) {
2221
val store = storeManager.getStore(req.collection)
23-
val versionedKeyValuePairOpt = store.get(req.key, req.version)
22+
val versionedKeyValuePairOpt = store.withRawRocksIterator{rocksIt => store.get(rocksIt, req.key, req.version)}
2423
versionedKeyValuePairOpt match {
2524
case Some(pair) => GetReply(success = true, None, ByteString.copyFrom(pair.value), pair.version)
2625
case None =>
@@ -31,7 +30,7 @@ class FossilDBGrpcImpl(storeManager: StoreManager)
3130

3231
override def put(req: PutRequest): Future[PutReply] = withExceptionHandler(req) {
3332
val store = storeManager.getStore(req.collection)
34-
val version = req.version.getOrElse(store.get(req.key, None).map(_.version + 1).getOrElse(0L))
33+
val version = store.withRawRocksIterator{rocksIt => req.version.getOrElse(store.get(rocksIt, req.key, None).map(_.version + 1).getOrElse(0L))}
3534
require(version >= 0, "Version numbers must be non-negative")
3635
store.put(req.key, version, req.value.toByteArray)
3736
PutReply(success = true)
@@ -45,31 +44,31 @@ class FossilDBGrpcImpl(storeManager: StoreManager)
4544

4645
override def getMultipleVersions(req: GetMultipleVersionsRequest): Future[GetMultipleVersionsReply] = withExceptionHandler(req) {
4746
val store = storeManager.getStore(req.collection)
48-
val (values, versions) = store.getMultipleVersions(req.key, req.oldestVersion, req.newestVersion)
47+
val (values, versions) = store.withRawRocksIterator{rocksIt => store.getMultipleVersions(rocksIt, req.key, req.oldestVersion, req.newestVersion)}
4948
GetMultipleVersionsReply(success = true, None, values.map(ByteString.copyFrom), versions)
5049
} { errorMsg => GetMultipleVersionsReply(success = false, errorMsg) }
5150

5251
override def getMultipleKeys(req: GetMultipleKeysRequest): Future[GetMultipleKeysReply] = withExceptionHandler(req) {
5352
val store = storeManager.getStore(req.collection)
54-
val (keys, values, versions) = store.getMultipleKeys(req.startAfterKey, req.prefix, req.version, req.limit)
53+
val (keys, values, versions) = store.withRawRocksIterator{rocksIt => store.getMultipleKeys(rocksIt, req.startAfterKey, req.prefix, req.version, req.limit)}
5554
GetMultipleKeysReply(success = true, None, keys, values.map(ByteString.copyFrom), versions)
5655
} { errorMsg => GetMultipleKeysReply(success = false, errorMsg) }
5756

5857
override def deleteMultipleVersions(req: DeleteMultipleVersionsRequest): Future[DeleteMultipleVersionsReply] = withExceptionHandler(req) {
5958
val store = storeManager.getStore(req.collection)
60-
store.deleteMultipleVersions(req.key, req.oldestVersion, req.newestVersion)
59+
store.withRawRocksIterator{rocksIt => store.deleteMultipleVersions(rocksIt, req.key, req.oldestVersion, req.newestVersion)}
6160
DeleteMultipleVersionsReply(success = true)
6261
} { errorMsg => DeleteMultipleVersionsReply(success = false, errorMsg) }
6362

6463
override def listKeys(req: ListKeysRequest): Future[ListKeysReply] = withExceptionHandler(req) {
6564
val store = storeManager.getStore(req.collection)
66-
val keys = store.listKeys(req.limit, req.startAfterKey)
65+
val keys = store.withRawRocksIterator{rocksIt => store.listKeys(rocksIt, req.limit, req.startAfterKey)}
6766
ListKeysReply(success = true, None, keys)
6867
} { errorMsg => ListKeysReply(success = false, errorMsg) }
6968

7069
override def listVersions(req: ListVersionsRequest): Future[ListVersionsReply] = withExceptionHandler(req) {
7170
val store = storeManager.getStore(req.collection)
72-
val versions = store.listVersions(req.key, req.limit, req.offset)
71+
val versions = store.withRawRocksIterator{rocksIt => store.listVersions(rocksIt, req.key, req.limit, req.offset)}
7372
ListVersionsReply(success = true, None, versions)
7473
} { errorMsg => ListVersionsReply(success = false, errorMsg) }
7574

src/main/scala/com/scalableminds/fossildb/db/RocksDBStore.scala

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import java.nio.file.{Files, Path}
77
import java.util
88
import scala.collection.mutable
99
import scala.concurrent.Future
10-
import scala.jdk.CollectionConverters.{ListHasAsScala, BufferHasAsJava, SeqHasAsJava}
10+
import scala.jdk.CollectionConverters.{BufferHasAsJava, ListHasAsScala, SeqHasAsJava}
1111
import scala.language.postfixOps
1212

1313
case class BackupInfo(id: Int, timestamp: Long, size: Long)
@@ -35,7 +35,8 @@ class RocksDBManager(dataDir: Path, columnFamilies: List[String], optionsFilePat
3535
}
3636
}
3737
options.setCreateIfMissing(true).setCreateMissingColumnFamilies(true)
38-
val defaultColumnFamilyOptions = cfListRef.find(_.getName sameElements RocksDB.DEFAULT_COLUMN_FAMILY).map(_.getOptions).getOrElse(columnOptions)
38+
val defaultColumnFamilyOptions: ColumnFamilyOptions = cfListRef.find(_.getName sameElements RocksDB.DEFAULT_COLUMN_FAMILY).map(_.getOptions).getOrElse(columnOptions)
39+
println(defaultColumnFamilyOptions)
3940
val newColumnFamilyDescriptors = (columnFamilies.map(_.getBytes) :+ RocksDB.DEFAULT_COLUMN_FAMILY).diff(cfListRef.toList.map(_.getName)).map(new ColumnFamilyDescriptor(_, defaultColumnFamilyOptions))
4041
val columnFamilyDescriptors = cfListRef.toList ::: newColumnFamilyDescriptors
4142
logger.info("Opening RocksDB at " + dataDir.toAbsolutePath)
@@ -84,8 +85,11 @@ class RocksDBManager(dataDir: Path, columnFamilies: List[String], optionsFilePat
8485
logger.info(s"Exporting to new DB at ${newDataDir.toString} with options file $newOptionsFilePathOpt")
8586
val newManager = new RocksDBManager(newDataDir, columnFamilies, newOptionsFilePathOpt)
8687
newManager.columnFamilyHandles.foreach { case (name, handle) =>
87-
val dataIterator = getStoreForColumnFamily(name).get.scan("", None)
88-
dataIterator.foreach(el => newManager.db.put(handle, el.key.getBytes, el.value))
88+
val store = getStoreForColumnFamily(name).get
89+
store.withRawRocksIterator { rocksIt =>
90+
val dataIterator = RocksDBStore.scan(rocksIt, "", None)
91+
dataIterator.foreach(el => newManager.db.put(handle, el.key.getBytes, el.value))
92+
}
8993
}
9094
logger.info("Writing data completed. Start compaction")
9195
newManager.db.compactRange()
@@ -128,20 +132,17 @@ class RocksDBIterator(it: RocksIterator, prefix: Option[String]) extends Iterato
128132

129133
class RocksDBStore(db: RocksDB, handle: ColumnFamilyHandle) extends LazyLogging {
130134

131-
def get(key: String): Array[Byte] = {
132-
db.get(handle, key.getBytes())
133-
}
134-
135-
def scan(key: String, prefix: Option[String]): RocksDBIterator = {
136-
val it = db.newIterator(handle)
137-
it.seek(key.getBytes())
138-
new RocksDBIterator(it, prefix)
135+
def withRawRocksIterator[T](block: RocksIterator => T): T = {
136+
val rocksIt = db.newIterator(handle)
137+
try {
138+
block(rocksIt)
139+
} finally {
140+
rocksIt.close()
141+
}
139142
}
140143

141-
def scanKeysOnly(key: String, prefix: Option[String]): RocksDBKeyIterator = {
142-
val it = db.newIterator(handle)
143-
it.seek(key.getBytes())
144-
new RocksDBKeyIterator(it, prefix)
144+
def get(key: String): Array[Byte] = {
145+
db.get(handle, key.getBytes())
145146
}
146147

147148
def put(key: String, value: Array[Byte]): Unit = {
@@ -153,3 +154,17 @@ class RocksDBStore(db: RocksDB, handle: ColumnFamilyHandle) extends LazyLogging
153154
}
154155

155156
}
157+
158+
object RocksDBStore {
159+
160+
def scan(rocksIt: RocksIterator, key: String, prefix: Option[String]): RocksDBIterator = {
161+
rocksIt.seek(key.getBytes())
162+
new RocksDBIterator(rocksIt, prefix)
163+
}
164+
165+
def scanKeysOnly(rocksIt: RocksIterator, key: String, prefix: Option[String]): RocksDBKeyIterator = {
166+
rocksIt.seek(key.getBytes())
167+
new RocksDBKeyIterator(rocksIt, prefix)
168+
}
169+
170+
}

src/main/scala/com/scalableminds/fossildb/db/VersionedKeyValueStore.scala

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.scalableminds.fossildb.db
22

3+
import org.rocksdb.RocksIterator
4+
35
import scala.annotation.tailrec
46
import scala.util.Try
57

@@ -56,7 +58,7 @@ class VersionFilterIterator(it: RocksDBIterator, version: Option[Long]) extends
5658

5759
}
5860

59-
class KeyOnlyIterator[T](underlying: RocksDBStore, startAfterKey: Option[String]) extends Iterator[String] {
61+
class KeyOnlyIterator[T](rocksIt: RocksIterator, startAfterKey: Option[String]) extends Iterator[String] {
6062

6163
/*
6264
Note that seek in the underlying iterators either hits precisely or goes to the
@@ -72,13 +74,13 @@ class KeyOnlyIterator[T](underlying: RocksDBStore, startAfterKey: Option[String]
7274
}
7375

7476
override def hasNext: Boolean = {
75-
val it = underlying.scanKeysOnly(compositeKeyFor(currentKey), None)
77+
val it = RocksDBStore.scanKeysOnly(rocksIt, compositeKeyFor(currentKey), None)
7678
if (it.hasNext && currentKey.isDefined && currentKey.contains(VersionedKey(it.peek).get.key)) it.next()
7779
it.hasNext
7880
}
7981

8082
override def next(): String = {
81-
val it = underlying.scanKeysOnly(compositeKeyFor(currentKey), None)
83+
val it = RocksDBStore.scanKeysOnly(rocksIt, compositeKeyFor(currentKey), None)
8284
if (it.hasNext && currentKey.isDefined && currentKey.contains(VersionedKey(it.peek).get.key)) it.next()
8385
val nextKey = VersionedKey(it.next()).get.key
8486
currentKey = Some(nextKey)
@@ -90,10 +92,12 @@ class KeyOnlyIterator[T](underlying: RocksDBStore, startAfterKey: Option[String]
9092

9193
class VersionedKeyValueStore(underlying: RocksDBStore) {
9294

93-
def get(key: String, version: Option[Long] = None): Option[VersionedKeyValuePair[Array[Byte]]] =
94-
scanVersionValuePairs(key, version).nextOption()
95+
def withRawRocksIterator[T](block: RocksIterator => T): T = underlying.withRawRocksIterator(block)
96+
97+
def get(rocksIt: RocksIterator, key: String, version: Option[Long] = None): Option[VersionedKeyValuePair[Array[Byte]]] =
98+
scanVersionValuePairs(rocksIt, key, version).nextOption()
9599

96-
def getMultipleVersions(key: String, oldestVersion: Option[Long] = None, newestVersion: Option[Long] = None): (List[Array[Byte]], List[Long]) = {
100+
def getMultipleVersions(rocksIt: RocksIterator, key: String, oldestVersion: Option[Long] = None, newestVersion: Option[Long] = None): (List[Array[Byte]], List[Long]) = {
97101

98102
@tailrec
99103
def toListIter(versionIterator: Iterator[VersionedKeyValuePair[Array[Byte]]],
@@ -106,31 +110,31 @@ class VersionedKeyValueStore(underlying: RocksDBStore) {
106110
}
107111
}
108112

109-
val iterator = scanVersionValuePairs(key, newestVersion)
113+
val iterator = scanVersionValuePairs(rocksIt, key, newestVersion)
110114
val (versions, keys) = toListIter(iterator, List(), List())
111115
(versions.reverse, keys.reverse)
112116
}
113117

114-
private def scanVersionValuePairs(key: String, version: Option[Long] = None): Iterator[VersionedKeyValuePair[Array[Byte]]] = {
118+
private def scanVersionValuePairs(rocksIt: RocksIterator, key: String, version: Option[Long] = None): Iterator[VersionedKeyValuePair[Array[Byte]]] = {
115119
requireValidKey(key)
116120
val prefix = s"$key${VersionedKey.versionSeparator}"
117-
underlying.scan(version.map(VersionedKey(key, _).toString).getOrElse(prefix), Some(prefix)).flatMap { pair =>
121+
RocksDBStore.scan(rocksIt, version.map(VersionedKey(key, _).toString).getOrElse(prefix), Some(prefix)).flatMap { pair =>
118122
VersionedKey(pair.key).map(VersionedKeyValuePair(_, pair.value))
119123
}
120124
}
121125

122-
private def scanVersionsOnly(key: String, version: Option[Long] = None): Iterator[VersionedKey] = {
126+
private def scanVersionsOnly(rocksIt: RocksIterator, key: String, version: Option[Long] = None): Iterator[VersionedKey] = {
123127
requireValidKey(key)
124128
val prefix = s"$key${VersionedKey.versionSeparator}"
125-
underlying.scanKeysOnly(version.map(VersionedKey(key, _).toString).getOrElse(prefix), Some(prefix)).flatMap { key =>
129+
RocksDBStore.scanKeysOnly(rocksIt, version.map(VersionedKey(key, _).toString).getOrElse(prefix), Some(prefix)).flatMap { key =>
126130
VersionedKey(key)
127131
}
128132
}
129133

130-
def getMultipleKeys(startAfterKey: Option[String], prefix: Option[String] = None, version: Option[Long] = None, limit: Option[Int]): (Seq[String], Seq[Array[Byte]], Seq[Long]) = {
134+
def getMultipleKeys(rocksIt: RocksIterator, startAfterKey: Option[String], prefix: Option[String] = None, version: Option[Long] = None, limit: Option[Int]): (Seq[String], Seq[Array[Byte]], Seq[Long]) = {
131135
startAfterKey.foreach(requireValidKey)
132136
prefix.foreach{ p => requireValidKey(p)}
133-
val iterator: VersionFilterIterator = scanKeys(startAfterKey, prefix, version)
137+
val iterator: VersionFilterIterator = scanKeys(rocksIt, startAfterKey, prefix, version)
134138

135139
/*
136140
Note that seek in the underlying iterators either hits precisely or goes to the
@@ -155,12 +159,12 @@ class VersionedKeyValueStore(underlying: RocksDBStore) {
155159
(keys, values, versions)
156160
}
157161

158-
private def scanKeys(startAfterKey: Option[String], prefix: Option[String] = None, version: Option[Long] = None): VersionFilterIterator = {
162+
private def scanKeys(rocksIt: RocksIterator, startAfterKey: Option[String], prefix: Option[String] = None, version: Option[Long] = None): VersionFilterIterator = {
159163
val fullKey = startAfterKey.map(key => s"$key${VersionedKey.versionSeparator}").orElse(prefix).getOrElse("")
160-
new VersionFilterIterator(underlying.scan(fullKey, prefix), version)
164+
new VersionFilterIterator(RocksDBStore.scan(rocksIt, fullKey, prefix), version)
161165
}
162166

163-
def deleteMultipleVersions(key: String, oldestVersion: Option[Long] = None, newestVersion: Option[Long] = None): Unit = {
167+
def deleteMultipleVersions(rocksIt: RocksIterator, key: String, oldestVersion: Option[Long] = None, newestVersion: Option[Long] = None): Unit = {
164168
@tailrec
165169
def deleteIter(versionIterator: Iterator[VersionedKey]): Unit = {
166170
if (versionIterator.hasNext) {
@@ -172,7 +176,7 @@ class VersionedKeyValueStore(underlying: RocksDBStore) {
172176
}
173177
}
174178

175-
val versionsIterator = scanVersionsOnly(key, newestVersion)
179+
val versionsIterator = scanVersionsOnly(rocksIt, key, newestVersion)
176180
deleteIter(versionsIterator)
177181
}
178182

@@ -186,13 +190,13 @@ class VersionedKeyValueStore(underlying: RocksDBStore) {
186190
underlying.delete(VersionedKey(key, version).toString)
187191
}
188192

189-
def listKeys(limit: Option[Int], startAfterKey: Option[String]): Seq[String] = {
190-
val iterator = new KeyOnlyIterator(underlying, startAfterKey)
193+
def listKeys(rocksIt: RocksIterator, limit: Option[Int], startAfterKey: Option[String]): Seq[String] = {
194+
val iterator = new KeyOnlyIterator(rocksIt, startAfterKey)
191195
iterator.take(limit.getOrElse(Int.MaxValue)).toSeq
192196
}
193197

194-
def listVersions(key: String, limit: Option[Int], offset: Option[Int]): Seq[Long] = {
195-
val iterator = scanVersionsOnly(key)
198+
def listVersions(rocksIt: RocksIterator, key: String, limit: Option[Int], offset: Option[Int]): Seq[Long] = {
199+
val iterator = scanVersionsOnly(rocksIt, key)
196200
iterator.map(_.version).drop(offset.getOrElse(0)).take(limit.getOrElse(Int.MaxValue)).toSeq
197201
}
198202

0 commit comments

Comments
 (0)