diff --git a/ext/leveldb/leveldb.cc b/ext/leveldb/leveldb.cc index d2dd9fa..8b6c429 100644 --- a/ext/leveldb/leveldb.cc +++ b/ext/leveldb/leveldb.cc @@ -17,6 +17,7 @@ static VALUE c_no_compression; static VALUE c_snappy_compression; static VALUE k_fill; static VALUE k_verify; +static VALUE k_snapshot; static VALUE k_sync; static VALUE k_from; static VALUE k_to; @@ -27,6 +28,7 @@ static ID k_to_s; static leveldb::ReadOptions uncached_read_options; static VALUE c_db_options; +static VALUE c_snapshot; static VALUE k_create_if_missing; static VALUE k_error_if_exists; static VALUE k_paranoid_checks; @@ -54,6 +56,11 @@ typedef struct bound_db { leveldb::DB* db; } bound_db; +typedef struct bound_snapshot { + const leveldb::Snapshot* snapshot; + VALUE v_db; +} bound_snapshot; + static void db_free(bound_db* db) { if(db->db != NULL) { delete db->db; @@ -233,9 +240,16 @@ static leveldb::ReadOptions parse_read_options(VALUE options) { VALUE v_fill = rb_hash_aref(options, k_fill); VALUE v_verify = rb_hash_aref(options, k_verify); + VALUE v_snapshot = rb_hash_aref(options, k_snapshot); if(!NIL_P(v_fill)) readOptions.fill_cache = RTEST(v_fill); if(!NIL_P(v_verify)) readOptions.verify_checksums = RTEST(v_verify); + + if(!NIL_P(v_snapshot)) { + bound_snapshot* sn; + Data_Get_Struct(v_snapshot, bound_snapshot, sn); + readOptions.snapshot = sn->snapshot; + } } return readOptions; @@ -274,6 +288,9 @@ static leveldb::WriteOptions parse_write_options(VALUE options) { * verified against corresponding checksums. * * Default: false + * [options[ :snapshot ]] If value is a Snapshot instance, read from that version of DB. + * + * Default: nil * [return] value of stored db */ static VALUE db_get(int argc, VALUE* argv, VALUE self) { @@ -416,11 +433,14 @@ static VALUE iter_make(VALUE klass, VALUE db, VALUE options) { bound_db* b_db; Data_Get_Struct(db, bound_db, b_db); + leveldb::ReadOptions read_options = parse_read_options(options); + read_options.fill_cache = false; + current_iteration* iter = new current_iteration; iter->passed_limit = false; iter->check_limit = false; iter->checked_valid = 0; - iter->iterator = b_db->db->NewIterator(uncached_read_options); + iter->iterator = b_db->db->NewIterator(read_options); VALUE o_iter = Data_Wrap_Struct(klass, NULL, current_iteration_free, iter); @@ -636,10 +656,123 @@ static VALUE db_batch(int argc, VALUE* argv, VALUE self) { return Qtrue; } +static void bound_snapshot_mark(bound_snapshot* b_sn) { + rb_gc_mark(b_sn->v_db); +} + +static void bound_snapshot_free(bound_snapshot* b_sn) { + if (b_sn->snapshot && rb_during_gc()) { + bound_db* b_db; + Data_Get_Struct(b_sn->v_db, bound_db, b_db); + b_db->db->ReleaseSnapshot(b_sn->snapshot); + } + // If not rb_during_gc, then ruby vm is finalizing, and db either has been freed + // (in which case we can't call ReleaseSnapshot) or is about to be freed (in which + // case we don't need to). + delete b_sn; +} + +static VALUE snapshot_make(VALUE klass, VALUE v_db) { + if (c_db != rb_funcall(v_db, k_class, 0)) { + rb_raise(rb_eArgError, "db must be a LevelDB::DB"); + } + + bound_db* b_db; + Data_Get_Struct(v_db, bound_db, b_db); + + bound_snapshot* b_sn = new bound_snapshot; + b_sn->snapshot = b_db->db->GetSnapshot(); + b_sn->v_db = v_db; + VALUE o_snapshot = Data_Wrap_Struct(klass, bound_snapshot_mark, bound_snapshot_free, b_sn); + + VALUE argv[1]; + argv[0] = v_db; + rb_obj_call_init(o_snapshot, 1, argv); + + return o_snapshot; +} + +static VALUE snapshot_init(VALUE self, VALUE v_db) { + return self; +} + +/* + * call-seq: + * db() + * + * [return] the db that the snapshot references. + */ +static VALUE snapshot_db(VALUE self) { + bound_snapshot* b_sn; + Data_Get_Struct(self, bound_snapshot, b_sn); + return b_sn->v_db; +} + +/* + * call-seq: + * release() + * + * Release the snapshot; after calling this method, the snapshot can still be used, + * but it reads from the current database state. + * + * [return] self. + */ +static VALUE snapshot_release(VALUE self) { + bound_snapshot* b_sn; + Data_Get_Struct(self, bound_snapshot, b_sn); + + if (b_sn->snapshot) { + bound_db* b_db; + Data_Get_Struct(b_sn->v_db, bound_db, b_db); + b_db->db->ReleaseSnapshot(b_sn->snapshot); + b_sn->snapshot = NULL; + } + + return self; +} + +/* + * call-seq: + * released?() + * + * [return] true if the snapshot has been released, false otherwise. + */ +static VALUE snapshot_released(VALUE self) { + bound_snapshot* b_sn; + Data_Get_Struct(self, bound_snapshot, b_sn); + return b_sn->snapshot ? Qfalse : Qtrue; +} + +/* + * call-seq: + * exists?() + * + * [return] true if the key exists in the snapshot of the db, false otherwise. + */ +static VALUE snapshot_exists(VALUE self, VALUE v_key) { + Check_Type(v_key, T_STRING); + + bound_snapshot* b_sn; + Data_Get_Struct(self, bound_snapshot, b_sn); + + leveldb::Slice key = RUBY_STRING_TO_SLICE(v_key); + std::string value; + leveldb::ReadOptions options; + options.snapshot = b_sn->snapshot; + + bound_db* b_db; + Data_Get_Struct(b_sn->v_db, bound_db, b_db); + leveldb::Status status = b_db->db->Get(options, key, &value); + + if(status.IsNotFound()) return Qfalse; + return Qtrue; +} + extern "C" { void Init_leveldb() { k_fill = ID2SYM(rb_intern("fill_cache")); k_verify = ID2SYM(rb_intern("verify_checksums")); + k_snapshot = ID2SYM(rb_intern("snapshot")); k_sync = ID2SYM(rb_intern("sync")); k_from = ID2SYM(rb_intern("from")); k_to = ID2SYM(rb_intern("to")); @@ -689,6 +822,14 @@ void Init_leveldb() { c_db_options = rb_define_class_under(m_leveldb, "Options", rb_cObject); + c_snapshot = rb_define_class_under(m_leveldb, "Snapshot", rb_cObject); + rb_define_singleton_method(c_snapshot, "make", RUBY_METHOD_FUNC(snapshot_make), 1); + rb_define_method(c_snapshot, "initialize", RUBY_METHOD_FUNC(snapshot_init), 1); + rb_define_method(c_snapshot, "db", RUBY_METHOD_FUNC(snapshot_db), 0); + rb_define_method(c_snapshot, "release", RUBY_METHOD_FUNC(snapshot_release), 0); + rb_define_method(c_snapshot, "released?", RUBY_METHOD_FUNC(snapshot_released), 0); + rb_define_method(c_snapshot, "exists?", RUBY_METHOD_FUNC(snapshot_exists), 1); + VALUE m_ctype = rb_define_module_under(m_leveldb, "CompressionType"); VALUE c_base = rb_define_class_under(m_ctype, "Base", rb_cObject); c_no_compression = rb_define_class_under(m_ctype, "NoCompression", c_base); diff --git a/lib/leveldb.rb b/lib/leveldb.rb index 5d6209c..9b949de 100644 --- a/lib/leveldb.rb +++ b/lib/leveldb.rb @@ -59,6 +59,19 @@ def iterator(*args); Iterator.new self, *args end def keys; map { |k, v| k } end def values; map { |k, v| v } end + def snapshot(*args) + sn = Snapshot.new self, *args + if block_given? + begin + yield sn + ensure + sn.release + end + else + sn + end + end + def inspect %(<#{self.class} #{@pathname.inspect}>) end @@ -85,6 +98,38 @@ class << self end end +# Snapshot has the same API as DB, restricted to read access. +class Snapshot + include Enumerable + + def self.new(db) + make db + end + + def each(*args, &block) + i = iterator(*args) + i.each(&block) if block + i + end + + def get(*args) + db.get(*args, snapshot: self) + end + + alias :[] :get + alias :includes? :exists? + alias :contains? :exists? + alias :member? :exists? + + def iterator(*args); db.iterator *args, snapshot: self end + def keys; map { |k, v| k } end + def values; map { |k, v| v } end + + def inspect + %(<#{self.class} #{db.inspect} #{' (released)' if released?}>) + end +end + class Options DEFAULT_MAX_OPEN_FILES = 1000 DEFAULT_WRITE_BUFFER_SIZE = 4 * 1024 * 1024 diff --git a/test/snapshot_test.rb b/test/snapshot_test.rb new file mode 100644 index 0000000..0d40ed1 --- /dev/null +++ b/test/snapshot_test.rb @@ -0,0 +1,75 @@ +require 'test/unit' +require File.expand_path("../../lib/leveldb", __FILE__) +require 'fileutils' + +class SnapshotTest < Test::Unit::TestCase + DB_PATH = "/tmp/snapshot-%s.db" + + attr_reader :db + + def initialize(name) + super + @path = DB_PATH%name + FileUtils.rm_rf @path + @db = LevelDB::DB.new @path + end + + KEYS = %w{ k1 k2 k3 } + + def setup + KEYS.each do |k| + db[k] = "0" + end + end + + def test_get + sn = db.snapshot + KEYS.each do |k| + db[k] = "1" + end + KEYS.each do |k| + assert_equal "0", sn[k] + end + end + + def test_iterator + sn = db.snapshot + KEYS.each do |k| + db[k] = "1" + end + sn.each do |k,v| + assert_equal "0", v + end + assert_equal KEYS, sn.keys + assert_equal ["0"], sn.values.uniq + end + + def test_exists + sn = db.snapshot + db["new"] = "new" + db.delete "k1" + assert_equal false, sn.exists?( "new" ) + assert_equal true, sn.exists?( "k1" ) + assert_equal true, db.exists?( "new" ) + assert_equal false, db.exists?( "k1" ) + end + + def test_release + sn = db.snapshot + KEYS.each do |k| + db[k] = "1" + end + + assert_equal false, sn.released? + sn.release + assert_equal true, sn.released? + + sn.each do |k,v| + assert_equal "1", v + end + end + + def teardown + FileUtils.rm_rf @path + end +end