Skip to content
Open
3 changes: 2 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,9 @@ if(ENABLE_DPCPP)
join_helpers_lib
cuckoo_hash_build
join
groupby
groupby_global
groupby_local
groupby_perfect
hash_build_non_bitmask
)
if(ENABLE_EXPERIMENTAL)
Expand Down
13 changes: 9 additions & 4 deletions bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ int main(int argc, char *argv[]) {

std::unique_ptr<RunOptions> opts = std::make_unique<RunOptions>();
size_t groups_count = 1;
size_t executors = 1;
size_t threads_count = 1;
size_t work_group_size = 1;

opts->root_path = helpers::get_kernels_root_env(argv[0]);
std::cout
Expand All @@ -43,8 +44,11 @@ int main(int argc, char *argv[]) {
desc.add_options()(
"groups_count", po::value<size_t>(&groups_count),
"Number of unique keys for dwarfs with keys (groupby, hash build etc.).");
desc.add_options()("executors", po::value<size_t>(&executors),
"Number of executors for GroupByLocal.");
desc.add_options()("threads_count", po::value<size_t>(&threads_count),
"Number of threads for GroupBy dwarfs.");
desc.add_options()("work_group_size", po::value<size_t>(&work_group_size),
"Work group size for GroupBy dwarfs. threads_count must "
"be divisible by work_group_size.");
po::positional_options_description pos_opts;
pos_opts.add("dwarf", 1);

Expand Down Expand Up @@ -84,7 +88,8 @@ int main(int argc, char *argv[]) {

if (isGroupBy(dwarf_name)) {
std::unique_ptr<GroupByRunOptions> tmpPtr =
std::make_unique<GroupByRunOptions>(*opts, groups_count, executors);
std::make_unique<GroupByRunOptions>(*opts, groups_count,
threads_count, work_group_size);
opts.reset();
opts = std::move(tmpPtr);
}
Expand Down
45 changes: 45 additions & 0 deletions common/dpcpp/perfect_hashtable.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#pragma once

#include <CL/sycl.hpp>

template <class Key, class T,
sycl::access::address_space Space =
sycl::access::address_space::global_space>
class PerfectHashTable {
public:
PerfectHashTable(size_t hash_size, sycl::multi_ptr<T, Space> vals,
Key min_key)
: _vals(vals), _hash_size(hash_size), _hasher(hash_size, min_key) {}

bool add(Key key, T val) {
sycl::atomic<Key, Space>(_vals + _hasher(key)).fetch_add(val);
return true;
}

bool insert(Key key, T val) {
sycl::atomic<Key, Space>(_vals + _hasher(key)).store(val);
return true;
}

const T at(const Key &key) const {
return sycl::atomic<Key, Space>(_vals + _hasher(key)).load();
}

private:
class PerfectHashFunction {
public:
PerfectHashFunction(size_t hash_size, Key min_key)
: hash_size(hash_size), min_key(min_key) {}

size_t operator()(Key key) const { return key - min_key; }

private:
size_t hash_size;
Key min_key;
};

PerfectHashFunction _hasher;
size_t _hash_size;

sycl::multi_ptr<T, Space> _vals;
};
8 changes: 5 additions & 3 deletions common/options.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ struct RunOptions {

struct GroupByRunOptions : public RunOptions {
GroupByRunOptions(const RunOptions &opts, size_t groups_count,
size_t executors)
: RunOptions(opts), groups_count(groups_count), executors(executors){};
size_t threads_count, size_t work_group_size)
: RunOptions(opts), groups_count(groups_count),
threads_count(threads_count), work_group_size(work_group_size){};
size_t groups_count;
size_t executors;
size_t threads_count;
size_t work_group_size;
};

std::istream &operator>>(std::istream &in, RunOptions::DeviceType &dt);
Expand Down
23 changes: 21 additions & 2 deletions groupby/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,23 @@
if(ENABLE_DPCPP)
add_dpcpp_lib(groupby groupby.cpp)
add_dpcpp_lib(groupby_local groupby_local.cpp)
set(groupby_sources
groupby.cpp
)
set(groupby_global_sources
groupby_global.cpp
${groupby_sources}
)

set(groupby_local_sources
groupby_local.cpp
${groupby_sources}
)

set(groupby_perfect_sources
perfect_groupby.cpp
${groupby_sources}
)

add_dpcpp_lib(groupby_global "${groupby_global_sources}")
add_dpcpp_lib(groupby_local "${groupby_local_sources}")
add_dpcpp_lib(groupby_perfect "${groupby_perfect_sources}")
endif()
134 changes: 30 additions & 104 deletions groupby/groupby.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,118 +2,44 @@
#include "common/dpcpp/hashtable.hpp"
#include <limits>

namespace {
using Func = std::function<uint32_t(uint32_t, uint32_t)>;
GroupBy::GroupBy(const std::string &suffix) : Dwarf("GroupBy" + suffix) {}

std::vector<uint32_t> expected_GroupBy(const std::vector<uint32_t> &keys,
const std::vector<uint32_t> &vals,
size_t groups_count, Func f) {
std::vector<uint32_t> result(groups_count);
size_t data_size = keys.size();

for (int i = 0; i < data_size; i++) {
result[keys[i]] = f(result[keys[i]], vals[i]);
void GroupBy::run(const RunOptions &opts) {
for (auto size : opts.input_size) {
_run(size, meter());
}

return result;
}
} // namespace

GroupBy::GroupBy() : Dwarf("GroupBy") {}

void GroupBy::_run(const size_t buf_size, Meter &meter) {
constexpr uint32_t empty_element = std::numeric_limits<uint32_t>::max();
auto opts = static_cast<const GroupByRunOptions &>(meter.opts());

const int groups_count = opts.groups_count;
const std::vector<uint32_t> host_src_vals =
helpers::make_random<uint32_t>(buf_size);
const std::vector<uint32_t> host_src_keys =
helpers::make_random<uint32_t>(buf_size, 0, groups_count - 1);

std::vector<uint32_t> expected =
expected_GroupBy(host_src_keys, host_src_vals, groups_count,
[](uint32_t x, uint32_t y) { return x + y; });

auto sel = get_device_selector(opts);
sycl::queue q{*sel};
std::cout << "Selected device: "
<< q.get_device().get_info<sycl::info::device::name>() << "\n";

PolynomialHasher hasher(buf_size);

for (auto it = 0; it < opts.iterations; ++it) {
std::vector<uint32_t> data(buf_size, 0);
std::vector<uint32_t> keys(buf_size, empty_element);
std::vector<uint32_t> output(groups_count, 0);

sycl::buffer<uint32_t> data_buf(data);
sycl::buffer<uint32_t> keys_buf(keys);
sycl::buffer<uint32_t> src_vals(host_src_vals);
sycl::buffer<uint32_t> src_keys(host_src_keys);

auto host_start = std::chrono::steady_clock::now();
q.submit([&](sycl::handler &h) {
auto sv = src_vals.get_access(h);
auto sk = src_keys.get_access(h);

auto data_acc = data_buf.get_access(h);
auto keys_acc = keys_buf.get_access(h);

h.parallel_for<class hash_build>(buf_size, [=](auto &idx) {
NonOwningHashTableNonBitmask<uint32_t, uint32_t, PolynomialHasher> ht(
buf_size, keys_acc.get_pointer(), data_acc.get_pointer(), hasher,
empty_element);

ht.add(sk[idx], sv[idx]);
});
}).wait();

sycl::buffer<uint32_t> out_buf(output);

q.submit([&](sycl::handler &h) {
auto sv = src_vals.get_access(h);
auto sk = src_keys.get_access(h);
auto o = out_buf.get_access(h);

auto data_acc = data_buf.get_access(h);
auto keys_acc = keys_buf.get_access(h);

h.parallel_for<class hash_build_check>(buf_size, [=](auto &idx) {
NonOwningHashTableNonBitmask<uint32_t, uint32_t, PolynomialHasher> ht(
buf_size, keys_acc.get_pointer(), data_acc.get_pointer(), hasher,
empty_element);

std::pair<uint32_t, bool> sum_for_group = ht.at(sk[idx]);
sycl::atomic<uint32_t>(o.get_pointer() + sk[idx])
.store(sum_for_group.first);
});
}).wait();
auto host_end = std::chrono::steady_clock::now();
auto host_exe_time = std::chrono::duration_cast<std::chrono::microseconds>(
host_end - host_start)
.count();
std::unique_ptr<Result> result = std::make_unique<Result>();
result->host_time = host_end - host_start;
out_buf.get_access<sycl::access::mode::read>();
void GroupBy::init(const RunOptions &opts) {
reporting_header_ = "total_time,group_by_time,reduction_time";
meter().set_opts(opts);
DwarfParams params = {{"device_type", to_string(opts.device_ty)}};
meter().set_params(params);
}

if (output != expected) {
std::cerr << "Incorrect results" << std::endl;
result->valid = false;
}
void GroupBy::generate_expected(size_t groups_count, AggregationFunc f) {
expected.resize(groups_count);
size_t data_size = src_keys.size();

DwarfParams params{{"buf_size", std::to_string(buf_size)}};
meter.add_result(std::move(params), std::move(result));
for (int i = 0; i < data_size; i++) {
expected[src_keys[i]] = f(expected[src_keys[i]], src_vals[i]);
}
}

void GroupBy::run(const RunOptions &opts) {
for (auto size : opts.input_size) {
_run(size, meter());
bool GroupBy::check_correctness(const std::vector<uint32_t> &result) {
if (result != expected) {
std::cerr << "Incorrect results" << std::endl;
return false;
}
return true;
}
void GroupBy::init(const RunOptions &opts) {
meter().set_opts(opts);
DwarfParams params = {{"device_type", to_string(opts.device_ty)}};
meter().set_params(params);
}

void GroupBy::generate_keys(size_t buf_size, size_t groups_count) {
src_keys = helpers::make_random<uint32_t>(buf_size, 0, groups_count - 1);
}

void GroupBy::generate_vals(size_t buf_size) {
src_vals = helpers::make_random<uint32_t>(buf_size);
}

size_t GroupBy::get_size(size_t buf_size) { return buf_size * 2; }
25 changes: 22 additions & 3 deletions groupby/groupby.hpp
Original file line number Diff line number Diff line change
@@ -1,12 +1,31 @@
#pragma once
#include "common/common.hpp"
#include <functional>

class GroupBy : public Dwarf {
public:
GroupBy();
GroupBy(const std::string &suffix);
void run(const RunOptions &opts) override;
void init(const RunOptions &opts) override;

private:
void _run(const size_t buffer_size, Meter &meter);
protected:
using AggregationFunc = std::function<uint32_t(uint32_t, uint32_t)>;
AggregationFunc add = [](uint32_t acc, uint32_t x) { return acc + x; };
AggregationFunc mul = [](uint32_t acc, uint32_t x) { return acc * x; };
AggregationFunc count = [](uint32_t acc, uint32_t) { return acc + 1; };

const uint32_t _empty_element = std::numeric_limits<uint32_t>::max();

virtual size_t get_size(size_t buf_size);
virtual void _run(const size_t buffer_size, Meter &meter) = 0;

std::vector<uint32_t> src_keys;
std::vector<uint32_t> src_vals;
std::vector<uint32_t> expected;

void generate_keys(size_t buf_size, size_t groups_count);
void generate_vals(size_t buf_size);
void generate_expected(size_t groups_count, AggregationFunc f);

bool check_correctness(const std::vector<uint32_t> &result);
};
Loading