Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
236 changes: 215 additions & 21 deletions binding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <vector>
#include <mutex>
#include <atomic>
#include <optional>

/**
* Forward declarations.
Expand Down Expand Up @@ -105,6 +106,8 @@ static std::map<std::string, LevelDbHandle> db_handles;
} \
}

#define undefined NULL

/**
* Bit fields.
*/
Expand Down Expand Up @@ -162,6 +165,25 @@ static napi_value CreateCodeError (napi_env env, const char* code, const char* m
return error;
}

static void ThrowError(napi_env env, leveldb::Status status) {
if (status.IsCorruption()) {
napi_throw_error(env, "LEVEL_CORRUPTION", status.ToString().c_str());
} else if (!status.ok()) {
napi_throw_error(env, NULL, status.ToString().c_str());
} else {
napi_throw_error(env, NULL, "Operation failed");
}
}

static napi_value ErrorOrNotFound(napi_env env, leveldb::Status status) {
if (status.IsNotFound()) {
return undefined;
} else {
ThrowError(env, status);
return undefined;
}
}

/**
* Returns true if 'obj' has a property 'key'.
*/
Expand Down Expand Up @@ -354,6 +376,14 @@ static std::vector<std::string> KeyArray (napi_env env, napi_value arr) {
return result;
}

// TODO: use in more places
enum Flags : uint32_t {
FILL_CACHE = 1,
KEY_AS_BUFFER = 2,
VALUE_AS_BUFFER = 4,
SHARED_KEY = 8
};

/**
* Whether to yield entries, keys or values.
*/
Expand Down Expand Up @@ -538,11 +568,13 @@ struct BaseWorker {
struct Database {
Database ()
: db_(NULL),
sharedBuffer_(NULL),
blockCache_(NULL),
filterPolicy_(leveldb::NewBloomFilterPolicy(10)),
resourceSequence_(0),
pendingCloseWorker_(NULL),
ref_(NULL),
sharedBufferRef_(NULL),
priorityWork_(0) {}

~Database () {
Expand Down Expand Up @@ -576,7 +608,7 @@ struct Database {

leveldb::Status Get (const leveldb::ReadOptions& options,
leveldb::Slice key,
std::string& value) {
leveldb::ValueSink& value) {
return db_->Get(options, key, &value);
}

Expand Down Expand Up @@ -635,7 +667,40 @@ struct Database {
return priorityWork_ > 0;
}

bool SetSharedBuffer (napi_env env, napi_value value) {
// Delete reference to previous buffer if any
if (sharedBufferRef_) {
napi_delete_reference(env, sharedBufferRef_);
sharedBufferRef_ = NULL;
}

// Get underlying data (length is separately communicated, on use)
if (napi_get_buffer_info(env, value, (void**)&sharedBuffer_, NULL) != napi_ok) {
sharedBuffer_ = NULL;
return false;
}

// Create reference in order to keep buffer alive
if (napi_create_reference(env, value, 1, &sharedBufferRef_) != napi_ok) {
sharedBufferRef_ = NULL;
return false;
}

return true;
}

void ReleaseReferences (napi_env env) {
if (ref_ != NULL) napi_reference_unref(env, ref_, NULL);
if (sharedBufferRef_ != NULL) napi_reference_unref(env, sharedBufferRef_, NULL);
}

void DeleteReferences (napi_env env) {
if (ref_ != NULL) napi_delete_reference(env, ref_);
if (sharedBufferRef_ != NULL) napi_delete_reference(env, sharedBufferRef_);
}

leveldb::DB* db_;
char* sharedBuffer_;
leveldb::Cache* blockCache_;
const leveldb::FilterPolicy* filterPolicy_;
uint32_t resourceSequence_;
Expand All @@ -644,6 +709,7 @@ struct Database {
napi_ref ref_;

private:
napi_ref sharedBufferRef_;
std::atomic<uint32_t> priorityWork_;
std::string location_;

Expand Down Expand Up @@ -1107,7 +1173,7 @@ static void FinalizeDatabase (napi_env env, void* data, void* hint) {
if (data) {
Database* database = (Database*)data;
napi_remove_env_cleanup_hook(env, env_cleanup_hook, database);
if (database->ref_ != NULL) napi_delete_reference(env, database->ref_);
database->DeleteReferences(env);
delete database;
}
}
Expand Down Expand Up @@ -1233,7 +1299,7 @@ struct CloseWorker final : public BaseWorker {
}

void DoFinally (napi_env env) override {
napi_reference_unref(env, database_->ref_, NULL);
database_->ReleaseReferences(env);
BaseWorker::DoFinally(env);
}
};
Expand Down Expand Up @@ -1318,14 +1384,15 @@ struct GetWorker final : public PriorityWorker {
GetWorker (napi_env env,
Database* database,
napi_deferred deferred,
uint32_t flags,
leveldb::Slice key,
const Encoding encoding,
const bool fillCache,
napi_ref keyRef,
ExplicitSnapshot* snapshot)
: PriorityWorker(env, database, deferred, "classic_level.db.get"),
flags_(flags),
key_(key),
encoding_(encoding) {
options_.fill_cache = fillCache;
keyRef_(keyRef) {
options_.fill_cache = (flags & Flags::FILL_CACHE) != 0;

if (snapshot == NULL) {
implicitSnapshot_ = database->NewSnapshot();
Expand All @@ -1337,54 +1404,178 @@ struct GetWorker final : public PriorityWorker {
}

~GetWorker () {
DisposeSliceBuffer(key_);
if (!keyRef_) DisposeSliceBuffer(key_);
}

void DoExecute () override {
SetStatus(database_->Get(options_, key_, value_));
leveldb::StringValueSink wrapped(&value_);
SetStatus(database_->Get(options_, key_, wrapped));

if (implicitSnapshot_) {
database_->ReleaseSnapshot(implicitSnapshot_);
}
}

void DoFinally (napi_env env) override {
if (keyRef_) napi_delete_reference(env, keyRef_);
PriorityWorker::DoFinally(env);
}

void HandleOKCallback (napi_env env, napi_deferred deferred) override {
napi_value argv;
Entry::Convert(env, &value_, encoding_, argv);

if ((flags_ & Flags::VALUE_AS_BUFFER) != 0) {
napi_create_buffer_copy(env, value_.size(), value_.data(), NULL, &argv);
} else {
napi_create_string_utf8(env, value_.data(), value_.size(), &argv);
}

napi_resolve_deferred(env, deferred, argv);
}

private:
leveldb::ReadOptions options_;
uint32_t flags_;
leveldb::Slice key_;
napi_ref keyRef_;
std::string value_;
const Encoding encoding_;
const leveldb::Snapshot* implicitSnapshot_;
};

/**
* Gets a value from a database.
*/
NAPI_METHOD(db_get) {
NAPI_ARGV(5);
NAPI_METHOD(db_get) {
NAPI_ARGV(4);
NAPI_DB_CONTEXT();
NAPI_PROMISE();

leveldb::Slice key = ToSlice(env, argv[1]);
const Encoding encoding = GetEncoding(env, argv[2]);
const bool fillCache = BooleanValue(env, argv[3], true);
uint32_t flags;
NAPI_STATUS_THROWS(napi_get_value_uint32(env, argv[1], &flags));

ExplicitSnapshot* snapshot = NULL;
napi_get_value_external(env, argv[4], (void**)&snapshot);
napi_get_value_external(env, argv[3], (void**)&snapshot);

GetWorker* worker = new GetWorker(
env, database, deferred, key, encoding, fillCache, snapshot
);
char* keyBuffer;
size_t keySize;
GetWorker* worker;

if ((flags & Flags::KEY_AS_BUFFER) != 0) {
// Instead of copying the memory, create a reference so that it stays valid
napi_ref keyRef;
NAPI_STATUS_THROWS(napi_create_reference(env, argv[2], 1, &keyRef));
NAPI_STATUS_THROWS(napi_get_typedarray_info(env, argv[2], NULL, &keySize, (void**)&keyBuffer, NULL, NULL));
leveldb::Slice keySlice(keyBuffer, keySize);
worker = new GetWorker(env, database, deferred, flags, keySlice, keyRef, snapshot);
} else {
NAPI_STATUS_THROWS(napi_get_value_string_utf8(env, argv[2], NULL, 0, &keySize));
keyBuffer = new char[keySize + 1];
NAPI_STATUS_THROWS(napi_get_value_string_utf8(env, argv[2], keyBuffer, keySize + 1, NULL));
keyBuffer[keySize] = '\0';
leveldb::Slice keySlice(keyBuffer, keySize);

// A null keyRef implies that keyBuffer needs to be deleted after the read
// TODO: solve in a more obvious way like a subclass
worker = new GetWorker(env, database, deferred, flags, keySlice, NULL, snapshot);
}

worker->Queue(env);
return promise;
}

struct NapiValueSink : public leveldb::ValueSink {
NapiValueSink (napi_env env)
: leveldb::ValueSink(),
result(NULL),
env_(env),
status_(napi_generic_failure) {}

virtual ~NapiValueSink() = default;

const bool valid() {
return status_ == napi_ok;
}

napi_value result;

protected:
napi_env env_;
napi_status status_;
};

struct NapiStringValueSink : public NapiValueSink {
NapiStringValueSink (napi_env env)
: NapiValueSink(env) {}

public:
void assign(const char* data, size_t size) override {
status_ = napi_create_string_utf8(env_, data, size, &result);
}
};

struct NapiBufferValueSink : public NapiValueSink {
NapiBufferValueSink (napi_env env)
: NapiValueSink(env) {}

public:
void assign(const char* data, size_t size) override {
status_ = napi_create_buffer_copy(env_, size, data, NULL, &result);
}
};

/**
* Get a value from a database synchronously.
*/
NAPI_METHOD(db_get_sync) {
NAPI_ARGV(4);
NAPI_DB_CONTEXT();

uint32_t flags;
NAPI_STATUS_THROWS(napi_get_value_uint32(env, argv[1], &flags));

std::optional<leveldb::Slice> keySlice;

if ((flags & Flags::SHARED_KEY) != 0) {
uint32_t keySize;
NAPI_STATUS_THROWS(napi_get_value_uint32(env, argv[2], &keySize));
keySlice.emplace(database->sharedBuffer_, keySize);
} else {
char* keyBuffer;
size_t keySize;
NAPI_STATUS_THROWS(napi_get_typedarray_info(env, argv[2], NULL, &keySize, (void**)&keyBuffer, NULL, NULL));
keySlice.emplace(keyBuffer, keySize);
}

ExplicitSnapshot* snapshot = NULL;
napi_get_value_external(env, argv[3], (void**)&snapshot);

leveldb::ReadOptions options;
options.fill_cache = (flags & Flags::FILL_CACHE) != 0;
options.snapshot = snapshot != NULL ? snapshot->nut : NULL;

if ((flags & Flags::VALUE_AS_BUFFER) != 0) {
NapiBufferValueSink valueSink(env);
leveldb::Status status = database->Get(options, *keySlice, valueSink);
return status.ok() && valueSink.valid() ? valueSink.result : ErrorOrNotFound(env, status);
} else {
NapiStringValueSink valueSink(env);
leveldb::Status status = database->Get(options, *keySlice, valueSink);
return status.ok() && valueSink.valid() ? valueSink.result : ErrorOrNotFound(env, status);
}
}

NAPI_METHOD(db_set_shared_buffer) {
NAPI_ARGV(2);
NAPI_DB_CONTEXT();

if (!database->SetSharedBuffer(env, argv[1])) {
napi_throw_error(env, NULL, "SetSharedBuffer failed");
return undefined;
}

return undefined;
}

/**
* Worker class for db.has().
*/
Expand Down Expand Up @@ -1482,7 +1673,8 @@ struct GetManyWorker final : public PriorityWorker {

for (const std::string& key: keys_) {
std::string* value = new std::string();
leveldb::Status status = database_->Get(options_, key, *value);
leveldb::StringValueSink wrapped(value);
leveldb::Status status = database_->Get(options_, key, wrapped);

if (status.ok()) {
cache_.push_back(value);
Expand Down Expand Up @@ -2424,10 +2616,12 @@ NAPI_METHOD(snapshot_close) {
*/
NAPI_INIT() {
NAPI_EXPORT_FUNCTION(db_init);
NAPI_EXPORT_FUNCTION(db_set_shared_buffer)
NAPI_EXPORT_FUNCTION(db_open);
NAPI_EXPORT_FUNCTION(db_close);
NAPI_EXPORT_FUNCTION(db_put);
NAPI_EXPORT_FUNCTION(db_get);
NAPI_EXPORT_FUNCTION(db_get_sync);
NAPI_EXPORT_FUNCTION(db_get_many);
NAPI_EXPORT_FUNCTION(db_has);
NAPI_EXPORT_FUNCTION(db_has_many);
Expand Down
2 changes: 1 addition & 1 deletion deps/leveldb/leveldb-1.20/db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1108,7 +1108,7 @@ int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {

Status DBImpl::Get(const ReadOptions& options,
const Slice& key,
std::string* value) {
ValueSink* value) {
Status s;
MutexLock l(&mutex_);
SequenceNumber snapshot;
Expand Down
Loading