Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 27 additions & 8 deletions src/binding/transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,31 +206,41 @@ napi_value Transaction::Commit(napi_env env, napi_callback_info info) {
state->status = rocksdb::Status::Aborted("Database closed during transaction commit operation");
} else {
auto descriptor = txnHandle->dbHandle->descriptor;
LogPosition committedPosition; // To record the log position of the committed transaction, for tracking of visible commits available in transaction log

if (txnHandle->logEntryBatch) {
DEBUG_LOG("%p Transaction::Commit Committing log entries for transaction %u\n",
txnHandle.get(), txnHandle->id);
auto store = txnHandle->boundLogStore.lock();
if (store) {
// write the batch to the store
store->writeBatch(*txnHandle->logEntryBatch, committedPosition);
store->writeBatch(*txnHandle->logEntryBatch, txnHandle->committedPosition);
} else {
DEBUG_LOG("%p Transaction::Commit ERROR: Log store not found for transaction %u\n", txnHandle.get(), txnHandle->id);
state->status = rocksdb::Status::Aborted("Log store not found for transaction");
}
}

state->status = txnHandle->txn->Commit();
if (txnHandle->logEntryBatch) {
if (txnHandle->committedPosition.logSequenceNumber > 0 && !state->status.IsBusy()) {
auto store = txnHandle->boundLogStore.lock();
store->commitFinished(committedPosition, descriptor->db->GetLatestSequenceNumber());
if (store) {
store->commitFinished(txnHandle->committedPosition, descriptor->db->GetLatestSequenceNumber());
} else {
DEBUG_LOG("%p Transaction::Commit ERROR: Log store not found for transaction, log number: %u id: %u\n", txnHandle.get(), txnHandle->committedPosition.logSequenceNumber, txnHandle->id);
state->status = rocksdb::Status::Aborted("Log store not found for transaction");
}
}

if (state->status.ok()) {
DEBUG_LOG("%p Transaction::Commit Emitted committed event (txnId=%u)\n", txnHandle.get(), txnHandle->id);
txnHandle->state = TransactionState::Committed;
descriptor->notify("committed", nullptr);
} else if (state->status.IsBusy()) {
// clear/delete the previous transaction and create a new transaction so that it can be retried
txnHandle->txn->ClearSnapshot();
delete txnHandle->txn;
txnHandle->logEntryBatch = nullptr;
txnHandle->createTransaction();
}
}
// signal that execute handler is complete
Expand Down Expand Up @@ -294,13 +304,12 @@ napi_value Transaction::CommitSync(napi_env env, napi_callback_info info) {
}
(*txnHandle)->state = TransactionState::Committing;

LogPosition committedPosition; // To record the log position of the committed transaction, for tracking of visible commits available in transaction log
if ((*txnHandle)->logEntryBatch) {
DEBUG_LOG("%p Transaction::CommitSync Committing log entries for transaction %u\n",
(*txnHandle).get(), (*txnHandle)->id);
auto store = (*txnHandle)->boundLogStore.lock();
if (store) {
store->writeBatch(*(*txnHandle)->logEntryBatch, committedPosition);
store->writeBatch(*(*txnHandle)->logEntryBatch, (*txnHandle)->committedPosition);
} else {
DEBUG_LOG("%p Transaction::CommitSync ERROR: Log store not found for transaction %u\n", (*txnHandle).get(), (*txnHandle)->id);
NAPI_THROW_JS_ERROR("ERR_LOG_STORE_NOT_FOUND", "Log store not found for transaction");
Expand All @@ -309,10 +318,13 @@ napi_value Transaction::CommitSync(napi_env env, napi_callback_info info) {

rocksdb::Status status = (*txnHandle)->txn->Commit();

if ((*txnHandle)->logEntryBatch) {
if ((*txnHandle)->committedPosition.logSequenceNumber > 0 && !status.IsBusy()) {
auto store = (*txnHandle)->boundLogStore.lock();
if (store) {
store->commitFinished(committedPosition, (*txnHandle)->dbHandle->descriptor->db->GetLatestSequenceNumber());
store->commitFinished((*txnHandle)->committedPosition, (*txnHandle)->dbHandle->descriptor->db->GetLatestSequenceNumber());
} else {
DEBUG_LOG("%p Transaction::Commit ERROR: Log store not found for transaction, log number: %u id: %u\n", txnHandle.get(), txnHandle->committedPosition.logSequenceNumber, txnHandle->id);
status = rocksdb::Status::Aborted("Log store not found for transaction");
}
}

Expand All @@ -324,6 +336,13 @@ napi_value Transaction::CommitSync(napi_env env, napi_callback_info info) {
DEBUG_LOG("%p Transaction::CommitSync Closing transaction (txnId=%u)\n", (*txnHandle).get(), (*txnHandle)->id);
(*txnHandle)->close();
} else {
if (status.IsBusy()) {
// clear/delete the previous transaction and create a new transaction so that it can be retried
(*txnHandle)->txn->ClearSnapshot();
delete (*txnHandle)->txn;
(*txnHandle)->logEntryBatch = nullptr;
(*txnHandle)->createTransaction();
}
(*txnHandle)->state = TransactionState::Pending;
napi_value error;
ROCKSDB_CREATE_ERROR_LIKE_VOID(error, status, "Transaction commit failed");
Expand Down
17 changes: 12 additions & 5 deletions src/binding/transaction_handle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,21 @@ TransactionHandle::TransactionHandle(
disableSnapshot(disableSnapshot),
snapshotSet(false),
state(TransactionState::Pending),
txn(nullptr)
{
txn(nullptr),
committedPosition(0, 0) {
this->createTransaction();
this->id = this->dbHandle->descriptor->transactionGetNextId();

this->startTimestamp = rocksdb_js::getMonotonicTimestamp();
}

void TransactionHandle::createTransaction(){
this->snapshotSet = false; // snapshot flag so it will be reapplied

rocksdb::WriteOptions writeOptions;
writeOptions.disableWAL = dbHandle->disableWAL;

auto dbHandle = this->dbHandle;
if (dbHandle->descriptor->mode == DBMode::Pessimistic) {
auto* tdb = static_cast<rocksdb::TransactionDB*>(dbHandle->descriptor->db.get());
rocksdb::TransactionOptions txnOptions;
Expand All @@ -40,9 +50,6 @@ TransactionHandle::TransactionHandle(
throw std::runtime_error("Invalid database");
}

this->id = this->dbHandle->descriptor->transactionGetNextId();

this->startTimestamp = rocksdb_js::getMonotonicTimestamp();
}

/**
Expand Down
8 changes: 8 additions & 0 deletions src/binding/transaction_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ struct TransactionHandle final : Closable, AsyncWorkHandle, std::enable_shared_f
*/
std::weak_ptr<TransactionLogStore> boundLogStore;

/**
* The position of the beginning of the log entries that were written for this transaction.
* This is used for tracking of visible commits available in transaction log, once the transaction is successfully committed.
*/
LogPosition committedPosition;

TransactionHandle(
std::shared_ptr<DBHandle> dbHandle,
napi_env env,
Expand All @@ -108,6 +114,8 @@ struct TransactionHandle final : Closable, AsyncWorkHandle, std::enable_shared_f
);
~TransactionHandle();

void createTransaction();

void addLogEntry(std::unique_ptr<TransactionLogEntry> entry);

void close() override;
Expand Down
3 changes: 3 additions & 0 deletions test/load-rocks.cjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
let start = performance.now();
require('..');
console.log(performance.now() - start);
42 changes: 41 additions & 1 deletion test/transaction-log.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { join } from 'node:path';
import { setTimeout as delay } from 'node:timers/promises';
import { Worker } from 'node:worker_threads';
import { describe, expect, it } from 'vitest';
import { RocksDatabase } from '../src/index.js';
import { RocksDatabase, Transaction } from '../src/index.js';
import { constants, type TransactionLog } from '../src/load-binding.js';
import { parseTransactionLog } from '../src/parse-transaction-log.js';
import { withResolvers } from '../src/util.js';
Expand Down Expand Up @@ -126,6 +126,46 @@ describe('Transaction Log', () => {
}));
});

describe('Transaction log visibility after commits', () => {
it('Should not treat transaction logs as visible until successfully committed', () =>
dbRunner(async ({ db }) => {
const log = db.useLog('foo');
db.putSync('key1', 'value1');
let firstTransactionCompletions: Promise<void>[] = [];
let fullTransactionCompletions: Promise<void>[] = [];
for (let i = 0; i < 3; i++) {
let transaction = new Transaction(db.store);
let firstTxnCommit = (async () => {
db.getSync('key1', { transaction });
const value = Buffer.alloc(10, i.toString());
log.addEntry(value, transaction.id);
await delay(10);
// should be a conflicted write
db.putSync('key1', 'updated' + i, { transaction });
await transaction.commit();
})();
firstTransactionCompletions.push(firstTxnCommit);
const catchFailedCommit = async (error) => {
if (error.code === 'ERR_BUSY') {
db.getSync('key1', { transaction });
await delay(10);
db.putSync('key1', 'updated' + i, { transaction });
await transaction.commit().catch(catchFailedCommit);
} else throw error;
};
let fullTxnCompletion = firstTxnCommit.catch(catchFailedCommit);

fullTransactionCompletions.push(fullTxnCompletion);
}
let transactionResults = await Promise.allSettled(firstTransactionCompletions);
expect(transactionResults.filter(result => result.status === 'rejected').length).toBeGreaterThanOrEqual(1); // at least one should fail
expect(Array.from(log.query({ start: 0 })).length).toBeLessThan(3); // The entries should not be all visible at this point (only one)
await Promise.all(fullTransactionCompletions); // wait for all the retries to finish
expect(Array.from(log.query({ start: 0 })).length).toBe(3); // now all the transactions should be visible in the log
})
);
});

describe('query() from TransactionLog', () => {
it('should query an empty transaction log', () =>
dbRunner(async ({ db }) => {
Expand Down