Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 142 additions & 1 deletion ext/leveldb/leveldb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ static VALUE c_no_compression;
static VALUE c_snappy_compression;
static VALUE k_fill;
static VALUE k_verify;
static VALUE k_snapshot;
static VALUE k_sync;
static VALUE k_from;
static VALUE k_to;
Expand All @@ -27,6 +28,7 @@ static ID k_to_s;
static leveldb::ReadOptions uncached_read_options;

static VALUE c_db_options;
static VALUE c_snapshot;
static VALUE k_create_if_missing;
static VALUE k_error_if_exists;
static VALUE k_paranoid_checks;
Expand Down Expand Up @@ -54,6 +56,11 @@ typedef struct bound_db {
leveldb::DB* db;
} bound_db;

typedef struct bound_snapshot {
const leveldb::Snapshot* snapshot;
VALUE v_db;
} bound_snapshot;

static void db_free(bound_db* db) {
if(db->db != NULL) {
delete db->db;
Expand Down Expand Up @@ -233,9 +240,16 @@ static leveldb::ReadOptions parse_read_options(VALUE options) {

VALUE v_fill = rb_hash_aref(options, k_fill);
VALUE v_verify = rb_hash_aref(options, k_verify);
VALUE v_snapshot = rb_hash_aref(options, k_snapshot);

if(!NIL_P(v_fill)) readOptions.fill_cache = RTEST(v_fill);
if(!NIL_P(v_verify)) readOptions.verify_checksums = RTEST(v_verify);

if(!NIL_P(v_snapshot)) {
bound_snapshot* sn;
Data_Get_Struct(v_snapshot, bound_snapshot, sn);
readOptions.snapshot = sn->snapshot;
}
}

return readOptions;
Expand Down Expand Up @@ -274,6 +288,9 @@ static leveldb::WriteOptions parse_write_options(VALUE options) {
* verified against corresponding checksums.
*
* Default: false
* [options[ :snapshot ]] If value is a Snapshot instance, read from that version of DB.
*
* Default: nil
* [return] value of stored db
*/
static VALUE db_get(int argc, VALUE* argv, VALUE self) {
Expand Down Expand Up @@ -416,11 +433,14 @@ static VALUE iter_make(VALUE klass, VALUE db, VALUE options) {
bound_db* b_db;
Data_Get_Struct(db, bound_db, b_db);

leveldb::ReadOptions read_options = parse_read_options(options);
read_options.fill_cache = false;

current_iteration* iter = new current_iteration;
iter->passed_limit = false;
iter->check_limit = false;
iter->checked_valid = 0;
iter->iterator = b_db->db->NewIterator(uncached_read_options);
iter->iterator = b_db->db->NewIterator(read_options);

VALUE o_iter = Data_Wrap_Struct(klass, NULL, current_iteration_free, iter);

Expand Down Expand Up @@ -636,10 +656,123 @@ static VALUE db_batch(int argc, VALUE* argv, VALUE self) {
return Qtrue;
}

static void bound_snapshot_mark(bound_snapshot* b_sn) {
rb_gc_mark(b_sn->v_db);
}

static void bound_snapshot_free(bound_snapshot* b_sn) {
if (b_sn->snapshot && rb_during_gc()) {
bound_db* b_db;
Data_Get_Struct(b_sn->v_db, bound_db, b_db);
b_db->db->ReleaseSnapshot(b_sn->snapshot);
}
// If not rb_during_gc, then ruby vm is finalizing, and db either has been freed
// (in which case we can't call ReleaseSnapshot) or is about to be freed (in which
// case we don't need to).
delete b_sn;
}

static VALUE snapshot_make(VALUE klass, VALUE v_db) {
if (c_db != rb_funcall(v_db, k_class, 0)) {
rb_raise(rb_eArgError, "db must be a LevelDB::DB");
}

bound_db* b_db;
Data_Get_Struct(v_db, bound_db, b_db);

bound_snapshot* b_sn = new bound_snapshot;
b_sn->snapshot = b_db->db->GetSnapshot();
b_sn->v_db = v_db;
VALUE o_snapshot = Data_Wrap_Struct(klass, bound_snapshot_mark, bound_snapshot_free, b_sn);

VALUE argv[1];
argv[0] = v_db;
rb_obj_call_init(o_snapshot, 1, argv);

return o_snapshot;
}

static VALUE snapshot_init(VALUE self, VALUE v_db) {
return self;
}

/*
* call-seq:
* db()
*
* [return] the db that the snapshot references.
*/
static VALUE snapshot_db(VALUE self) {
bound_snapshot* b_sn;
Data_Get_Struct(self, bound_snapshot, b_sn);
return b_sn->v_db;
}

/*
* call-seq:
* release()
*
* Release the snapshot; after calling this method, the snapshot can still be used,
* but it reads from the current database state.
*
* [return] self.
*/
static VALUE snapshot_release(VALUE self) {
bound_snapshot* b_sn;
Data_Get_Struct(self, bound_snapshot, b_sn);

if (b_sn->snapshot) {
bound_db* b_db;
Data_Get_Struct(b_sn->v_db, bound_db, b_db);
b_db->db->ReleaseSnapshot(b_sn->snapshot);
b_sn->snapshot = NULL;
}

return self;
}

/*
* call-seq:
* released?()
*
* [return] true if the snapshot has been released, false otherwise.
*/
static VALUE snapshot_released(VALUE self) {
bound_snapshot* b_sn;
Data_Get_Struct(self, bound_snapshot, b_sn);
return b_sn->snapshot ? Qfalse : Qtrue;
}

/*
* call-seq:
* exists?()
*
* [return] true if the key exists in the snapshot of the db, false otherwise.
*/
static VALUE snapshot_exists(VALUE self, VALUE v_key) {
Check_Type(v_key, T_STRING);

bound_snapshot* b_sn;
Data_Get_Struct(self, bound_snapshot, b_sn);

leveldb::Slice key = RUBY_STRING_TO_SLICE(v_key);
std::string value;
leveldb::ReadOptions options;
options.snapshot = b_sn->snapshot;

bound_db* b_db;
Data_Get_Struct(b_sn->v_db, bound_db, b_db);
leveldb::Status status = b_db->db->Get(options, key, &value);

if(status.IsNotFound()) return Qfalse;
return Qtrue;
}

extern "C" {
void Init_leveldb() {
k_fill = ID2SYM(rb_intern("fill_cache"));
k_verify = ID2SYM(rb_intern("verify_checksums"));
k_snapshot = ID2SYM(rb_intern("snapshot"));
k_sync = ID2SYM(rb_intern("sync"));
k_from = ID2SYM(rb_intern("from"));
k_to = ID2SYM(rb_intern("to"));
Expand Down Expand Up @@ -689,6 +822,14 @@ void Init_leveldb() {

c_db_options = rb_define_class_under(m_leveldb, "Options", rb_cObject);

c_snapshot = rb_define_class_under(m_leveldb, "Snapshot", rb_cObject);
rb_define_singleton_method(c_snapshot, "make", RUBY_METHOD_FUNC(snapshot_make), 1);
rb_define_method(c_snapshot, "initialize", RUBY_METHOD_FUNC(snapshot_init), 1);
rb_define_method(c_snapshot, "db", RUBY_METHOD_FUNC(snapshot_db), 0);
rb_define_method(c_snapshot, "release", RUBY_METHOD_FUNC(snapshot_release), 0);
rb_define_method(c_snapshot, "released?", RUBY_METHOD_FUNC(snapshot_released), 0);
rb_define_method(c_snapshot, "exists?", RUBY_METHOD_FUNC(snapshot_exists), 1);

VALUE m_ctype = rb_define_module_under(m_leveldb, "CompressionType");
VALUE c_base = rb_define_class_under(m_ctype, "Base", rb_cObject);
c_no_compression = rb_define_class_under(m_ctype, "NoCompression", c_base);
Expand Down
45 changes: 45 additions & 0 deletions lib/leveldb.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,19 @@ def iterator(*args); Iterator.new self, *args end
def keys; map { |k, v| k } end
def values; map { |k, v| v } end

def snapshot(*args)
sn = Snapshot.new self, *args
if block_given?
begin
yield sn
ensure
sn.release
end
else
sn
end
end

def inspect
%(<#{self.class} #{@pathname.inspect}>)
end
Expand All @@ -85,6 +98,38 @@ class << self
end
end

# Snapshot has the same API as DB, restricted to read access.
class Snapshot
include Enumerable

def self.new(db)
make db
end

def each(*args, &block)
i = iterator(*args)
i.each(&block) if block
i
end

def get(*args)
db.get(*args, snapshot: self)
end

alias :[] :get
alias :includes? :exists?
alias :contains? :exists?
alias :member? :exists?

def iterator(*args); db.iterator *args, snapshot: self end
def keys; map { |k, v| k } end
def values; map { |k, v| v } end

def inspect
%(<#{self.class} #{db.inspect} #{' (released)' if released?}>)
end
end

class Options
DEFAULT_MAX_OPEN_FILES = 1000
DEFAULT_WRITE_BUFFER_SIZE = 4 * 1024 * 1024
Expand Down
75 changes: 75 additions & 0 deletions test/snapshot_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
require 'test/unit'
require File.expand_path("../../lib/leveldb", __FILE__)
require 'fileutils'

class SnapshotTest < Test::Unit::TestCase
DB_PATH = "/tmp/snapshot-%s.db"

attr_reader :db

def initialize(name)
super
@path = DB_PATH%name
FileUtils.rm_rf @path
@db = LevelDB::DB.new @path
end

KEYS = %w{ k1 k2 k3 }

def setup
KEYS.each do |k|
db[k] = "0"
end
end

def test_get
sn = db.snapshot
KEYS.each do |k|
db[k] = "1"
end
KEYS.each do |k|
assert_equal "0", sn[k]
end
end

def test_iterator
sn = db.snapshot
KEYS.each do |k|
db[k] = "1"
end
sn.each do |k,v|
assert_equal "0", v
end
assert_equal KEYS, sn.keys
assert_equal ["0"], sn.values.uniq
end

def test_exists
sn = db.snapshot
db["new"] = "new"
db.delete "k1"
assert_equal false, sn.exists?( "new" )
assert_equal true, sn.exists?( "k1" )
assert_equal true, db.exists?( "new" )
assert_equal false, db.exists?( "k1" )
end

def test_release
sn = db.snapshot
KEYS.each do |k|
db[k] = "1"
end

assert_equal false, sn.released?
sn.release
assert_equal true, sn.released?

sn.each do |k,v|
assert_equal "1", v
end
end

def teardown
FileUtils.rm_rf @path
end
end