Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions include/merlin/allocator.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ class BaseAllocator {

class DefaultAllocator : public virtual BaseAllocator {
public:
DefaultAllocator() {};
~DefaultAllocator() override {};
DefaultAllocator(){};
~DefaultAllocator() override{};

void alloc(const MemoryType type, void** ptr, size_t size,
unsigned int pinned_flags = cudaHostAllocDefault) override {
Expand Down
40 changes: 32 additions & 8 deletions include/merlin/core_kernels/lookup_ptr.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ namespace nv {
namespace merlin {

// Use 1 thread to deal with a KV-pair, including copying value.
template <typename K, typename V, typename S>
template <typename K, typename V, typename S, int Strategy>
__global__ void tlp_lookup_ptr_kernel_with_filter(
Bucket<K, V, S>* __restrict__ buckets, const uint64_t buckets_num,
uint32_t bucket_capacity, const uint32_t dim, const K* __restrict__ keys,
V** __restrict values, S* __restrict scores, bool* __restrict founds,
uint64_t n) {
uint64_t n, bool update_score, const S global_epoch) {
using BUCKET = Bucket<K, V, S>;
using ScoreFunctor = ScoreFunctor<K, V, S, Strategy>;
// Load `STRIDE` digests every time.
constexpr uint32_t STRIDE = sizeof(VecD_Load) / sizeof(D);

Expand All @@ -43,6 +44,9 @@ __global__ void tlp_lookup_ptr_kernel_with_filter(
uint32_t key_pos = {0};
if (kv_idx < n) {
key = keys[kv_idx];
if (update_score) {
score = ScoreFunctor::desired_when_missed(scores, kv_idx, global_epoch);
}
if (!IS_RESERVED_KEY<K>(key)) {
const K hashed_key = Murmur3HashDevice(key);
target_digests = digests_from_hashed<K>(hashed_key);
Expand Down Expand Up @@ -86,12 +90,32 @@ __global__ void tlp_lookup_ptr_kernel_with_filter(
uint32_t index = (__ffs(cmp_result) - 1) >> 3;
cmp_result &= (cmp_result - 1);
possible_pos = pos_cur + i * 4 + index;
auto current_key = bucket_keys_ptr[possible_pos];
score = *BUCKET::scores(bucket_keys_ptr, bucket_capacity, possible_pos);
if (current_key == key) {
key_pos = possible_pos;
occupy_result = OccupyResult::DUPLICATE;
goto WRITE_BACK;
if (update_score) {
auto current_key = BUCKET::keys(bucket_keys_ptr, possible_pos);
K expected_key = key;
// Modifications to the bucket will not before this instruction.
bool result = current_key->compare_exchange_strong(
expected_key, static_cast<K>(LOCKED_KEY),
cuda::std::memory_order_acquire, cuda::std::memory_order_relaxed);
if (result) {
occupy_result = OccupyResult::DUPLICATE;
key_pos = possible_pos;
ScoreFunctor::update_with_digest(bucket_keys_ptr, key_pos, scores,
kv_idx, score, bucket_capacity,
get_digest<K>(key), false);
current_key->store(key, cuda::std::memory_order_release);
score = *BUCKET::scores(bucket_keys_ptr, bucket_capacity, key_pos);
goto WRITE_BACK;
}
} else {
auto current_key = bucket_keys_ptr[possible_pos];
score =
*BUCKET::scores(bucket_keys_ptr, bucket_capacity, possible_pos);
if (current_key == key) {
key_pos = possible_pos;
occupy_result = OccupyResult::DUPLICATE;
goto WRITE_BACK;
}
}
} while (true);
VecD_Comp empty_digests_ = empty_digests<K>();
Expand Down
91 changes: 89 additions & 2 deletions include/merlin_hashtable.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,35 @@ class HashTableBase {
score_type* scores = nullptr, // (n)
cudaStream_t stream = 0, bool unique_key = true) const = 0;

/**
* @brief Searches the hash table for the specified keys and returns address
* of the values, and will update the scores.
*
* @note When a key is missing, the data in @p values won't change.
* @warning This API returns internal addresses for high-performance but
* thread-unsafe. The caller is responsible for guaranteeing data consistency.
*
* @param n The number of key-value-score tuples to search.
* @param keys The keys to search on GPU-accessible memory with shape (n).
* @param values The addresses of values to search on GPU-accessible memory
* with shape (n).
* @param founds The status that indicates if the keys are found on
* GPU-accessible memory with shape (n).
* @param scores The scores to search on GPU-accessible memory with shape (n).
* @parblock
* If @p scores is `nullptr`, the score for each key will not be returned.
* @endparblock
* @param stream The CUDA stream that is used to execute the operation.
* @param unique_key If all keys in the same batch are unique.
*
*/
virtual void find_and_update(const size_type n, const key_type* keys, // (n)
value_type** values, // (n)
bool* founds, // (n)
score_type* scores = nullptr, // (n)
cudaStream_t stream = 0,
bool unique_key = true) = 0;

/**
* @brief Checks if there are elements with key equivalent to `keys` in the
* table.
Expand Down Expand Up @@ -2575,10 +2604,12 @@ class HashTable : public HashTableBase<K, V, S> {
constexpr uint32_t MinBucketCapacityFilter = sizeof(VecD_Load) / sizeof(D);
if (unique_key && options_.max_bucket_size >= MinBucketCapacityFilter) {
constexpr uint32_t BLOCK_SIZE = 128U;
tlp_lookup_ptr_kernel_with_filter<key_type, value_type, score_type>
tlp_lookup_ptr_kernel_with_filter<key_type, value_type, score_type,
evict_strategy>
<<<(n + BLOCK_SIZE - 1) / BLOCK_SIZE, BLOCK_SIZE, 0, stream>>>(
table_->buckets, table_->buckets_num, options_.max_bucket_size,
options_.dim, keys, values, scores, founds, n);
options_.dim, keys, values, scores, founds, n, false,
global_epoch_);
} else {
using Selector = SelectLookupPtrKernel<key_type, value_type, score_type>;
static thread_local int step_counter = 0;
Expand All @@ -2597,6 +2628,62 @@ class HashTable : public HashTableBase<K, V, S> {
CudaCheckError();
}

/**
* @brief Searches the hash table for the specified keys and returns address
* of the values, and will update the scores.
*
* @note When a key is missing, the data in @p values won't change.
* @warning This API returns internal addresses for high-performance but
* thread-unsafe. The caller is responsible for guaranteeing data consistency.
*
* @param n The number of key-value-score tuples to search.
* @param keys The keys to search on GPU-accessible memory with shape (n).
* @param values The addresses of values to search on GPU-accessible memory
* with shape (n).
* @param founds The status that indicates if the keys are found on
* GPU-accessible memory with shape (n).
* @param scores The scores to search on GPU-accessible memory with shape (n).
* @parblock
* If @p scores is `nullptr`, the score for each key will not be returned.
* @endparblock
* @param stream The CUDA stream that is used to execute the operation.
* @param unique_key If all keys in the same batch are unique.
*
*/
void find_and_update(const size_type n, const key_type* keys, // (n)
value_type** values, // (n)
bool* founds, // (n)
score_type* scores = nullptr, // (n)
cudaStream_t stream = 0, bool unique_key = true) {
if (n == 0) {
return;
}

std::unique_ptr<read_shared_lock> lock_ptr;
if (options_.api_lock) {
lock_ptr = std::make_unique<read_shared_lock>(mutex_, stream);
}

check_evict_strategy(scores);

constexpr uint32_t MinBucketCapacityFilter = sizeof(VecD_Load) / sizeof(D);
if (unique_key && options_.max_bucket_size >= MinBucketCapacityFilter) {
constexpr uint32_t BLOCK_SIZE = 128U;
tlp_lookup_ptr_kernel_with_filter<key_type, value_type, score_type,
evict_strategy>
<<<(n + BLOCK_SIZE - 1) / BLOCK_SIZE, BLOCK_SIZE, 0, stream>>>(
table_->buckets, table_->buckets_num, options_.max_bucket_size,
options_.dim, keys, values, scores, founds, n, true,
global_epoch_);
} else {
throw std::runtime_error(
"Not support update score when keys are not unique or bucket "
"capacity is small.");
}

CudaCheckError();
}

/**
* @brief Checks if there are elements with key equivalent to `keys` in the
* table.
Expand Down
60 changes: 36 additions & 24 deletions tests/accum_or_assign_test.cc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -972,14 +972,16 @@ void test_evict_strategy_lru_basic(size_t max_hbm_for_vectors) {

CUDA_CHECK(cudaMalloc(&d_accum_or_assigns_temp, TEMP_KEY_NUM * sizeof(bool)));

test_util::create_random_bools<K>(reinterpret_cast<bool*>(h_accum_or_assigns_base.data()),
BASE_KEY_NUM, true_ratio);
test_util::create_random_bools<K>(
reinterpret_cast<bool*>(h_accum_or_assigns_base.data()), BASE_KEY_NUM,
true_ratio);
test_util::create_keys_in_one_buckets<K, S, V, DIM>(
h_keys_base.data(), h_scores_base.data(), h_vectors_base.data(),
BASE_KEY_NUM, INIT_CAPACITY, BUCKET_MAX_SIZE, 1, 0, 0x3FFFFFFFFFFFFFFF);

test_util::create_random_bools<K>(reinterpret_cast<bool*>(h_accum_or_assigns_test.data()),
TEST_KEY_NUM, true_ratio);
test_util::create_random_bools<K>(
reinterpret_cast<bool*>(h_accum_or_assigns_test.data()), TEST_KEY_NUM,
true_ratio);
test_util::create_keys_in_one_buckets<K, S, V, DIM>(
h_keys_test.data(), h_scores_test.data(), h_vectors_test.data(),
TEST_KEY_NUM, INIT_CAPACITY, BUCKET_MAX_SIZE, 1, 0x3FFFFFFFFFFFFFFF,
Expand Down Expand Up @@ -1189,11 +1191,13 @@ void test_evict_strategy_lfu_basic(size_t max_hbm_for_vectors, int key_start) {

CUDA_CHECK(cudaMalloc(&d_accum_or_assigns_temp, TEMP_KEY_NUM * sizeof(bool)));

test_util::create_random_bools<K>(reinterpret_cast<bool*>(h_accum_or_assigns_base.data()),
BASE_KEY_NUM, true_ratio);
test_util::create_random_bools<K>(
reinterpret_cast<bool*>(h_accum_or_assigns_base.data()), BASE_KEY_NUM,
true_ratio);

test_util::create_random_bools<K>(reinterpret_cast<bool*>(h_accum_or_assigns_test.data()),
TEST_KEY_NUM, true_ratio);
test_util::create_random_bools<K>(
reinterpret_cast<bool*>(h_accum_or_assigns_test.data()), TEST_KEY_NUM,
true_ratio);

for (int i = 0; i < TEST_TIMES; i++) {
test_util::create_keys_in_one_buckets_lfu<K, S, V, DIM>(
Expand Down Expand Up @@ -1416,14 +1420,16 @@ void test_evict_strategy_epochlru_basic(size_t max_hbm_for_vectors,

CUDA_CHECK(cudaMalloc(&d_accum_or_assigns_temp, TEMP_KEY_NUM * sizeof(bool)));

test_util::create_random_bools<K>(reinterpret_cast<bool*>(h_accum_or_assigns_base.data()),
BASE_KEY_NUM, true_ratio);
test_util::create_random_bools<K>(
reinterpret_cast<bool*>(h_accum_or_assigns_base.data()), BASE_KEY_NUM,
true_ratio);
test_util::create_keys_in_one_buckets<K, S, V, DIM>(
h_keys_base.data(), h_scores_base.data(), h_vectors_base.data(),
BASE_KEY_NUM, INIT_CAPACITY, BUCKET_MAX_SIZE, 1, 0, 0x3FFFFFFFFFFFFFFF);

test_util::create_random_bools<K>(reinterpret_cast<bool*>(h_accum_or_assigns_test.data()),
TEST_KEY_NUM, true_ratio);
test_util::create_random_bools<K>(
reinterpret_cast<bool*>(h_accum_or_assigns_test.data()), TEST_KEY_NUM,
true_ratio);
test_util::create_keys_in_one_buckets<K, S, V, DIM>(
h_keys_test.data(), h_scores_test.data(), h_vectors_test.data(),
TEST_KEY_NUM, INIT_CAPACITY, BUCKET_MAX_SIZE, 1, 0x3FFFFFFFFFFFFFFF,
Expand Down Expand Up @@ -1645,11 +1651,13 @@ void test_evict_strategy_epochlfu_basic(size_t max_hbm_for_vectors,

CUDA_CHECK(cudaMalloc(&d_accum_or_assigns_temp, TEMP_KEY_NUM * sizeof(bool)));

test_util::create_random_bools<K>(reinterpret_cast<bool*>(h_accum_or_assigns_base.data()),
BASE_KEY_NUM, true_ratio);
test_util::create_random_bools<K>(
reinterpret_cast<bool*>(h_accum_or_assigns_base.data()), BASE_KEY_NUM,
true_ratio);

test_util::create_random_bools<K>(reinterpret_cast<bool*>(h_accum_or_assigns_test.data()),
TEST_KEY_NUM, true_ratio);
test_util::create_random_bools<K>(
reinterpret_cast<bool*>(h_accum_or_assigns_test.data()), TEST_KEY_NUM,
true_ratio);

test_util::create_keys_in_one_buckets_lfu<K, S, V, DIM>(
h_keys_base.data(), h_scores_base.data(), h_vectors_base.data(),
Expand Down Expand Up @@ -1913,10 +1921,12 @@ void test_evict_strategy_customized_basic(size_t max_hbm_for_vectors,
CUDA_CHECK(cudaMalloc(&d_accum_or_assigns_temp, TEMP_KEY_NUM * sizeof(bool)));
CUDA_CHECK(cudaMalloc(&d_found_temp, TEMP_KEY_NUM * sizeof(bool)));

test_util::create_random_bools<K>(reinterpret_cast<bool*>(h_accum_or_assigns_base.data()),
BASE_KEY_NUM, true_ratio);
test_util::create_random_bools<K>(reinterpret_cast<bool*>(h_accum_or_assigns_test.data()),
TEST_KEY_NUM, true_ratio);
test_util::create_random_bools<K>(
reinterpret_cast<bool*>(h_accum_or_assigns_base.data()), BASE_KEY_NUM,
true_ratio);
test_util::create_random_bools<K>(
reinterpret_cast<bool*>(h_accum_or_assigns_test.data()), TEST_KEY_NUM,
true_ratio);

test_util::create_keys_in_one_buckets<K, S, V, DIM>(
h_keys_base.data(), h_scores_base.data(), h_vectors_base.data(),
Expand Down Expand Up @@ -2171,8 +2181,9 @@ void test_evict_strategy_customized_advanced(size_t max_hbm_for_vectors,
cudaMalloc(&d_vectors_temp, TEMP_KEY_NUM * sizeof(V) * options.dim));
CUDA_CHECK(cudaMalloc(&d_accum_or_assigns_temp, TEMP_KEY_NUM * sizeof(bool)));

test_util::create_random_bools<K>(reinterpret_cast<bool*>(h_accum_or_assigns_base.data()),
BASE_KEY_NUM, base_true_ratio);
test_util::create_random_bools<K>(
reinterpret_cast<bool*>(h_accum_or_assigns_base.data()), BASE_KEY_NUM,
base_true_ratio);
test_util::create_keys_in_one_buckets<K, S, V, DIM>(
h_keys_base.data(), h_scores_base.data(), h_vectors_base.data(),
BASE_KEY_NUM, INIT_CAPACITY, BUCKET_MAX_SIZE, 1, 0, 0x3FFFFFFFFFFFFFFF);
Expand All @@ -2182,8 +2193,9 @@ void test_evict_strategy_customized_advanced(size_t max_hbm_for_vectors,
h_scores_base[i] = base_score_start + i;
}

test_util::create_random_bools<K>(reinterpret_cast<bool*>(h_accum_or_assigns_test.data()),
TEST_KEY_NUM, test_true_ratio);
test_util::create_random_bools<K>(
reinterpret_cast<bool*>(h_accum_or_assigns_test.data()), TEST_KEY_NUM,
test_true_ratio);
test_util::create_keys_in_one_buckets<K, S, V, DIM>(
h_keys_test.data(), h_scores_test.data(), h_vectors_test.data(),
TEST_KEY_NUM, INIT_CAPACITY, BUCKET_MAX_SIZE, 1, 0x3FFFFFFFFFFFFFFF,
Expand Down
15 changes: 11 additions & 4 deletions tests/assign_score_test.cc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -917,7 +917,9 @@ void test_evict_strategy_customized_basic(size_t max_hbm_for_vectors,
std::sort(h_scores_temp_sorted.begin(), h_scores_temp_sorted.end());

auto expected_range = test_util::range<S, TEMP_KEY_NUM>(base_score_start);
ASSERT_TRUE(std::equal(h_scores_temp_sorted.begin(), h_scores_temp_sorted.end(), expected_range.begin()));
ASSERT_TRUE(std::equal(h_scores_temp_sorted.begin(),
h_scores_temp_sorted.end(),
expected_range.begin()));
for (int i = 0; i < dump_counter; i++) {
for (int j = 0; j < options.dim; j++) {
ASSERT_EQ(h_vectors_temp[i * options.dim + j],
Expand Down Expand Up @@ -958,8 +960,11 @@ void test_evict_strategy_customized_basic(size_t max_hbm_for_vectors,
std::vector<S> h_scores_temp_sorted(h_scores_temp);
std::sort(h_scores_temp_sorted.begin(), h_scores_temp_sorted.end());

auto expected_range_test = test_util::range<S, TEST_KEY_NUM>(test_score_start);
ASSERT_TRUE(std::equal(h_scores_temp_sorted.begin(), h_scores_temp_sorted.end(), expected_range_test.begin()));
auto expected_range_test =
test_util::range<S, TEST_KEY_NUM>(test_score_start);
ASSERT_TRUE(std::equal(h_scores_temp_sorted.begin(),
h_scores_temp_sorted.end(),
expected_range_test.begin()));
for (int i = 0; i < dump_counter; i++) {
for (int j = 0; j < options.dim; j++) {
ASSERT_EQ(h_vectors_temp[i * options.dim + j],
Expand Down Expand Up @@ -1104,7 +1109,9 @@ void test_evict_strategy_customized_advanced(size_t max_hbm_for_vectors,
std::sort(h_scores_temp_sorted.begin(), h_scores_temp_sorted.end());

auto expected_range = test_util::range<S, TEMP_KEY_NUM>(base_score_start);
ASSERT_TRUE(std::equal(h_scores_temp_sorted.begin(), h_scores_temp_sorted.end(), expected_range.begin()));
ASSERT_TRUE(std::equal(h_scores_temp_sorted.begin(),
h_scores_temp_sorted.end(),
expected_range.begin()));
for (int i = 0; i < dump_counter; i++) {
for (int j = 0; j < options.dim; j++) {
ASSERT_EQ(h_vectors_temp[i * options.dim + j],
Expand Down
15 changes: 11 additions & 4 deletions tests/find_or_insert_ptr_lock_test.cc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -2153,7 +2153,9 @@ void test_evict_strategy_customized_basic(size_t max_hbm_for_vectors,
std::sort(h_scores_temp_sorted.begin(), h_scores_temp_sorted.end());

auto expected_range = test_util::range<S, TEMP_KEY_NUM>(base_score_start);
ASSERT_TRUE(std::equal(h_scores_temp_sorted.begin(), h_scores_temp_sorted.end(), expected_range.begin()));
ASSERT_TRUE(std::equal(h_scores_temp_sorted.begin(),
h_scores_temp_sorted.end(),
expected_range.begin()));
for (int i = 0; i < dump_counter; i++) {
for (int j = 0; j < options.dim; j++) {
ASSERT_EQ(h_vectors_temp[i * options.dim + j],
Expand Down Expand Up @@ -2197,8 +2199,11 @@ void test_evict_strategy_customized_basic(size_t max_hbm_for_vectors,
std::vector<S> h_scores_temp_sorted(h_scores_temp);
std::sort(h_scores_temp_sorted.begin(), h_scores_temp_sorted.end());

auto expected_range_test = test_util::range<S, TEST_KEY_NUM>(test_score_start);
ASSERT_TRUE(std::equal(h_scores_temp_sorted.begin(), h_scores_temp_sorted.end(), expected_range_test.begin()));
auto expected_range_test =
test_util::range<S, TEST_KEY_NUM>(test_score_start);
ASSERT_TRUE(std::equal(h_scores_temp_sorted.begin(),
h_scores_temp_sorted.end(),
expected_range_test.begin()));
for (int i = 0; i < dump_counter; i++) {
for (int j = 0; j < options.dim; j++) {
ASSERT_EQ(h_vectors_temp[i * options.dim + j],
Expand Down Expand Up @@ -2344,7 +2349,9 @@ void test_evict_strategy_customized_advanced(size_t max_hbm_for_vectors,
std::sort(h_scores_temp_sorted.begin(), h_scores_temp_sorted.end());

auto expected_range = test_util::range<S, TEMP_KEY_NUM>(base_score_start);
ASSERT_TRUE(std::equal(h_scores_temp_sorted.begin(), h_scores_temp_sorted.end(), expected_range.begin()));
ASSERT_TRUE(std::equal(h_scores_temp_sorted.begin(),
h_scores_temp_sorted.end(),
expected_range.begin()));
for (int i = 0; i < dump_counter; i++) {
for (int j = 0; j < options.dim; j++) {
ASSERT_EQ(h_vectors_temp[i * options.dim + j],
Expand Down
Loading