Skip to content

Commit 9b0367b

Browse files
committed
Add db.getSync() method
Also optimizes `db.get()` to avoid a copy of the key if it's a buffer. Relies on a patch to LevelDB that replaces use of `std::string` with a new ValueSink struct that gives us more control over the memory. Ref: Level/community#144 Category: addition
1 parent 6aeb739 commit 9b0367b

File tree

14 files changed

+539
-40
lines changed

14 files changed

+539
-40
lines changed

binding.cc

Lines changed: 215 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include <vector>
1414
#include <mutex>
1515
#include <atomic>
16+
#include <optional>
1617

1718
/**
1819
* Forward declarations.
@@ -105,6 +106,8 @@ static std::map<std::string, LevelDbHandle> db_handles;
105106
} \
106107
}
107108

109+
#define undefined NULL
110+
108111
/**
109112
* Bit fields.
110113
*/
@@ -162,6 +165,25 @@ static napi_value CreateCodeError (napi_env env, const char* code, const char* m
162165
return error;
163166
}
164167

168+
static void ThrowError(napi_env env, leveldb::Status status) {
169+
if (status.IsCorruption()) {
170+
napi_throw_error(env, "LEVEL_CORRUPTION", status.ToString().c_str());
171+
} else if (!status.ok()) {
172+
napi_throw_error(env, NULL, status.ToString().c_str());
173+
} else {
174+
napi_throw_error(env, NULL, "Operation failed");
175+
}
176+
}
177+
178+
static napi_value ErrorOrNotFound(napi_env env, leveldb::Status status) {
179+
if (status.IsNotFound()) {
180+
return undefined;
181+
} else {
182+
ThrowError(env, status);
183+
return undefined;
184+
}
185+
}
186+
165187
/**
166188
* Returns true if 'obj' has a property 'key'.
167189
*/
@@ -354,6 +376,14 @@ static std::vector<std::string> KeyArray (napi_env env, napi_value arr) {
354376
return result;
355377
}
356378

379+
// TODO: use in more places
380+
enum Flags : uint32_t {
381+
FILL_CACHE = 1,
382+
KEY_AS_BUFFER = 2,
383+
VALUE_AS_BUFFER = 4,
384+
SHARED_KEY = 8
385+
};
386+
357387
/**
358388
* Whether to yield entries, keys or values.
359389
*/
@@ -538,11 +568,13 @@ struct BaseWorker {
538568
struct Database {
539569
Database ()
540570
: db_(NULL),
571+
sharedBuffer_(NULL),
541572
blockCache_(NULL),
542573
filterPolicy_(leveldb::NewBloomFilterPolicy(10)),
543574
resourceSequence_(0),
544575
pendingCloseWorker_(NULL),
545576
ref_(NULL),
577+
sharedBufferRef_(NULL),
546578
priorityWork_(0) {}
547579

548580
~Database () {
@@ -576,7 +608,7 @@ struct Database {
576608

577609
leveldb::Status Get (const leveldb::ReadOptions& options,
578610
leveldb::Slice key,
579-
std::string& value) {
611+
leveldb::ValueSink& value) {
580612
return db_->Get(options, key, &value);
581613
}
582614

@@ -635,7 +667,40 @@ struct Database {
635667
return priorityWork_ > 0;
636668
}
637669

670+
bool SetSharedBuffer (napi_env env, napi_value value) {
671+
// Delete reference to previous buffer if any
672+
if (sharedBufferRef_) {
673+
napi_delete_reference(env, sharedBufferRef_);
674+
sharedBufferRef_ = NULL;
675+
}
676+
677+
// Get underlying data (length is separately communicated, on use)
678+
if (napi_get_buffer_info(env, value, (void**)&sharedBuffer_, NULL) != napi_ok) {
679+
sharedBuffer_ = NULL;
680+
return false;
681+
}
682+
683+
// Create reference in order to keep buffer alive
684+
if (napi_create_reference(env, value, 1, &sharedBufferRef_) != napi_ok) {
685+
sharedBufferRef_ = NULL;
686+
return false;
687+
}
688+
689+
return true;
690+
}
691+
692+
void ReleaseReferences (napi_env env) {
693+
if (ref_ != NULL) napi_reference_unref(env, ref_, NULL);
694+
if (sharedBufferRef_ != NULL) napi_reference_unref(env, sharedBufferRef_, NULL);
695+
}
696+
697+
void DeleteReferences (napi_env env) {
698+
if (ref_ != NULL) napi_delete_reference(env, ref_);
699+
if (sharedBufferRef_ != NULL) napi_delete_reference(env, sharedBufferRef_);
700+
}
701+
638702
leveldb::DB* db_;
703+
char* sharedBuffer_;
639704
leveldb::Cache* blockCache_;
640705
const leveldb::FilterPolicy* filterPolicy_;
641706
uint32_t resourceSequence_;
@@ -644,6 +709,7 @@ struct Database {
644709
napi_ref ref_;
645710

646711
private:
712+
napi_ref sharedBufferRef_;
647713
std::atomic<uint32_t> priorityWork_;
648714
std::string location_;
649715

@@ -1107,7 +1173,7 @@ static void FinalizeDatabase (napi_env env, void* data, void* hint) {
11071173
if (data) {
11081174
Database* database = (Database*)data;
11091175
napi_remove_env_cleanup_hook(env, env_cleanup_hook, database);
1110-
if (database->ref_ != NULL) napi_delete_reference(env, database->ref_);
1176+
database->DeleteReferences(env);
11111177
delete database;
11121178
}
11131179
}
@@ -1233,7 +1299,7 @@ struct CloseWorker final : public BaseWorker {
12331299
}
12341300

12351301
void DoFinally (napi_env env) override {
1236-
napi_reference_unref(env, database_->ref_, NULL);
1302+
database_->ReleaseReferences(env);
12371303
BaseWorker::DoFinally(env);
12381304
}
12391305
};
@@ -1318,14 +1384,15 @@ struct GetWorker final : public PriorityWorker {
13181384
GetWorker (napi_env env,
13191385
Database* database,
13201386
napi_deferred deferred,
1387+
uint32_t flags,
13211388
leveldb::Slice key,
1322-
const Encoding encoding,
1323-
const bool fillCache,
1389+
napi_ref keyRef,
13241390
ExplicitSnapshot* snapshot)
13251391
: PriorityWorker(env, database, deferred, "classic_level.db.get"),
1392+
flags_(flags),
13261393
key_(key),
1327-
encoding_(encoding) {
1328-
options_.fill_cache = fillCache;
1394+
keyRef_(keyRef) {
1395+
options_.fill_cache = (flags & Flags::FILL_CACHE) != 0;
13291396

13301397
if (snapshot == NULL) {
13311398
implicitSnapshot_ = database->NewSnapshot();
@@ -1337,54 +1404,178 @@ struct GetWorker final : public PriorityWorker {
13371404
}
13381405

13391406
~GetWorker () {
1340-
DisposeSliceBuffer(key_);
1407+
if (!keyRef_) DisposeSliceBuffer(key_);
13411408
}
13421409

13431410
void DoExecute () override {
1344-
SetStatus(database_->Get(options_, key_, value_));
1411+
leveldb::StringValueSink wrapped(&value_);
1412+
SetStatus(database_->Get(options_, key_, wrapped));
13451413

13461414
if (implicitSnapshot_) {
13471415
database_->ReleaseSnapshot(implicitSnapshot_);
13481416
}
13491417
}
13501418

1419+
void DoFinally (napi_env env) override {
1420+
if (keyRef_) napi_delete_reference(env, keyRef_);
1421+
PriorityWorker::DoFinally(env);
1422+
}
1423+
13511424
void HandleOKCallback (napi_env env, napi_deferred deferred) override {
13521425
napi_value argv;
1353-
Entry::Convert(env, &value_, encoding_, argv);
1426+
1427+
if ((flags_ & Flags::VALUE_AS_BUFFER) != 0) {
1428+
napi_create_buffer_copy(env, value_.size(), value_.data(), NULL, &argv);
1429+
} else {
1430+
napi_create_string_utf8(env, value_.data(), value_.size(), &argv);
1431+
}
1432+
13541433
napi_resolve_deferred(env, deferred, argv);
13551434
}
13561435

13571436
private:
13581437
leveldb::ReadOptions options_;
1438+
uint32_t flags_;
13591439
leveldb::Slice key_;
1440+
napi_ref keyRef_;
13601441
std::string value_;
1361-
const Encoding encoding_;
13621442
const leveldb::Snapshot* implicitSnapshot_;
13631443
};
13641444

13651445
/**
13661446
* Gets a value from a database.
13671447
*/
1368-
NAPI_METHOD(db_get) {
1369-
NAPI_ARGV(5);
1448+
NAPI_METHOD(db_get) {
1449+
NAPI_ARGV(4);
13701450
NAPI_DB_CONTEXT();
13711451
NAPI_PROMISE();
13721452

1373-
leveldb::Slice key = ToSlice(env, argv[1]);
1374-
const Encoding encoding = GetEncoding(env, argv[2]);
1375-
const bool fillCache = BooleanValue(env, argv[3], true);
1453+
uint32_t flags;
1454+
NAPI_STATUS_THROWS(napi_get_value_uint32(env, argv[1], &flags));
13761455

13771456
ExplicitSnapshot* snapshot = NULL;
1378-
napi_get_value_external(env, argv[4], (void**)&snapshot);
1457+
napi_get_value_external(env, argv[3], (void**)&snapshot);
13791458

1380-
GetWorker* worker = new GetWorker(
1381-
env, database, deferred, key, encoding, fillCache, snapshot
1382-
);
1459+
char* keyBuffer;
1460+
size_t keySize;
1461+
GetWorker* worker;
1462+
1463+
if ((flags & Flags::KEY_AS_BUFFER) != 0) {
1464+
// Instead of copying the memory, create a reference so that it stays valid
1465+
napi_ref keyRef;
1466+
NAPI_STATUS_THROWS(napi_create_reference(env, argv[2], 1, &keyRef));
1467+
NAPI_STATUS_THROWS(napi_get_typedarray_info(env, argv[2], NULL, &keySize, (void**)&keyBuffer, NULL, NULL));
1468+
leveldb::Slice keySlice(keyBuffer, keySize);
1469+
worker = new GetWorker(env, database, deferred, flags, keySlice, keyRef, snapshot);
1470+
} else {
1471+
NAPI_STATUS_THROWS(napi_get_value_string_utf8(env, argv[2], NULL, 0, &keySize));
1472+
keyBuffer = new char[keySize + 1];
1473+
NAPI_STATUS_THROWS(napi_get_value_string_utf8(env, argv[2], keyBuffer, keySize + 1, NULL));
1474+
keyBuffer[keySize] = '\0';
1475+
leveldb::Slice keySlice(keyBuffer, keySize);
1476+
1477+
// A null keyRef implies that keyBuffer needs to be deleted after the read
1478+
// TODO: solve in a more obvious way like a subclass
1479+
worker = new GetWorker(env, database, deferred, flags, keySlice, NULL, snapshot);
1480+
}
13831481

13841482
worker->Queue(env);
13851483
return promise;
13861484
}
13871485

1486+
struct NapiValueSink : public leveldb::ValueSink {
1487+
NapiValueSink (napi_env env)
1488+
: leveldb::ValueSink(),
1489+
result(NULL),
1490+
env_(env),
1491+
status_(napi_generic_failure) {}
1492+
1493+
virtual ~NapiValueSink() = default;
1494+
1495+
const bool valid() {
1496+
return status_ == napi_ok;
1497+
}
1498+
1499+
napi_value result;
1500+
1501+
protected:
1502+
napi_env env_;
1503+
napi_status status_;
1504+
};
1505+
1506+
struct NapiStringValueSink : public NapiValueSink {
1507+
NapiStringValueSink (napi_env env)
1508+
: NapiValueSink(env) {}
1509+
1510+
public:
1511+
void assign(const char* data, size_t size) override {
1512+
status_ = napi_create_string_utf8(env_, data, size, &result);
1513+
}
1514+
};
1515+
1516+
struct NapiBufferValueSink : public NapiValueSink {
1517+
NapiBufferValueSink (napi_env env)
1518+
: NapiValueSink(env) {}
1519+
1520+
public:
1521+
void assign(const char* data, size_t size) override {
1522+
status_ = napi_create_buffer_copy(env_, size, data, NULL, &result);
1523+
}
1524+
};
1525+
1526+
/**
1527+
* Get a value from a database synchronously.
1528+
*/
1529+
NAPI_METHOD(db_get_sync) {
1530+
NAPI_ARGV(4);
1531+
NAPI_DB_CONTEXT();
1532+
1533+
uint32_t flags;
1534+
NAPI_STATUS_THROWS(napi_get_value_uint32(env, argv[1], &flags));
1535+
1536+
std::optional<leveldb::Slice> keySlice;
1537+
1538+
if ((flags & Flags::SHARED_KEY) != 0) {
1539+
uint32_t keySize;
1540+
NAPI_STATUS_THROWS(napi_get_value_uint32(env, argv[2], &keySize));
1541+
keySlice.emplace(database->sharedBuffer_, keySize);
1542+
} else {
1543+
char* keyBuffer;
1544+
size_t keySize;
1545+
NAPI_STATUS_THROWS(napi_get_typedarray_info(env, argv[2], NULL, &keySize, (void**)&keyBuffer, NULL, NULL));
1546+
keySlice.emplace(keyBuffer, keySize);
1547+
}
1548+
1549+
ExplicitSnapshot* snapshot = NULL;
1550+
napi_get_value_external(env, argv[3], (void**)&snapshot);
1551+
1552+
leveldb::ReadOptions options;
1553+
options.fill_cache = (flags & Flags::FILL_CACHE) != 0;
1554+
options.snapshot = snapshot != NULL ? snapshot->nut : NULL;
1555+
1556+
if ((flags & Flags::VALUE_AS_BUFFER) != 0) {
1557+
NapiBufferValueSink valueSink(env);
1558+
leveldb::Status status = database->Get(options, *keySlice, valueSink);
1559+
return status.ok() && valueSink.valid() ? valueSink.result : ErrorOrNotFound(env, status);
1560+
} else {
1561+
NapiStringValueSink valueSink(env);
1562+
leveldb::Status status = database->Get(options, *keySlice, valueSink);
1563+
return status.ok() && valueSink.valid() ? valueSink.result : ErrorOrNotFound(env, status);
1564+
}
1565+
}
1566+
1567+
NAPI_METHOD(db_set_shared_buffer) {
1568+
NAPI_ARGV(2);
1569+
NAPI_DB_CONTEXT();
1570+
1571+
if (!database->SetSharedBuffer(env, argv[1])) {
1572+
napi_throw_error(env, NULL, "SetSharedBuffer failed");
1573+
return undefined;
1574+
}
1575+
1576+
return undefined;
1577+
}
1578+
13881579
/**
13891580
* Worker class for db.has().
13901581
*/
@@ -1482,7 +1673,8 @@ struct GetManyWorker final : public PriorityWorker {
14821673

14831674
for (const std::string& key: keys_) {
14841675
std::string* value = new std::string();
1485-
leveldb::Status status = database_->Get(options_, key, *value);
1676+
leveldb::StringValueSink wrapped(value);
1677+
leveldb::Status status = database_->Get(options_, key, wrapped);
14861678

14871679
if (status.ok()) {
14881680
cache_.push_back(value);
@@ -2424,10 +2616,12 @@ NAPI_METHOD(snapshot_close) {
24242616
*/
24252617
NAPI_INIT() {
24262618
NAPI_EXPORT_FUNCTION(db_init);
2619+
NAPI_EXPORT_FUNCTION(db_set_shared_buffer)
24272620
NAPI_EXPORT_FUNCTION(db_open);
24282621
NAPI_EXPORT_FUNCTION(db_close);
24292622
NAPI_EXPORT_FUNCTION(db_put);
24302623
NAPI_EXPORT_FUNCTION(db_get);
2624+
NAPI_EXPORT_FUNCTION(db_get_sync);
24312625
NAPI_EXPORT_FUNCTION(db_get_many);
24322626
NAPI_EXPORT_FUNCTION(db_has);
24332627
NAPI_EXPORT_FUNCTION(db_has_many);

deps/leveldb/leveldb-1.20/db/db_impl.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1108,7 +1108,7 @@ int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
11081108

11091109
Status DBImpl::Get(const ReadOptions& options,
11101110
const Slice& key,
1111-
std::string* value) {
1111+
ValueSink* value) {
11121112
Status s;
11131113
MutexLock l(&mutex_);
11141114
SequenceNumber snapshot;

0 commit comments

Comments
 (0)