diff --git a/cpp/TooManyCooks/threads_sweep.cpp b/cpp/TooManyCooks/threads_sweep.cpp index 230eafc..60e42ff 100644 --- a/cpp/TooManyCooks/threads_sweep.cpp +++ b/cpp/TooManyCooks/threads_sweep.cpp @@ -11,39 +11,50 @@ #include #include #include +#include + + +// Generates thread counts to benchmark based on the target hardware. +// Starts with a doubling progression [1,2,4, ..., topo.core_count()] +// Also inserts values at for each different type of core without SMT, and the max with SMT +// e.g. on a 13600k inserts: +// 6 (number of P-cores) +// 14 (number of P + E-cores) +// 20 (6*2 for P-cores SMT + 8 for E-cores without) int main(int argc, char* argv[]) { + // Get the breakpoints (number of physical cores of different kinds, and max with SMT) auto topo = tmc::topology::query(); std::vector breakpoints; - size_t max = topo.pu_count(); - if (max > 64) { - max = 64; - } { size_t count = 0; for (auto c : topo.cpu_kind_counts) { auto n = count + c; - if (n >= max) { - breakpoints.push_back(max); - break; - } breakpoints.push_back(n); count += c; } } - if (breakpoints.back() != max) { - breakpoints.push_back(max); + auto puCount = topo.pu_count(); + if (breakpoints.back() != puCount) { + breakpoints.push_back(puCount); } - std::printf("[1"); - size_t idx = 0; - size_t count = 2; - while (count < max) { - while (count > breakpoints[idx]) { - std::printf(",%zu", breakpoints[idx]); - ++idx; + + // Calculate the doubling progression + std::vector doubling{}; + for (size_t count = 1; count <= topo.core_count(); count *= 2) { + doubling.push_back(count); + } + + // Merge two lists together + std::vector merged{}; + std::set_union(doubling.begin(), doubling.end(), breakpoints.begin(), breakpoints.end(), std::back_inserter(merged)); + + std::printf("[%zu", merged[0]); + for (size_t i = 1; i < merged.size(); ++i) { + if (merged[i] > 64) { + break; } - std::printf(",%zu", count); - count *= 2; + std::printf(",%zu", merged[i]); } - std::printf(",%zu]\n", max); + std::printf("]\n"); } diff --git a/cpp/taskflow/CMakeLists.txt b/cpp/taskflow/CMakeLists.txt index 2885e0f..7a3314a 100644 --- a/cpp/taskflow/CMakeLists.txt +++ b/cpp/taskflow/CMakeLists.txt @@ -10,6 +10,7 @@ set(CMAKE_CXX_STANDARD 20) add_definitions( "-march=native" + "-DTF_ENABLE_ATOMIC_NOTIFIER" ) include(../1CMake/CPM.cmake) @@ -17,7 +18,7 @@ include(../1CMake/CPM.cmake) CPMAddPackage( NAME taskflow GIT_REPOSITORY https://github.com/taskflow/taskflow.git - GIT_TAG v3.10.0 + GIT_TAG v4.0.0 DOWNLOAD_ONLY) include_directories( diff --git a/cpp/taskflow/fib.cpp b/cpp/taskflow/fib.cpp index 6ea47e0..75d4466 100644 --- a/cpp/taskflow/fib.cpp +++ b/cpp/taskflow/fib.cpp @@ -10,20 +10,26 @@ #include #include #include +#include #include static size_t thread_count = std::thread::hardware_concurrency() / 2; static const size_t iter_count = 1; +std::optional executor; -size_t fib(size_t n, tf::Runtime& rt) { +size_t fib(size_t n) { if (n < 2) { return n; } + + tf::TaskGroup tg = executor->task_group(); + size_t x, y; - rt.silent_async([&x, n](tf::Runtime& s) { x = fib(n - 1, s); }); - rt.silent_async([&y, n](tf::Runtime& s) { y = fib(n - 2, s); }); - rt.corun_all(); + tg.silent_async([n, &x]() { x = fib(n - 1); }); + y = fib(n - 2); // compute one branch synchronously + + tg.corun(); return x + y; } @@ -37,21 +43,19 @@ int main(int argc, char* argv[]) { } size_t n = static_cast(atoi(argv[1])); - tf::Executor executor(thread_count); - tf::Taskflow taskflow("fibonacci"); + executor.emplace(thread_count); std::printf("threads: %zu\n", thread_count); size_t result = 0; - taskflow.emplace([&result, n](tf::Runtime& rt) { result = fib(n, rt); }); - executor.run(taskflow).wait(); + executor->async([&result, n]() { result = fib(n); }).get(); auto startTime = std::chrono::high_resolution_clock::now(); for (size_t i = 0; i < iter_count; ++i) { result = 0; - executor.run(taskflow).wait(); + executor->async([&result, n]() { result = fib(n); }).get(); std::printf("output: %zu\n", result); } diff --git a/cpp/taskflow/matmul.cpp b/cpp/taskflow/matmul.cpp index cf371f9..86079cd 100644 --- a/cpp/taskflow/matmul.cpp +++ b/cpp/taskflow/matmul.cpp @@ -15,12 +15,14 @@ #include #include #include +#include #include static size_t thread_count = std::thread::hardware_concurrency() / 2; static const size_t iter_count = 1; +std::optional executor; -void matmul(tf::Runtime& rt, int* a, int* b, int* c, int n, int N) { +void matmul(int* a, int* b, int* c, int n, int N) { if (n <= 32) { // Base case: Use simple triple-loop multiplication for small matrices matmul_small(a, b, c, n, N); @@ -28,31 +30,24 @@ void matmul(tf::Runtime& rt, int* a, int* b, int* c, int n, int N) { // Recursive case: Divide the matrices into 4 submatrices and multiply them int k = n / 2; + tf::TaskGroup tg = executor->task_group(); // Split the execution into 2 sections to ensure output locations are not // written in parallel - rt.silent_async([=](tf::Runtime& s) { matmul(s, a, b, c, k, N); }); - rt.silent_async([=](tf::Runtime& s) { matmul(s, a, b + k, c + k, k, N); }); - rt.silent_async([=](tf::Runtime& s) { - matmul(s, a + k * N, b, c + k * N, k, N); + tg.silent_async([=]() { matmul(a, b, c, k, N); }); + tg.silent_async([=]() { matmul(a, b + k, c + k, k, N); }); + tg.silent_async([=]() { matmul(a + k * N, b, c + k * N, k, N); }); + // Compute one branch synchronously + matmul(a + k * N, b + k, c + k * N + k, k, N); + tg.corun(); + + tg.silent_async([=]() { matmul(a + k, b + k * N, c, k, N); }); + tg.silent_async([=]() { matmul(a + k, b + k * N + k, c + k, k, N); }); + tg.silent_async([=]() { + matmul(a + k * N + k, b + k * N, c + k * N, k, N); }); - rt.silent_async([=](tf::Runtime& s) { - matmul(s, a + k * N, b + k, c + k * N + k, k, N); - }); - rt.corun_all(); - - rt.silent_async([=](tf::Runtime& s) { - matmul(s, a + k, b + k * N, c, k, N); - }); - rt.silent_async([=](tf::Runtime& s) { - matmul(s, a + k, b + k * N + k, c + k, k, N); - }); - rt.silent_async([=](tf::Runtime& s) { - matmul(s, a + k * N + k, b + k * N, c + k * N, k, N); - }); - rt.silent_async([=](tf::Runtime& s) { - matmul(s, a + k * N + k, b + k * N + k, c + k * N + k, k, N); - }); - rt.corun_all(); + // Compute one branch synchronously + matmul(a + k * N + k, b + k * N + k, c + k * N + k, k, N); + tg.corun(); } } @@ -73,9 +68,7 @@ std::vector run_matmul(tf::Executor& executor, int N) { } } - tf::Taskflow taskflow; - taskflow.emplace([=](tf::Runtime& rt) { matmul(rt, a, b, c, N, N); }); - executor.run(taskflow).wait(); + executor.async([=]() { matmul(a, b, c, N, N); }).get(); return C; } @@ -117,11 +110,11 @@ int main(int argc, char* argv[]) { } int n = atoi(argv[1]); std::printf("threads: %zu\n", thread_count); - tf::Executor executor(thread_count); + executor.emplace(thread_count); - run_matmul(executor, n); // warmup + run_matmul(*executor, n); // warmup std::printf("runs:\n"); - run_one(executor, n); + run_one(*executor, n); } diff --git a/cpp/taskflow/nqueens.cpp b/cpp/taskflow/nqueens.cpp index ac3ddc8..0de1db9 100644 --- a/cpp/taskflow/nqueens.cpp +++ b/cpp/taskflow/nqueens.cpp @@ -16,10 +16,12 @@ #include #include #include +#include #include static size_t thread_count = std::thread::hardware_concurrency() / 2; static const size_t iter_count = 1; +std::optional executor; inline constexpr int nqueens_work = 14; @@ -35,8 +37,7 @@ void check_answer(int result) { } } -template -void nqueens(tf::Runtime& rt, int xMax, std::array buf, int& out) { +template void nqueens(int xMax, std::array buf, int& out) { if (N == xMax) { out = 1; return; @@ -60,15 +61,16 @@ void nqueens(tf::Runtime& rt, int xMax, std::array buf, int& out) { buf[xMax] = y; size_t idx = taskCount; ++taskCount; - return [xMax, buf, idx, &results](tf::Runtime& s) { - nqueens(s, xMax + 1, buf, results[idx]); - }; + return + [xMax, buf, idx, &results]() { nqueens(xMax + 1, buf, results[idx]); }; }); + tf::TaskGroup tg = executor->task_group(); + for (auto&& t : tasks) { - rt.silent_async(t); + tg.silent_async(t); } - rt.corun_all(); + tg.corun(); int ret = 0; for (size_t i = 0; i < taskCount; ++i) { @@ -83,14 +85,12 @@ int main(int argc, char* argv[]) { thread_count = static_cast(atoi(argv[1])); } std::printf("threads: %zu\n", thread_count); - tf::Executor executor(thread_count); - tf::Taskflow taskflow; + executor.emplace(thread_count); { std::array buf{}; int result; - taskflow.emplace([&](tf::Runtime& rt) { nqueens(rt, 0, buf, result); }); - executor.run(taskflow).wait(); + executor->async([&]() { nqueens(0, buf, result); }).get(); check_answer(result); } @@ -99,8 +99,7 @@ int main(int argc, char* argv[]) { for (size_t i = 0; i < iter_count; ++i) { std::array buf{}; int result; - taskflow.emplace([&](tf::Runtime& rt) { nqueens(rt, 0, buf, result); }); - executor.run(taskflow).wait(); + executor->async([&]() { nqueens(0, buf, result); }).get(); check_answer(result); std::printf("output: %d\n", result); } diff --git a/cpp/taskflow/skynet.cpp b/cpp/taskflow/skynet.cpp index bc0250c..418ab74 100644 --- a/cpp/taskflow/skynet.cpp +++ b/cpp/taskflow/skynet.cpp @@ -34,12 +34,13 @@ #include #include #include +#include static size_t thread_count = std::thread::hardware_concurrency() / 2; static const size_t iter_count = 1; +std::optional executor; -template -size_t skynet_one(tf::Runtime& rt, size_t BaseNum, size_t Depth) { +template size_t skynet_one(size_t BaseNum, size_t Depth) { if (Depth == DepthMax) { return BaseNum; } @@ -50,13 +51,17 @@ size_t skynet_one(tf::Runtime& rt, size_t BaseNum, size_t Depth) { std::array results; - for (size_t i = 0; i < 10; ++i) { - rt.silent_async([=, &results, idx = i](tf::Runtime& s) { + tf::TaskGroup tg = executor->task_group(); + + for (size_t i = 0; i < 9; ++i) { + tg.silent_async([=, &results, idx = i]() { results[idx] = - skynet_one(s, BaseNum + depthOffset * idx, Depth + 1); + skynet_one(BaseNum + depthOffset * idx, Depth + 1); }); } - rt.corun_all(); + // compute one branch synchronously + results[9] = skynet_one(BaseNum + depthOffset * 9, Depth + 1); + tg.corun(); size_t count = 0; for (size_t idx = 0; idx < 10; ++idx) { @@ -65,12 +70,8 @@ size_t skynet_one(tf::Runtime& rt, size_t BaseNum, size_t Depth) { return count; } template void skynet(tf::Executor& executor) { - tf::Taskflow taskflow; size_t count; - taskflow.emplace([&](tf::Runtime& rt) { - count = skynet_one(rt, 0, 0); - }); - executor.run(taskflow).wait(); + executor.async([&]() { count = skynet_one(0, 0); }).get(); if (count != 4999999950000000) { std::printf("ERROR: wrong result - %" PRIu64 "\n", count); } @@ -94,7 +95,7 @@ int main(int argc, char* argv[]) { thread_count = static_cast(atoi(argv[1])); } std::printf("threads: %zu\n", thread_count); - tf::Executor executor(thread_count); - skynet<8>(executor); - loop_skynet<8>(executor); + executor.emplace(thread_count); + skynet<8>(*executor); + loop_skynet<8>(*executor); }