Skip to content

Commit ac44c4f

Browse files
authored
supports checkpoint (#721)
* support checkpoint Signed-off-by: nolouch <[email protected]> * suppport titan checkpoint Signed-off-by: nolouch <[email protected]> * address comments Signed-off-by: nolouch <[email protected]> * address comments Signed-off-by: nolouch <[email protected]> Signed-off-by: nolouch <[email protected]>
1 parent 4c859a2 commit ac44c4f

File tree

9 files changed

+277
-4
lines changed

9 files changed

+277
-4
lines changed

librocksdb_sys/crocksdb/c.cc

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
#include "rocksdb/types.h"
4949
#include "rocksdb/universal_compaction.h"
5050
#include "rocksdb/utilities/backupable_db.h"
51+
#include "rocksdb/utilities/checkpoint.h"
5152
#include "rocksdb/utilities/db_ttl.h"
5253
#include "rocksdb/utilities/debug.h"
5354
#include "rocksdb/utilities/options_util.h"
@@ -57,6 +58,7 @@
5758
#include "table/block_based/block_based_table_factory.h"
5859
#include "table/sst_file_writer_collectors.h"
5960
#include "table/table_reader.h"
61+
#include "titan/checkpoint.h"
6062
#include "titan/db.h"
6163
#include "titan/options.h"
6264
#include "util/coding.h"
@@ -77,6 +79,7 @@ using rocksdb::BackupInfo;
7779
using rocksdb::BlockBasedTableOptions;
7880
using rocksdb::BlockCipher;
7981
using rocksdb::Cache;
82+
using rocksdb::Checkpoint;
8083
using rocksdb::ColumnFamilyDescriptor;
8184
using rocksdb::ColumnFamilyHandle;
8285
using rocksdb::ColumnFamilyOptions;
@@ -171,6 +174,7 @@ using rocksdb::ExternalSstFilePropertyNames;
171174
using rocksdb::IOStatsContext;
172175
using rocksdb::LDBTool;
173176
using rocksdb::LevelMetaData;
177+
using rocksdb::MemoryAllocator;
174178
using rocksdb::PerfContext;
175179
using rocksdb::PerfLevel;
176180
using rocksdb::PutFixed64;
@@ -193,8 +197,7 @@ using rocksdb::titandb::TitanDB;
193197
using rocksdb::titandb::TitanDBOptions;
194198
using rocksdb::titandb::TitanOptions;
195199
using rocksdb::titandb::TitanReadOptions;
196-
197-
using rocksdb::MemoryAllocator;
200+
using TitanCheckpoint = rocksdb::titandb::Checkpoint;
198201

199202
#ifdef OPENSSL
200203
using rocksdb::encryption::EncryptionMethod;
@@ -223,6 +226,9 @@ struct crocksdb_backup_engine_t {
223226
struct crocksdb_backup_engine_info_t {
224227
std::vector<BackupInfo> rep;
225228
};
229+
struct crocksdb_checkpoint_t {
230+
Checkpoint* rep;
231+
};
226232
struct crocksdb_restore_options_t {
227233
RestoreOptions rep;
228234
};
@@ -756,6 +762,29 @@ void crocksdb_resume(crocksdb_t* db, char** errptr) {
756762
SaveError(errptr, db->rep->Resume());
757763
}
758764

765+
crocksdb_checkpoint_t* crocksdb_checkpoint_object_create(crocksdb_t* db,
766+
char** errptr) {
767+
Checkpoint* checkpoint;
768+
if (SaveError(errptr, Checkpoint::Create(db->rep, &checkpoint))) {
769+
return nullptr;
770+
}
771+
crocksdb_checkpoint_t* result = new crocksdb_checkpoint_t;
772+
result->rep = checkpoint;
773+
return result;
774+
}
775+
776+
void crocksdb_checkpoint_create(crocksdb_checkpoint_t* checkpoint,
777+
const char* checkpoint_dir,
778+
uint64_t log_size_for_flush, char** errptr) {
779+
SaveError(errptr, checkpoint->rep->CreateCheckpoint(
780+
std::string(checkpoint_dir), log_size_for_flush));
781+
}
782+
783+
void crocksdb_checkpoint_object_destroy(crocksdb_checkpoint_t* checkpoint) {
784+
delete checkpoint->rep;
785+
delete checkpoint;
786+
}
787+
759788
crocksdb_backup_engine_t* crocksdb_backup_engine_open(
760789
const crocksdb_options_t* options, const char* path, char** errptr) {
761790
BackupEngine* be;
@@ -6383,10 +6412,40 @@ void crocksdb_run_sst_dump_tool(int argc, char** argv,
63836412
}
63846413

63856414
/* Titan */
6415+
struct ctitandb_checkpoint_t {
6416+
TitanCheckpoint* rep;
6417+
};
6418+
63866419
struct ctitandb_options_t {
63876420
TitanOptions rep;
63886421
};
63896422

6423+
ctitandb_checkpoint_t* ctitandb_checkpoint_object_create(crocksdb_t* db,
6424+
char** errptr) {
6425+
TitanCheckpoint* checkpoint;
6426+
if (SaveError(errptr, TitanCheckpoint::Create(static_cast<TitanDB*>(db->rep),
6427+
&checkpoint))) {
6428+
return nullptr;
6429+
}
6430+
ctitandb_checkpoint_t* result = new ctitandb_checkpoint_t;
6431+
result->rep = checkpoint;
6432+
return result;
6433+
}
6434+
6435+
void ctitandb_checkpoint_create(ctitandb_checkpoint_t* checkpoint,
6436+
const char* basedb_checkpoint_dir,
6437+
const char* titan_checkpoint_dir,
6438+
uint64_t log_size_for_flush, char** errptr) {
6439+
SaveError(errptr, checkpoint->rep->CreateCheckpoint(
6440+
std::string(basedb_checkpoint_dir),
6441+
std::string(titan_checkpoint_dir), log_size_for_flush));
6442+
}
6443+
6444+
void ctitandb_checkpoint_object_destroy(ctitandb_checkpoint_t* checkpoint) {
6445+
delete checkpoint->rep;
6446+
delete checkpoint;
6447+
}
6448+
63906449
crocksdb_t* ctitandb_open_column_families(
63916450
const char* name, const ctitandb_options_t* tdb_options,
63926451
int num_column_families, const char** column_family_names,

librocksdb_sys/crocksdb/crocksdb/c.h

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ typedef struct crocksdb_lru_cache_options_t crocksdb_lru_cache_options_t;
8080
typedef struct crocksdb_cache_t crocksdb_cache_t;
8181
typedef struct crocksdb_memory_allocator_t crocksdb_memory_allocator_t;
8282
typedef struct crocksdb_compactionfilter_t crocksdb_compactionfilter_t;
83+
typedef struct crocksdb_checkpoint_t crocksdb_checkpoint_t;
8384
enum {
8485
crocksdb_table_file_creation_reason_flush = 0,
8586
crocksdb_table_file_creation_reason_compaction = 1,
@@ -245,6 +246,16 @@ extern C_ROCKSDB_LIBRARY_API void crocksdb_status_ptr_get_error(
245246

246247
extern C_ROCKSDB_LIBRARY_API void rocksdb_resume(crocksdb_t* db, char** errptr);
247248

249+
extern C_ROCKSDB_LIBRARY_API crocksdb_checkpoint_t*
250+
crocksdb_checkpoint_object_create(crocksdb_t* db, char** errptr);
251+
252+
extern C_ROCKSDB_LIBRARY_API void crocksdb_checkpoint_create(
253+
crocksdb_checkpoint_t* checkpoint, const char* checkpoint_dir,
254+
uint64_t log_size_for_flush, char** errptr);
255+
256+
extern C_ROCKSDB_LIBRARY_API void crocksdb_checkpoint_object_destroy(
257+
crocksdb_checkpoint_t* checkpoint);
258+
248259
extern C_ROCKSDB_LIBRARY_API crocksdb_backup_engine_t*
249260
crocksdb_backup_engine_open(const crocksdb_options_t* options, const char* path,
250261
char** errptr);
@@ -2533,6 +2544,7 @@ struct ctitandb_blob_index_t {
25332544
typedef struct ctitandb_options_t ctitandb_options_t;
25342545
typedef struct ctitandb_readoptions_t ctitandb_readoptions_t;
25352546
typedef struct ctitandb_blob_index_t ctitandb_blob_index_t;
2547+
typedef struct ctitandb_checkpoint_t ctitandb_checkpoint_t;
25362548

25372549
extern C_ROCKSDB_LIBRARY_API crocksdb_t* ctitandb_open_column_families(
25382550
const char* name, const ctitandb_options_t* tdb_options,
@@ -2545,6 +2557,17 @@ ctitandb_create_column_family(
25452557
crocksdb_t* db, const ctitandb_options_t* titan_column_family_options,
25462558
const char* column_family_name, char** errptr);
25472559

2560+
extern C_ROCKSDB_LIBRARY_API ctitandb_checkpoint_t*
2561+
ctitandb_checkpoint_object_create(crocksdb_t* db, char** errptr);
2562+
2563+
extern C_ROCKSDB_LIBRARY_API void ctitandb_checkpoint_create(
2564+
ctitandb_checkpoint_t* checkpoint, const char* basedb_checkpoint_dir,
2565+
const char* titan_checkpoint_dir, uint64_t log_size_for_flush,
2566+
char** errptr);
2567+
2568+
extern C_ROCKSDB_LIBRARY_API void ctitandb_checkpoint_object_destroy(
2569+
ctitandb_checkpoint_t* checkpoint);
2570+
25482571
/* TitanDBOptions */
25492572

25502573
extern C_ROCKSDB_LIBRARY_API ctitandb_options_t* ctitandb_options_create();

librocksdb_sys/src/lib.rs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ pub struct DBCompactionFilter(c_void);
8585
pub struct DBCompactionFilterFactory(c_void);
8686
#[repr(C)]
8787
pub struct DBCompactionFilterContext(c_void);
88-
88+
#[repr(C)]
89+
pub struct DBCheckpoint(c_void);
8990
#[repr(C)]
9091
pub struct EnvOptions(c_void);
9192
#[repr(C)]
@@ -2335,6 +2336,20 @@ extern "C" {
23352336

23362337
pub fn crocksdb_keyversions_value(kvs: *mut DBKeyVersions, index: usize) -> *const c_char;
23372338

2339+
pub fn crocksdb_checkpoint_object_create(
2340+
db: *mut DBInstance,
2341+
errptr: *mut *mut c_char,
2342+
) -> *mut DBCheckpoint;
2343+
2344+
pub fn crocksdb_checkpoint_create(
2345+
check_point: *mut DBCheckpoint,
2346+
check_point_dir: *const c_char,
2347+
log_size_for_flush: u64,
2348+
errptr: *mut *mut c_char,
2349+
);
2350+
2351+
pub fn crocksdb_checkpoint_object_destroy(check_point: *mut DBCheckpoint);
2352+
23382353
pub fn crocksdb_keyversions_seq(kvs: *mut DBKeyVersions, index: usize) -> u64;
23392354

23402355
pub fn crocksdb_keyversions_type(kvs: *mut DBKeyVersions, index: usize) -> c_int;
@@ -2813,6 +2828,21 @@ extern "C" {
28132828
include_end: bool,
28142829
errptr: *mut *mut c_char,
28152830
);
2831+
2832+
pub fn ctitandb_checkpoint_object_create(
2833+
db: *mut DBInstance,
2834+
errptr: *mut *mut c_char,
2835+
) -> *mut DBCheckpoint;
2836+
2837+
pub fn ctitandb_checkpoint_create(
2838+
checkpoint: *mut DBCheckpoint,
2839+
basedb_checkpoint_dir: *const c_char,
2840+
titan_checkpoint_dir: *const c_char,
2841+
log_size_for_flush: u64,
2842+
errptr: *mut *mut c_char,
2843+
);
2844+
2845+
pub fn ctitandb_checkpoint_object_destroy(check_point: *mut DBCheckpoint);
28162846
}
28172847

28182848
#[cfg(test)]

src/checkpoint.rs

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0.
2+
3+
use std::ffi::CString;
4+
use std::path::Path;
5+
6+
pub struct Checkpointer {
7+
ptr: *mut librocksdb_sys::DBCheckpoint,
8+
is_titan: bool,
9+
}
10+
11+
impl Checkpointer {
12+
/// Creates new checkpoint object for specific DB.
13+
pub(crate) fn new(
14+
db: *mut librocksdb_sys::DBInstance,
15+
is_titan: bool,
16+
) -> Result<Checkpointer, String> {
17+
let ptr = if is_titan {
18+
unsafe { ffi_try!(ctitandb_checkpoint_object_create(db)) }
19+
} else {
20+
unsafe { ffi_try!(crocksdb_checkpoint_object_create(db)) }
21+
};
22+
Ok(Checkpointer { ptr, is_titan })
23+
}
24+
/// Creates new physical DB checkpoint in directory specified by `path`.
25+
///
26+
/// Builds an openable snapshot of RocksDB on the same disk, which
27+
/// accepts an output directory on the same disk, and under the directory
28+
/// (1) hard-linked SST files pointing to existing live SST files
29+
/// SST files will be copied if output directory is on a different filesystem
30+
/// (2) a copied manifest files and other files
31+
/// The directory should not already exist and will be created by this API.
32+
/// The directory will be an absolute path
33+
/// log_size_for_flush: if the total log file size is equal or larger than
34+
/// this value, then a flush is triggered for all the column families. The
35+
/// default value is 0, which means flush is always triggered. If you move
36+
/// away from the default, the checkpoint may not contain up-to-date data
37+
/// if WAL writing is not always enabled.
38+
/// Flush will always trigger if it is 2PC.
39+
///
40+
/// basedb_out_dir: the checkpoint path about rocksdb
41+
/// titan_out_dir: the checkpoint path about titan's files. if titan_out_dir
42+
/// is None, the path will be "{basedb_out_dir}/titandb".
43+
pub fn create_at(
44+
&mut self,
45+
basedb_out_dir: &Path,
46+
titan_out_dir: Option<&Path>,
47+
log_size_for_flush: u64,
48+
) -> Result<(), String> {
49+
let basedb_out_dir = match basedb_out_dir.to_str().and_then(|s| CString::new(s).ok()) {
50+
Some(s) => s,
51+
None => {
52+
return Err(format!(
53+
"{} is not a valid directory",
54+
basedb_out_dir.display()
55+
))
56+
}
57+
};
58+
let mut titan_out_dir_str = CString::new("").ok().unwrap();
59+
if let Some(titan_out_dir) = titan_out_dir {
60+
match titan_out_dir.to_str().and_then(|s| CString::new(s).ok()) {
61+
Some(s) => titan_out_dir_str = s,
62+
None => {
63+
return Err(format!(
64+
"{} is not a valid directory",
65+
titan_out_dir.display()
66+
))
67+
}
68+
};
69+
}
70+
71+
if self.is_titan {
72+
unsafe {
73+
ffi_try!(ctitandb_checkpoint_create(
74+
self.ptr,
75+
basedb_out_dir.as_ptr(),
76+
titan_out_dir_str.as_ptr(),
77+
log_size_for_flush
78+
));
79+
}
80+
} else {
81+
unsafe {
82+
ffi_try!(crocksdb_checkpoint_create(
83+
self.ptr,
84+
basedb_out_dir.as_ptr(),
85+
log_size_for_flush
86+
));
87+
}
88+
}
89+
90+
Ok(())
91+
}
92+
}
93+
94+
impl Drop for Checkpointer {
95+
fn drop(&mut self) {
96+
if self.is_titan {
97+
unsafe {
98+
librocksdb_sys::ctitandb_checkpoint_object_destroy(self.ptr);
99+
}
100+
return;
101+
}
102+
unsafe {
103+
librocksdb_sys::crocksdb_checkpoint_object_destroy(self.ptr);
104+
}
105+
}
106+
}

src/compaction_filter.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -523,7 +523,6 @@ mod tests {
523523
db.compact_range_cf(cfh, None, None);
524524
let sk = rx.recv().unwrap();
525525
let ek = rx.recv().unwrap();
526-
println!("sk:{:?} ek:{:?}", sk, ek);
527526
let sk = str::from_utf8(&sk).unwrap();
528527
let ek = str::from_utf8(&ek).unwrap();
529528
assert_eq!("key0", sk);

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ extern crate tempfile;
2727
#[macro_use]
2828
extern crate lazy_static;
2929

30+
pub use checkpoint::Checkpointer;
3031
pub use compaction_filter::{
3132
new_compaction_filter, new_compaction_filter_factory, new_compaction_filter_raw,
3233
CompactionFilter, CompactionFilterContext, CompactionFilterDecision, CompactionFilterFactory,
@@ -83,6 +84,7 @@ pub use write_batch::{WriteBatch, WriteBatchIter, WriteBatchRef};
8384
#[allow(deprecated)]
8485
pub use rocksdb::Kv;
8586

87+
mod checkpoint;
8688
mod compaction_filter;
8789
pub mod comparator;
8890
#[cfg(feature = "encryption")]

src/rocksdb.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1856,6 +1856,10 @@ impl DB {
18561856
DB::open_default(restore_db_path)
18571857
}
18581858

1859+
pub fn new_checkpointer(&self) -> Result<crate::Checkpointer, String> {
1860+
crate::Checkpointer::new(self.inner, self.is_titan())
1861+
}
1862+
18591863
pub fn get_block_cache_usage(&self) -> u64 {
18601864
self.get_options().get_block_cache_usage()
18611865
}

tests/cases/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
mod test_checkpoint;
12
mod test_column_family;
23
mod test_compact_range;
34
mod test_compaction_filter;

0 commit comments

Comments
 (0)