diff --git a/core/client.h b/core/client.h index 2a97aa2c..974b8a93 100644 --- a/core/client.h +++ b/core/client.h @@ -9,94 +9,221 @@ #ifndef YCSB_C_CLIENT_H_ #define YCSB_C_CLIENT_H_ -#include -#include "db.h" #include "core_workload.h" +#include "db.h" +#include "transaction.h" #include "utils.h" +#include +#include namespace ycsbc { class Client { - public: +public: Client(DB &db, CoreWorkload &wl) : db_(db), workload_(wl) { workload_.InitKeyBuffer(key); workload_.InitPairs(pairs); + + abort_cnt = 0; } - + virtual bool DoInsert(); virtual bool DoTransaction(); - - virtual ~Client() { } - - protected: - - virtual int TransactionRead(); - virtual int TransactionReadModifyWrite(); - virtual int TransactionScan(); - virtual int TransactionUpdate(); - virtual int TransactionInsert(); - + + virtual ~Client() { Client::total_abort_cnt += abort_cnt; } + + static std::atomic total_abort_cnt; + +protected: + virtual int TransactionRead(Transaction *txn); + virtual int TransactionReadModifyWrite(Transaction *txn); + virtual int TransactionScan(Transaction *txn); + virtual int TransactionUpdate(Transaction *txn); + virtual int TransactionInsert(Transaction *txn); + + virtual int TransactionReadRetry(Transaction *txn, TransactionOperation &top); + virtual int TransactionReadModifyWriteRetry(Transaction *txn, + TransactionOperation &top); + virtual int TransactionScanRetry(Transaction *txn, TransactionOperation &top); + virtual int TransactionUpdateRetry(Transaction *txn, + TransactionOperation &top); + virtual int TransactionInsertRetry(Transaction *txn, + TransactionOperation &top); + DB &db_; CoreWorkload &workload_; std::string key; std::vector pairs; + + unsigned long abort_cnt; }; inline bool Client::DoInsert() { workload_.NextSequenceKey(key); workload_.UpdateValues(pairs); - return (db_.Insert(workload_.NextTable(), key, pairs) == DB::kOK); + int status = -1; + Transaction *txn = NULL; + db_.Begin(&txn); + status = db_.Insert(txn, workload_.NextTable(), key, pairs); + db_.Commit(&txn); + return (status == DB::kOK); } inline bool Client::DoTransaction() { int status = -1; - switch (workload_.NextOperation()) { + Transaction *txn = NULL; + + db_.Begin(&txn); + + if (txn != NULL) { + txn->ReadyToRecordOperations(workload_.ops_per_transaction()); + } + for (int i = 0; i < workload_.ops_per_transaction(); ++i) { + switch (workload_.NextOperation()) { case READ: - status = TransactionRead(); + status = TransactionRead(txn); break; case UPDATE: - status = TransactionUpdate(); + status = TransactionUpdate(txn); break; case INSERT: - status = TransactionInsert(); + status = TransactionInsert(txn); break; case SCAN: - status = TransactionScan(); + status = TransactionScan(txn); break; case READMODIFYWRITE: - status = TransactionReadModifyWrite(); + status = TransactionReadModifyWrite(txn); break; default: throw utils::Exception("Operation request is not recognized!"); + } + assert(status >= 0); + + if (status == DB::kErrorConflict) { + txn->SetAborted(true); + } } - assert(status >= 0); - return (status == DB::kOK); + + bool need_retry = db_.Commit(&txn) == DB::kErrorConflict; + + while (need_retry) { + ++abort_cnt; + + db_.Begin(&txn); + + txn->SetAborted(false); + + for (unsigned long i = 0; i < txn->GetTransactionOperationsSize(); ++i) { + TransactionOperation &top = txn->GetOperation(i); + switch (top.op) { + case READ: + status = TransactionReadRetry(txn, top); + break; + case UPDATE: + status = TransactionUpdateRetry(txn, top); + break; + case INSERT: + status = TransactionInsertRetry(txn, top); + break; + case SCAN: + status = TransactionScanRetry(txn, top); + break; + case READMODIFYWRITE: + status = TransactionReadModifyWriteRetry(txn, top); + break; + default: + throw utils::Exception("Operation request is not recognized!"); + } + assert(status >= 0); + + if (status == DB::kErrorConflict) { + txn->SetAborted(true); + break; + } + } + + need_retry = db_.Commit(&txn) == DB::kErrorConflict; + } + + return true; } -inline int Client::TransactionRead() { +inline int Client::TransactionRead(Transaction *txn) { + if (txn != NULL) { + TransactionOperation &top = txn->GetNextOperation(); + top.op = READ; + top.table = workload_.NextTable(); + top.key = workload_.NextTransactionKey(); + + if (txn->IsAborted()) { + return DB::kErrorConflict; + } + + std::vector result; + if (!workload_.read_all_fields()) { + std::vector fields; + fields.push_back("field" + workload_.NextFieldName()); + return db_.Read(txn, top.table, top.key, &fields, result); + } else { + return db_.Read(txn, top.table, top.key, NULL, result); + } + } + const std::string &table = workload_.NextTable(); const std::string &key = workload_.NextTransactionKey(); + std::vector result; if (!workload_.read_all_fields()) { std::vector fields; fields.push_back("field" + workload_.NextFieldName()); - return db_.Read(table, key, &fields, result); + return db_.Read(txn, table, key, &fields, result); } else { - return db_.Read(table, key, NULL, result); + return db_.Read(txn, table, key, NULL, result); } } -inline int Client::TransactionReadModifyWrite() { +inline int Client::TransactionReadModifyWrite(Transaction *txn) { + if (txn != NULL) { + TransactionOperation &top = txn->GetNextOperation(); + top.op = READMODIFYWRITE; + top.table = workload_.NextTable(); + top.key = workload_.NextTransactionKey(); + + if (workload_.write_all_fields()) { + workload_.BuildValues(top.values); + } else { + workload_.BuildUpdate(top.values); + } + + if (txn->IsAborted()) { + return DB::kErrorConflict; + } + + std::vector result; + + if (!workload_.read_all_fields()) { + std::vector fields; + fields.push_back("field" + workload_.NextFieldName()); + db_.Read(txn, top.table, top.key, &fields, result); + } else { + db_.Read(txn, top.table, top.key, NULL, result); + } + + return db_.Update(txn, top.table, top.key, top.values); + } + const std::string &table = workload_.NextTable(); const std::string &key = workload_.NextTransactionKey(); + std::vector result; if (!workload_.read_all_fields()) { std::vector fields; fields.push_back("field" + workload_.NextFieldName()); - db_.Read(table, key, &fields, result); + db_.Read(txn, table, key, &fields, result); } else { - db_.Read(table, key, NULL, result); + db_.Read(txn, table, key, NULL, result); } std::vector values; @@ -105,43 +232,181 @@ inline int Client::TransactionReadModifyWrite() { } else { workload_.BuildUpdate(values); } - return db_.Update(table, key, values); + + return db_.Update(txn, table, key, values); } -inline int Client::TransactionScan() { +inline int Client::TransactionScan(Transaction *txn) { + if (txn != NULL) { + TransactionOperation &top = txn->GetNextOperation(); + top.op = SCAN; + top.table = workload_.NextTable(); + top.key = workload_.NextTransactionKey(); + top.len = workload_.NextScanLength(); + + if (txn->IsAborted()) { + return DB::kErrorConflict; + } + + std::vector> result; + if (!workload_.read_all_fields()) { + std::vector fields; + fields.push_back("field" + workload_.NextFieldName()); + return db_.Scan(txn, top.table, top.key, top.len, &fields, result); + } else { + return db_.Scan(txn, top.table, top.key, top.len, NULL, result); + } + } + const std::string &table = workload_.NextTable(); const std::string &key = workload_.NextTransactionKey(); - int len = workload_.NextScanLength(); + uint64_t len = workload_.NextScanLength(); + std::vector> result; if (!workload_.read_all_fields()) { std::vector fields; fields.push_back("field" + workload_.NextFieldName()); - return db_.Scan(table, key, len, &fields, result); + return db_.Scan(txn, table, key, len, &fields, result); } else { - return db_.Scan(table, key, len, NULL, result); + return db_.Scan(txn, table, key, len, NULL, result); } } -inline int Client::TransactionUpdate() { +inline int Client::TransactionUpdate(Transaction *txn) { + if (txn != NULL) { + TransactionOperation &top = txn->GetNextOperation(); + top.op = UPDATE; + top.table = workload_.NextTable(); + top.key = workload_.NextTransactionKey(); + if (workload_.write_all_fields()) { + workload_.BuildValues(top.values); + } else { + workload_.BuildUpdate(top.values); + } + + if (txn->IsAborted()) { + return DB::kErrorConflict; + } + + return db_.Update(txn, top.table, top.key, top.values); + } + const std::string &table = workload_.NextTable(); const std::string &key = workload_.NextTransactionKey(); + std::vector values; if (workload_.write_all_fields()) { workload_.BuildValues(values); } else { workload_.BuildUpdate(values); } - return db_.Update(table, key, values); + + return db_.Update(txn, table, key, values); } -inline int Client::TransactionInsert() { +inline int Client::TransactionInsert(Transaction *txn) { + if (txn != NULL) { + TransactionOperation &top = txn->GetNextOperation(); + top.op = INSERT; + top.table = workload_.NextTable(); + workload_.NextSequenceKey(key); + top.key = key; + workload_.BuildValues(top.values); + + if (txn->IsAborted()) { + return DB::kErrorConflict; + } + + return db_.Insert(txn, top.table, top.key, top.values); + } + const std::string &table = workload_.NextTable(); workload_.NextSequenceKey(key); std::vector values; workload_.BuildValues(values); - return db_.Insert(table, key, values); -} -} // ycsbc + return db_.Insert(txn, table, key, values); +} + +inline int Client::TransactionReadRetry(Transaction *txn, + TransactionOperation &top) { + assert(txn != NULL); + + if (txn->IsAborted()) { + return DB::kErrorConflict; + } + + std::vector result; + if (!workload_.read_all_fields()) { + std::vector fields; + fields.push_back("field" + workload_.NextFieldName()); + return db_.Read(txn, top.table, top.key, &fields, result); + } else { + return db_.Read(txn, top.table, top.key, NULL, result); + } +} + +inline int Client::TransactionReadModifyWriteRetry(Transaction *txn, + TransactionOperation &top) { + assert(txn != NULL); + + if (txn->IsAborted()) { + return DB::kErrorConflict; + } + + std::vector result; + + if (!workload_.read_all_fields()) { + std::vector fields; + fields.push_back("field" + workload_.NextFieldName()); + db_.Read(txn, top.table, top.key, &fields, result); + } else { + db_.Read(txn, top.table, top.key, NULL, result); + } + + return db_.Update(txn, top.table, top.key, top.values); +} + +inline int Client::TransactionScanRetry(Transaction *txn, + TransactionOperation &top) { + assert(txn != NULL); + + if (txn->IsAborted()) { + return DB::kErrorConflict; + } + + std::vector> result; + if (!workload_.read_all_fields()) { + std::vector fields; + fields.push_back("field" + workload_.NextFieldName()); + return db_.Scan(txn, top.table, top.key, top.len, &fields, result); + } else { + return db_.Scan(txn, top.table, top.key, top.len, NULL, result); + } +} + +inline int Client::TransactionUpdateRetry(Transaction *txn, + TransactionOperation &top) { + assert(txn != NULL); + + if (txn->IsAborted()) { + return DB::kErrorConflict; + } + + return db_.Update(txn, top.table, top.key, top.values); +} + +inline int Client::TransactionInsertRetry(Transaction *txn, + TransactionOperation &top) { + assert(txn != NULL); + + if (txn->IsAborted()) { + return DB::kErrorConflict; + } + + return db_.Insert(txn, top.table, top.key, top.values); +} + +} // namespace ycsbc #endif // YCSB_C_CLIENT_H_ diff --git a/core/core_workload.cc b/core/core_workload.cc index f5567d72..6ccb2dac 100644 --- a/core/core_workload.cc +++ b/core/core_workload.cc @@ -76,6 +76,9 @@ const string CoreWorkload::INSERT_START_DEFAULT = "0"; const string CoreWorkload::RECORD_COUNT_PROPERTY = "recordcount"; const string CoreWorkload::OPERATION_COUNT_PROPERTY = "operationcount"; +const string CoreWorkload::OPS_PER_TRANSACTION_PROPERTY = "opspertransaction"; +const string CoreWorkload::OPS_PER_TRANSACTION_DEFAULT = "1"; + void CoreWorkload::InitLoadWorkload(const utils::Properties &p, unsigned int nthreads, unsigned int this_thread, BatchedCounterGenerator *key_generator) { table_name_ = p.GetProperty(TABLENAME_PROPERTY,TABLENAME_DEFAULT); @@ -178,6 +181,8 @@ void CoreWorkload::InitRunWorkload(const utils::Properties &p, unsigned int nthr } //batch_size_ = 1; + + ops_per_transaction_ = std::stoi(p.GetProperty(OPS_PER_TRANSACTION_PROPERTY, OPS_PER_TRANSACTION_DEFAULT)); } ycsbc::Generator *CoreWorkload::GetFieldLenGenerator( diff --git a/core/core_workload.h b/core/core_workload.h index c8bfe923..2e363917 100644 --- a/core/core_workload.h +++ b/core/core_workload.h @@ -141,6 +141,9 @@ class CoreWorkload { static const std::string RECORD_COUNT_PROPERTY; static const std::string OPERATION_COUNT_PROPERTY; + static const std::string OPS_PER_TRANSACTION_PROPERTY; + static const std::string OPS_PER_TRANSACTION_DEFAULT; + /// /// Initialize the scenario. /// Called once, in the main client thread, before any operations are started. @@ -164,6 +167,7 @@ class CoreWorkload { bool read_all_fields() const { return read_all_fields_; } bool write_all_fields() const { return write_all_fields_; } + int ops_per_transaction() const { return ops_per_transaction_; } CoreWorkload() : generator_(), @@ -214,7 +218,7 @@ class CoreWorkload { bool ordered_inserts_; size_t record_count_; int zero_padding_; - + int ops_per_transaction_; std::uniform_int_distribution uniform_letter_dist_; }; diff --git a/core/db.h b/core/db.h index 27a2a83f..ce10dd6c 100644 --- a/core/db.h +++ b/core/db.h @@ -9,27 +9,32 @@ #ifndef YCSB_C_DB_H_ #define YCSB_C_DB_H_ -#include #include +#include namespace ycsbc { +class Transaction; + class DB { - public: +public: typedef std::pair KVPair; static const int kOK = 0; static const int kErrorNoData = 1; static const int kErrorConflict = 2; + static const int kErrorNotSupport = 3; /// /// Initializes any state for accessing this DB. - /// Called once per DB client (thread); there is a single DB instance globally. + /// Called once per DB client (thread); there is a single DB instance + /// globally. /// - virtual void Init() { } + virtual void Init() {} /// /// Clears any state for accessing this DB. - /// Called once per DB client (thread); there is a single DB instance globally. + /// Called once per DB client (thread); there is a single DB instance + /// globally. /// - virtual void Close() { } + virtual void Close() {} /// /// Reads a record from the database. /// Field/value pairs from the result are stored in a vector. @@ -89,10 +94,93 @@ class DB { /// @return Zero on success, a non-zero error code on error. /// virtual int Delete(const std::string &table, const std::string &key) = 0; - - virtual ~DB() { } + + virtual ~DB() {} + + virtual void Begin(Transaction **txn) {} + + virtual int Commit(Transaction **txn) { return 0; } + + /// + /// Reads a record from the database. + /// Field/value pairs from the result are stored in a vector. + /// + /// @param txn The current transaction. + /// @param table The name of the table. + /// @param key The key of the record to read. + /// @param fields The list of fields to read, or NULL for all of them. + /// @param result A vector of field/value pairs for the result. + /// @return Zero on success, or a non-zero error code on error/record-miss. + /// + virtual int Read(Transaction *txn, const std::string &table, + const std::string &key, + const std::vector *fields, + std::vector &result) { + return Read(table, key, fields, result); + } + /// + /// Performs a range scan for a set of records in the database. + /// Field/value pairs from the result are stored in a vector. + /// + /// @param txn The current transaction. + /// @param table The name of the table. + /// @param key The key of the first record to read. + /// @param record_count The number of records to read. + /// @param fields The list of fields to read, or NULL for all of them. + /// @param result A vector of vector, where each vector contains field/value + /// pairs for one record + /// @return Zero on success, or a non-zero error code on error. + /// + virtual int Scan(Transaction *txn, const std::string &table, + const std::string &key, int record_count, + const std::vector *fields, + std::vector> &result) { + return Scan(table, key, record_count, fields, result); + } + + /// + /// Updates a record in the database. + /// Field/value pairs in the specified vector are written to the record, + /// overwriting any existing values with the same field names. + /// + /// @param txn The current transaction. + /// @param table The name of the table. + /// @param key The key of the record to write. + /// @param values A vector of field/value pairs to update in the record. + /// @return Zero on success, a non-zero error code on error. + /// + virtual int Update(Transaction *txn, const std::string &table, + const std::string &key, std::vector &values) { + return Update(table, key, values); + } + /// + /// Inserts a record into the database. + /// Field/value pairs in the specified vector are written into the record. + /// + /// @param txn The current transaction. + /// @param table The name of the table. + /// @param key The key of the record to insert. + /// @param values A vector of field/value pairs to insert in the record. + /// @return Zero on success, a non-zero error code on error. + /// + virtual int Insert(Transaction *txn, const std::string &table, + const std::string &key, std::vector &values) { + return Insert(table, key, values); + } + /// + /// Deletes a record from the database. + /// + /// @param txn The current transaction. + /// @param table The name of the table. + /// @param key The key of the record to delete. + /// @return Zero on success, a non-zero error code on error. + /// + virtual int Delete(Transaction *txn, const std::string &table, + const std::string &key) { + return Delete(table, key); + } }; -} // ycsbc +} // namespace ycsbc #endif // YCSB_C_DB_H_ diff --git a/core/transaction.h b/core/transaction.h new file mode 100644 index 00000000..22ce8d57 --- /dev/null +++ b/core/transaction.h @@ -0,0 +1,49 @@ +#ifndef YCSB_C_TRANSACTION_H_ +#define YCSB_C_TRANSACTION_H_ + +#include + +#include "core_workload.h" +#include "db.h" + +namespace ycsbc { + +struct TransactionOperation { + enum Operation op; + std::string table; + std::string key; + int len; + std::vector values; +}; + +class Transaction { +public: + Transaction() : next_op(0), is_aborted(false){}; + + virtual ~Transaction(){}; + + void ReadyToRecordOperations(unsigned long size) { + ops.resize(size); + next_op = 0; + }; + + unsigned long GetTransactionOperationsSize() { return ops.size(); }; + + TransactionOperation &GetNextOperation() { return ops[next_op++]; } + + TransactionOperation &GetOperation(unsigned long i) { return ops[i]; } + + void SetAborted(bool aborted) { is_aborted = aborted; }; + + bool IsAborted() { return is_aborted; }; + +protected: + std::vector ops; + unsigned long next_op; + + bool is_aborted; +}; + +} // namespace ycsbc + +#endif // YCSB_C_TRANSACTION_H_ diff --git a/db/db_factory.cc b/db/db_factory.cc index 7ccd1bb9..61f27de9 100644 --- a/db/db_factory.cc +++ b/db/db_factory.cc @@ -15,7 +15,9 @@ #include "db/tbb_rand_db.h" #include "db/tbb_scan_db.h" #include "db/splinter_db.h" +#include "db/transactional_splinter_db.h" #include "db/rocks_db.h" +#include "db/transaction_rocks_db.h" using namespace std; using ycsbc::DB; @@ -33,8 +35,14 @@ DB* DBFactory::CreateDB(utils::Properties &props, bool preloaded) { return new RedisDB(props["host"].c_str(), port, slaves); } else if (props["dbname"] == "rocksdb") { return new RocksDB(props, preloaded); + } else if (props["dbname"] == "transaction_rocksdb") { + return new TransactionRocksDB(props, preloaded); + } else if (props["dbname"] == "optimistic_transaction_rocksdb") { + return new OptimisticTransactionRocksDB(props, preloaded); } else if (props["dbname"] == "splinterdb") { return new SplinterDB(props, preloaded); + } else if (props["dbname"] == "transactional_splinterdb") { + return new TransactionalSplinterDB(props, preloaded); } else if (props["dbname"] == "tbb_rand") { assert(!preloaded); return new TbbRandDB; diff --git a/db/optimistic_transaction_rocks_db.cc b/db/optimistic_transaction_rocks_db.cc new file mode 100644 index 00000000..a5ceb15d --- /dev/null +++ b/db/optimistic_transaction_rocks_db.cc @@ -0,0 +1,196 @@ +// +// rocks_db.cc +// YCSB-C +// +// Created by Rob Johnson on 3/20/2022. +// Copyright (c) 2022 VMware. +// + +#include "db/transaction_rocks_db.h" +#include +#include +#include +#include + +using std::string; +using std::vector; + +namespace ycsbc { + +void OptimisticTransactionRocksDB::InitializeOptions(utils::Properties &props) { + const std::map &m = + (const std::map &)props; + rocksdb::ConfigOptions copts; + + if (m.count("rocksdb.config_file")) { + std::vector cf_descs; + assert(LoadOptionsFromFile(copts, m.at("rocksdb.config_file"), &options, + &cf_descs) == rocksdb::Status::OK()); + } + + std::unordered_map options_map; + for (auto tuple : m) { + if (tuple.first.find("rocksdb.options.") == 0) { + auto key = + tuple.first.substr(strlen("rocksdb.options."), std::string::npos); + options_map[key] = tuple.second; + + } else if (tuple.first == "rocksdb.write_options.sync") { + long int sync = props.GetIntProperty("rocksdb.write_options.sync"); + woptions.sync = sync; + } else if (tuple.first == "rocksdb.write_options.disableWAL") { + long int disableWAL = + props.GetIntProperty("rocksdb.write_options.disableWAL"); + woptions.disableWAL = disableWAL; + } else if (tuple.first == "rocksdb.config_file") { + // ignore it here -- loaded above + } else if (tuple.first == "rocksdb.database_filename") { + // ignore it, used in constructor + } else if (tuple.first == "rocksdb.isolation_level") { + isol_level = (RocksDBIsolationLevel)props.GetIntProperty( + "rocksdb.isolation_level"); + } else if (tuple.first.find("rocksdb.") == 0) { + std::cout << "Unknown rocksdb config option " << tuple.first << std::endl; + assert(0); + } + } + rocksdb::Options new_options; + assert(GetDBOptionsFromMap(copts, options, options_map, &new_options) == + rocksdb::Status::OK()); + options = new_options; +} + +OptimisticTransactionRocksDB::OptimisticTransactionRocksDB( + utils::Properties &props, bool preloaded) { + InitializeOptions(props); + std::string database_filename = + props.GetProperty("rocksdb.database_filename"); + options.create_if_missing = !preloaded; + options.error_if_exists = !preloaded; + rocksdb::Status status = + rocksdb::OptimisticTransactionDB::Open(options, database_filename, &db); + assert(status.ok()); + assert(isol_level != ROCKSDB_ISOLATION_LEVEL_INVALID); +} + +OptimisticTransactionRocksDB::~OptimisticTransactionRocksDB() { delete db; } + +void OptimisticTransactionRocksDB::Init() {} + +void OptimisticTransactionRocksDB::Close() {} + +void OptimisticTransactionRocksDB::Begin(Transaction **txn) { + if (*txn == NULL) { + *txn = new RocksDBTransaction(); + } + + rocksdb::Transaction *rt = db->BeginTransaction(woptions); + ((RocksDBTransaction *)*txn)->handle = rt; + + if (isol_level == ROCKSDB_ISOLATION_LEVEL_SNAPSHOT_ISOLATION || + isol_level == ROCKSDB_ISOLATION_LEVEL_MONOTONIC_ATOMIC_VIEW) { + rt->SetSnapshot(); + } +} + +int OptimisticTransactionRocksDB::Commit(Transaction **txn) { + rocksdb::Transaction *txn_handle = ((RocksDBTransaction *)*txn)->handle; + rocksdb::Status s = txn_handle->Commit(); + delete txn_handle; + + if (s.ok()) { + delete *txn; + *txn = NULL; + + return DB::kOK; + } + + if (s.IsBusy()) { + return DB::kErrorConflict; + } + + // FIXME: this error type might not be correct + return DB::kErrorNotSupport; +} + +int OptimisticTransactionRocksDB::Read(Transaction *txn, + const std::string &table, + const std::string &key, + const std::vector *fields, + std::vector &result) { + assert(txn != NULL); + string value; + + rocksdb::Transaction *txn_handle = ((RocksDBTransaction *)txn)->handle; + + if (isol_level == ROCKSDB_ISOLATION_LEVEL_MONOTONIC_ATOMIC_VIEW) { + txn_handle->SetSnapshot(); + } + rocksdb::ReadOptions roptions_ = roptions; + + if (isol_level == ROCKSDB_ISOLATION_LEVEL_SNAPSHOT_ISOLATION || + isol_level == ROCKSDB_ISOLATION_LEVEL_MONOTONIC_ATOMIC_VIEW) { + roptions_.snapshot = db->GetSnapshot(); + } + rocksdb::Status status = + txn_handle->GetForUpdate(roptions_, rocksdb::Slice(key), &value); + assert(status.ok() || status.IsNotFound()); // TODO is it expected we're + // querying non-existing keys? + return DB::kOK; +} + +int OptimisticTransactionRocksDB::Scan( + Transaction *txn, const std::string &table, const std::string &key, int len, + const std::vector *fields, + std::vector> &result) { + return DB::kErrorNotSupport; + // rocksdb::Iterator* it = db->NewIterator(roptions); + // int i = 0; + // for (it->Seek(key); i < len && it->Valid(); it->Next()) { + // i++; + // } + // delete it; + // return DB::kOK; +} + +int OptimisticTransactionRocksDB::Update(Transaction *txn, + const std::string &table, + const std::string &key, + std::vector &values) { + return Insert(txn, table, key, values); +} + +int OptimisticTransactionRocksDB::Insert(Transaction *txn, + const std::string &table, + const std::string &key, + std::vector &values) { + assert(txn != NULL); + assert(values.size() == 1); + + rocksdb::Transaction *txn_handle = ((RocksDBTransaction *)txn)->handle; + rocksdb::Status status = + txn_handle->Put(rocksdb::Slice(key), rocksdb::Slice(values[0].second)); + assert(status.ok()); + return DB::kOK; +} + +int OptimisticTransactionRocksDB::Delete(Transaction *txn, + const std::string &table, + const std::string &key) { + assert(txn != NULL); + rocksdb::Transaction *txn_handle = ((RocksDBTransaction *)txn)->handle; + rocksdb::Status status = txn_handle->Delete(rocksdb::Slice(key)); + assert(status.ok()); + return DB::kOK; +} + +} // namespace ycsbc + +// Might want this for later: +// +// +// inline void sync(bool /*fullSync*/) { +// static struct rocksdb::FlushOptions foptions = +// rocksdb::FlushOptions(); rocksdb::Status status = db.Flush(foptions); +// assert(status.ok()); +// } diff --git a/db/transaction_rocks_db.cc b/db/transaction_rocks_db.cc new file mode 100644 index 00000000..1f8ddafc --- /dev/null +++ b/db/transaction_rocks_db.cc @@ -0,0 +1,228 @@ +// +// rocks_db.cc +// YCSB-C +// +// Created by Rob Johnson on 3/20/2022. +// Copyright (c) 2022 VMware. +// + +#include "db/transaction_rocks_db.h" +#include +#include +#include +#include + +using std::string; +using std::vector; + +namespace ycsbc { + +void TransactionRocksDB::InitializeOptions(utils::Properties &props) { + const std::map &m = + (const std::map &)props; + rocksdb::ConfigOptions copts; + + if (m.count("rocksdb.config_file")) { + std::vector cf_descs; + assert(LoadOptionsFromFile(copts, m.at("rocksdb.config_file"), &options, + &cf_descs) == rocksdb::Status::OK()); + } + + std::unordered_map options_map; + for (auto tuple : m) { + if (tuple.first.find("rocksdb.options.") == 0) { + auto key = + tuple.first.substr(strlen("rocksdb.options."), std::string::npos); + options_map[key] = tuple.second; + + } else if (tuple.first == "rocksdb.write_options.sync") { + long int sync = props.GetIntProperty("rocksdb.write_options.sync"); + woptions.sync = sync; + } else if (tuple.first == "rocksdb.write_options.disableWAL") { + long int disableWAL = + props.GetIntProperty("rocksdb.write_options.disableWAL"); + woptions.disableWAL = disableWAL; + } else if (tuple.first == "rocksdb.config_file") { + // ignore it here -- loaded above + } else if (tuple.first == "rocksdb.database_filename") { + // ignore it, used in constructor + } else if (tuple.first == + "rocksdb.txndb_options.transaction_lock_timeout") { + long transaction_lock_timeout = props.GetIntProperty( + "rocksdb.txndb_options.transaction_lock_timeout"); + txndb_options.transaction_lock_timeout = transaction_lock_timeout; + } else if (tuple.first == "rocksdb.isolation_level") { + isol_level = (RocksDBIsolationLevel)props.GetIntProperty( + "rocksdb.isolation_level"); + } else if (tuple.first.find("rocksdb.") == 0) { + std::cout << "Unknown rocksdb config option " << tuple.first << std::endl; + assert(0); + } + } + rocksdb::Options new_options; + assert(GetDBOptionsFromMap(copts, options, options_map, &new_options) == + rocksdb::Status::OK()); + options = new_options; +} + +TransactionRocksDB::TransactionRocksDB(utils::Properties &props, + bool preloaded) { + InitializeOptions(props); + std::string database_filename = + props.GetProperty("rocksdb.database_filename"); + options.create_if_missing = !preloaded; + options.error_if_exists = !preloaded; + rocksdb::Status status = rocksdb::TransactionDB::Open(options, txndb_options, + database_filename, &db); + assert(status.ok()); + assert(isol_level != ROCKSDB_ISOLATION_LEVEL_INVALID); +} + +TransactionRocksDB::~TransactionRocksDB() { delete db; } + +void TransactionRocksDB::Init() {} + +void TransactionRocksDB::Close() {} + +void TransactionRocksDB::Begin(Transaction **txn) { + if (*txn == NULL) { + *txn = new RocksDBTransaction(); + } + ((RocksDBTransaction *)*txn)->handle = db->BeginTransaction(woptions); + + if (isol_level == ROCKSDB_ISOLATION_LEVEL_SNAPSHOT_ISOLATION || + isol_level == ROCKSDB_ISOLATION_LEVEL_MONOTONIC_ATOMIC_VIEW) { + ((RocksDBTransaction *)*txn)->handle->SetSnapshot(); + ((RocksDBTransaction *)*txn)->handle->SetSavePoint(); + } +} + +int TransactionRocksDB::Commit(Transaction **txn) { + rocksdb::Transaction *txn_handle = ((RocksDBTransaction *)*txn)->handle; + + if ((*txn)->IsAborted()) { + if (isol_level == ROCKSDB_ISOLATION_LEVEL_SNAPSHOT_ISOLATION || + isol_level == ROCKSDB_ISOLATION_LEVEL_MONOTONIC_ATOMIC_VIEW) { + txn_handle->RollbackToSavePoint(); + } + rocksdb::Status s = txn_handle->Commit(); + assert(s.ok()); + delete txn_handle; + return DB::kErrorConflict; + } + + rocksdb::Status s = txn_handle->Commit(); + delete txn_handle; + + if (s.ok()) { + delete *txn; + *txn = NULL; + + return DB::kOK; + } + + if (s.IsBusy()) { + if (isol_level == ROCKSDB_ISOLATION_LEVEL_SNAPSHOT_ISOLATION || + isol_level == ROCKSDB_ISOLATION_LEVEL_MONOTONIC_ATOMIC_VIEW) { + txn_handle->Rollback(); + } + + return DB::kErrorConflict; + } + + // FIXME: this error type might not be correct + return DB::kErrorNotSupport; +} + +int TransactionRocksDB::Read(Transaction *txn, const std::string &table, + const std::string &key, + const std::vector *fields, + std::vector &result) { + assert(txn != NULL); + string value; + + rocksdb::Transaction *txn_handle = ((RocksDBTransaction *)txn)->handle; + + if (isol_level == ROCKSDB_ISOLATION_LEVEL_MONOTONIC_ATOMIC_VIEW) { + txn_handle->SetSnapshot(); + } + rocksdb::ReadOptions roptions_ = roptions; + + if (isol_level == ROCKSDB_ISOLATION_LEVEL_SNAPSHOT_ISOLATION || + isol_level == ROCKSDB_ISOLATION_LEVEL_MONOTONIC_ATOMIC_VIEW) { + roptions_.snapshot = db->GetSnapshot(); + } + rocksdb::Status status = + txn_handle->GetForUpdate(roptions_, rocksdb::Slice(key), &value); + + if (status.IsTimedOut() || status.IsBusy()) { + return DB::kErrorConflict; + } + + assert(status.ok() || status.IsNotFound()); // TODO is it expected we're + // querying non-existing keys? + return DB::kOK; +} + +int TransactionRocksDB::Scan(Transaction *txn, const std::string &table, + const std::string &key, int len, + const std::vector *fields, + std::vector> &result) { + return DB::kErrorNotSupport; + // rocksdb::Iterator* it = db->NewIterator(roptions); + // int i = 0; + // for (it->Seek(key); i < len && it->Valid(); it->Next()) { + // i++; + // } + // delete it; + // return DB::kOK; +} + +int TransactionRocksDB::Update(Transaction *txn, const std::string &table, + const std::string &key, + std::vector &values) { + return Insert(txn, table, key, values); +} + +int TransactionRocksDB::Insert(Transaction *txn, const std::string &table, + const std::string &key, + std::vector &values) { + assert(txn != NULL); + assert(values.size() == 1); + + rocksdb::Transaction *txn_handle = ((RocksDBTransaction *)txn)->handle; + rocksdb::Status status = + txn_handle->Put(rocksdb::Slice(key), rocksdb::Slice(values[0].second)); + + if (status.IsTimedOut() || status.IsBusy()) { + return DB::kErrorConflict; + } + + assert(status.ok()); + return DB::kOK; +} + +int TransactionRocksDB::Delete(Transaction *txn, const std::string &table, + const std::string &key) { + assert(txn != NULL); + rocksdb::Transaction *txn_handle = ((RocksDBTransaction *)txn)->handle; + rocksdb::Status status = txn_handle->Delete(rocksdb::Slice(key)); + + if (status.IsTimedOut() || status.IsBusy()) { + return DB::kErrorConflict; + } + + assert(status.ok()); + return DB::kOK; +} + +} // namespace ycsbc + +// Might want this for later: +// +// +// inline void sync(bool /*fullSync*/) { +// static struct rocksdb::FlushOptions foptions = +// rocksdb::FlushOptions(); rocksdb::Status status = db.Flush(foptions); +// assert(status.ok()); +// } diff --git a/db/transaction_rocks_db.h b/db/transaction_rocks_db.h new file mode 100644 index 00000000..4cbfbd74 --- /dev/null +++ b/db/transaction_rocks_db.h @@ -0,0 +1,172 @@ +// +// rocks_db.h +// YCSB-C +// + +#ifndef YCSB_C_TRANSACTION_ROCKS_DB_H_ +#define YCSB_C_TRANSACTION_ROCKS_DB_H_ + +#include "core/db.h" +#include "core/properties.h" +#include "core/transaction.h" +#include "rocksdb/db.h" +#include "rocksdb/utilities/optimistic_transaction_db.h" +#include "rocksdb/utilities/transaction_db.h" +#include +#include + +using std::cout; +using std::endl; + +namespace ycsbc { + +typedef enum { + ROCKSDB_ISOLATION_LEVEL_INVALID = 0, + ROCKSDB_ISOLATION_LEVEL_READ_COMMITTED, + ROCKSDB_ISOLATION_LEVEL_SNAPSHOT_ISOLATION, + ROCKSDB_ISOLATION_LEVEL_MONOTONIC_ATOMIC_VIEW, +} RocksDBIsolationLevel; + +class TransactionRocksDB : public DB { +public: + TransactionRocksDB(utils::Properties &props, bool preloaded); + ~TransactionRocksDB(); + + void Init(); + void Close(); + + int Read(const std::string &table, const std::string &key, + const std::vector *fields, + std::vector &result) { + return DB::kErrorNotSupport; + } + + int Scan(const std::string &table, const std::string &key, int len, + const std::vector *fields, + std::vector> &result) { + return DB::kErrorNotSupport; + } + + int Update(const std::string &table, const std::string &key, + std::vector &values) { + return DB::kErrorNotSupport; + } + + int Insert(const std::string &table, const std::string &key, + std::vector &values) { + return DB::kErrorNotSupport; + } + + int Delete(const std::string &table, const std::string &key) { + return DB::kErrorNotSupport; + } + + void Begin(Transaction **txn); + + int Commit(Transaction **txn); + + int Read(Transaction *txn, const std::string &table, const std::string &key, + const std::vector *fields, std::vector &result); + + int Scan(Transaction *txn, const std::string &table, const std::string &key, + int len, const std::vector *fields, + std::vector> &result); + + int Update(Transaction *txn, const std::string &table, const std::string &key, + std::vector &values); + + int Insert(Transaction *txn, const std::string &table, const std::string &key, + std::vector &values); + + int Delete(Transaction *txn, const std::string &table, + const std::string &key); + +private: + void InitializeOptions(utils::Properties &props); + + rocksdb::TransactionDB *db; + rocksdb::Options options; + rocksdb::ReadOptions roptions; + rocksdb::WriteOptions woptions; + rocksdb::TransactionDBOptions txndb_options; + RocksDBIsolationLevel isol_level; +}; + +class OptimisticTransactionRocksDB : public DB { +public: + OptimisticTransactionRocksDB(utils::Properties &props, bool preloaded); + ~OptimisticTransactionRocksDB(); + + void Init(); + void Close(); + + int Read(const std::string &table, const std::string &key, + const std::vector *fields, + std::vector &result) { + return DB::kErrorNotSupport; + } + + int Scan(const std::string &table, const std::string &key, int len, + const std::vector *fields, + std::vector> &result) { + return DB::kErrorNotSupport; + } + + int Update(const std::string &table, const std::string &key, + std::vector &values) { + return DB::kErrorNotSupport; + } + + int Insert(const std::string &table, const std::string &key, + std::vector &values) { + return DB::kErrorNotSupport; + } + + int Delete(const std::string &table, const std::string &key) { + return DB::kErrorNotSupport; + } + + void Begin(Transaction **txn); + + int Commit(Transaction **txn); + + int Read(Transaction *txn, const std::string &table, const std::string &key, + const std::vector *fields, std::vector &result); + + int Scan(Transaction *txn, const std::string &table, const std::string &key, + int len, const std::vector *fields, + std::vector> &result); + + int Update(Transaction *txn, const std::string &table, const std::string &key, + std::vector &values); + + int Insert(Transaction *txn, const std::string &table, const std::string &key, + std::vector &values); + + int Delete(Transaction *txn, const std::string &table, + const std::string &key); + +private: + void InitializeOptions(utils::Properties &props); + + rocksdb::OptimisticTransactionDB *db; + rocksdb::Options options; + rocksdb::ReadOptions roptions; + rocksdb::WriteOptions woptions; + RocksDBIsolationLevel isol_level; +}; + +class RocksDBTransaction : public Transaction { +public: + RocksDBTransaction() : Transaction(){}; + +private: + rocksdb::Transaction *handle; + + friend TransactionRocksDB; + friend OptimisticTransactionRocksDB; +}; + +} // namespace ycsbc + +#endif // YCSB_C_TRANSACTION_ROCKS_DB_H_ diff --git a/db/transactional_splinter_db.cc b/db/transactional_splinter_db.cc new file mode 100644 index 00000000..64ae4396 --- /dev/null +++ b/db/transactional_splinter_db.cc @@ -0,0 +1,237 @@ +// +// splinter_db.cc +// YCSB-C +// +// Created by Rob Johnson on 3/20/2022. +// Copyright (c) 2022 VMware. +// + +#include "db/transactional_splinter_db.h" +extern "C" { +#include "splinterdb/default_data_config.h" +} + +#include +#include + +using std::string; +using std::vector; + +namespace ycsbc { + +TransactionalSplinterDB::TransactionalSplinterDB(utils::Properties &props, + bool preloaded) { + cout << "This is TransacionalSplinterDB\n"; + + uint64_t max_key_size = props.GetIntProperty("splinterdb.max_key_size"); + + default_data_config_init(max_key_size, &data_cfg); + splinterdb_cfg.filename = props.GetProperty("splinterdb.filename").c_str(); + splinterdb_cfg.cache_size = + props.GetIntProperty("splinterdb.cache_size_mb") * 1024 * 1024; + splinterdb_cfg.disk_size = + props.GetIntProperty("splinterdb.disk_size_gb") * 1024 * 1024 * 1024; + splinterdb_cfg.data_cfg = &data_cfg; + splinterdb_cfg.heap_handle = NULL; + splinterdb_cfg.heap_id = NULL; + splinterdb_cfg.page_size = props.GetIntProperty("splinterdb.page_size"); + splinterdb_cfg.extent_size = props.GetIntProperty("splinterdb.extent_size"); + splinterdb_cfg.io_flags = props.GetIntProperty("splinterdb.io_flags"); + splinterdb_cfg.io_perms = props.GetIntProperty("splinterdb.io_perms"); + splinterdb_cfg.io_async_queue_depth = + props.GetIntProperty("splinterdb.io_async_queue_depth"); + splinterdb_cfg.cache_use_stats = + props.GetIntProperty("splinterdb.cache_use_stats"); + splinterdb_cfg.cache_logfile = + props.GetProperty("splinterdb.cache_logfile").c_str(); + splinterdb_cfg.btree_rough_count_height = + props.GetIntProperty("splinterdb.btree_rough_count_height"); + splinterdb_cfg.filter_remainder_size = + props.GetIntProperty("splinterdb.filter_remainder_size"); + splinterdb_cfg.filter_index_size = + props.GetIntProperty("splinterdb.filter_index_size"); + splinterdb_cfg.use_log = props.GetIntProperty("splinterdb.use_log"); + splinterdb_cfg.memtable_capacity = + props.GetIntProperty("splinterdb.memtable_capacity"); + splinterdb_cfg.fanout = props.GetIntProperty("splinterdb.fanout"); + splinterdb_cfg.max_branches_per_node = + props.GetIntProperty("splinterdb.max_branches_per_node"); + splinterdb_cfg.use_stats = props.GetIntProperty("splinterdb.use_stats"); + splinterdb_cfg.reclaim_threshold = + props.GetIntProperty("splinterdb.reclaim_threshold"); + + if (preloaded) { + assert(!transactional_splinterdb_open(&splinterdb_cfg, &spl)); + } else { + assert(!transactional_splinterdb_create(&splinterdb_cfg, &spl)); + } + + transactional_splinterdb_set_isolation_level( + spl, (transaction_isolation_level)props.GetIntProperty( + "splinterdb.isolation_level")); +} + +TransactionalSplinterDB::~TransactionalSplinterDB() { + transactional_splinterdb_close(&spl); +} + +void TransactionalSplinterDB::Init() { + transactional_splinterdb_register_thread(spl); +} + +void TransactionalSplinterDB::Close() { + transactional_splinterdb_deregister_thread(spl); +} + +int TransactionalSplinterDB::Read(Transaction *txn, const string &table, + const string &key, + const vector *fields, + vector &result) { + assert(txn != NULL); + + splinterdb_lookup_result lookup_result; + transactional_splinterdb_lookup_result_init(spl, &lookup_result, 0, NULL); + slice key_slice = slice_create(key.size(), key.c_str()); + // cout << "lookup " << key << endl; + + transaction *txn_handle = &((SplinterDBTransaction *)txn)->handle; + assert(!transactional_splinterdb_lookup(spl, txn_handle, key_slice, + &lookup_result)); + if (!splinterdb_lookup_found(&lookup_result)) { + cout << "FAILED lookup " << key << endl; + assert(0); + } + // cout << "done lookup " << key << endl; + splinterdb_lookup_result_deinit(&lookup_result); + return DB::kOK; +} + +int TransactionalSplinterDB::Scan(Transaction *txn, const string &table, + const string &key, int len, + const vector *fields, + vector> &result) { + assert(txn != NULL); + assert(fields == NULL); + + return DB::kErrorNotSupport; +} + +int TransactionalSplinterDB::Update(Transaction *txn, const string &table, + const string &key, vector &values) { + return Insert(txn, table, key, values); +} + +int TransactionalSplinterDB::Insert(Transaction *txn, const string &table, + const string &key, vector &values) { + assert(txn != NULL); + assert(values.size() == 1); + + std::string val = values[0].second; + slice key_slice = slice_create(key.size(), key.c_str()); + slice val_slice = slice_create(val.size(), val.c_str()); + // cout << "insert " << key << endl; + + transaction *txn_handle = &((SplinterDBTransaction *)txn)->handle; + assert( + !transactional_splinterdb_insert(spl, txn_handle, key_slice, val_slice)); + // cout << "done insert " << key << endl; + + return DB::kOK; +} + +int TransactionalSplinterDB::Delete(Transaction *txn, const string &table, + const string &key) { + slice key_slice = slice_create(key.size(), key.c_str()); + + transaction *txn_handle = &((SplinterDBTransaction *)txn)->handle; + assert(!transactional_splinterdb_delete(spl, txn_handle, key_slice)); + + return DB::kOK; +} + +void TransactionalSplinterDB::Begin(Transaction **txn) { + if (*txn == NULL) { + *txn = new SplinterDBTransaction(); + } + transaction *txn_handle = &((SplinterDBTransaction *)*txn)->handle; + transactional_splinterdb_begin(spl, txn_handle); +} + +int TransactionalSplinterDB::Commit(Transaction **txn) { + transaction *txn_handle = &((SplinterDBTransaction *)*txn)->handle; + if (transactional_splinterdb_commit(spl, txn_handle) < 0) { + return DB::kErrorConflict; + } + + delete *txn; + *txn = NULL; + return DB::kOK; +} + +int TransactionalSplinterDB::Read(const std::string &table, + const std::string &key, + const std::vector *fields, + std::vector &result) { + return Read(NULL, table, key, fields, result); +} +/// +/// Performs a range scan for a set of records in the database. +/// Field/value pairs from the result are stored in a vector. +/// +/// @param table The name of the table. +/// @param key The key of the first record to read. +/// @param record_count The number of records to read. +/// @param fields The list of fields to read, or NULL for all of them. +/// @param result A vector of vector, where each vector contains field/value +/// pairs for one record +/// @return Zero on success, or a non-zero error code on error. +/// +int TransactionalSplinterDB::Scan(const std::string &table, + const std::string &key, int record_count, + const std::vector *fields, + std::vector> &result) { + return Scan(NULL, table, key, record_count, fields, result); +} + +/// +/// Updates a record in the database. +/// Field/value pairs in the specified vector are written to the record, +/// overwriting any existing values with the same field names. +/// +/// @param table The name of the table. +/// @param key The key of the record to write. +/// @param values A vector of field/value pairs to update in the record. +/// @return Zero on success, a non-zero error code on error. +/// +int TransactionalSplinterDB::Update(const std::string &table, + const std::string &key, + std::vector &values) { + return Update(NULL, table, key, values); +} +/// +/// Inserts a record into the database. +/// Field/value pairs in the specified vector are written into the record. +/// +/// @param table The name of the table. +/// @param key The key of the record to insert. +/// @param values A vector of field/value pairs to insert in the record. +/// @return Zero on success, a non-zero error code on error. +/// +int TransactionalSplinterDB::Insert(const std::string &table, + const std::string &key, + std::vector &values) { + return Insert(NULL, table, key, values); +} +/// +/// Deletes a record from the database. +/// +/// @param table The name of the table. +/// @param key The key of the record to delete. +/// @return Zero on success, a non-zero error code on error. +/// +int TransactionalSplinterDB::Delete(const std::string &table, + const std::string &key) { + return Delete(NULL, table, key); +} + +} // namespace ycsbc diff --git a/db/transactional_splinter_db.h b/db/transactional_splinter_db.h new file mode 100644 index 00000000..b69d6320 --- /dev/null +++ b/db/transactional_splinter_db.h @@ -0,0 +1,89 @@ +// +// splinter_db.h +// YCSB-C +// + +#ifndef YCSB_C_TRANSACTIONAL_SPLINTER_DB_H_ +#define YCSB_C_TRANSACTIONAL_SPLINTER_DB_H_ + +#include +#include +#include + +#include "core/db.h" +#include "core/properties.h" +#include "core/transaction.h" + +extern "C" { +#include "splinterdb/transaction.h" +} + +using std::cout; +using std::endl; + +namespace ycsbc { + +class TransactionalSplinterDB : public DB { +public: + TransactionalSplinterDB(utils::Properties &props, bool preloaded); + ~TransactionalSplinterDB(); + + void Init(); + void Close(); + + int Read(const std::string &table, const std::string &key, + const std::vector *fields, std::vector &result); + + int Scan(const std::string &table, const std::string &key, int len, + const std::vector *fields, + std::vector> &result); + + int Update(const std::string &table, const std::string &key, + std::vector &values); + + int Insert(const std::string &table, const std::string &key, + std::vector &values); + + int Delete(const std::string &table, const std::string &key); + + void Begin(Transaction **txn); + + int Commit(Transaction **txn); + + int Read(Transaction *txn, const std::string &table, const std::string &key, + const std::vector *fields, std::vector &result); + + int Scan(Transaction *txn, const std::string &table, const std::string &key, + int len, const std::vector *fields, + std::vector> &result); + + int Update(Transaction *txn, const std::string &table, const std::string &key, + std::vector &values); + + int Insert(Transaction *txn, const std::string &table, const std::string &key, + std::vector &values); + + int Delete(Transaction *txn, const std::string &table, + const std::string &key); + +private: + splinterdb_config splinterdb_cfg; + data_config data_cfg; + transactional_splinterdb *spl; +}; + +class SplinterDBTransaction : public Transaction { +public: + SplinterDBTransaction() : Transaction(){}; + + ~SplinterDBTransaction(){}; + +private: + transaction handle; + + friend TransactionalSplinterDB; +}; + +} // namespace ycsbc + +#endif // YCSB_C_TRANSACTIONAL_SPLINTER_DB_H_ diff --git a/plot_thoughput.py b/plot_thoughput.py new file mode 100644 index 00000000..257e925c --- /dev/null +++ b/plot_thoughput.py @@ -0,0 +1,57 @@ +import sys +import matplotlib.pyplot as plt + +outputfile = sys.argv[1] + +f = open(outputfile, "r") + +lines = f.readlines() + +load_threads = [] +load_tputs = [] +run_threads = [] +run_tputs = [] + +abort_counts = [] + +load_data = False +run_data = False + +for line in lines: + if load_data: + fields = line.split() + load_threads.append(fields[-2]) + load_tputs.append(fields[-1]) + load_data = False + + if line.startswith("# Load throughput (KTPS)"): + load_data = True + + if run_data: + fields = line.split() + run_threads.append(fields[-2]) + run_tputs.append(fields[-1]) + run_data = False + + if line.startswith("# Transaction throughput (KTPS)"): + run_data = True + + if line.startswith("# Abort count"): + fields = line.split() + abort_counts.append(fields[-1]) + +# print csv +print("threads,load,workload,aborts") +for i in range(0, len(load_threads)): + print(load_threads[i], load_tputs[i], + run_tputs[i], abort_counts[i], sep=',') + +plt.plot(load_threads, load_tputs, label='load') +plt.plot(run_threads, run_tputs, label='run') + +plt.ylabel("Throughput(ops/sec)") +plt.xlabel("# of threads") + +plt.legend() + +plt.show() diff --git a/run_individual.sh b/run_individual.sh new file mode 100644 index 00000000..35212207 --- /dev/null +++ b/run_individual.sh @@ -0,0 +1,20 @@ +#!/bin/bash + +DB=${1:-"transactional_splinterdb"} +THREADS=${2:-1} +DIST=${3:-"uniform"} +FIELDLENGTH=1024 + +RECORDCOUNT=84000000 +TXNPERTREAD=1000000 +OPSPERTRANSACTION=2 +OPERATIONCOUNT=$(($OPSPERTRANSACTION * $THREADS * $TXNPERTREAD)) + +./ycsbc -db $DB -threads $THREADS \ +-L workloads/load.spec \ +-w fieldlength $FIELDLENGTH \ +-w recordcount $RECORDCOUNT \ +-W workloads/workloada.spec \ +-w requestdistribution $DIST \ +-w operationcount $OPERATIONCOUNT \ +-w opspertransaction $OPSPERTRANSACTION \ No newline at end of file diff --git a/run_rocksdb.sh b/run_rocksdb.sh new file mode 100755 index 00000000..e44ed2b1 --- /dev/null +++ b/run_rocksdb.sh @@ -0,0 +1,31 @@ +#!/bin/bash + +NTHREADS=1 +DB=optimistic_transaction_rocksdb +WL=workloads/myworkload.spec + +PARAMS="-W $WL -L $WL" + +if [ `uname` = FreeBSD ]; then + GETOPT=/usr/local/bin/getopt +else + GETOPT=getopt +fi + +eval set -- "$(${GETOPT} -o i:t:ph -- $@)" + +while true ; do + case "$1" in + -p) DB=transaction_rocksdb; shift 1 ;; + -t) NTHREADS=$2; shift 2 ;; + -i) PARAMS+=" -p rocksdb.isolation_level $2"; shift 2 ;; + -h) printf "$0 options:\n\t-t [# threads]\n\t-i [isolation_level: 1=read_committed, 2=snapshot_isolation, 3=monotonic_atomic_views(default)]\n\t-p: pessimistic transaction db\nExample: $0 -t 4 -i 3 -p\n"; exit ;; + --) shift ; break ;; + esac +done + +./ycsbc -db $DB -threads $NTHREADS $PARAMS + +if [[ -d rocksdb.db ]]; then + rm -rf rocksdb.db +fi diff --git a/run_splinterdb.sh b/run_splinterdb.sh new file mode 100755 index 00000000..6fc70976 --- /dev/null +++ b/run_splinterdb.sh @@ -0,0 +1,33 @@ +#!/bin/bash -x + +if [[ "x$1" == "xsplinterdb" || "x$1" == "xtransactional_splinterdb" ]] +then + DB=$1 +else + echo "Usage: $0 [splinterdb|transactional_splinterdb]" + exit 1 +fi + +THREADS=(1 2 4 8) + +echo "Run for the uniform distribution" + +OUT=uniform.out + +rm -f $OUT + +for t in ${THREADS[@]} +do + bash run_individual.sh $DB $t uniform >> $OUT 2>&1 +done + +OUT=zipf.out + +rm -f $OUT + +echo "Run for the zipfian distribution" + +for t in ${THREADS[@]} +do + bash run_individual.sh $DB $t zipfian >> $OUT 2>&1 +done \ No newline at end of file diff --git a/workloads/myworkload.spec b/workloads/myworkload.spec new file mode 100644 index 00000000..a338f32c --- /dev/null +++ b/workloads/myworkload.spec @@ -0,0 +1,22 @@ +# Yahoo! Cloud System Benchmark +# Workload A: Update heavy workload +# Application example: Session store recording recent actions +# +# Read/update ratio: 50/50 +# Default data size: 1 KB records (10 fields, 100 bytes each, plus key) +# Request distribution: zipfian + +recordcount=1024 +operationcount=8192 +workload=com.yahoo.ycsb.workloads.CoreWorkload +fieldcount=1 +fieldlength=1024 + +opspertransaction=16 + +readallfields=true +requestdistribution=zipfian +readproportion=0.5 +updateproportion=0.5 +scanproportion=0 +insertproportion=0 diff --git a/ycsbc.cc b/ycsbc.cc index 42ae4480..7b823afb 100644 --- a/ycsbc.cc +++ b/ycsbc.cc @@ -6,16 +6,16 @@ // Copyright (c) 2014 Jinglei Ren . // -#include -#include -#include -#include -#include -#include "core/utils.h" -#include "core/timer.h" #include "core/client.h" #include "core/core_workload.h" +#include "core/timer.h" +#include "core/utils.h" #include "db/db_factory.h" +#include +#include +#include +#include +#include using namespace std; @@ -26,49 +26,52 @@ typedef struct WorkloadProperties { } WorkloadProperties; std::map default_props = { - {"threadcount", "1"}, - {"dbname", "basic"}, - {"progress", "none"}, - - // - // Basicdb config defaults - // - {"basicdb.verbose", "0"}, - - // - // splinterdb config defaults - // - {"splinterdb.filename", "splinterdb.db"}, - {"splinterdb.cache_size_mb", "4096"}, - {"splinterdb.disk_size_gb", "128"}, - - {"splinterdb.max_key_size", "24"}, - {"splinterdb.use_log", "1"}, - - // All these options use splinterdb's internal defaults - {"splinterdb.page_size", "0"}, - {"splinterdb.extent_size", "0"}, - {"splinterdb.io_flags", "0"}, - {"splinterdb.io_perms", "0"}, - {"splinterdb.io_async_queue_depth", "0"}, - {"splinterdb.cache_use_stats", "0"}, - {"splinterdb.cache_logfile", "0"}, - {"splinterdb.btree_rough_count_height", "0"}, - {"splinterdb.filter_remainder_size", "0"}, - {"splinterdb.filter_index_size", "0"}, - {"splinterdb.memtable_capacity", "0"}, - {"splinterdb.fanout", "0"}, - {"splinterdb.max_branches_per_node", "0"}, - {"splinterdb.use_stats", "0"}, - {"splinterdb.reclaim_threshold", "0"}, - - {"rocksdb.database_filename", "rocksdb.db"}, + {"threadcount", "1"}, + {"dbname", "basic"}, + {"progress", "none"}, + + // + // Basicdb config defaults + // + {"basicdb.verbose", "0"}, + + // + // splinterdb config defaults + // + {"splinterdb.filename", "splinterdb.db"}, + {"splinterdb.cache_size_mb", "4096"}, + {"splinterdb.disk_size_gb", "128"}, + + {"splinterdb.max_key_size", "24"}, + {"splinterdb.use_log", "1"}, + + // All these options use splinterdb's internal defaults + {"splinterdb.page_size", "0"}, + {"splinterdb.extent_size", "0"}, + {"splinterdb.io_flags", "0"}, + {"splinterdb.io_perms", "0"}, + {"splinterdb.io_async_queue_depth", "0"}, + {"splinterdb.cache_use_stats", "0"}, + {"splinterdb.cache_logfile", "0"}, + {"splinterdb.btree_rough_count_height", "0"}, + {"splinterdb.filter_remainder_size", "0"}, + {"splinterdb.filter_index_size", "0"}, + {"splinterdb.memtable_capacity", "0"}, + {"splinterdb.fanout", "0"}, + {"splinterdb.max_branches_per_node", "0"}, + {"splinterdb.use_stats", "0"}, + {"splinterdb.reclaim_threshold", "0"}, + {"splinterdb.isolation_level", "1"}, + + {"rocksdb.database_filename", "rocksdb.db"}, +// {"rocksdb.isolation_level", "3"}, }; - void UsageMessage(const char *command); bool StrStartWith(const char *str, const char *pre); -void ParseCommandLine(int argc, const char *argv[], utils::Properties &props, WorkloadProperties &load_workload, vector &run_workloads); +void ParseCommandLine(int argc, const char *argv[], utils::Properties &props, + WorkloadProperties &load_workload, + vector &run_workloads); typedef enum progress_mode { no_progress, @@ -76,8 +79,10 @@ typedef enum progress_mode { percent_progress, } progress_mode; -static inline void ReportProgress(progress_mode pmode, uint64_t total_ops, volatile uint64_t *global_op_counter, uint64_t stepsize, volatile uint64_t *last_printed) -{ +static inline void ReportProgress(progress_mode pmode, uint64_t total_ops, + volatile uint64_t *global_op_counter, + uint64_t stepsize, + volatile uint64_t *last_printed) { uint64_t old_counter = __sync_fetch_and_add(global_op_counter, stepsize); uint64_t new_counter = old_counter + stepsize; if (100 * old_counter / total_ops != 100 * new_counter / total_ops) { @@ -85,29 +90,36 @@ static inline void ReportProgress(progress_mode pmode, uint64_t total_ops, volat cout << "#" << flush; } else if (pmode == percent_progress) { uint64_t my_percent = 100 * new_counter / total_ops; - while (*last_printed + 1 != my_percent) {} + while (*last_printed + 1 != my_percent) { + } cout << 100 * new_counter / total_ops << "%\r" << flush; *last_printed = my_percent; } } } -static inline void ProgressUpdate(progress_mode pmode, uint64_t total_ops, volatile uint64_t *global_op_counter, uint64_t i, volatile uint64_t *last_printed) -{ +static inline void ProgressUpdate(progress_mode pmode, uint64_t total_ops, + volatile uint64_t *global_op_counter, + uint64_t i, volatile uint64_t *last_printed) { uint64_t sync_interval = 0 < total_ops / 1000 ? total_ops / 1000 : 1; if ((i % sync_interval) == 0) { - ReportProgress(pmode, total_ops, global_op_counter, sync_interval, last_printed); + ReportProgress(pmode, total_ops, global_op_counter, sync_interval, + last_printed); } } -static inline void ProgressFinish(progress_mode pmode, uint64_t total_ops, volatile uint64_t *global_op_counter, uint64_t i, volatile uint64_t *last_printed) -{ +static inline void ProgressFinish(progress_mode pmode, uint64_t total_ops, + volatile uint64_t *global_op_counter, + uint64_t i, volatile uint64_t *last_printed) { uint64_t sync_interval = 0 < total_ops / 1000 ? total_ops / 1000 : 1; - ReportProgress(pmode, total_ops, global_op_counter, i % sync_interval, last_printed); + ReportProgress(pmode, total_ops, global_op_counter, i % sync_interval, + last_printed); } -int DelegateClient(ycsbc::DB *db, ycsbc::CoreWorkload *wl, const uint64_t num_ops, bool is_loading, - progress_mode pmode, uint64_t total_ops, volatile uint64_t *global_op_counter, volatile uint64_t *last_printed) { +int DelegateClient(ycsbc::DB *db, ycsbc::CoreWorkload *wl, + const uint64_t num_ops, bool is_loading, progress_mode pmode, + uint64_t total_ops, volatile uint64_t *global_op_counter, + volatile uint64_t *last_printed) { db->Init(); ycsbc::Client client(*db, *wl); uint64_t oks = 0; @@ -128,6 +140,8 @@ int DelegateClient(ycsbc::DB *db, ycsbc::CoreWorkload *wl, const uint64_t num_op return oks; } +std::atomic ycsbc::Client::total_abort_cnt = 0; + int main(const int argc, const char *argv[]) { utils::Properties props; WorkloadProperties load_workload; @@ -153,17 +167,20 @@ int main(const int argc, const char *argv[]) { exit(0); } - record_count = stoi(load_workload.props[ycsbc::CoreWorkload::RECORD_COUNT_PROPERTY]); + record_count = + stoi(load_workload.props[ycsbc::CoreWorkload::RECORD_COUNT_PROPERTY]); uint64_t batch_size = sqrt(record_count); if (record_count / batch_size < num_threads) batch_size = record_count / num_threads; if (batch_size < 1) batch_size = 1; - ycsbc::BatchedCounterGenerator key_generator(load_workload.preloaded ? record_count : 0, batch_size); + ycsbc::BatchedCounterGenerator key_generator( + load_workload.preloaded ? record_count : 0, batch_size); ycsbc::CoreWorkload wls[num_threads]; for (unsigned int i = 0; i < num_threads; ++i) { - wls[i].InitLoadWorkload(load_workload.props, num_threads, i, &key_generator); + wls[i].InitLoadWorkload(load_workload.props, num_threads, i, + &key_generator); } // Perform the Load phase @@ -176,7 +193,9 @@ int main(const int argc, const char *argv[]) { for (unsigned int i = 0; i < num_threads; ++i) { uint64_t start_op = (record_count * i) / num_threads; uint64_t end_op = (record_count * (i + 1)) / num_threads; - actual_ops.emplace_back(async(launch::async, DelegateClient, db, &wls[i], end_op - start_op, true, pmode, record_count, &load_progress, &last_printed)); + actual_ops.emplace_back( + async(launch::async, DelegateClient, db, &wls[i], end_op - start_op, + true, pmode, record_count, &load_progress, &last_printed)); } assert(actual_ops.size() == num_threads); sum = 0; @@ -190,11 +209,11 @@ int main(const int argc, const char *argv[]) { } double load_duration = timer.End(); cerr << "# Load throughput (KTPS)" << endl; - cerr << props["dbname"] << '\t' << load_workload.filename << '\t' << num_threads << '\t'; + cerr << props["dbname"] << '\t' << load_workload.filename << '\t' + << num_threads << '\t'; cerr << sum / load_duration / 1000 << endl; } - // Perform any Run phases for (unsigned int i = 0; i < run_workloads.size(); i++) { auto workload = run_workloads[i]; @@ -202,7 +221,11 @@ int main(const int argc, const char *argv[]) { wls[i].InitRunWorkload(workload.props, num_threads, i); } actual_ops.clear(); - total_ops = stoi(workload.props[ycsbc::CoreWorkload::OPERATION_COUNT_PROPERTY]); + total_ops = + stoi(workload.props[ycsbc::CoreWorkload::OPERATION_COUNT_PROPERTY]); + uint64_t ops_per_transactions = + stoi(workload.props.GetProperty(ycsbc::CoreWorkload::OPS_PER_TRANSACTION_PROPERTY, + ycsbc::CoreWorkload::OPS_PER_TRANSACTION_DEFAULT)); timer.Start(); { cerr << "# Transaction count:\t" << total_ops << endl; @@ -211,14 +234,16 @@ int main(const int argc, const char *argv[]) { for (unsigned int i = 0; i < num_threads; ++i) { uint64_t start_op = (total_ops * i) / num_threads; uint64_t end_op = (total_ops * (i + 1)) / num_threads; - actual_ops.emplace_back(async(launch::async, - DelegateClient, db, &wls[i], end_op - start_op, false, pmode, total_ops, &run_progress, &last_printed)); + uint64_t num_transactions = (end_op - start_op) / ops_per_transactions; + actual_ops.emplace_back(async(launch::async, DelegateClient, db, + &wls[i], num_transactions, false, pmode, + total_ops, &run_progress, &last_printed)); } assert(actual_ops.size() == num_threads); sum = 0; for (auto &n : actual_ops) { assert(n.valid()); - sum += n.get(); + sum += n.get() * ops_per_transactions; } if (pmode != no_progress) { cout << "\n"; @@ -227,19 +252,24 @@ int main(const int argc, const char *argv[]) { double run_duration = timer.End(); cerr << "# Transaction throughput (KTPS)" << endl; - cerr << props["dbname"] << '\t' << workload.filename << '\t' << num_threads << '\t'; + cerr << props["dbname"] << '\t' << workload.filename << '\t' << num_threads + << '\t'; cerr << sum / run_duration / 1000 << endl; + + cerr << "# Abort count:\t" << ycsbc::Client::total_abort_cnt << '\n'; } delete db; } -void ParseCommandLine(int argc, const char *argv[], utils::Properties &props, WorkloadProperties &load_workload, vector &run_workloads) { +void ParseCommandLine(int argc, const char *argv[], utils::Properties &props, + WorkloadProperties &load_workload, + vector &run_workloads) { bool saw_load_workload = false; WorkloadProperties *last_workload = NULL; int argindex = 1; - for (auto const & [key, val] : default_props) { + for (auto const &[key, val] : default_props) { props.SetProperty(key, val); } @@ -292,9 +322,9 @@ void ParseCommandLine(int argc, const char *argv[], utils::Properties &props, Wo } props.SetProperty("slaves", argv[argindex]); argindex++; - } else if (strcmp(argv[argindex], "-W") == 0 - || strcmp(argv[argindex], "-P") == 0 - || strcmp(argv[argindex], "-L") == 0) { + } else if (strcmp(argv[argindex], "-W") == 0 || + strcmp(argv[argindex], "-P") == 0 || + strcmp(argv[argindex], "-L") == 0) { WorkloadProperties workload; workload.preloaded = strcmp(argv[argindex], "-P") == 0; argindex++; @@ -312,9 +342,9 @@ void ParseCommandLine(int argc, const char *argv[], utils::Properties &props, Wo } input.close(); argindex++; - if (strcmp(argv[argindex-2], "-W") == 0) { + if (strcmp(argv[argindex - 2], "-W") == 0) { run_workloads.push_back(workload); - last_workload = &run_workloads[run_workloads.size()-1]; + last_workload = &run_workloads[run_workloads.size() - 1]; } else if (saw_load_workload) { UsageMessage(argv[0]); exit(0); @@ -323,8 +353,8 @@ void ParseCommandLine(int argc, const char *argv[], utils::Properties &props, Wo load_workload = workload; last_workload = &load_workload; } - } else if (strcmp(argv[argindex], "-p") == 0 - || strcmp(argv[argindex], "-w") == 0) { + } else if (strcmp(argv[argindex], "-p") == 0 || + strcmp(argv[argindex], "-w") == 0) { argindex++; if (argindex >= argc) { UsageMessage(argv[0]); @@ -337,7 +367,7 @@ void ParseCommandLine(int argc, const char *argv[], utils::Properties &props, Wo exit(0); } std::string propval = argv[argindex]; - if (strcmp(argv[argindex-2], "-w") == 0) { + if (strcmp(argv[argindex - 2], "-w") == 0) { if (last_workload) { last_workload->props.SetProperty(propkey, propval); } else { @@ -361,23 +391,39 @@ void ParseCommandLine(int argc, const char *argv[], utils::Properties &props, Wo } void UsageMessage(const char *command) { - cout << "Usage: " << command << " [options]" << "-L [-W run-workload.spec] ..." << endl; - cout << " Perform the given Load workload, then each Run workload" << endl; - cout << "Usage: " << command << " [options]" << "-P [-W run-workload.spec] ... " << endl; - cout << " Perform each given Run workload on a database that has been preloaded with the given Load workload" << endl; + cout << "Usage: " << command << " [options]" + << "-L [-W run-workload.spec] ..." << endl; + cout << " Perform the given Load workload, then each Run workload" + << endl; + cout << "Usage: " << command << " [options]" + << "-P [-W run-workload.spec] ... " << endl; + cout << " Perform each given Run workload on a database that has been " + "preloaded with the given Load workload" + << endl; cout << "Options:" << endl; - cout << " -threads : execute using threads (default: " << default_props["threadcount"] << ")" << endl; - cout << " -db : specify the name of the DB to use (default: " << default_props["dbname"] << ")" << endl; - cout << " -L : Initialize the database with the specified Load workload" << endl; - cout << " -P : Indicates that the database has been preloaded with the specified Load workload" << endl; + cout << " -threads : execute using threads (default: " + << default_props["threadcount"] << ")" << endl; + cout << " -db : specify the name of the DB to use (default: " + << default_props["dbname"] << ")" << endl; + cout + << " -L : Initialize the database with the specified Load workload" + << endl; + cout << " -P : Indicates that the database has been preloaded with " + "the specified Load workload" + << endl; cout << " -W : Perform the Run workload specified in " << endl; cout << " -p : set property to value " << endl; - cout << " -w : set a property in the previously specified workload" << endl; - cout << "Exactly one Load workload is allowed, but multiple Run workloads may be given.." << endl; - cout << "Run workloads will be executed in the order given on the command line." << endl; + cout << " -w : set a property in the previously specified " + "workload" + << endl; + cout << "Exactly one Load workload is allowed, but multiple Run workloads " + "may be given.." + << endl; + cout << "Run workloads will be executed in the order given on the command " + "line." + << endl; } inline bool StrStartWith(const char *str, const char *pre) { return strncmp(str, pre, strlen(pre)) == 0; } -