Skip to content

Commit 7a68fd2

Browse files
author
Youri K
committed
implement new rpc route for db export and remove unnecessary code
1 parent 2af7eef commit 7a68fd2

File tree

4 files changed

+58
-83
lines changed

4 files changed

+58
-83
lines changed

src/main/protobuf/fossildbapi.proto

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,13 +130,23 @@ message RestoreFromBackupReply {
130130
optional string errorMessage = 2;
131131
}
132132

133-
message CompactAllDataRequest {optional uint32 type = 1;}
133+
message CompactAllDataRequest {}
134134

135135
message CompactAllDataReply {
136136
required bool success = 1;
137137
optional string errorMessage = 2;
138138
}
139139

140+
message ExportDBRequest {
141+
required string newDataDir = 1;
142+
optional string optionsFile = 2;
143+
}
144+
145+
message ExportDBReply {
146+
required bool success = 1;
147+
optional string errorMessage = 2;
148+
}
149+
140150

141151
service FossilDB {
142152
rpc Health (HealthRequest) returns (HealthReply) {}
@@ -151,4 +161,5 @@ service FossilDB {
151161
rpc Backup (BackupRequest) returns (BackupReply) {}
152162
rpc RestoreFromBackup (RestoreFromBackupRequest) returns (RestoreFromBackupReply) {}
153163
rpc CompactAllData (CompactAllDataRequest) returns (CompactAllDataReply) {}
164+
rpc ExportDB (ExportDBRequest) returns (ExportDBReply) {}
154165
}

src/main/scala/com/scalableminds/fossildb/FossilDBGrpcImpl.scala

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@ import scala.concurrent.Future
1515

1616
class FossilDBGrpcImpl(storeManager: StoreManager)
1717
extends FossilDBGrpc.FossilDB
18-
with LazyLogging {
18+
with LazyLogging {
1919

2020
override def health(req: HealthRequest) = withExceptionHandler(req) {
2121
HealthReply(true)
22-
} {errorMsg => HealthReply(false, errorMsg)}
22+
} { errorMsg => HealthReply(false, errorMsg) }
2323

2424
override def get(req: GetRequest) = withExceptionHandler(req) {
2525
val store = storeManager.getStore(req.collection)
@@ -31,71 +31,76 @@ class FossilDBGrpcImpl(storeManager: StoreManager)
3131
GetReply(false, Some("No such element"), ByteString.EMPTY, 0)
3232
}
3333
}
34-
} {errorMsg => GetReply(false, errorMsg, ByteString.EMPTY, 0)}
34+
} { errorMsg => GetReply(false, errorMsg, ByteString.EMPTY, 0) }
3535

3636
override def put(req: PutRequest) = withExceptionHandler(req) {
3737
val store = storeManager.getStore(req.collection)
3838
val version = req.version.getOrElse(store.get(req.key, None).map(_.version + 1).getOrElse(0L))
3939
require(version >= 0, "Version numbers must be non-negative")
4040
store.put(req.key, version, req.value.toByteArray)
4141
PutReply(true)
42-
} {errorMsg => PutReply(false, errorMsg)}
42+
} { errorMsg => PutReply(false, errorMsg) }
4343

4444
override def delete(req: DeleteRequest) = withExceptionHandler(req) {
4545
val store = storeManager.getStore(req.collection)
4646
store.delete(req.key, req.version)
4747
DeleteReply(true)
48-
} {errorMsg => DeleteReply(false, errorMsg)}
48+
} { errorMsg => DeleteReply(false, errorMsg) }
4949

5050
override def getMultipleVersions(req: GetMultipleVersionsRequest) = withExceptionHandler(req) {
5151
val store = storeManager.getStore(req.collection)
5252
val (values, versions) = store.getMultipleVersions(req.key, req.oldestVersion, req.newestVersion)
5353
GetMultipleVersionsReply(true, None, values.map(ByteString.copyFrom(_)), versions)
54-
} {errorMsg => GetMultipleVersionsReply(false, errorMsg)}
54+
} { errorMsg => GetMultipleVersionsReply(false, errorMsg) }
5555

5656
override def getMultipleKeys(req: GetMultipleKeysRequest) = withExceptionHandler(req) {
5757
val store = storeManager.getStore(req.collection)
5858
val (keys, values, versions) = store.getMultipleKeys(req.key, req.prefix, req.version, req.limit)
5959
GetMultipleKeysReply(true, None, keys, values.map(ByteString.copyFrom(_)), versions)
60-
} {errorMsg => GetMultipleKeysReply(false, errorMsg)}
60+
} { errorMsg => GetMultipleKeysReply(false, errorMsg) }
6161

6262
override def deleteMultipleVersions(req: DeleteMultipleVersionsRequest) = withExceptionHandler(req) {
6363
val store = storeManager.getStore(req.collection)
6464
store.deleteMultipleVersions(req.key, req.oldestVersion, req.newestVersion)
6565
DeleteMultipleVersionsReply(true)
66-
} {errorMsg => DeleteMultipleVersionsReply(false, errorMsg)}
66+
} { errorMsg => DeleteMultipleVersionsReply(false, errorMsg) }
6767

6868
override def listKeys(req: ListKeysRequest) = withExceptionHandler(req) {
6969
val store = storeManager.getStore(req.collection)
7070
val keys = store.listKeys(req.limit, req.startAfterKey)
7171
ListKeysReply(true, None, keys)
72-
} {errorMsg => ListKeysReply(false, errorMsg)}
72+
} { errorMsg => ListKeysReply(false, errorMsg) }
7373

7474
override def listVersions(req: ListVersionsRequest) = withExceptionHandler(req) {
7575
val store = storeManager.getStore(req.collection)
7676
val versions = store.listVersions(req.key, req.limit, req.offset)
7777
ListVersionsReply(true, None, versions)
78-
} {errorMsg => ListVersionsReply(false, errorMsg)}
78+
} { errorMsg => ListVersionsReply(false, errorMsg) }
7979

8080
override def backup(req: BackupRequest) = withExceptionHandler(req) {
8181
val backupInfoOpt = storeManager.backup
8282
backupInfoOpt match {
8383
case Some(backupInfo) => BackupReply(true, None, backupInfo.id, backupInfo.timestamp, backupInfo.size)
8484
case _ => throw new Exception("Backup did not return valid BackupInfo")
8585
}
86-
} {errorMsg => BackupReply(false, errorMsg, 0, 0, 0)}
86+
} { errorMsg => BackupReply(false, errorMsg, 0, 0, 0) }
8787

8888
override def restoreFromBackup(req: RestoreFromBackupRequest) = withExceptionHandler(req) {
8989
storeManager.restoreFromBackup
9090
RestoreFromBackupReply(true)
91-
} {errorMsg => RestoreFromBackupReply(false, errorMsg)}
91+
} { errorMsg => RestoreFromBackupReply(false, errorMsg) }
9292

9393
override def compactAllData(req: CompactAllDataRequest) = withExceptionHandler(req) {
94-
storeManager.compactAllData(req.`type`)
94+
storeManager.compactAllData()
9595
CompactAllDataReply(true)
96-
} {errorMsg => CompactAllDataReply(false, errorMsg)}
96+
} { errorMsg => CompactAllDataReply(false, errorMsg) }
9797

98-
private def withExceptionHandler [T, R <: GeneratedMessage](request: R)(tryBlock: => T)(onErrorBlock: Option[String] => T): Future[T] = {
98+
override def exportDB(req: ExportDBRequest) = withExceptionHandler(req) {
99+
storeManager.exportDB(req.newDataDir, req.optionsFile)
100+
ExportDBReply(true)
101+
} { errorMsg => ExportDBReply(false, errorMsg) }
102+
103+
private def withExceptionHandler[T, R <: GeneratedMessage](request: R)(tryBlock: => T)(onErrorBlock: Option[String] => T): Future[T] = {
99104
try {
100105
logger.debug("received " + requestToString(request))
101106
Future.successful(tryBlock)
@@ -112,7 +117,7 @@ class FossilDBGrpcImpl(storeManager: StoreManager)
112117
}
113118

114119
private def requestToString[R <: GeneratedMessage](request: R) =
115-
request.getClass.getSimpleName + "(" + request.toString.replaceAll("\n"," ") + ")"
120+
request.getClass.getSimpleName + "(" + request.toString.replaceAll("\n", " ") + ")"
116121

117122
private def getStackTraceAsString(t: Throwable) = {
118123
val sw = new StringWriter

src/main/scala/com/scalableminds/fossildb/db/RocksDBStore.scala

Lines changed: 15 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -76,77 +76,29 @@ class RocksDBManager(dataDir: Path, columnFamilies: List[String], optionsFilePat
7676
logger.info("Restoring from backup complete. Reopening RocksDB")
7777
}
7878

79-
def compactAllData(idx: Option[Int]) = {
79+
def compactAllData() = {
8080
logger.info("Compacting all data")
8181
RocksDB.loadLibrary()
82-
idx.getOrElse(0) match {
83-
case 0 => db.compactRange()
84-
case 1 => writeAllSSts()
85-
case 2 => ingestFiles()
86-
case 3 => writeToNewDB()
87-
}
82+
db.compactRange()
8883
logger.info("All data has been compacted to last level containing data")
8984
}
9085

91-
def close(): Future[Unit] = {
92-
logger.info("Closing RocksDB handle")
93-
Future.successful(db.close())
94-
}
95-
96-
def ingestFiles() = {
97-
val ifo = new IngestExternalFileOptions()
98-
ifo.setMoveFiles(true)
99-
val fileNames = new File("toIngest").listFiles.filter(_.isFile).filter(_.getName.endsWith("sst")).map(_.getPath)
100-
val asd: mutable.Buffer[String] = fileNames.toBuffer
101-
val handle = columnFamilyHandles("skeletons")
102-
db.ingestExternalFile(handle, asd.asJava, ifo)
103-
}
104-
105-
def writeAllSSts() = {
106-
val (dbOptions, columnFamilyDescriptors) = loadOptions("config/options.ini")
107-
val descriptor = columnFamilyDescriptors.find(_.getName sameElements "skeletons".getBytes)
108-
val options = new Options(dbOptions, descriptor.get.getOptions)
109-
val writer = new SstFileWriter(new EnvOptions(), options)
110-
val store = getStoreForColumnFamily("skeletons")
111-
val it = store.get.scan("", None)
112-
var idx = 0
113-
writer.open(s"data/export/test${idx}.sst")
114-
it.foreach { el =>
115-
if (new File(s"data/export/test${idx}.sst").length() + el.key.getBytes.length + el.value.length > options.targetFileSizeBase()) {
116-
writer.finish()
117-
idx += 1
118-
writer.open(s"data/export/test${idx}.sst")
119-
}
120-
writer.put(el.key.getBytes, el.value)
86+
def exportToNewDB(newDataDir: Path, newOptionsFilePathOpt: Option[String]) = {
87+
RocksDB.loadLibrary()
88+
logger.info(s"Exporting to new DB at ${newDataDir.toString} with options file ${newOptionsFilePathOpt}")
89+
val newManager = new RocksDBManager(newDataDir, columnFamilies, newOptionsFilePathOpt)
90+
newManager.columnFamilyHandles.foreach { case (name, handle) =>
91+
val dataIterator = getStoreForColumnFamily(name).get.scan("", None)
92+
dataIterator.foreach(el => newManager.db.put(handle, el.key.getBytes, el.value))
12193
}
122-
writer.finish()
123-
}
124-
125-
def writeToNewDB() = {
126-
val manager = new RocksDBManager(Paths.get("data_new"), columnFamilies, Some("config/options.ini"))
127-
val skeletonHandle = manager.columnFamilyHandles("skeletons")
128-
val it = getStoreForColumnFamily("skeletons").get.scan("", None)
129-
it.foreach { el => manager.db.put(skeletonHandle, el.key.getBytes, el.value) }
94+
logger.info("Writing data completed. Start compaction")
95+
newManager.db.compactRange()
96+
logger.info("Compaction finished")
13097
}
13198

132-
def loadOptions(optionFilepath: String) = {
133-
val options = new DBOptions()
134-
val cfListRef: mutable.Buffer[ColumnFamilyDescriptor] = mutable.Buffer()
135-
optionsFilePathOpt.foreach { optionsFilePath =>
136-
try {
137-
org.rocksdb.OptionsUtil.loadOptionsFromFile(optionsFilePath, Env.getDefault, options, cfListRef.asJava)
138-
logger.info("successfully loaded rocksdb options from " + optionsFilePath)
139-
} catch {
140-
case e: Exception => {
141-
throw new Exception("Failed to load rocksdb options from file " + optionsFilePath, e)
142-
}
143-
}
144-
}
145-
options.setCreateIfMissing(true).setCreateMissingColumnFamilies(true)
146-
val defaultColumnFamilyOptions = cfListRef.find(_.getName sameElements RocksDB.DEFAULT_COLUMN_FAMILY).map(_.getOptions).getOrElse(new ColumnFamilyOptions())
147-
val newColumnFamilyDescriptors = (columnFamilies.map(_.getBytes) :+ RocksDB.DEFAULT_COLUMN_FAMILY).diff(cfListRef.toList.map(_.getName)).map(new ColumnFamilyDescriptor(_, defaultColumnFamilyOptions))
148-
val columnFamilyDescriptors = cfListRef.toList ::: newColumnFamilyDescriptors
149-
(options, columnFamilyDescriptors)
99+
def close(): Future[Unit] = {
100+
logger.info("Closing RocksDB handle")
101+
Future.successful(db.close())
150102
}
151103
}
152104

src/main/scala/com/scalableminds/fossildb/db/StoreManager.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
*/
44
package com.scalableminds.fossildb.db
55

6-
import java.nio.file.Path
6+
import java.nio.file.{Path, Paths}
77
import java.util.concurrent.atomic.AtomicBoolean
88

99
class StoreManager(dataDir: Path, backupDir: Path, columnFamilies: List[String], rocksdbOptions: Option[String]) {
@@ -66,11 +66,18 @@ class StoreManager(dataDir: Path, backupDir: Path, columnFamilies: List[String],
6666
}
6767
}
6868

69-
def compactAllData(idx: Option[Int]) = {
69+
def compactAllData() = {
7070
failDuringBackup
7171
failDuringRestore
7272
try {
73-
rocksDBManager.get.compactAllData(idx)
73+
rocksDBManager.get.compactAllData()
74+
}
75+
}
76+
77+
def exportDB(newDataDir: String, newOptionsFilePathOpt: Option[String]) = {
78+
failDuringRestore
79+
try {
80+
rocksDBManager.get.exportToNewDB(Paths.get(newDataDir), newOptionsFilePathOpt)
7481
}
7582
}
7683

0 commit comments

Comments
 (0)