Skip to content
Open
Show file tree
Hide file tree
Changes from 22 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,3 @@ __pycache__/

include/csv2/
debug.*
*.log
11 changes: 11 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ ifeq ($(origin PYTHON), undefined)
endif
export PYTHON

# EXTRA_CXXFLAGS += -I$(HOME)/local/include -gdwarf-4 -fsanitize=address
EXTRA_CXXFLAGS += -I$(HOME)/local/include -gdwarf-4 -DGFLAGS
LDFLAGS += -L$(HOME)/local/lib -lsocket++ -lgflags

CLEAN_FILES = # deliberately empty, so we can append below.
CFLAGS += ${EXTRA_CFLAGS}
CXXFLAGS += ${EXTRA_CXXFLAGS}
Expand Down Expand Up @@ -112,6 +116,8 @@ ifneq ($(findstring rocksdbjava, $(MAKECMDGOALS)),)
endif
endif

# DEBUG_LEVEL=1
DEBUG_LEVEL=1
$(info $$DEBUG_LEVEL is ${DEBUG_LEVEL})

# Lite build flag.
Expand Down Expand Up @@ -1310,6 +1316,11 @@ $(STATIC_LIBRARY): $(LIB_OBJECTS)
$(AM_V_AR)rm -f $@ $(SHARED1) $(SHARED2) $(SHARED3) $(SHARED4)
$(AM_V_at)$(AR) $(ARFLAGS) $@ $(LIB_OBJECTS)

# Add a target to build bloom_test with ART_PLUS defined
bloom_test_plus: util/bloom_test.cc $(LIBRARY) $(GTEST)
$(AM_V_CCLD)$(CXX) $(CXXFLAGS) -DART_PLUS -c util/bloom_test.cc -o $(OBJ_DIR)/util/bloom_test_plus.o
$(AM_V_CCLD)$(CXX) -o bloom_test_plus $(OBJ_DIR)/util/bloom_test_plus.o $(GTEST) $(LIBRARY) $(EXEC_LDFLAGS) $(LDFLAGS) $(COVERAGEFLAGS)

$(STATIC_TEST_LIBRARY): $(TEST_OBJECTS)
$(AM_V_AR)rm -f $@ $(SHARED_TEST_LIBRARY)
$(AM_V_at)$(AR) $(ARFLAGS) $@ $^
Expand Down
11 changes: 7 additions & 4 deletions YCSB/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,19 @@

#---------------------build config-------------------------

CXXFLAGS += -I$(HOME)/local/include -gdwarf-4
LDFLAGS += -lstdc++
LDFLAGS += -L$(HOME)/local/lib -lsocket++

DEBUG_BUILD ?= 0
EXTRA_CXXFLAGS ?= -I../include -I../include/rocksdb
EXTRA_LDFLAGS ?= -L../ -lpmem -ldl
# EXTRA_CXXFLAGS += -I../include -I../include/rocksdb -fsanitize=address
EXTRA_CXXFLAGS += -I../include -I../include/rocksdb
EXTRA_LDFLAGS += -L../ -lpmem -ldl

BIND_ROCKSDB ?= 1
BIND_LEVELDB ?= 0
BIND_LMDB ?= 0

EXTRA_LDFLAGS += -lstdc++
EXTRA_LDFLAGS += -lsocket++
# EXTRA_LDFLAGS += -lpython3.12
# EXTRA_CXXFLAGS += -I$(PYTHON_INCLUDE_PATH)
# EXTRA_CXXFLAGS += -L$(PYTHON_LIBRARY_PATH)
Expand Down
7 changes: 7 additions & 0 deletions YCSB/buildall.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
cd ..
# make clean
make static_lib -j32

cd YCSB
make clean && make DEBUG_BUILD=1

1 change: 1 addition & 0 deletions YCSB/core/core_workload.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ const string CoreWorkload::REQUEST_DISTRIBUTION_PROPERTY = "requestdistribution"
const string CoreWorkload::REQUEST_DISTRIBUTION_DEFAULT = "uniform";

const string CoreWorkload::ZERO_PADDING_PROPERTY = "zeropadding";
// const string CoreWorkload::ZERO_PADDING_DEFAULT = "96";
const string CoreWorkload::ZERO_PADDING_DEFAULT = "1";

const string CoreWorkload::MIN_SCAN_LENGTH_PROPERTY = "minscanlength";
Expand Down
32 changes: 18 additions & 14 deletions YCSB/rocksdb/rocksdb.properties
Original file line number Diff line number Diff line change
@@ -1,29 +1,33 @@
rocksdb.dbname=/mnt/walsm/tmp/tmp_data/db_test_art
rocksdb.nvm_path=/mnt/walsm/node_memory
rocksdb.dbname=/mnt/nvme0n1/guoteng/walsmtest/tmp/db_nvm_l0
rocksdb.nvm_path=/mnt/pmem0.7/guoteng/nodememory
rocksdb.format=single
rocksdb.destroy=false
# rocksdb.destroy=false

# Load options from file
#rocksdb.optionsfile=rocksdb/options.ini

# Below options are ignored if options file is used
rocksdb.compression=no
rocksdb.max_background_jobs=2
rocksdb.target_file_size_base=67108864
rocksdb.target_file_size_multiplier=1
rocksdb.max_bytes_for_level_base=268435456
rocksdb.write_buffer_size=67108864
# rocksdb.compression=no
rocksdb.max_background_jobs=4
# rocksdb.target_file_size_base=67108864
# rocksdb.target_file_size_multiplier=1
# rocksdb.max_bytes_for_level_base=268435456
# rocksdb.write_buffer_size=67108864
rocksdb.max_open_files=-1
rocksdb.max_write_buffer_number=2
# rocksdb.max_write_buffer_number=2
rocksdb.use_direct_io_for_flush_compaction=true
rocksdb.use_direct_reads=true
rocksdb.allow_mmap_writes=false
rocksdb.allow_mmap_reads=false
rocksdb.cache_size=8388608
# rocksdb.allow_mmap_writes=false
# rocksdb.allow_mmap_reads=false
rocksdb.cache_size=134217728
rocksdb.compressed_cache_size=0
rocksdb.bloom_bits=0
rocksdb.bloom_bits=2

# set total_threads to 32, see rocksdb_db.cc
rocksdb.increase_parallelism=true
# rocksdb.optimize_level_style_compaction=true
rocksdb.optimize_universal_style_compaction=true

rocksdb.block_size=4096
rocksdb.metadata_size=8192
rocksdb.max_subcompactions=4
53 changes: 51 additions & 2 deletions YCSB/rocksdb/rocksdb_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,15 @@
#include <rocksdb/status.h>
#include <rocksdb/utilities/options_util.h>
#include <rocksdb/write_batch.h>
#include <atomic>
#include <fstream>
#include <iostream>
#include <mutex>
#include "rocksdb/env.h"
#include "rocksdb/options.h"
#include "rocksdb/table.h"

#include <unistd.h>

namespace {
const std::string PROP_NAME = "rocksdb.dbname";
Expand Down Expand Up @@ -111,6 +119,15 @@ namespace {
const std::string PROP_FS_URI = "rocksdb.fs_uri";
const std::string PROP_FS_URI_DEFAULT = "";

const std::string PROP_BLOCK_SIZE = "rocksdb.block_size";
const std::string PROP_BLOCK_SIZE_DEFAULT = "0";

const std::string PROP_METADATA_SIZE = "rocksdb.metadata_size";
const std::string PROP_METADATA_SIZE_DEFAULT = "0";

const std::string PROP_MAX_SUBCOMPACTION = "rocksdb.max_subcompactions";
const std::string PROP_MAX_SUBCOMPACTION_DEFAULT = "0";

static std::shared_ptr<rocksdb::Env> env_guard;
static std::shared_ptr<rocksdb::Cache> block_cache;
static std::shared_ptr<rocksdb::Cache> block_cache_compressed;
Expand All @@ -121,6 +138,7 @@ namespace ycsbc {
rocksdb::DB *RocksdbDB::db_ = nullptr;
int RocksdbDB::ref_cnt_ = 0;
std::mutex RocksdbDB::mu_;
rocksdb::Options opt;

void RocksdbDB::Init() {
// merge operator disabled by default due to link error
Expand Down Expand Up @@ -198,7 +216,6 @@ void RocksdbDB::Init() {
throw utils::Exception("RocksDB db path is missing");
}

rocksdb::Options opt;
opt.create_if_missing = true;
opt.nvm_path = nvm_path;
std::vector<rocksdb::ColumnFamilyDescriptor> cf_descs;
Expand Down Expand Up @@ -230,6 +247,8 @@ void RocksdbDB::Cleanup() {
if (--ref_cnt_) {
return;
}
std::cout << "Statistics: " << opt.statistics->ToString() << std::endl;
sleep(5); // sleep 5 seconds to wait for final reports
Copy link

Copilot AI May 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a fixed sleep duration during cleanup can unnecessarily delay shutdown and block the process. Consider replacing the sleep with a more robust synchronization mechanism or reducing its duration.

Suggested change
sleep(5); // sleep 5 seconds to wait for final reports
{
std::lock_guard<std::mutex> cv_lock(cv_mu_);
reports_ready_ = true;
}
cv_.notify_all();
{
std::unique_lock<std::mutex> cv_lock(cv_mu_);
cv_.wait(cv_lock, [this]() { return reports_ready_; });
}

Copilot uses AI. Check for mistakes.
delete db_;
}

Expand Down Expand Up @@ -309,6 +328,10 @@ void RocksdbDB::GetOptions(const utils::Properties &props, rocksdb::Options *opt
if (val != 0) {
opt->max_open_files = val;
}
val = std::stoi(props.GetProperty(PROP_MAX_SUBCOMPACTION, PROP_MAX_SUBCOMPACTION_DEFAULT));
if (val != 0) {
opt->max_subcompactions = val;
}

val = std::stoi(props.GetProperty(PROP_L0_COMPACTION_TRIGGER, PROP_L0_COMPACTION_TRIGGER_DEFAULT));
if (val != 0) {
Expand Down Expand Up @@ -337,6 +360,21 @@ void RocksdbDB::GetOptions(const utils::Properties &props, rocksdb::Options *opt
}

rocksdb::BlockBasedTableOptions table_options;
table_options.pin_top_level_index_and_filter = false;
table_options.pin_l0_filter_and_index_blocks_in_cache = false;
table_options.cache_index_and_filter_blocks_with_high_priority = true;
table_options.index_type = rocksdb::BlockBasedTableOptions::kTwoLevelIndexSearch;
table_options.partition_filters = true;
table_options.cache_index_and_filter_blocks = true;
table_options.index_shortening = rocksdb::BlockBasedTableOptions::IndexShorteningMode::kNoShortening;
size_t block_size = std::stoul(props.GetProperty(PROP_BLOCK_SIZE, PROP_BLOCK_SIZE_DEFAULT));
if (block_size > 0) {
table_options.block_size = block_size;
}
size_t metadata_block_size = std::stoul(props.GetProperty(PROP_METADATA_SIZE, PROP_METADATA_SIZE_DEFAULT));
if (metadata_block_size > 0) {
table_options.metadata_block_size = metadata_block_size;
}
size_t cache_size = std::stoul(props.GetProperty(PROP_CACHE_SIZE, PROP_CACHE_SIZE_DEFAULT));
if (cache_size > 0) {
block_cache = rocksdb::NewLRUCache(cache_size);
Expand All @@ -348,13 +386,16 @@ void RocksdbDB::GetOptions(const utils::Properties &props, rocksdb::Options *opt
block_cache_compressed = rocksdb::NewLRUCache(cache_size);
table_options.block_cache_compressed = rocksdb::NewLRUCache(compressed_cache_size);
}
int bloom_bits = std::stoul(props.GetProperty(PROP_BLOOM_BITS, PROP_BLOOM_BITS_DEFAULT));
int bloom_bits = std::stoul(props.GetProperty(PROP_BLOOM_BITS, PROP_BLOOM_BITS_DEFAULT));
if (bloom_bits > 0) {
table_options.filter_policy.reset(rocksdb::NewBloomFilterPolicy(bloom_bits));
}
opt->table_factory.reset(rocksdb::NewBlockBasedTableFactory(table_options));

if (props.GetProperty(PROP_INCREASE_PARALLELISM, PROP_INCREASE_PARALLELISM_DEFAULT) == "true") {
// unlimit the thread count of compactions and flushes. let it depend on total thread: 32
opt->max_background_compactions = -1;
opt->max_background_flushes = -1;
opt->IncreaseParallelism(32);
}
if (props.GetProperty(PROP_OPTIMIZE_LEVELCOMP, PROP_OPTIMIZE_LEVELCOMP_DEFAULT) == "true") {
Expand All @@ -363,6 +404,7 @@ void RocksdbDB::GetOptions(const utils::Properties &props, rocksdb::Options *opt
if (props.GetProperty(PROP_OPTIMIZE_UNIVERSALCOMP, PROP_OPTIMIZE_UNIVERSALCOMP_DEFAULT) == "true") {
opt->OptimizeUniversalStyleCompaction();
}
opt->statistics = rocksdb::CreateDBStatistics();
}
}

Expand Down Expand Up @@ -524,6 +566,13 @@ DB::Status RocksdbDB::MergeSingle(const std::string &table, const std::string &k

DB::Status RocksdbDB::InsertSingle(const std::string &table, const std::string &key,
std::vector<Field> &values) {
// static std::ofstream ofile("first_keys.txt");
// static std::atomic<int> key_counter{0};
// static std::mutex mutex;
// if (key_counter.fetch_add(1) < 10500000) {
// std::lock_guard<std::mutex> lock_guard(mutex);
// ofile << (key + "\n");
// }
std::string data;
SerializeRow(values, data);
rocksdb::WriteOptions wopt;
Expand Down
36 changes: 19 additions & 17 deletions db/art/clf_model.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,9 @@
#include <map>
#include <random>
#include <chrono>
#include "port/likely.h"

namespace ROCKSDB_NAMESPACE {

uint16_t ClfModel::feature_num_;
std::string ClfModel::dataset_name_;
std::string ClfModel::dataset_path_;
std::string ClfModel::host_, ClfModel::port_;
size_t ClfModel::buffer_size_;

void ClfModel::write_debug_dataset() {
assert(feature_num_ > 0);
// ready for writer
Expand Down Expand Up @@ -125,23 +119,28 @@ void ClfModel::write_real_dataset(std::vector<std::vector<uint32_t>>& datas, std

void ClfModel::write_dataset(std::vector<std::vector<uint32_t>>& datas, std::vector<uint16_t>& tags, std::vector<uint32_t>& get_cnts) {
assert(feature_num_ > 0);
if (datas.empty()) {
write_debug_dataset();
// dataset_cnt_ += 1;
return;
}
if (UNLIKELY(datas.empty())) return;
assert(datas.size() > 0);
// if (datas.empty()) {
// assert(false); // we have to write dataset
// write_debug_dataset();
// // dataset_cnt_ += 1;
// return;
// }

assert(feature_num_ % 2 != 0); // features num: 2r + 1
assert(feature_num_ >= 3);

write_real_dataset(datas, tags, get_cnts);
// dataset_cnt_ += 1;
return;
}

void ClfModel::make_train(std::vector<std::vector<uint32_t>>& datas, std::vector<uint16_t>& tags, std::vector<uint32_t>& get_cnts) {
assert(feature_num_ > 0);
assert(feature_num_ > 0); // model is ready
write_dataset(datas, tags, get_cnts);

// // TODO: avoid python model training
// already write dataset
// send msg to LightGBM server, let server read dataset and train new model
libsocket::inet_stream sock(host_, port_, LIBSOCKET_IPv4);
Expand All @@ -153,6 +152,7 @@ void ClfModel::make_train(std::vector<std::vector<uint32_t>>& datas, std::vector
sock << message;
sock >> recv_buffer; // wait for training end
// will destroy sock when leaving this func scope
std::cout << "[MODEL] model training end, message: " << recv_buffer << std::endl;
}

void ClfModel::make_predict_samples(std::vector<std::vector<uint32_t>>& datas) {
Expand Down Expand Up @@ -194,7 +194,7 @@ void ClfModel::make_real_predict(std::vector<std::vector<uint32_t>>& datas, std:
libsocket::inet_stream sock(host_, port_, LIBSOCKET_IPv4);
std::string message, recv_buffer;
for (std::vector<uint32_t>& data : datas) {
if (!data.empty()) {
if (LIKELY(!data.empty())) {
prepare_data(data);
message.clear();
recv_buffer.clear();
Expand All @@ -220,10 +220,12 @@ void ClfModel::make_real_predict(std::vector<std::vector<uint32_t>>& datas, std:
void ClfModel::make_predict(std::vector<std::vector<uint32_t>>& datas, std::vector<uint16_t>& preds) {
preds.clear();

if (UNLIKELY(datas.empty())) return;
assert(datas.size() > 0);
// datas empty means we are debuging class ClfModel
if (datas.empty()) {
make_predict_samples(datas);
}
// if (datas.empty()) {
// make_predict_samples(datas);
// }
// only write pred result to vector preds, and return nothing
make_real_predict(datas, preds);
return;
Expand Down
Loading