diff --git a/.github/workflows/master-test-workflow.yml b/.github/workflows/master-test-workflow.yml index 1116b42..75468ca 100644 --- a/.github/workflows/master-test-workflow.yml +++ b/.github/workflows/master-test-workflow.yml @@ -42,7 +42,8 @@ jobs: - name: Build Pybind11 run: | - /usr/bin/python3 -m pip install --upgrade pip pytest + sudo apt-get update + sudo apt-get install -y python3-pytest cd extern/pybind11 mkdir -p build diff --git a/.github/workflows/pr-development-workflow.yml b/.github/workflows/pr-development-workflow.yml index 2c24a6d..38bf6ac 100644 --- a/.github/workflows/pr-development-workflow.yml +++ b/.github/workflows/pr-development-workflow.yml @@ -272,7 +272,8 @@ jobs: - name: Build Pybind11 run: | - /usr/bin/python3 -m pip install --upgrade pip pytest + sudo apt-get update + sudo apt-get install -y python3-pytest cd extern/pybind11 mkdir -p build diff --git a/examples/abcTasks/cpp/abcTasks.hpp b/examples/abcTasks/cpp/abcTasks.hpp index c77f168..799ce5b 100644 --- a/examples/abcTasks/cpp/abcTasks.hpp +++ b/examples/abcTasks/cpp/abcTasks.hpp @@ -27,9 +27,9 @@ void abcTasks(taskr::Runtime &taskr) taskr.setTaskCallbackHandler(HiCR::tasking::Task::callback_t::onTaskFinish, [&taskr](taskr::Task *task) { delete task; }); // Creating the execution units (functions that the tasks will run) - auto taskAfc = taskr::Function([](taskr::Task *task) { printf("Task A %ld\n", task->getLabel()); }); - auto taskBfc = taskr::Function([](taskr::Task *task) { printf("Task B %ld\n", task->getLabel()); }); - auto taskCfc = taskr::Function([](taskr::Task *task) { printf("Task C %ld\n", task->getLabel()); }); + auto taskAfc = taskr::Function([](taskr::Task *task) { printf("Task A %ld\n", task->getTaskId()); }); + auto taskBfc = taskr::Function([](taskr::Task *task) { printf("Task B %ld\n", task->getTaskId()); }); + auto taskCfc = taskr::Function([](taskr::Task *task) { printf("Task C %ld\n", task->getTaskId()); }); // Initializing taskr taskr.initialize(); @@ -38,7 +38,7 @@ void abcTasks(taskr::Runtime &taskr) for (size_t r = 0; r < REPETITIONS; r++) { // Calculating the base task id for this repetition - auto repetitionLabel = r * ITERATIONS * 3; + auto repetitionTaskId = r * ITERATIONS * 3; // Our connection with the previous iteration is the last task C, null in the first iteration taskr::Task *prevTaskC = nullptr; @@ -46,9 +46,9 @@ void abcTasks(taskr::Runtime &taskr) // Each run consists of several iterations of ABC for (size_t i = 0; i < ITERATIONS; i++) { - auto taskA = new taskr::Task(repetitionLabel + i * 3 + 0, &taskAfc); - auto taskB = new taskr::Task(repetitionLabel + i * 3 + 1, &taskBfc); - auto taskC = new taskr::Task(repetitionLabel + i * 3 + 2, &taskCfc); + auto taskA = new taskr::Task(repetitionTaskId + i * 3 + 0, &taskAfc); + auto taskB = new taskr::Task(repetitionTaskId + i * 3 + 1, &taskBfc); + auto taskC = new taskr::Task(repetitionTaskId + i * 3 + 2, &taskCfc); // Creating dependencies if (i > 0) taskA->addDependency(prevTaskC); diff --git a/examples/abcTasks/python/abcTasks.py b/examples/abcTasks/python/abcTasks.py index 00aa1af..854b660 100644 --- a/examples/abcTasks/python/abcTasks.py +++ b/examples/abcTasks/python/abcTasks.py @@ -6,9 +6,9 @@ def abcTasks(runtime): # Create the taskr Tasks - taskAfc = taskr.Function(lambda task : print(f"Task A {task.getLabel()}")) - taskBfc = taskr.Function(lambda task : print(f"Task B {task.getLabel()}")) - taskCfc = taskr.Function(lambda task : print(f"Task C {task.getLabel()}")) + taskAfc = taskr.Function(lambda task : print(f"Task A {task.getTaskId()}")) + taskBfc = taskr.Function(lambda task : print(f"Task B {task.getTaskId()}")) + taskCfc = taskr.Function(lambda task : print(f"Task C {task.getTaskId()}")) # Initializing taskr runtime.initialize() @@ -19,13 +19,13 @@ def abcTasks(runtime): # Creating the execution units (functions that the tasks will run) for r in range(REPETITIONS): # Calculating the base task id for this repetition - repetitionLabel = r * ITERATIONS * 3 + repetitionTaskId = r * ITERATIONS * 3 for i in range(ITERATIONS): - taskA = taskr.Task(repetitionLabel + i * 3 + 0, taskAfc) - taskB = taskr.Task(repetitionLabel + i * 3 + 1, taskBfc) - taskC = taskr.Task(repetitionLabel + i * 3 + 2, taskCfc) + taskA = taskr.Task(repetitionTaskId + i * 3 + 0, taskAfc) + taskB = taskr.Task(repetitionTaskId + i * 3 + 1, taskBfc) + taskC = taskr.Task(repetitionTaskId + i * 3 + 2, taskCfc) # Creating dependencies if i > 0: taskA.addDependency(prevTaskC) diff --git a/examples/conditionVariable/cpp/conditionVariableWaitForCondition.hpp b/examples/conditionVariable/cpp/conditionVariableWaitForCondition.hpp index ebffc8d..5e79a65 100644 --- a/examples/conditionVariable/cpp/conditionVariableWaitForCondition.hpp +++ b/examples/conditionVariable/cpp/conditionVariableWaitForCondition.hpp @@ -57,7 +57,8 @@ void conditionVariableWaitForCondition(taskr::Runtime &taskr) { printf("Thread 1: I wait (forever) for the value to turn 2\n"); mutex.lock(task); - bool wasNotified = cv.waitFor(task, mutex, [&]() { return value == 2; }, forever); + bool wasNotified = cv.waitFor( + task, mutex, [&]() { return value == 2; }, forever); mutex.unlock(task); if (wasNotified == false) { @@ -72,7 +73,8 @@ void conditionVariableWaitForCondition(taskr::Runtime &taskr) printf("Thread 1: I wait (with timeout) for the value to turn 3 (won't happen)\n"); mutex.lock(task); auto startTime = std::chrono::high_resolution_clock::now(); - bool wasNotified = cv.waitFor(task, mutex, [&]() { return value == 3; }, timeoutTimeUs); + bool wasNotified = cv.waitFor( + task, mutex, [&]() { return value == 3; }, timeoutTimeUs); auto currentTime = std::chrono::high_resolution_clock::now(); auto elapsedTime = (size_t)std::chrono::duration_cast(currentTime - startTime).count(); mutex.unlock(task); @@ -99,7 +101,8 @@ void conditionVariableWaitForCondition(taskr::Runtime &taskr) // Waiting for the other thread to set the first value printf("Thread 2: First, I'll wait for the value to become 1\n"); mutex.lock(task); - bool wasNotified = cv.waitFor(task, mutex, [&]() { return value == 1; }, forever); + bool wasNotified = cv.waitFor( + task, mutex, [&]() { return value == 1; }, forever); mutex.unlock(task); if (wasNotified == false) { diff --git a/examples/jacobi3d/meson.build b/examples/jacobi3d/meson.build index 276796d..f9e4f3e 100644 --- a/examples/jacobi3d/meson.build +++ b/examples/jacobi3d/meson.build @@ -1,17 +1,64 @@ testSuite = [ 'examples', 'jacobi3d' ] -if 'boost' in get_option('executionStateType') and 'pthreads' in get_option('processingUnitType') - threading = executable('threading', [ 'source/pthreads.cpp', 'source/grid.cpp' ], dependencies: [ TaskRBuildDep ], cpp_args: [ TaskRDistributedCppFlag ] ) +if distributedEngine == 'mpi' + TaskRDistributedCppFlag = '-D_TASKR_DISTRIBUTED_ENGINE_MPI' - if get_option('buildTests') - test('threading', threading, args : [ '-n', '64', '-i', '10' ], suite: testSuite, workdir: threading.path() + '.p' ) + mpirunExecutable = HiCRProject.get_variable('mpirunExecutable') + + if 'boost' in get_option('executionStateType') and 'pthreads' in get_option('processingUnitType') + threading = executable('threading', [ 'source/pthreads.cpp', 'source/grid.cpp' ], dependencies: [ TaskRBuildDep ], cpp_args: [ TaskRDistributedCppFlag ] ) + + if get_option('buildTests') + test('threading', mpirunExecutable, args : [ '-n', '2', '--oversubscribe', threading.full_path(), '-px', '1', '-py', '1', '-pz', '2', '-lx', '1', '-ly', '2', '-lz', '2', '-n', '64', '-i', '10'], suite: testSuite, workdir: threading.path() + '.p' ) + endif + endif + + # TODO: deadlocking (even for mpirun -n 1) + if 'nosv' in get_option('executionStateType') and 'nosv' in get_option('processingUnitType') + nosv = executable('nosv', [ 'source/nosv.cpp', 'source/grid.cpp' ], dependencies: [ TaskRBuildDep ], cpp_args: [ TaskRDistributedCppFlag ] ) + + if get_option('buildTests') + test('nosv', mpirunExecutable, args : [ '-n', '2', '--oversubscribe', nosv.full_path(), '-px', '1', '-py', '1', '-pz', '2', '-lx', '1', '-ly', '2', '-lz', '2', '-n', '64', '-i', '10'], is_parallel : false, suite: testSuite, workdir: nosv.path() + '.p' ) + endif + endif + +elif distributedEngine == 'lpf' + TaskRDistributedCppFlag = '-D_TASKR_DISTRIBUTED_ENGINE_LPF' + + lpfrunExecutable = HiCRProject.get_variable('lpfrunExecutable') + + if 'boost' in get_option('executionStateType') and 'pthreads' in get_option('processingUnitType') + threading = executable('threading', [ 'source/pthreads.cpp', 'source/grid.cpp' ], dependencies: [ TaskRBuildDep ], cpp_args: [ TaskRDistributedCppFlag ] ) + + if get_option('buildTests') + test('threading', lpfrunExecutable, args : [ '-n', '2', '-engine', 'zero', threading.full_path(), '-px', '1', '-py', '1', '-pz', '2', '-lx', '1', '-ly', '2', '-lz', '2', '-n', '64', '-i', '10'], suite: testSuite, workdir: threading.path() + '.p' ) + endif + endif + + + if 'nosv' in get_option('executionStateType') and 'nosv' in get_option('processingUnitType') + nosv = executable('nosv', [ 'source/nosv.cpp', 'source/grid.cpp' ], dependencies: [ TaskRBuildDep ], cpp_args: [ TaskRDistributedCppFlag ] ) + + if get_option('buildTests') + test('nosv', lpfrunExecutable, args : [ '-n', '2', '-engine', 'zero', nosv.full_path(), '-px', '1', '-py', '1', '-pz', '2', '-lx', '1', '-ly', '2', '-lz', '2', '-n', '64', '-i', '10'], is_parallel : false, suite: testSuite, workdir: nosv.path() + '.p' ) + endif + endif +elif distributedEngine == 'none' # Atm these are segfaulting (TODO fix!) + TaskRDistributedCppFlag = '-D_TASKR_DISTRIBUTED_ENGINE_NONE' + + if 'boost' in get_option('executionStateType') and 'pthreads' in get_option('processingUnitType') + threading = executable('threading', [ 'source/pthreads.cpp', 'source/grid.cpp' ], dependencies: [ TaskRBuildDep ], cpp_args: [ TaskRDistributedCppFlag ] ) + + if get_option('buildTests') + test('threading', threading, args : [ '-n', '64', '-i', '10' ], suite: testSuite, workdir: threading.path() + '.p' ) + endif endif -endif -if 'nosv' in get_option('executionStateType') and 'nosv' in get_option('processingUnitType') - nosv = executable('nosv', [ 'source/nosv.cpp', 'source/grid.cpp' ], dependencies: [ TaskRBuildDep ], cpp_args: [ TaskRDistributedCppFlag ] ) + if 'nosv' in get_option('executionStateType') and 'nosv' in get_option('processingUnitType') + nosv = executable('nosv', [ 'source/nosv.cpp', 'source/grid.cpp' ], dependencies: [ TaskRBuildDep ], cpp_args: [ TaskRDistributedCppFlag ] ) - if get_option('buildTests') - test('nosv', nosv, args : [ '-n', '64', '-i', '10' ], is_parallel : false, suite: testSuite, workdir: nosv.path() + '.p' ) + if get_option('buildTests') + test('nosv', nosv, args : [ '-n', '64', '-i', '10' ], is_parallel : false, suite: testSuite, workdir: nosv.path() + '.p' ) + endif endif endif \ No newline at end of file diff --git a/examples/jacobi3d/mpi/Makefile b/examples/jacobi3d/mpi/Makefile index d1433f1..97ca9a6 100755 --- a/examples/jacobi3d/mpi/Makefile +++ b/examples/jacobi3d/mpi/Makefile @@ -1,20 +1,17 @@ -CFLAGS = -O3 -g -Wfatal-errors -LIBS = -MPICXX = mpicxx +CXX = g++ +CXXFLAGS = -O3 -g -Wfatal-errors +INCLUDES = -I/usr/local/include +LIBS = -L/usr/local/lib -lmpi -.SECONDARY: +.SECONDARY: BINARIES = jacobi -.PHONY: all stage +.PHONY: all clean + all: $(BINARIES) jacobi: jacobi.cpp grid.cpp - $(MPICXX) $(CFLAGS) $(LIBS) -o $@ $^ + $(CXX) $(CXXFLAGS) $(INCLUDES) $^ -o $@ $(LIBS) -.PHONY: clean clean: - $(RM) $(BINARIES) *.o - - - - + $(RM) $(BINARIES) *.o diff --git a/examples/jacobi3d/source/grid.hpp b/examples/jacobi3d/source/grid.hpp index c0e80bd..53edfc2 100644 --- a/examples/jacobi3d/source/grid.hpp +++ b/examples/jacobi3d/source/grid.hpp @@ -14,6 +14,8 @@ * limitations under the License. */ +#pragma once + #include #include #include @@ -28,8 +30,8 @@ #include #define CHANNEL_DEPTH 10 -const int BLOCKZ = 96; -const int BLOCKY = 64; +constexpr int BLOCKZ = 96; +constexpr int BLOCKY = 64; extern std::unordered_map taskid_hashmap; diff --git a/examples/jacobi3d/source/jacobi3d.hpp b/examples/jacobi3d/source/jacobi3d.hpp new file mode 100644 index 0000000..4cc5f7a --- /dev/null +++ b/examples/jacobi3d/source/jacobi3d.hpp @@ -0,0 +1,205 @@ +/* + * Copyright 2025 Huawei Technologies Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include "grid.hpp" +#include "task.hpp" + +void jacobi3d(HiCR::InstanceManager *instanceManager, + taskr::Runtime &taskr, + Grid *g, + size_t gDepth = 1, + size_t N = 128, + ssize_t nIters = 100, + D3 pt = D3({.x = 1, .y = 1, .z = 1}), + D3 lt = D3({.x = 1, .y = 1, .z = 1})) +{ + // Getting distributed instance information + const auto instanceCount = instanceManager->getInstances().size(); + const auto myInstanceId = instanceManager->getCurrentInstance()->getId(); + const auto rootInstanceId = instanceManager->getRootInstanceId(); + const auto isRootInstance = myInstanceId == rootInstanceId; + + // Initializing the Grid + bool success = g->initialize(); + if (success == false) instanceManager->abort(-1); + + // Creating grid processing functions + g->resetFc = std::make_unique([&g](taskr::Task *task) { g->reset(task, ((Task *)task)->i, ((Task *)task)->j, ((Task *)task)->k); }); + g->computeFc = + std::make_unique([&g](taskr::Task *task) { g->compute(task, ((Task *)task)->i, ((Task *)task)->j, ((Task *)task)->k, ((Task *)task)->iteration); }); + g->receiveFc = + std::make_unique([&g](taskr::Task *task) { g->receive(task, ((Task *)task)->i, ((Task *)task)->j, ((Task *)task)->k, ((Task *)task)->iteration); }); + g->unpackFc = std::make_unique([&g](taskr::Task *task) { g->unpack(task, ((Task *)task)->i, ((Task *)task)->j, ((Task *)task)->k, ((Task *)task)->iteration); }); + g->packFc = std::make_unique([&g](taskr::Task *task) { g->pack(task, ((Task *)task)->i, ((Task *)task)->j, ((Task *)task)->k, ((Task *)task)->iteration); }); + g->sendFc = std::make_unique([&g](taskr::Task *task) { g->send(task, ((Task *)task)->i, ((Task *)task)->j, ((Task *)task)->k, ((Task *)task)->iteration); }); + g->localResidualFc = std::make_unique( + [&g](taskr::Task *task) { g->calculateLocalResidual(task, ((Task *)task)->i, ((Task *)task)->j, ((Task *)task)->k, ((Task *)task)->iteration); }); + + // Task map + std::map> _taskMap; + + printf("Instance %lu: Executing...\n", myInstanceId); + + // Creating tasks to reset the grid + for (ssize_t i = 0; i < lt.x; i++) + for (ssize_t j = 0; j < lt.y; j++) + for (ssize_t k = 0; k < lt.z; k++) + { + auto resetTask = new Task("Reset", i, j, k, 0, g->resetFc.get()); + taskr.addTask(resetTask); + } + + // Initializing TaskR + taskr.initialize(); + + // Running Taskr initially + taskr.run(); + + // Waiting for Taskr to finish + taskr.await(); + + // Creating and adding tasks (graph nodes) + for (ssize_t it = 0; it < nIters; it++) + for (ssize_t i = 0; i < lt.x; i++) + for (ssize_t j = 0; j < lt.y; j++) + for (ssize_t k = 0; k < lt.z; k++) + { + auto localId = g->localSubGridMapping[k][j][i]; + auto &subGrid = g->subgrids[localId]; + + // create new specific tasks + auto computeTask = std::make_shared("Compute", i, j, k, it, g->computeFc.get()); + auto packTask = std::make_shared("Pack", i, j, k, it, g->packFc.get()); + auto sendTask = std::make_shared("Send", i, j, k, it, g->sendFc.get()); + auto recvTask = std::make_shared("Receive", i, j, k, it, g->receiveFc.get()); + auto unpackTask = std::make_shared("Unpack", i, j, k, it, g->unpackFc.get()); + + _taskMap[Task::encodeTaskName("Compute", i, j, k, it)] = computeTask; + _taskMap[Task::encodeTaskName("Pack", i, j, k, it)] = packTask; + _taskMap[Task::encodeTaskName("Send", i, j, k, it)] = sendTask; + _taskMap[Task::encodeTaskName("Receive", i, j, k, it)] = recvTask; + _taskMap[Task::encodeTaskName("Unpack", i, j, k, it)] = unpackTask; + + // Creating and adding local compute task dependencies + if (it > 0) + if (subGrid.X0.type == LOCAL) computeTask->addDependency(_taskMap[Task::encodeTaskName("Compute", i - 1, j + 0, k + 0, it - 1)].get()); + if (it > 0) + if (subGrid.X1.type == LOCAL) computeTask->addDependency(_taskMap[Task::encodeTaskName("Compute", i + 1, j + 0, k + 0, it - 1)].get()); + if (it > 0) + if (subGrid.Y0.type == LOCAL) computeTask->addDependency(_taskMap[Task::encodeTaskName("Compute", i + 0, j - 1, k + 0, it - 1)].get()); + if (it > 0) + if (subGrid.Y1.type == LOCAL) computeTask->addDependency(_taskMap[Task::encodeTaskName("Compute", i + 0, j + 1, k + 0, it - 1)].get()); + if (it > 0) + if (subGrid.Z0.type == LOCAL) computeTask->addDependency(_taskMap[Task::encodeTaskName("Compute", i + 0, j + 0, k - 1, it - 1)].get()); + if (it > 0) + if (subGrid.Z1.type == LOCAL) computeTask->addDependency(_taskMap[Task::encodeTaskName("Compute", i + 0, j + 0, k + 1, it - 1)].get()); + if (it > 0) computeTask->addDependency(_taskMap[Task::encodeTaskName("Compute", i + 0, j + 0, k + 0, it - 1)].get()); + + // Adding communication-related dependencies + if (it > 0) computeTask->addDependency(_taskMap[Task::encodeTaskName("Pack", i, j, k, it - 1)].get()); + if (it > 0) computeTask->addDependency(_taskMap[Task::encodeTaskName("Unpack", i, j, k, it - 1)].get()); + + // Creating and adding receive task dependencies, from iteration 1 onwards + if (it > 0) recvTask->addDependency(_taskMap[Task::encodeTaskName("Unpack", i, j, k, it - 1)].get()); + + // Creating and adding unpack task dependencies + unpackTask->addDependency(_taskMap[Task::encodeTaskName("Receive", i, j, k, it)].get()); + unpackTask->addDependency(_taskMap[Task::encodeTaskName("Compute", i, j, k, it)].get()); + + // Creating and adding send task dependencies, from iteration 1 onwards + packTask->addDependency(_taskMap[Task::encodeTaskName("Compute", i, j, k, it)].get()); + if (it > 0) packTask->addDependency(_taskMap[Task::encodeTaskName("Send", i, j, k, it - 1)].get()); + + // Creating and adding send task dependencies, from iteration 1 onwards + sendTask->addDependency(_taskMap[Task::encodeTaskName("Pack", i, j, k, it)].get()); + + // Adding tasks to taskr + taskr.addTask(computeTask.get()); + if (it < nIters - 1) taskr.addTask(packTask.get()); + if (it < nIters - 1) taskr.addTask(sendTask.get()); + if (it < nIters - 1) taskr.addTask(recvTask.get()); + if (it < nIters - 1) taskr.addTask(unpackTask.get()); + } + + // Setting start time as now + auto t0 = std::chrono::high_resolution_clock::now(); + + // Running Taskr + taskr.run(); + + // Waiting for Taskr to finish + taskr.await(); + + ////// Calculating residual + + // Reset local residual to zero + g->resetResidual(); + + // Calculating local residual + for (ssize_t i = 0; i < lt.x; i++) + for (ssize_t j = 0; j < lt.y; j++) + for (ssize_t k = 0; k < lt.z; k++) + { + auto residualTask = new Task("Residual", i, j, k, nIters, g->localResidualFc.get()); + taskr.addTask(residualTask); + } + + // Running Taskr + taskr.run(); + + // Waiting for Taskr to finish + taskr.await(); + + // Finalizing TaskR + taskr.finalize(); + + // If i'm not the root instance, simply send my locally calculated residual + if (isRootInstance == false) + { + *(double *)g->residualSendBuffer->getPointer() = g->_residual; + g->residualProducerChannel->push(g->residualSendBuffer, 1); + } + else + { + // Otherwise gather all the residuals and print the results + double globalRes = g->_residual; + + for (size_t i = 0; i < instanceCount - 1; i++) + { + while (g->residualConsumerChannel->isEmpty()) + ; + double *residualPtr = (double *)g->residualConsumerChannel->getTokenBuffer()->getSourceLocalMemorySlot()->getPointer() + g->residualConsumerChannel->peek(0); + g->residualConsumerChannel->pop(); + globalRes += *residualPtr; + } + + // Setting final time now + auto tf = std::chrono::high_resolution_clock::now(); + std::chrono::duration dt = tf - t0; + float execTime = dt.count(); + + double residual = sqrt(globalRes / ((double)(N - 1) * (double)(N - 1) * (double)(N - 1))); + double gflops = nIters * (double)N * (double)N * (double)N * (2 + gDepth * 8) / (1.0e9); + printf("%.4fs, %.3f GFlop/s (L2 Norm: %.10g)\n", execTime, gflops / execTime, residual); + } + + // Finalizing grid + g->finalize(); +} \ No newline at end of file diff --git a/examples/jacobi3d/source/nosv.cpp b/examples/jacobi3d/source/nosv.cpp index c70d260..63ddb28 100644 --- a/examples/jacobi3d/source/nosv.cpp +++ b/examples/jacobi3d/source/nosv.cpp @@ -17,15 +17,23 @@ #include #include #include +#include +#include +#include +#include #include #include #include -#include -#include -#include -#include +#ifdef _TASKR_DISTRIBUTED_ENGINE_LPF + #include + #include + #include + #include + #include + #include +#endif #ifdef _TASKR_DISTRIBUTED_ENGINE_MPI #include @@ -41,10 +49,16 @@ #include "grid.hpp" #include "task.hpp" +#include "jacobi3d.hpp" -std::unordered_map taskIdHashmap; +// Setting default values (globali) +size_t gDepth = 1; +size_t N = 128; +ssize_t nIters = 100; +D3 pt = D3({.x = 1, .y = 1, .z = 1}); +D3 lt = D3({.x = 1, .y = 1, .z = 1}); -int main(int argc, char *argv[]) +void jacobiDriver(HiCR::InstanceManager *instanceManager, HiCR::CommunicationManager *communicationManager, HiCR::MemoryManager *memoryManager) { // Initialize nosv check(nosv_init()); @@ -55,33 +69,6 @@ int main(int argc, char *argv[]) // Attaching the main thread check(nosv_attach(&mainTask, NULL, NULL, NOSV_ATTACH_NONE)); - //// Instantiating distributed execution machinery - - // Storage for the distributed engine's communication manager - std::unique_ptr communicationManager; - - // Storage for the distributed engine's instance manager - std::unique_ptr instanceManager; - - // Storage for the distributed engine's memory manager - std::unique_ptr memoryManager; - -#ifdef _TASKR_DISTRIBUTED_ENGINE_LPF - #error "LPF backend not supported yet" -#endif - -#ifdef _TASKR_DISTRIBUTED_ENGINE_MPI - instanceManager = HiCR::backend::mpi::InstanceManager::createDefault(&argc, &argv); - communicationManager = std::make_unique(); - memoryManager = std::make_unique(); -#endif - -#ifdef _TASKR_DISTRIBUTED_ENGINE_NONE - instanceManager = std::make_unique(); - communicationManager = std::make_unique(); - memoryManager = HiCR::backend::hwloc::MemoryManager::createDefault(); -#endif - // Creating (local host) topology manager const auto topologyManager = HiCR::backend::hwloc::TopologyManager::createDefault(); @@ -93,9 +80,6 @@ int main(int argc, char *argv[]) //// Setting up Taskr - // Initializing nosv-based compute manager to run tasks in parallel - HiCR::backend::nosv::ComputeManager computeManager; - // Creating HWloc topology object hwloc_topology_t topology; @@ -114,44 +98,75 @@ int main(int argc, char *argv[]) printf("NUMA Domains per Node: %lu\n", numaDomains.size()); // Assuming one process per numa domain - size_t numaDomainId = myInstanceId % numaDomains.size(); - auto numaDomain = numaDomains[numaDomainId]; + // Looking for Domains that are not zero (Slurm non --exclusive issue) + size_t numaDomainId; + for (size_t i = 0; i < numaDomains.size(); ++i) + { + numaDomainId = (myInstanceId + i) % numaDomains.size(); + if (numaDomains[numaDomainId]->getComputeResourceList().size() > 0) { break; } + } + + auto numaDomain = numaDomains[numaDomainId]; printf("Instance %lu - Using NUMA domain: %lu\n", myInstanceId, numaDomainId); // Updating the compute resource list auto computeResources = numaDomain->getComputeResourceList(); - printf("PUs Per NUMA Domain: %lu\n", computeResources.size()); + + // Compute resources to use + HiCR::Device::computeResourceList_t cr; + int size; + MPI_Comm_size(MPI_COMM_WORLD, &size); + + // for(size_t i = 0; i < (size_t)(lt.x * lt.y * lt.z); i++) + // { + // cr.push_back(computeResources[(2+i)%(computeResources.size())]); //21,43,65,87 are broken for nOS-V + // } + + // cr.push_back(numaDomains[0]->getComputeResourceList()[0]); + + for (int i = 0; i < size; ++i) + { + if (myInstanceId == (size_t)i) + { + auto itr = computeResources.begin(); + for (size_t i = 0; i < computeResources.size(); i++) + { + // Getting up-casted pointer for the processing unit + auto c = dynamic_pointer_cast(*itr); + + // Checking whether the execution unit passed is compatible with this backend + if (c == nullptr) HICR_THROW_LOGIC("The passed compute resource is not supported by this processing unit type\n"); + + // Getting the logical processor ID of the compute resource + auto pid = c->getProcessorId(); + + if (pid != 21 && pid != 43 && pid != 65 && pid != 87) + { + printf("%u ", pid); + fflush(stdout); + + cr.push_back(*itr); + } + itr++; + } + printf("]\n"); + fflush(stdout); + } + MPI_Barrier(MPI_COMM_WORLD); + } + // printf("PUs Per NUMA Domain: %lu\n", computeResources.size()); + + // Initializing nosv-based compute manager to run tasks in parallel + HiCR::backend::nosv::ComputeManager computeManager; // Creating taskr object nlohmann::json taskrConfig; taskrConfig["Remember Finished Objects"] = true; - auto taskr = taskr::Runtime(&computeManager, &computeManager, computeResources, taskrConfig); + taskr::Runtime taskr(&computeManager, &computeManager, cr, taskrConfig); // Allowing tasks to immediately resume upon suspension -- they won't execute until their pending operation is finished taskr.setTaskCallbackHandler(HiCR::tasking::Task::callback_t::onTaskSuspend, [&taskr](taskr::Task *task) { taskr.resumeTask(task); }); - //// Setting up application configuration - - // Setting default values - size_t gDepth = 1; - size_t N = 128; - ssize_t nIters = 100; - D3 pt = D3({.x = 1, .y = 1, .z = 1}); - D3 lt = D3({.x = 1, .y = 1, .z = 1}); - - // Parsing user inputs - for (int i = 0; i < argc; i++) - { - if (!strcmp(argv[i], "-px")) pt.x = atoi(argv[++i]); - if (!strcmp(argv[i], "-py")) pt.y = atoi(argv[++i]); - if (!strcmp(argv[i], "-pz")) pt.z = atoi(argv[++i]); - if (!strcmp(argv[i], "-lx")) lt.x = atoi(argv[++i]); - if (!strcmp(argv[i], "-ly")) lt.y = atoi(argv[++i]); - if (!strcmp(argv[i], "-lz")) lt.z = atoi(argv[++i]); - if (!strcmp(argv[i], "-n")) N = atoi(argv[++i]); - if (!strcmp(argv[i], "-i")) nIters = atoi(argv[++i]); - } - if ((size_t)(pt.x * pt.y * pt.z) != instanceCount) { if (isRootInstance) printf("[Error] The specified px/py/pz geometry does not match the number of instances (-n %lu).\n", instanceCount); @@ -159,181 +174,139 @@ int main(int argc, char *argv[]) } // Creating and initializing Grid - auto g = new Grid(myInstanceId, N, nIters, gDepth, pt, lt, &taskr, memoryManager.get(), topologyManager.get(), communicationManager.get()); - bool success = g->initialize(); - if (success == false) instanceManager->abort(-1); - - // Creating grid processing functions - g->resetFc = std::make_unique([&g](taskr::Task *task) { g->reset(task, ((Task *)task)->i, ((Task *)task)->j, ((Task *)task)->k); }); - g->computeFc = - std::make_unique([&g](taskr::Task *task) { g->compute(task, ((Task *)task)->i, ((Task *)task)->j, ((Task *)task)->k, ((Task *)task)->iteration); }); - g->receiveFc = - std::make_unique([&g](taskr::Task *task) { g->receive(task, ((Task *)task)->i, ((Task *)task)->j, ((Task *)task)->k, ((Task *)task)->iteration); }); - g->unpackFc = std::make_unique([&g](taskr::Task *task) { g->unpack(task, ((Task *)task)->i, ((Task *)task)->j, ((Task *)task)->k, ((Task *)task)->iteration); }); - g->packFc = std::make_unique([&g](taskr::Task *task) { g->pack(task, ((Task *)task)->i, ((Task *)task)->j, ((Task *)task)->k, ((Task *)task)->iteration); }); - g->sendFc = std::make_unique([&g](taskr::Task *task) { g->send(task, ((Task *)task)->i, ((Task *)task)->j, ((Task *)task)->k, ((Task *)task)->iteration); }); - g->localResidualFc = std::make_unique( - [&g](taskr::Task *task) { g->calculateLocalResidual(task, ((Task *)task)->i, ((Task *)task)->j, ((Task *)task)->k, ((Task *)task)->iteration); }); - - // Task map - std::map> _taskMap; - - // printf("Instance %lu: Executing...\n", myInstanceId); - - // Creating tasks to reset the grid - for (ssize_t i = 0; i < lt.x; i++) - for (ssize_t j = 0; j < lt.y; j++) - for (ssize_t k = 0; k < lt.z; k++) - { - auto resetTask = new Task("Reset", i, j, k, 0, g->resetFc.get()); - taskr.addTask(resetTask); - } + auto g = std::make_unique(myInstanceId, N, nIters, gDepth, pt, lt, &taskr, memoryManager, topologyManager.get(), communicationManager); + + // running the Jacobi3D example + jacobi3d(instanceManager, taskr, g.get(), gDepth, N, nIters, pt, lt); + + // Finalizing instances + instanceManager->finalize(); - // Initializing TaskR - taskr.initialize(); + // Detaching the main thread + check(nosv_detach(NOSV_DETACH_NONE)); - // Running Taskr initially - taskr.run(); + // Shutdown nosv + // check(nosv_shutdown()); +} - // Waiting for Taskr to finish - taskr.await(); +#ifdef _TASKR_DISTRIBUTED_ENGINE_LPF - // Creating and adding tasks (graph nodes) - for (ssize_t it = 0; it < nIters; it++) - for (ssize_t i = 0; i < lt.x; i++) - for (ssize_t j = 0; j < lt.y; j++) - for (ssize_t k = 0; k < lt.z; k++) - { - auto localId = g->localSubGridMapping[k][j][i]; - auto &subGrid = g->subgrids[localId]; - - // create new specific tasks - auto computeTask = std::make_shared("Compute", i, j, k, it, g->computeFc.get()); - auto packTask = std::make_shared("Pack", i, j, k, it, g->packFc.get()); - auto sendTask = std::make_shared("Send", i, j, k, it, g->sendFc.get()); - auto recvTask = std::make_shared("Receive", i, j, k, it, g->receiveFc.get()); - auto unpackTask = std::make_shared("Unpack", i, j, k, it, g->unpackFc.get()); - - _taskMap[Task::encodeTaskName("Compute", i, j, k, it)] = computeTask; - _taskMap[Task::encodeTaskName("Pack", i, j, k, it)] = packTask; - _taskMap[Task::encodeTaskName("Send", i, j, k, it)] = sendTask; - _taskMap[Task::encodeTaskName("Receive", i, j, k, it)] = recvTask; - _taskMap[Task::encodeTaskName("Unpack", i, j, k, it)] = unpackTask; - - // Creating and adding local compute task dependencies - if (it > 0) - if (subGrid.X0.type == LOCAL) computeTask->addDependency(_taskMap[Task::encodeTaskName("Compute", i - 1, j + 0, k + 0, it - 1)].get()); - if (it > 0) - if (subGrid.X1.type == LOCAL) computeTask->addDependency(_taskMap[Task::encodeTaskName("Compute", i + 1, j + 0, k + 0, it - 1)].get()); - if (it > 0) - if (subGrid.Y0.type == LOCAL) computeTask->addDependency(_taskMap[Task::encodeTaskName("Compute", i + 0, j - 1, k + 0, it - 1)].get()); - if (it > 0) - if (subGrid.Y1.type == LOCAL) computeTask->addDependency(_taskMap[Task::encodeTaskName("Compute", i + 0, j + 1, k + 0, it - 1)].get()); - if (it > 0) - if (subGrid.Z0.type == LOCAL) computeTask->addDependency(_taskMap[Task::encodeTaskName("Compute", i + 0, j + 0, k - 1, it - 1)].get()); - if (it > 0) - if (subGrid.Z1.type == LOCAL) computeTask->addDependency(_taskMap[Task::encodeTaskName("Compute", i + 0, j + 0, k + 1, it - 1)].get()); - if (it > 0) computeTask->addDependency(_taskMap[Task::encodeTaskName("Compute", i + 0, j + 0, k + 0, it - 1)].get()); - - // Adding communication-related dependencies - if (it > 0) computeTask->addDependency(_taskMap[Task::encodeTaskName("Pack", i, j, k, it - 1)].get()); - if (it > 0) computeTask->addDependency(_taskMap[Task::encodeTaskName("Unpack", i, j, k, it - 1)].get()); - - // Creating and adding receive task dependencies, from iteration 1 onwards - if (it > 0) recvTask->addDependency(_taskMap[Task::encodeTaskName("Unpack", i, j, k, it - 1)].get()); - - // Creating and adding unpack task dependencies - unpackTask->addDependency(_taskMap[Task::encodeTaskName("Receive", i, j, k, it)].get()); - unpackTask->addDependency(_taskMap[Task::encodeTaskName("Compute", i, j, k, it)].get()); - - // Creating and adding send task dependencies, from iteration 1 onwards - packTask->addDependency(_taskMap[Task::encodeTaskName("Compute", i, j, k, it)].get()); - if (it > 0) packTask->addDependency(_taskMap[Task::encodeTaskName("Send", i, j, k, it - 1)].get()); - - // Creating and adding send task dependencies, from iteration 1 onwards - sendTask->addDependency(_taskMap[Task::encodeTaskName("Pack", i, j, k, it)].get()); - - // Adding tasks to taskr - taskr.addTask(computeTask.get()); - if (it < nIters - 1) taskr.addTask(packTask.get()); - if (it < nIters - 1) taskr.addTask(sendTask.get()); - if (it < nIters - 1) taskr.addTask(recvTask.get()); - if (it < nIters - 1) taskr.addTask(unpackTask.get()); - } +// flag needed when using MPI to launch +const int LPF_MPI_AUTO_INITIALIZE = 0; + + /** + * #DEFAULT_MEMSLOTS The memory slots used by LPF + * in lpf_resize_memory_register . This value is currently + * guessed as sufficiently large for a program + */ + #define DEFAULT_MEMSLOTS 10000 - // Setting start time as now - auto t0 = std::chrono::high_resolution_clock::now(); + /** + * #DEFAULT_MSGSLOTS The message slots used by LPF + * in lpf_resize_message_queue . This value is currently + * guessed as sufficiently large for a program + */ + #define DEFAULT_MSGSLOTS 10000 - // Running Taskr - taskr.run(); +// Global pointer to the +HiCR::InstanceManager *instanceManager; - // Waiting for Taskr to finish - taskr.await(); +void spmd(lpf_t lpf, lpf_pid_t pid, lpf_pid_t nprocs, lpf_args_t args) +{ + // Initializing LPF + CHECK(lpf_resize_message_queue(lpf, DEFAULT_MSGSLOTS)); + CHECK(lpf_resize_memory_register(lpf, DEFAULT_MEMSLOTS)); + CHECK(lpf_sync(lpf, LPF_SYNC_DEFAULT)); - ////// Calculating residual + // Creating HWloc topology object + hwloc_topology_t topology; - // Reset local residual to zero - g->resetResidual(); + // Reserving memory for hwloc + hwloc_topology_init(&topology); - // Calculating local residual - for (ssize_t i = 0; i < lt.x; i++) - for (ssize_t j = 0; j < lt.y; j++) - for (ssize_t k = 0; k < lt.z; k++) - { - auto residualTask = new Task("Residual", i, j, k, nIters, g->localResidualFc.get()); - taskr.addTask(residualTask); - } + // Initializing host (CPU) topology manager + HiCR::backend::hwloc::TopologyManager tm(&topology); - // Running Taskr - taskr.run(); + // Creating memory and communication managers + std::unique_ptr communicationManager = std::make_unique(nprocs, pid, lpf); + std::unique_ptr memoryManager = std::make_unique(lpf); - // Waiting for Taskr to finish - taskr.await(); + // Running the remote memcpy example + jacobiDriver(instanceManager, communicationManager.get(), memoryManager.get()); +} +#endif - // Finalizing TaskR - taskr.finalize(); +int main(int argc, char *argv[]) +{ + //// Instantiating distributed execution machinery - // If i'm not the root instance, simply send my locally calculated reisdual - if (isRootInstance == false) +#ifdef _TASKR_DISTRIBUTED_ENGINE_LPF + // Initializing instance manager + auto im = HiCR::backend::mpi::InstanceManager::createDefault(&argc, &argv); + instanceManager = im.get(); + + // Parsing user inputs + for (int i = 0; i < argc; i++) { - *(double *)g->residualSendBuffer->getPointer() = g->_residual; - g->residualProducerChannel->push(g->residualSendBuffer, 1); + if (!strcmp(argv[i], "-px")) pt.x = atoi(argv[++i]); + if (!strcmp(argv[i], "-py")) pt.y = atoi(argv[++i]); + if (!strcmp(argv[i], "-pz")) pt.z = atoi(argv[++i]); + if (!strcmp(argv[i], "-lx")) lt.x = atoi(argv[++i]); + if (!strcmp(argv[i], "-ly")) lt.y = atoi(argv[++i]); + if (!strcmp(argv[i], "-lz")) lt.z = atoi(argv[++i]); + if (!strcmp(argv[i], "-n")) N = atoi(argv[++i]); + if (!strcmp(argv[i], "-i")) nIters = atoi(argv[++i]); } - else - { - // Otherwise gather all the residuals and print the results - double globalRes = g->_residual; - for (size_t i = 0; i < instanceCount - 1; i++) - { - while (g->residualConsumerChannel->isEmpty()); - double *residualPtr = (double *)g->residualConsumerChannel->getTokenBuffer()->getSourceLocalMemorySlot()->getPointer() + g->residualConsumerChannel->peek(0); - g->residualConsumerChannel->pop(); - globalRes += *residualPtr; - } + lpf_init_t init; + lpf_args_t args; - // Setting final time now - auto tf = std::chrono::high_resolution_clock::now(); - std::chrono::duration dt = tf - t0; - float execTime = dt.count(); + CHECK(lpf_mpi_initialize_with_mpicomm(MPI_COMM_WORLD, &init)); + CHECK(lpf_hook(init, &spmd, args)); + CHECK(lpf_mpi_finalize(init)); +#endif - double residual = sqrt(globalRes / ((double)(N - 1) * (double)(N - 1) * (double)(N - 1))); - double gflops = nIters * (double)N * (double)N * (double)N * (2 + gDepth * 8) / (1.0e9); - printf("%.4fs, %.3f GFlop/s (L2 Norm: %.10g)\n", execTime, gflops / execTime, residual); +#ifdef _TASKR_DISTRIBUTED_ENGINE_MPI + std::unique_ptr instanceManager = HiCR::backend::mpi::InstanceManager::createDefault(&argc, &argv); + std::unique_ptr communicationManager = std::make_unique(); + std::unique_ptr memoryManager = std::make_unique(); + + // Parsing user inputs + for (int i = 0; i < argc; i++) + { + if (!strcmp(argv[i], "-px")) pt.x = atoi(argv[++i]); + if (!strcmp(argv[i], "-py")) pt.y = atoi(argv[++i]); + if (!strcmp(argv[i], "-pz")) pt.z = atoi(argv[++i]); + if (!strcmp(argv[i], "-lx")) lt.x = atoi(argv[++i]); + if (!strcmp(argv[i], "-ly")) lt.y = atoi(argv[++i]); + if (!strcmp(argv[i], "-lz")) lt.z = atoi(argv[++i]); + if (!strcmp(argv[i], "-n")) N = atoi(argv[++i]); + if (!strcmp(argv[i], "-i")) nIters = atoi(argv[++i]); } - // Finalizing grid - g->finalize(); + // Running the remote memcpy example + jacobiDriver(instanceManager.get(), communicationManager.get(), memoryManager.get()); +#endif - // Freeing grid - delete g; +#ifdef _TASKR_DISTRIBUTED_ENGINE_NONE // This one segfaults (check why) + std::unique_ptr instanceManager = std::make_unique(); + std::unique_ptr communicationManager = std::make_unique(); + std::unique_ptr memoryManager = HiCR::backend::hwloc::MemoryManager::createDefault(); - // Finalizing instances - instanceManager->finalize(); - - // Detaching the main thread - check(nosv_detach(NOSV_DETACH_NONE)); + // Parsing user inputs + for (int i = 0; i < argc; i++) + { + if (!strcmp(argv[i], "-px")) pt.x = atoi(argv[++i]); + if (!strcmp(argv[i], "-py")) pt.y = atoi(argv[++i]); + if (!strcmp(argv[i], "-pz")) pt.z = atoi(argv[++i]); + if (!strcmp(argv[i], "-lx")) lt.x = atoi(argv[++i]); + if (!strcmp(argv[i], "-ly")) lt.y = atoi(argv[++i]); + if (!strcmp(argv[i], "-lz")) lt.z = atoi(argv[++i]); + if (!strcmp(argv[i], "-n")) N = atoi(argv[++i]); + if (!strcmp(argv[i], "-i")) nIters = atoi(argv[++i]); + } - // Shutdown nosv - check(nosv_shutdown()); + // Running the remote memcpy example + jacobiDriver(instanceManager.get(), communicationManager.get(), &memoryManager.get()); +#endif } \ No newline at end of file diff --git a/examples/jacobi3d/source/pthreads.cpp b/examples/jacobi3d/source/pthreads.cpp index 531cb66..c63e28f 100644 --- a/examples/jacobi3d/source/pthreads.cpp +++ b/examples/jacobi3d/source/pthreads.cpp @@ -24,6 +24,15 @@ #include #include +#ifdef _TASKR_DISTRIBUTED_ENGINE_LPF + #include + #include + #include + #include + #include + #include +#endif + #ifdef _TASKR_DISTRIBUTED_ENGINE_MPI #include #include @@ -38,35 +47,20 @@ #include "grid.hpp" #include "task.hpp" +#include "jacobi3d.hpp" -int main(int argc, char *argv[]) -{ - //// Instantiating distributed execution machinery - - // Storage for the distributed engine's communication manager - std::unique_ptr communicationManager; - - // Storage for the distributed engine's instance manager - std::unique_ptr instanceManager; - - // Storage for the distributed engine's memory manager - std::unique_ptr memoryManager; - -#ifdef _TASKR_DISTRIBUTED_ENGINE_LPF - #error "LPF backend not supported yet" -#endif - -#ifdef _TASKR_DISTRIBUTED_ENGINE_MPI - instanceManager = HiCR::backend::mpi::InstanceManager::createDefault(&argc, &argv); - communicationManager = std::make_unique(); - memoryManager = std::make_unique(); -#endif +// Setting default values (globali) +size_t gDepth = 1; +size_t N = 128; +ssize_t nIters = 100; +D3 pt = D3({.x = 1, .y = 1, .z = 1}); +D3 lt = D3({.x = 1, .y = 1, .z = 1}); -#ifdef _TASKR_DISTRIBUTED_ENGINE_NONE - instanceManager = std::make_unique(); - communicationManager = std::make_unique(); - memoryManager = HiCR::backend::hwloc::MemoryManager::createDefault(); -#endif +void jacobiDriver(HiCR::InstanceManager *instanceManager, HiCR::CommunicationManager *communicationManager, HiCR::MemoryManager *memoryManager) +{ + int rank, size; + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + MPI_Comm_size(MPI_COMM_WORLD, &size); // Creating (local host) topology manager const auto topologyManager = HiCR::backend::hwloc::TopologyManager::createDefault(); @@ -94,16 +88,66 @@ int main(int argc, char *argv[]) // Getting NUMA Domain information const auto &numaDomains = t.getDevices(); - printf("NUMA Domains per Node: %lu\n", numaDomains.size()); + // printf("NUMA Domains per Node: %lu\n", numaDomains.size()); // Assuming one process per numa domain - size_t numaDomainId = myInstanceId % numaDomains.size(); - auto numaDomain = numaDomains[numaDomainId]; + // Looking for Domains that are not zero (Slurm non --exclusive issue) + size_t numaDomainId; + for (size_t i = 0; i < numaDomains.size(); ++i) + { + numaDomainId = (myInstanceId + i) % numaDomains.size(); + if (numaDomains[numaDomainId]->getComputeResourceList().size() > 0) { break; } + } + + auto numaDomain = numaDomains[numaDomainId]; printf("Instance %lu - Using NUMA domain: %lu\n", myInstanceId, numaDomainId); // Updating the compute resource list auto computeResources = numaDomain->getComputeResourceList(); - printf("PUs Per NUMA Domain: %lu\n", computeResources.size()); + printf("NUMA Domain %lu: #PUs %lu and has PID [", numaDomainId, computeResources.size()); + + // Compute resources to use + HiCR::Device::computeResourceList_t cr; + + // for(size_t i = 0; i < (size_t)(lt.x * lt.y * lt.z); i++) + // { + // cr.push_back(computeResources[(i+2)%(computeResources.size())]); + // } + + // cr.push_back(numaDomains[0]->getComputeResourceList()[0]); + + for (int i = 0; i < size; ++i) + { + if (myInstanceId == (size_t)i) + { + auto itr = computeResources.begin(); + for (size_t i = 0; i < computeResources.size(); i++) + { + // Getting up-casted pointer for the processing unit + auto c = dynamic_pointer_cast(*itr); + + // Checking whether the execution unit passed is compatible with this backend + if (c == nullptr) HICR_THROW_LOGIC("The passed compute resource is not supported by this processing unit type\n"); + + // Getting the logical processor ID of the compute resource + auto pid = c->getProcessorId(); + + if (pid != 21 && pid != 43 && pid != 65 && pid != 87) //21,43,65,87 are broken for nOS-V + { + printf("%u ", pid); + fflush(stdout); + + cr.push_back(*itr); + } + + itr++; + } + printf("]\n"); + fflush(stdout); + } + MPI_Barrier(MPI_COMM_WORLD); + } + // printf("PUs Per NUMA Domain: %lu\n", computeResources.size()); // Initializing Boost-based compute manager to instantiate suspendable coroutines HiCR::backend::boost::ComputeManager boostComputeManager; @@ -114,33 +158,11 @@ int main(int argc, char *argv[]) // Creating taskr object nlohmann::json taskrConfig; taskrConfig["Remember Finished Objects"] = true; - taskr::Runtime taskr(&boostComputeManager, &pthreadsComputeManager, computeResources, taskrConfig); + taskr::Runtime taskr(&boostComputeManager, &pthreadsComputeManager, cr, taskrConfig); // Allowing tasks to immediately resume upon suspension -- they won't execute until their pending operation is finished taskr.setTaskCallbackHandler(HiCR::tasking::Task::callback_t::onTaskSuspend, [&taskr](taskr::Task *task) { taskr.resumeTask(task); }); - //// Setting up application configuration - - // Setting default values - size_t gDepth = 1; - size_t N = 128; - ssize_t nIters = 100; - D3 pt = D3({.x = 1, .y = 1, .z = 1}); - D3 lt = D3({.x = 1, .y = 1, .z = 1}); - - // Parsing user inputs - for (int i = 0; i < argc; i++) - { - if (!strcmp(argv[i], "-px")) pt.x = atoi(argv[++i]); - if (!strcmp(argv[i], "-py")) pt.y = atoi(argv[++i]); - if (!strcmp(argv[i], "-pz")) pt.z = atoi(argv[++i]); - if (!strcmp(argv[i], "-lx")) lt.x = atoi(argv[++i]); - if (!strcmp(argv[i], "-ly")) lt.y = atoi(argv[++i]); - if (!strcmp(argv[i], "-lz")) lt.z = atoi(argv[++i]); - if (!strcmp(argv[i], "-n")) N = atoi(argv[++i]); - if (!strcmp(argv[i], "-i")) nIters = atoi(argv[++i]); - } - if ((size_t)(pt.x * pt.y * pt.z) != instanceCount) { if (isRootInstance) printf("[Error] The specified px/py/pz geometry does not match the number of instances (-n %lu).\n", instanceCount); @@ -148,171 +170,106 @@ int main(int argc, char *argv[]) } // Creating and initializing Grid - auto g = std::make_unique(myInstanceId, N, nIters, gDepth, pt, lt, &taskr, memoryManager.get(), topologyManager.get(), communicationManager.get()); - bool success = g->initialize(); - if (success == false) instanceManager->abort(-1); - - // Creating grid processing functions - g->resetFc = std::make_unique([&g](taskr::Task *task) { g->reset(task, ((Task *)task)->i, ((Task *)task)->j, ((Task *)task)->k); }); - g->computeFc = - std::make_unique([&g](taskr::Task *task) { g->compute(task, ((Task *)task)->i, ((Task *)task)->j, ((Task *)task)->k, ((Task *)task)->iteration); }); - g->receiveFc = - std::make_unique([&g](taskr::Task *task) { g->receive(task, ((Task *)task)->i, ((Task *)task)->j, ((Task *)task)->k, ((Task *)task)->iteration); }); - g->unpackFc = std::make_unique([&g](taskr::Task *task) { g->unpack(task, ((Task *)task)->i, ((Task *)task)->j, ((Task *)task)->k, ((Task *)task)->iteration); }); - g->packFc = std::make_unique([&g](taskr::Task *task) { g->pack(task, ((Task *)task)->i, ((Task *)task)->j, ((Task *)task)->k, ((Task *)task)->iteration); }); - g->sendFc = std::make_unique([&g](taskr::Task *task) { g->send(task, ((Task *)task)->i, ((Task *)task)->j, ((Task *)task)->k, ((Task *)task)->iteration); }); - g->localResidualFc = std::make_unique( - [&g](taskr::Task *task) { g->calculateLocalResidual(task, ((Task *)task)->i, ((Task *)task)->j, ((Task *)task)->k, ((Task *)task)->iteration); }); - - // Task map - std::map> _taskMap; - - // printf("Instance %lu: Executing...\n", myInstanceId); - - // Creating tasks to reset the grid - for (ssize_t i = 0; i < lt.x; i++) - for (ssize_t j = 0; j < lt.y; j++) - for (ssize_t k = 0; k < lt.z; k++) - { - auto resetTask = new Task("Reset", i, j, k, 0, g->resetFc.get()); - taskr.addTask(resetTask); - } + auto g = std::make_unique(myInstanceId, N, nIters, gDepth, pt, lt, &taskr, memoryManager, topologyManager.get(), communicationManager); - // Initializing TaskR - taskr.initialize(); + // running the Jacobi3D example + jacobi3d(instanceManager, taskr, g.get(), gDepth, N, nIters, pt, lt); +} - // Running Taskr initially - taskr.run(); +#ifdef _TASKR_DISTRIBUTED_ENGINE_LPF - // Waiting for Taskr to finish - taskr.await(); +// flag needed when using MPI to launch +const int LPF_MPI_AUTO_INITIALIZE = 0; - // Creating and adding tasks (graph nodes) - for (ssize_t it = 0; it < nIters; it++) - for (ssize_t i = 0; i < lt.x; i++) - for (ssize_t j = 0; j < lt.y; j++) - for (ssize_t k = 0; k < lt.z; k++) - { - auto localId = g->localSubGridMapping[k][j][i]; - auto &subGrid = g->subgrids[localId]; - - // create new specific tasks - auto computeTask = std::make_shared("Compute", i, j, k, it, g->computeFc.get()); - auto packTask = std::make_shared("Pack", i, j, k, it, g->packFc.get()); - auto sendTask = std::make_shared("Send", i, j, k, it, g->sendFc.get()); - auto recvTask = std::make_shared("Receive", i, j, k, it, g->receiveFc.get()); - auto unpackTask = std::make_shared("Unpack", i, j, k, it, g->unpackFc.get()); - - _taskMap[Task::encodeTaskName("Compute", i, j, k, it)] = computeTask; - _taskMap[Task::encodeTaskName("Pack", i, j, k, it)] = packTask; - _taskMap[Task::encodeTaskName("Send", i, j, k, it)] = sendTask; - _taskMap[Task::encodeTaskName("Receive", i, j, k, it)] = recvTask; - _taskMap[Task::encodeTaskName("Unpack", i, j, k, it)] = unpackTask; - - // Creating and adding local compute task dependencies - if (it > 0) - if (subGrid.X0.type == LOCAL) computeTask->addDependency(_taskMap[Task::encodeTaskName("Compute", i - 1, j + 0, k + 0, it - 1)].get()); - if (it > 0) - if (subGrid.X1.type == LOCAL) computeTask->addDependency(_taskMap[Task::encodeTaskName("Compute", i + 1, j + 0, k + 0, it - 1)].get()); - if (it > 0) - if (subGrid.Y0.type == LOCAL) computeTask->addDependency(_taskMap[Task::encodeTaskName("Compute", i + 0, j - 1, k + 0, it - 1)].get()); - if (it > 0) - if (subGrid.Y1.type == LOCAL) computeTask->addDependency(_taskMap[Task::encodeTaskName("Compute", i + 0, j + 1, k + 0, it - 1)].get()); - if (it > 0) - if (subGrid.Z0.type == LOCAL) computeTask->addDependency(_taskMap[Task::encodeTaskName("Compute", i + 0, j + 0, k - 1, it - 1)].get()); - if (it > 0) - if (subGrid.Z1.type == LOCAL) computeTask->addDependency(_taskMap[Task::encodeTaskName("Compute", i + 0, j + 0, k + 1, it - 1)].get()); - if (it > 0) computeTask->addDependency(_taskMap[Task::encodeTaskName("Compute", i + 0, j + 0, k + 0, it - 1)].get()); - - // Adding communication-related dependencies - if (it > 0) computeTask->addDependency(_taskMap[Task::encodeTaskName("Pack", i, j, k, it - 1)].get()); - if (it > 0) computeTask->addDependency(_taskMap[Task::encodeTaskName("Unpack", i, j, k, it - 1)].get()); - - // Creating and adding receive task dependencies, from iteration 1 onwards - if (it > 0) recvTask->addDependency(_taskMap[Task::encodeTaskName("Unpack", i, j, k, it - 1)].get()); - - // Creating and adding unpack task dependencies - unpackTask->addDependency(_taskMap[Task::encodeTaskName("Receive", i, j, k, it)].get()); - unpackTask->addDependency(_taskMap[Task::encodeTaskName("Compute", i, j, k, it)].get()); - - // Creating and adding send task dependencies, from iteration 1 onwards - packTask->addDependency(_taskMap[Task::encodeTaskName("Compute", i, j, k, it)].get()); - if (it > 0) packTask->addDependency(_taskMap[Task::encodeTaskName("Send", i, j, k, it - 1)].get()); - - // Creating and adding send task dependencies, from iteration 1 onwards - sendTask->addDependency(_taskMap[Task::encodeTaskName("Pack", i, j, k, it)].get()); - - // Adding tasks to taskr - taskr.addTask(computeTask.get()); - if (it < nIters - 1) taskr.addTask(packTask.get()); - if (it < nIters - 1) taskr.addTask(sendTask.get()); - if (it < nIters - 1) taskr.addTask(recvTask.get()); - if (it < nIters - 1) taskr.addTask(unpackTask.get()); - } + /** + * #DEFAULT_MEMSLOTS The memory slots used by LPF + * in lpf_resize_memory_register . This value is currently + * guessed as sufficiently large for a program + */ + #define DEFAULT_MEMSLOTS 100000 - // Setting start time as now - auto t0 = std::chrono::high_resolution_clock::now(); + /** + * #DEFAULT_MSGSLOTS The message slots used by LPF + * in lpf_resize_message_queue . This value is currently + * guessed as sufficiently large for a program + */ + #define DEFAULT_MSGSLOTS 100000 - // Running Taskr - taskr.run(); +// Global pointer to the +HiCR::InstanceManager *instanceManager; - // Waiting for Taskr to finish - taskr.await(); +void spmd(lpf_t lpf, lpf_pid_t pid, lpf_pid_t nprocs, lpf_args_t args) +{ + // Initializing LPF + CHECK(lpf_resize_message_queue(lpf, DEFAULT_MSGSLOTS)); + CHECK(lpf_resize_memory_register(lpf, DEFAULT_MEMSLOTS)); + CHECK(lpf_sync(lpf, LPF_SYNC_DEFAULT)); - ////// Calculating residual + // Creating HWloc topology object + hwloc_topology_t topology; - // Reset local residual to zero - g->resetResidual(); + // Reserving memory for hwloc + hwloc_topology_init(&topology); - // Calculating local residual - for (ssize_t i = 0; i < lt.x; i++) - for (ssize_t j = 0; j < lt.y; j++) - for (ssize_t k = 0; k < lt.z; k++) - { - auto residualTask = new Task("Residual", i, j, k, nIters, g->localResidualFc.get()); - taskr.addTask(residualTask); - } + // Initializing host (CPU) topology manager + HiCR::backend::hwloc::TopologyManager tm(&topology); - // Running Taskr - taskr.run(); + // Creating memory and communication managers + std::unique_ptr communicationManager = std::make_unique(nprocs, pid, lpf); + std::unique_ptr memoryManager = std::make_unique(lpf); - // Waiting for Taskr to finish - taskr.await(); + // Running the remote memcpy example + jacobiDriver(instanceManager, communicationManager.get(), memoryManager.get()); +} +#endif - // Finalizing TaskR - taskr.finalize(); +int main(int argc, char *argv[]) +{ + //// Instantiating distributed execution machinery - // If i'm not the root instance, simply send my locally calculated reisdual - if (isRootInstance == false) + // Parsing user inputs + for (int i = 0; i < argc; i++) { - *(double *)g->residualSendBuffer->getPointer() = g->_residual; - g->residualProducerChannel->push(g->residualSendBuffer, 1); + if (!strcmp(argv[i], "-px")) pt.x = atoi(argv[++i]); + if (!strcmp(argv[i], "-py")) pt.y = atoi(argv[++i]); + if (!strcmp(argv[i], "-pz")) pt.z = atoi(argv[++i]); + if (!strcmp(argv[i], "-lx")) lt.x = atoi(argv[++i]); + if (!strcmp(argv[i], "-ly")) lt.y = atoi(argv[++i]); + if (!strcmp(argv[i], "-lz")) lt.z = atoi(argv[++i]); + if (!strcmp(argv[i], "-n")) N = atoi(argv[++i]); + if (!strcmp(argv[i], "-i")) nIters = atoi(argv[++i]); } - else - { - // Otherwise gather all the residuals and print the results - double globalRes = g->_residual; - for (size_t i = 0; i < instanceCount - 1; i++) - { - while (g->residualConsumerChannel->isEmpty()); - double *residualPtr = (double *)g->residualConsumerChannel->getTokenBuffer()->getSourceLocalMemorySlot()->getPointer() + g->residualConsumerChannel->peek(0); - g->residualConsumerChannel->pop(); - globalRes += *residualPtr; - } +#ifdef _TASKR_DISTRIBUTED_ENGINE_LPF + // Initializing instance manager + auto im = HiCR::backend::mpi::InstanceManager::createDefault(&argc, &argv); + instanceManager = im.get(); - // Setting final time now - auto tf = std::chrono::high_resolution_clock::now(); - std::chrono::duration dt = tf - t0; - float execTime = dt.count(); + lpf_init_t init; + lpf_args_t args; - double residual = sqrt(globalRes / ((double)(N - 1) * (double)(N - 1) * (double)(N - 1))); - double gflops = nIters * (double)N * (double)N * (double)N * (2 + gDepth * 8) / (1.0e9); - printf("%.4fs, %.3f GFlop/s (L2 Norm: %.10g)\n", execTime, gflops / execTime, residual); - } + CHECK(lpf_mpi_initialize_with_mpicomm(MPI_COMM_WORLD, &init)); + CHECK(lpf_hook(init, &spmd, args)); + CHECK(lpf_mpi_finalize(init)); +#endif - // Finalizing grid - g->finalize(); +#ifdef _TASKR_DISTRIBUTED_ENGINE_MPI + std::unique_ptr instanceManager = HiCR::backend::mpi::InstanceManager::createDefault(&argc, &argv); + std::unique_ptr communicationManager = std::make_unique(); + std::unique_ptr memoryManager = std::make_unique(); + + // Running the remote memcpy example + jacobiDriver(instanceManager.get(), communicationManager.get(), memoryManager.get()); +#endif + +#ifdef _TASKR_DISTRIBUTED_ENGINE_NONE // This one segfaults (check why) + std::unique_ptr instanceManager = std::make_unique(); + std::unique_ptr communicationManager = std::make_unique(); + std::unique_ptr memoryManager = HiCR::backend::hwloc::MemoryManager::createDefault(); + + // Running the remote memcpy example + jacobiDriver(instanceManager.get(), communicationManager.get(), &memoryManager.get()); +#endif // Finalizing instances instanceManager->finalize(); diff --git a/examples/jacobi3d/source/task.hpp b/examples/jacobi3d/source/task.hpp index 7360989..b236c18 100644 --- a/examples/jacobi3d/source/task.hpp +++ b/examples/jacobi3d/source/task.hpp @@ -54,17 +54,17 @@ class Task final : public taskr::Task const auto hashResult = hasher(buffer); // find if this hash already exists in the hashmap if not: add it - size_t tasklabel; + size_t taskId; auto it = taskid_hashmap.find(hashResult); if (it == taskid_hashmap.end()) { - tasklabel = taskid_hashmap.size(); + taskId = taskid_hashmap.size(); - taskid_hashmap[hashResult] = tasklabel; + taskid_hashmap[hashResult] = taskId; } - else { tasklabel = it->second; } + else { taskId = it->second; } - return tasklabel; + return taskId; } }; \ No newline at end of file diff --git a/examples/matmul/python/matmul.py b/examples/matmul/python/matmul.py index de0aeeb..07848ca 100644 --- a/examples/matmul/python/matmul.py +++ b/examples/matmul/python/matmul.py @@ -60,8 +60,8 @@ def matmul_numpy(task): B[i, j] = 1.0/(i + 1) C[i, j] = 1.0/(j + 1) - B += task.getLabel()+1 - C += task.getLabel()+1 + B += task.getTaskId()+1 + C += task.getTaskId()+1 A = B @ C diff --git a/examples/meson.build b/examples/meson.build index cc852f8..5c464c9 100644 --- a/examples/meson.build +++ b/examples/meson.build @@ -1,16 +1,4 @@ -# Handling distributed engine options -if distributedEngine == 'mpi' -TaskRDistributedCppFlag = '-D_TASKR_DISTRIBUTED_ENGINE_MPI' -endif - -if distributedEngine == 'lpf' -TaskRDistributedCppFlag = '-D_TASKR_DISTRIBUTED_ENGINE_LPF' -endif - -if distributedEngine == 'none' -TaskRDistributedCppFlag = '-D_TASKR_DISTRIBUTED_ENGINE_NONE' -endif - +# distributed subdir('jacobi3d') # local diff --git a/examples/multiJob/cpp/job1.cpp b/examples/multiJob/cpp/job1.cpp index c22ff85..fe619af 100644 --- a/examples/multiJob/cpp/job1.cpp +++ b/examples/multiJob/cpp/job1.cpp @@ -26,9 +26,9 @@ void job1(taskr::Runtime &taskr) std::vector tasks(3 * ITERATIONS); // Creating the execution units (functions that the tasks will run) - auto taskAfc = taskr::Function([&](taskr::Task *task) { printf("Job 1 - Task A %lu\n", task->getLabel()); }); - auto taskBfc = taskr::Function([&](taskr::Task *task) { printf("Job 1 - Task B %lu\n", task->getLabel()); }); - auto taskCfc = taskr::Function([&](taskr::Task *task) { printf("Job 1 - Task C %lu\n", task->getLabel()); }); + auto taskAfc = taskr::Function([&](taskr::Task *task) { printf("Job 1 - Task A %lu\n", task->getTaskId()); }); + auto taskBfc = taskr::Function([&](taskr::Task *task) { printf("Job 1 - Task B %lu\n", task->getTaskId()); }); + auto taskCfc = taskr::Function([&](taskr::Task *task) { printf("Job 1 - Task C %lu\n", task->getTaskId()); }); // Now creating tasks for (size_t i = 0; i < ITERATIONS; i++) diff --git a/examples/multiJob/cpp/job2.cpp b/examples/multiJob/cpp/job2.cpp index a330453..5e21f50 100644 --- a/examples/multiJob/cpp/job2.cpp +++ b/examples/multiJob/cpp/job2.cpp @@ -26,9 +26,9 @@ void job2(taskr::Runtime &taskr) std::vector tasks(3 * ITERATIONS); // Creating the execution units (functions that the tasks will run) - auto taskAfc = taskr::Function([&](taskr::Task *task) { printf("Job 2 - Task A %lu\n", task->getLabel()); }); - auto taskBfc = taskr::Function([&](taskr::Task *task) { printf("Job 2 - Task B %lu\n", task->getLabel()); }); - auto taskCfc = taskr::Function([&](taskr::Task *task) { printf("Job 2 - Task C %lu\n", task->getLabel()); }); + auto taskAfc = taskr::Function([&](taskr::Task *task) { printf("Job 2 - Task A %lu\n", task->getTaskId()); }); + auto taskBfc = taskr::Function([&](taskr::Task *task) { printf("Job 2 - Task B %lu\n", task->getTaskId()); }); + auto taskCfc = taskr::Function([&](taskr::Task *task) { printf("Job 2 - Task C %lu\n", task->getTaskId()); }); // Now creating tasks for (size_t i = 0; i < ITERATIONS; i++) diff --git a/examples/multiJob/python/job1.py b/examples/multiJob/python/job1.py index 5ffd2bb..88e836f 100644 --- a/examples/multiJob/python/job1.py +++ b/examples/multiJob/python/job1.py @@ -23,9 +23,9 @@ def job1(runtime): tasks = [None] * (3 * ITERATIONS) # Creating the execution units (functions that the tasks will run) - taskAfc = taskr.Function(lambda task : print(f"Job 1 - Task A {task.getLabel()}")) - taskBfc = taskr.Function(lambda task : print(f"Job 1 - Task B {task.getLabel()}")) - taskCfc = taskr.Function(lambda task : print(f"Job 1 - Task C {task.getLabel()}")) + taskAfc = taskr.Function(lambda task : print(f"Job 1 - Task A {task.getTaskId()}")) + taskBfc = taskr.Function(lambda task : print(f"Job 1 - Task B {task.getTaskId()}")) + taskCfc = taskr.Function(lambda task : print(f"Job 1 - Task C {task.getTaskId()}")) # Now creating tasks for i in range(ITERATIONS): diff --git a/examples/multiJob/python/job2.py b/examples/multiJob/python/job2.py index 1a0adbd..7fef2a3 100644 --- a/examples/multiJob/python/job2.py +++ b/examples/multiJob/python/job2.py @@ -23,9 +23,9 @@ def job2(runtime): tasks = [None] * (3 * ITERATIONS) # Creating the execution units (functions that the tasks will run) - taskAfc = taskr.Function(lambda task : print(f"Job 1 - Task A {task.getLabel()}")) - taskBfc = taskr.Function(lambda task : print(f"Job 1 - Task B {task.getLabel()}")) - taskCfc = taskr.Function(lambda task : print(f"Job 1 - Task C {task.getLabel()}")) + taskAfc = taskr.Function(lambda task : print(f"Job 1 - Task A {task.getTaskId()}")) + taskBfc = taskr.Function(lambda task : print(f"Job 1 - Task B {task.getTaskId()}")) + taskCfc = taskr.Function(lambda task : print(f"Job 1 - Task C {task.getTaskId()}")) # Now creating tasks for i in range(ITERATIONS): diff --git a/examples/mutex/python/main.py b/examples/mutex/python/main.py index 7dd17f4..b318368 100644 --- a/examples/mutex/python/main.py +++ b/examples/mutex/python/main.py @@ -20,7 +20,7 @@ def main(): # Initialize taskr with the wanted compute manager backend and number of PUs - t = taskr.create("threading") + t = taskr.create("nosv") # Running mutex example mutex.mutex(t) diff --git a/examples/pendingOperation/cpp/pendingOperation.hpp b/examples/pendingOperation/cpp/pendingOperation.hpp index dc7c85a..b267378 100644 --- a/examples/pendingOperation/cpp/pendingOperation.hpp +++ b/examples/pendingOperation/cpp/pendingOperation.hpp @@ -23,7 +23,7 @@ void heavyTask(taskr::Task *currentTask) { // Printing starting message - printf("Task %lu -- Starting 1 second-long operation.\n", currentTask->getLabel()); + printf("Task %lu -- Starting 1 second-long operation.\n", currentTask->getTaskId()); // Getting initial time auto t0 = std::chrono::high_resolution_clock::now(); @@ -50,7 +50,7 @@ void heavyTask(taskr::Task *currentTask) currentTask->suspend(); // Printing finished message - printf("Task %lu - operation finished\n", currentTask->getLabel()); + printf("Task %lu - operation finished\n", currentTask->getTaskId()); } void pendingOperation(taskr::Runtime &taskr) diff --git a/examples/pendingOperation/python/pendingOperation.py b/examples/pendingOperation/python/pendingOperation.py index d218bfe..1f0d066 100644 --- a/examples/pendingOperation/python/pendingOperation.py +++ b/examples/pendingOperation/python/pendingOperation.py @@ -19,7 +19,7 @@ def heavyTask(currentTask): # Printing starting message - print(f"Task {currentTask.getLabel()} -- Starting 1 second-long operation.") + print(f"Task {currentTask.getTaskId()} -- Starting 1 second-long operation.") # Getting initial time t0 = time.time() @@ -46,7 +46,7 @@ def operation(): currentTask.suspend() # Printing finished message - print(f"Task {currentTask.getLabel()} - operation finished") + print(f"Task {currentTask.getTaskId()} - operation finished") def pendingOperation(runtime): diff --git a/examples/simple/cpp/simple.hpp b/examples/simple/cpp/simple.hpp index 0192905..99f35a4 100644 --- a/examples/simple/cpp/simple.hpp +++ b/examples/simple/cpp/simple.hpp @@ -26,7 +26,7 @@ void simple(taskr::Runtime *taskr) // Initializing taskr taskr->initialize(); - auto fc = [](taskr::Task *task) { printf("Hello, I am task %ld\n", task->getLabel()); }; + auto fc = [](taskr::Task *task) { printf("Hello, I am task %ld\n", task->getTaskId()); }; // Create the taskr Tasks auto taskfc = taskr::Function(fc); diff --git a/examples/simple/python/simple.py b/examples/simple/python/simple.py index a69e605..1f36937 100644 --- a/examples/simple/python/simple.py +++ b/examples/simple/python/simple.py @@ -22,17 +22,17 @@ def simple(runtime): runtime.initialize() # Create tasks - fc = lambda task : print(f"Hello, I am task {task.getLabel()}") + fc = lambda task : print(f"Hello, I am task {task.getTaskId()}") taskfc = taskr.Function(fc) # Adding to tasks to taskr for i in range(NTASKS): runtime.addTask(taskr.Task(i, taskfc)) - # Running taskr for the current repetition + # Running the tasks runtime.run() - # Waiting current repetition to end + # Waiting until all tasks finished runtime.wait() # Finalizing taskr diff --git a/examples/sleepsort/meson.build b/examples/sleepsort/meson.build new file mode 100644 index 0000000..fe6055e --- /dev/null +++ b/examples/sleepsort/meson.build @@ -0,0 +1,11 @@ +testSuite = [ 'examples', 'sleepsort' ] + +if get_option('buildPyTaskR') and get_option('buildTests') + test('pyTaskR', + py, + args : [ 'python/main.py' ], + is_parallel : false, + env: ['PYTHONPATH=' + meson.project_build_root() + '/include/pytaskr/:' + meson.project_build_root() + '/examples/matmul/'], + suite: testSuite, + workdir: meson.current_source_dir()) +endif \ No newline at end of file diff --git a/examples/sleepsort/python/main.py b/examples/sleepsort/python/main.py new file mode 100644 index 0000000..d526e65 --- /dev/null +++ b/examples/sleepsort/python/main.py @@ -0,0 +1,61 @@ +""" + Copyright 2025 Huawei Technologies Co., Ltd. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +""" +import taskr +import time +import numpy as np + +np.random.seed(42) + +def main(): + t = taskr.create("threading") + + t.initialize() + + time_buffer_scale = 0.001 + n = 100 + shuffled_array = np.arange(n) + sorted_array = [] + + np.random.shuffle(shuffled_array) + + def fc(task): + nonlocal sorted_array + nonlocal shuffled_array + + id = task.getTaskId() + + value = shuffled_array[id] + + time.sleep(time_buffer_scale*value) + + sorted_array.append(value) + + taskfc = taskr.Function(fc) + + for i in range(n): + t.addTask(taskr.Task(i, taskfc)) + + t_start = time.time() + t.run() + t.wait() + assert np.all(sorted_array[:-1] <= sorted_array[1:]), "Array is not sorted!" + print(f"Total time [s]: {time.time() - t_start:0.5}") + + t.finalize() + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/examples/workerSpecific/cpp/workerSpecific.hpp b/examples/workerSpecific/cpp/workerSpecific.hpp index 51ce4b4..57e5575 100644 --- a/examples/workerSpecific/cpp/workerSpecific.hpp +++ b/examples/workerSpecific/cpp/workerSpecific.hpp @@ -21,17 +21,17 @@ void workFc(taskr::Task *currentTask) { - auto taskLabel = currentTask->getLabel(); + auto taskId = currentTask->getTaskId(); int currentCPUId = sched_getcpu(); ////// First launched on even cpus - printf("Task %lu first run running on CPU %d\n", taskLabel, currentCPUId); + printf("Task %lu first run running on CPU %d\n", taskId, currentCPUId); // Sanity check - if (int(2 * taskLabel) != currentCPUId) + if (int(2 * taskId) != currentCPUId) { - fprintf(stderr, "Task label (%lu) does not coincide with the current CPU id! (%d)\n", taskLabel, currentCPUId); + fprintf(stderr, "Task ID (%lu) does not coincide with the current CPU id! (%d)\n", taskId, currentCPUId); std::abort(); } @@ -44,12 +44,12 @@ void workFc(taskr::Task *currentTask) ///// Now launched in odd cpus currentCPUId = sched_getcpu(); - printf("Task %lu second run running on CPU %d\n", taskLabel, currentCPUId); + printf("Task %lu second run running on CPU %d\n", taskId, currentCPUId); // Sanity check - if (int(2 * taskLabel) + 1 != currentCPUId) + if (int(2 * taskId) + 1 != currentCPUId) { - fprintf(stderr, "Task label (%lu) + 1 does not coincide with the current CPU id! (%d)\n", taskLabel, currentCPUId); + fprintf(stderr, "Task ID (%lu) + 1 does not coincide with the current CPU id! (%d)\n", taskId, currentCPUId); std::abort(); } } diff --git a/examples/workerSpecific/python/workerSpecific.py b/examples/workerSpecific/python/workerSpecific.py index 4ecce9c..aead39a 100644 --- a/examples/workerSpecific/python/workerSpecific.py +++ b/examples/workerSpecific/python/workerSpecific.py @@ -24,16 +24,16 @@ import taskr def workFc(currentTask): - taskLabel = currentTask.getLabel() + taskId = currentTask.getTaskId() currentCPUId = libc.sched_getcpu() #### First launched on even cpus - print(f"Task {taskLabel} first run running on CPU {currentCPUId}") + print(f"Task {taskId} first run running on CPU {currentCPUId}") # Sanity check - if int(2 * taskLabel) != currentCPUId: - sys.stderr.write(f"Task label ({taskLabel}) does not coincide with the current CPU id! ({currentCPUId})") + if int(2 * taskId) != currentCPUId: + sys.stderr.write(f"Task ID ({taskId}) does not coincide with the current CPU id! ({currentCPUId})") sys.exit(1) # Changing to odd cpus @@ -45,11 +45,11 @@ def workFc(currentTask): #### Now launched in odd cpus currentCPUId = libc.sched_getcpu() - print(f"Task {taskLabel} second run running on CPU {currentCPUId}") + print(f"Task {taskId} second run running on CPU {currentCPUId}") # Sanity check - if int(2 * taskLabel) + 1 != currentCPUId: - sys.stderr.write(f"Task label ({taskLabel}) + 1 does not coincide with the current CPU id! ({currentCPUId})") + if int(2 * taskId) + 1 != currentCPUId: + sys.stderr.write(f"Task ID ({taskId}) + 1 does not coincide with the current CPU id! ({currentCPUId})") sys.exit(1) diff --git a/extern/hicr b/extern/hicr index a3408df..7e86acf 160000 --- a/extern/hicr +++ b/extern/hicr @@ -1 +1 @@ -Subproject commit a3408df876e3f41fc72cf5bed24c587c7c4e3788 +Subproject commit 7e86acf195d8d5de9b25979fe1d2be8142c80c5a diff --git a/extern/tracr b/extern/tracr index 54387c3..d37ea2a 160000 --- a/extern/tracr +++ b/extern/tracr @@ -1 +1 @@ -Subproject commit 54387c3c786095780c7a325162c5ebab3accbed4 +Subproject commit d37ea2af0c4505a99c59bccb416e8c119404f290 diff --git a/include/pytaskr/pytaskr.cpp b/include/pytaskr/pytaskr.cpp index fe00f1e..462fe6c 100644 --- a/include/pytaskr/pytaskr.cpp +++ b/include/pytaskr/pytaskr.cpp @@ -55,10 +55,10 @@ PYBIND11_MODULE(taskr, m) // TaskR's Task class py::class_(m, "Task") - .def(py::init(), py::arg("fc"), py::arg("workerAffinity") = -1) - .def(py::init(), py::arg("ID"), py::arg("taskfc"), py::arg("workerAffinity") = -1) - .def("getLabel", &Task::getLabel) - .def("setLabel", &Task::setLabel) + .def(py::init(), py::arg("taskfc"), py::arg("workerAffinity") = -1) + .def(py::init(), py::arg("taskId"), py::arg("taskfc"), py::arg("workerAffinity") = -1) + .def("getTaskId", &Task::getTaskId) + .def("setTaskId", &Task::setTaskId) .def("getWorkerAffinity", &Task::getWorkerAffinity) .def("setWorkerAffinity", &Task::setWorkerAffinity) .def("addDependency", &Task::addDependency) @@ -91,8 +91,8 @@ PYBIND11_MODULE(taskr, m) py::call_guard(), "cv waitFor with condition") .def("waitFor", py::overload_cast(&ConditionVariable::waitFor), py::call_guard(), "cv waitFor") - .def("notifyOne", &ConditionVariable::notifyOne) // Not sure if I need to release the GIL here as this function also uses the intern mutex lock - .def("notifyAll", &ConditionVariable::notifyAll) // Same here + .def("notifyOne", &ConditionVariable::notifyOne) + .def("notifyAll", &ConditionVariable::notifyAll) .def("getWaitingTaskCount", &ConditionVariable::getWaitingTaskCount); } diff --git a/include/taskr/common.hpp b/include/taskr/common.hpp index 144f290..6784568 100644 --- a/include/taskr/common.hpp +++ b/include/taskr/common.hpp @@ -49,9 +49,9 @@ namespace taskr { /** - * A unique identifier (label) for an object + * A unique identifier for an object (task) */ -typedef HiCR::tasking::uniqueId_t label_t; +typedef HiCR::tasking::uniqueId_t taskId_t; /** * Type for a locally-unique worker identifier diff --git a/include/taskr/runtime.hpp b/include/taskr/runtime.hpp index 5415c6e..a92a521 100644 --- a/include/taskr/runtime.hpp +++ b/include/taskr/runtime.hpp @@ -311,6 +311,16 @@ class Runtime size_t taskWorkerId = 0; for (size_t computeResourceId = _serviceWorkerCount; computeResourceId < _computeResources.size(); computeResourceId++) { + // // Getting up-casted pointer for the processing unit + // auto c = dynamic_pointer_cast(_computeResources[computeResourceId]); + + // // Checking whether the execution unit passed is compatible with this backend + // if (c == nullptr) HICR_THROW_LOGIC("The passed compute resource is not supported by this processing unit type\n"); + + // // Getting the logical processor ID of the compute resource + // auto pid = c->getProcessorId(); + // printf("activating PU with PID: %d\n", pid); + // Creating new task worker auto taskWorker = std::make_shared( taskWorkerId, _executionStateComputeManager, _processingUnitComputeManager, [this, taskWorkerId]() -> taskr::Task * { return taskWorkerLoop(taskWorkerId); }); @@ -409,10 +419,10 @@ class Runtime } /** - * This function informs TaskR that a certain task (with a given unique label) has finished + * This function informs TaskR that a certain task (with a given unique ID) has finished * If this task the last remaining dependency for a given task, now the task may be scheduled for execution. * - * @param[in] task Label of the task to report as finished + * @param[in] task The task to report as finished */ __INLINE__ void setFinishedTask(taskr::Task *const task) { diff --git a/include/taskr/task.hpp b/include/taskr/task.hpp index 929b960..e13da94 100644 --- a/include/taskr/task.hpp +++ b/include/taskr/task.hpp @@ -70,11 +70,11 @@ class Task : public HiCR::tasking::Task * Constructor for the TaskR task class. It requires a user-defined function to execute * The task is considered finished when the function runs to completion. * - * @param[in] label The unique label to assign to this task + * @param[in] taskId The unique identifier to assign to this task * @param[in] fc Specifies the TaskR-formatted function to use * @param[in] workerAffinity The worker affinity to set from the start. Default -1 indicates no affinity. */ - Task(const label_t label, Function *fc, const workerId_t workerAffinity = -1); + Task(const taskId_t taskId, Function *fc, const workerId_t workerAffinity = -1); /** * Returns the task/worker affinity @@ -91,18 +91,18 @@ class Task : public HiCR::tasking::Task __INLINE__ void setWorkerAffinity(const workerId_t workerAffinity) { _workerAffinity = workerAffinity; }; /** - * Function to obtain the task's label + * Function to obtain the task's ID * - * @return The task's label + * @return The task's ID */ - __INLINE__ label_t getLabel() const { return _label; } + __INLINE__ taskId_t getTaskId() const { return _taskId; } /** - * Function to set the task's label + * Function to set the task's ID * - * @param[in] label The label to set + * @param[in] taskId The taskId to set */ - __INLINE__ void setLabel(const label_t label) { _label = label; } + __INLINE__ void setTaskId(const taskId_t taskId) { _taskId = taskId; } /** * Adds one pending operation on the current task @@ -169,7 +169,7 @@ class Task : public HiCR::tasking::Task /** * Unique identifier for the task */ - label_t _label; + taskId_t _taskId; /** * Represents the affinity to a given worker, if specified. -1 if not specified. diff --git a/include/taskr/taskImpl.hpp b/include/taskr/taskImpl.hpp index a53b8c7..40174b7 100644 --- a/include/taskr/taskImpl.hpp +++ b/include/taskr/taskImpl.hpp @@ -42,9 +42,9 @@ __INLINE__ Task::Task(Function *fc, const workerId_t workerAffinity) * Constructor for the TaskR task class. It requires a user-defined function to execute * The task is considered finished when the function runs to completion. */ -__INLINE__ Task::Task(const label_t label, Function *fc, const workerId_t workerAffinity) +__INLINE__ Task::Task(const taskId_t taskId, Function *fc, const workerId_t workerAffinity) : HiCR::tasking::Task(fc->getExecutionUnit(), nullptr), - _label(label), + _taskId(taskId), _workerAffinity(workerAffinity) {} diff --git a/meson.build b/meson.build index bfd5495..ede90f3 100644 --- a/meson.build +++ b/meson.build @@ -50,7 +50,6 @@ if meson.is_subproject() == false HiCRProject = subproject('hicr', required: true, default_options: [ 'backends=' + ','.join(HiCRBackends), 'frontends=tasking' ]) HiCRBuildDep = HiCRProject.get_variable('hicrBuildDep') taskrDependencies += HiCRBuildDep - endif ####### Creating TaskR dependency