Skip to content

Commit 273a251

Browse files
authored
Merge pull request #32 from scalableminds/fix-columnFamily-options
Export To New DB
2 parents 28687a6 + 7a68fd2 commit 273a251

File tree

4 files changed

+61
-20
lines changed

4 files changed

+61
-20
lines changed

src/main/protobuf/fossildbapi.proto

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,16 @@ message CompactAllDataReply {
137137
optional string errorMessage = 2;
138138
}
139139

140+
message ExportDBRequest {
141+
required string newDataDir = 1;
142+
optional string optionsFile = 2;
143+
}
144+
145+
message ExportDBReply {
146+
required bool success = 1;
147+
optional string errorMessage = 2;
148+
}
149+
140150

141151
service FossilDB {
142152
rpc Health (HealthRequest) returns (HealthReply) {}
@@ -151,4 +161,5 @@ service FossilDB {
151161
rpc Backup (BackupRequest) returns (BackupReply) {}
152162
rpc RestoreFromBackup (RestoreFromBackupRequest) returns (RestoreFromBackupReply) {}
153163
rpc CompactAllData (CompactAllDataRequest) returns (CompactAllDataReply) {}
164+
rpc ExportDB (ExportDBRequest) returns (ExportDBReply) {}
154165
}

src/main/scala/com/scalableminds/fossildb/FossilDBGrpcImpl.scala

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@ import scala.concurrent.Future
1515

1616
class FossilDBGrpcImpl(storeManager: StoreManager)
1717
extends FossilDBGrpc.FossilDB
18-
with LazyLogging {
18+
with LazyLogging {
1919

2020
override def health(req: HealthRequest) = withExceptionHandler(req) {
2121
HealthReply(true)
22-
} {errorMsg => HealthReply(false, errorMsg)}
22+
} { errorMsg => HealthReply(false, errorMsg) }
2323

2424
override def get(req: GetRequest) = withExceptionHandler(req) {
2525
val store = storeManager.getStore(req.collection)
@@ -31,71 +31,76 @@ class FossilDBGrpcImpl(storeManager: StoreManager)
3131
GetReply(false, Some("No such element"), ByteString.EMPTY, 0)
3232
}
3333
}
34-
} {errorMsg => GetReply(false, errorMsg, ByteString.EMPTY, 0)}
34+
} { errorMsg => GetReply(false, errorMsg, ByteString.EMPTY, 0) }
3535

3636
override def put(req: PutRequest) = withExceptionHandler(req) {
3737
val store = storeManager.getStore(req.collection)
3838
val version = req.version.getOrElse(store.get(req.key, None).map(_.version + 1).getOrElse(0L))
3939
require(version >= 0, "Version numbers must be non-negative")
4040
store.put(req.key, version, req.value.toByteArray)
4141
PutReply(true)
42-
} {errorMsg => PutReply(false, errorMsg)}
42+
} { errorMsg => PutReply(false, errorMsg) }
4343

4444
override def delete(req: DeleteRequest) = withExceptionHandler(req) {
4545
val store = storeManager.getStore(req.collection)
4646
store.delete(req.key, req.version)
4747
DeleteReply(true)
48-
} {errorMsg => DeleteReply(false, errorMsg)}
48+
} { errorMsg => DeleteReply(false, errorMsg) }
4949

5050
override def getMultipleVersions(req: GetMultipleVersionsRequest) = withExceptionHandler(req) {
5151
val store = storeManager.getStore(req.collection)
5252
val (values, versions) = store.getMultipleVersions(req.key, req.oldestVersion, req.newestVersion)
5353
GetMultipleVersionsReply(true, None, values.map(ByteString.copyFrom(_)), versions)
54-
} {errorMsg => GetMultipleVersionsReply(false, errorMsg)}
54+
} { errorMsg => GetMultipleVersionsReply(false, errorMsg) }
5555

5656
override def getMultipleKeys(req: GetMultipleKeysRequest) = withExceptionHandler(req) {
5757
val store = storeManager.getStore(req.collection)
5858
val (keys, values, versions) = store.getMultipleKeys(req.key, req.prefix, req.version, req.limit)
5959
GetMultipleKeysReply(true, None, keys, values.map(ByteString.copyFrom(_)), versions)
60-
} {errorMsg => GetMultipleKeysReply(false, errorMsg)}
60+
} { errorMsg => GetMultipleKeysReply(false, errorMsg) }
6161

6262
override def deleteMultipleVersions(req: DeleteMultipleVersionsRequest) = withExceptionHandler(req) {
6363
val store = storeManager.getStore(req.collection)
6464
store.deleteMultipleVersions(req.key, req.oldestVersion, req.newestVersion)
6565
DeleteMultipleVersionsReply(true)
66-
} {errorMsg => DeleteMultipleVersionsReply(false, errorMsg)}
66+
} { errorMsg => DeleteMultipleVersionsReply(false, errorMsg) }
6767

6868
override def listKeys(req: ListKeysRequest) = withExceptionHandler(req) {
6969
val store = storeManager.getStore(req.collection)
7070
val keys = store.listKeys(req.limit, req.startAfterKey)
7171
ListKeysReply(true, None, keys)
72-
} {errorMsg => ListKeysReply(false, errorMsg)}
72+
} { errorMsg => ListKeysReply(false, errorMsg) }
7373

7474
override def listVersions(req: ListVersionsRequest) = withExceptionHandler(req) {
7575
val store = storeManager.getStore(req.collection)
7676
val versions = store.listVersions(req.key, req.limit, req.offset)
7777
ListVersionsReply(true, None, versions)
78-
} {errorMsg => ListVersionsReply(false, errorMsg)}
78+
} { errorMsg => ListVersionsReply(false, errorMsg) }
7979

8080
override def backup(req: BackupRequest) = withExceptionHandler(req) {
8181
val backupInfoOpt = storeManager.backup
8282
backupInfoOpt match {
8383
case Some(backupInfo) => BackupReply(true, None, backupInfo.id, backupInfo.timestamp, backupInfo.size)
8484
case _ => throw new Exception("Backup did not return valid BackupInfo")
8585
}
86-
} {errorMsg => BackupReply(false, errorMsg, 0, 0, 0)}
86+
} { errorMsg => BackupReply(false, errorMsg, 0, 0, 0) }
8787

8888
override def restoreFromBackup(req: RestoreFromBackupRequest) = withExceptionHandler(req) {
8989
storeManager.restoreFromBackup
9090
RestoreFromBackupReply(true)
91-
} {errorMsg => RestoreFromBackupReply(false, errorMsg)}
91+
} { errorMsg => RestoreFromBackupReply(false, errorMsg) }
9292

9393
override def compactAllData(req: CompactAllDataRequest) = withExceptionHandler(req) {
94-
storeManager.compactAllData
94+
storeManager.compactAllData()
9595
CompactAllDataReply(true)
96-
} {errorMsg => CompactAllDataReply(false, errorMsg)}
96+
} { errorMsg => CompactAllDataReply(false, errorMsg) }
9797

98-
private def withExceptionHandler [T, R <: GeneratedMessage](request: R)(tryBlock: => T)(onErrorBlock: Option[String] => T): Future[T] = {
98+
override def exportDB(req: ExportDBRequest) = withExceptionHandler(req) {
99+
storeManager.exportDB(req.newDataDir, req.optionsFile)
100+
ExportDBReply(true)
101+
} { errorMsg => ExportDBReply(false, errorMsg) }
102+
103+
private def withExceptionHandler[T, R <: GeneratedMessage](request: R)(tryBlock: => T)(onErrorBlock: Option[String] => T): Future[T] = {
99104
try {
100105
logger.debug("received " + requestToString(request))
101106
Future.successful(tryBlock)
@@ -112,7 +117,7 @@ class FossilDBGrpcImpl(storeManager: StoreManager)
112117
}
113118

114119
private def requestToString[R <: GeneratedMessage](request: R) =
115-
request.getClass.getSimpleName + "(" + request.toString.replaceAll("\n"," ") + ")"
120+
request.getClass.getSimpleName + "(" + request.toString.replaceAll("\n", " ") + ")"
116121

117122
private def getStackTraceAsString(t: Throwable) = {
118123
val sw = new StringWriter

src/main/scala/com/scalableminds/fossildb/db/RocksDBStore.scala

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
*/
44
package com.scalableminds.fossildb.db
55

6-
import java.nio.file.{Files, Path}
6+
import java.io.File
7+
import java.nio.file.{Files, Path, Paths}
78
import java.util
89

910
import com.typesafe.scalalogging.LazyLogging
@@ -21,6 +22,10 @@ class RocksDBManager(dataDir: Path, columnFamilies: List[String], optionsFilePat
2122

2223
val (db: RocksDB, columnFamilyHandles) = {
2324
RocksDB.loadLibrary()
25+
val columnOptions = new ColumnFamilyOptions()
26+
.setArenaBlockSize(4 * 1024 * 1024) // 4MB
27+
.setTargetFileSizeBase(1024 * 1024 * 1024) // 1GB
28+
.setMaxBytesForLevelBase(10 * 1024 * 1024 * 1024) // 10GB
2429
val options = new DBOptions()
2530
val cfListRef: mutable.Buffer[ColumnFamilyDescriptor] = mutable.Buffer()
2631
optionsFilePathOpt.foreach { optionsFilePath =>
@@ -34,7 +39,7 @@ class RocksDBManager(dataDir: Path, columnFamilies: List[String], optionsFilePat
3439
}
3540
}
3641
options.setCreateIfMissing(true).setCreateMissingColumnFamilies(true)
37-
val defaultColumnFamilyOptions = cfListRef.find(_.getName sameElements RocksDB.DEFAULT_COLUMN_FAMILY).map(_.getOptions).getOrElse(new ColumnFamilyOptions())
42+
val defaultColumnFamilyOptions = cfListRef.find(_.getName sameElements RocksDB.DEFAULT_COLUMN_FAMILY).map(_.getOptions).getOrElse(columnOptions)
3843
val newColumnFamilyDescriptors = (columnFamilies.map(_.getBytes) :+ RocksDB.DEFAULT_COLUMN_FAMILY).diff(cfListRef.toList.map(_.getName)).map(new ColumnFamilyDescriptor(_, defaultColumnFamilyOptions))
3944
val columnFamilyDescriptors = cfListRef.toList ::: newColumnFamilyDescriptors
4045
logger.info("Opening RocksDB at " + dataDir.toAbsolutePath)
@@ -78,6 +83,19 @@ class RocksDBManager(dataDir: Path, columnFamilies: List[String], optionsFilePat
7883
logger.info("All data has been compacted to last level containing data")
7984
}
8085

86+
def exportToNewDB(newDataDir: Path, newOptionsFilePathOpt: Option[String]) = {
87+
RocksDB.loadLibrary()
88+
logger.info(s"Exporting to new DB at ${newDataDir.toString} with options file ${newOptionsFilePathOpt}")
89+
val newManager = new RocksDBManager(newDataDir, columnFamilies, newOptionsFilePathOpt)
90+
newManager.columnFamilyHandles.foreach { case (name, handle) =>
91+
val dataIterator = getStoreForColumnFamily(name).get.scan("", None)
92+
dataIterator.foreach(el => newManager.db.put(handle, el.key.getBytes, el.value))
93+
}
94+
logger.info("Writing data completed. Start compaction")
95+
newManager.db.compactRange()
96+
logger.info("Compaction finished")
97+
}
98+
8199
def close(): Future[Unit] = {
82100
logger.info("Closing RocksDB handle")
83101
Future.successful(db.close())

src/main/scala/com/scalableminds/fossildb/db/StoreManager.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
*/
44
package com.scalableminds.fossildb.db
55

6-
import java.nio.file.Path
6+
import java.nio.file.{Path, Paths}
77
import java.util.concurrent.atomic.AtomicBoolean
88

99
class StoreManager(dataDir: Path, backupDir: Path, columnFamilies: List[String], rocksdbOptions: Option[String]) {
@@ -66,14 +66,21 @@ class StoreManager(dataDir: Path, backupDir: Path, columnFamilies: List[String],
6666
}
6767
}
6868

69-
def compactAllData = {
69+
def compactAllData() = {
7070
failDuringBackup
7171
failDuringRestore
7272
try {
7373
rocksDBManager.get.compactAllData()
7474
}
7575
}
7676

77+
def exportDB(newDataDir: String, newOptionsFilePathOpt: Option[String]) = {
78+
failDuringRestore
79+
try {
80+
rocksDBManager.get.exportToNewDB(Paths.get(newDataDir), newOptionsFilePathOpt)
81+
}
82+
}
83+
7784
def close = {
7885
rocksDBManager.map(_.close)
7986
}

0 commit comments

Comments
 (0)