Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
2bdea6c
this should fix the installation of pytest issue in the CI
noabauma Jul 8, 2025
8fa40f1
adding sleepsort pytaskr example
noabauma Jul 23, 2025
220d844
Label -> TaskId
noabauma Jul 23, 2025
9c1ceb9
style fix
noabauma Jul 23, 2025
b51bd59
updated jacobi with lpf (not yet working)
noabauma Jul 29, 2025
5d2537d
minor correction in some pytaskr examples
noabauma Jul 29, 2025
2879652
making nosv also with lpf and mpi (even if they don't work now)
noabauma Jul 29, 2025
d196f67
working on nosv mpi example
noabauma Jul 30, 2025
ef44522
more prints for debugging
noabauma Jul 31, 2025
471c00b
removing prints as well as minor improvement in nosv example
noabauma Aug 4, 2025
1d7e130
commenting out this print as well
noabauma Aug 4, 2025
9806409
more prints for debugging
Aug 6, 2025
9fb58b2
a better way in allocating the resources
noabauma Aug 6, 2025
bea828f
minor more stuff
Aug 8, 2025
98c8ce4
newer version of the no_mutex hicr branch
noabauma Aug 15, 2025
ec3178f
adding minor corrections to allow compute resource choose available N…
Aug 18, 2025
859e8d8
fixing nosv
noabauma Aug 18, 2025
cba6cab
adding finally working pthreads and nosv example
noabauma Aug 20, 2025
f69c6ca
cleanup some stuff
noabauma Aug 22, 2025
bd75493
code formatting
noabauma Aug 22, 2025
b3e9753
update hicr version
noabauma Aug 22, 2025
95b6733
code cleanup of jacobi example
noabauma Aug 26, 2025
5441e3e
fixing minor bug and updating the meson.build of jacobi3d
noabauma Aug 26, 2025
425cb5b
fixing style
noabauma Aug 26, 2025
e355c77
fixing style again
noabauma Aug 26, 2025
82261ad
fixing style again
noabauma Aug 26, 2025
2fe5d5a
fixing style
noabauma Aug 26, 2025
9865feb
Merge branch 'main' into jacobi3d_node_level
noabauma Aug 26, 2025
b318645
fixing style and merge main
noabauma Aug 26, 2025
9e7654a
now, correct core mapping
noabauma Aug 27, 2025
8e20aca
update hicr and meson.build
noabauma Aug 29, 2025
b537ae6
fix: lpf jacobi runs, and update git submodules
lterrac Aug 29, 2025
19292f0
adding old version of getting compute resources
noabauma Aug 29, 2025
ff37476
style: format files
lterrac Aug 29, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/master-test-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ jobs:

- name: Build Pybind11
run: |
/usr/bin/python3 -m pip install --upgrade pip pytest
sudo apt-get update
sudo apt-get install -y python3-pytest

cd extern/pybind11
mkdir -p build
Expand Down
3 changes: 2 additions & 1 deletion .github/workflows/pr-development-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,8 @@ jobs:

- name: Build Pybind11
run: |
/usr/bin/python3 -m pip install --upgrade pip pytest
sudo apt-get update
sudo apt-get install -y python3-pytest

cd extern/pybind11
mkdir -p build
Expand Down
14 changes: 7 additions & 7 deletions examples/abcTasks/cpp/abcTasks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ void abcTasks(taskr::Runtime &taskr)
taskr.setTaskCallbackHandler(HiCR::tasking::Task::callback_t::onTaskFinish, [&taskr](taskr::Task *task) { delete task; });

// Creating the execution units (functions that the tasks will run)
auto taskAfc = taskr::Function([](taskr::Task *task) { printf("Task A %ld\n", task->getLabel()); });
auto taskBfc = taskr::Function([](taskr::Task *task) { printf("Task B %ld\n", task->getLabel()); });
auto taskCfc = taskr::Function([](taskr::Task *task) { printf("Task C %ld\n", task->getLabel()); });
auto taskAfc = taskr::Function([](taskr::Task *task) { printf("Task A %ld\n", task->getTaskId()); });
auto taskBfc = taskr::Function([](taskr::Task *task) { printf("Task B %ld\n", task->getTaskId()); });
auto taskCfc = taskr::Function([](taskr::Task *task) { printf("Task C %ld\n", task->getTaskId()); });

// Initializing taskr
taskr.initialize();
Expand All @@ -38,17 +38,17 @@ void abcTasks(taskr::Runtime &taskr)
for (size_t r = 0; r < REPETITIONS; r++)
{
// Calculating the base task id for this repetition
auto repetitionLabel = r * ITERATIONS * 3;
auto repetitionTaskId = r * ITERATIONS * 3;

// Our connection with the previous iteration is the last task C, null in the first iteration
taskr::Task *prevTaskC = nullptr;

// Each run consists of several iterations of ABC
for (size_t i = 0; i < ITERATIONS; i++)
{
auto taskA = new taskr::Task(repetitionLabel + i * 3 + 0, &taskAfc);
auto taskB = new taskr::Task(repetitionLabel + i * 3 + 1, &taskBfc);
auto taskC = new taskr::Task(repetitionLabel + i * 3 + 2, &taskCfc);
auto taskA = new taskr::Task(repetitionTaskId + i * 3 + 0, &taskAfc);
auto taskB = new taskr::Task(repetitionTaskId + i * 3 + 1, &taskBfc);
auto taskC = new taskr::Task(repetitionTaskId + i * 3 + 2, &taskCfc);

// Creating dependencies
if (i > 0) taskA->addDependency(prevTaskC);
Expand Down
14 changes: 7 additions & 7 deletions examples/abcTasks/python/abcTasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
def abcTasks(runtime):

# Create the taskr Tasks
taskAfc = taskr.Function(lambda task : print(f"Task A {task.getLabel()}"))
taskBfc = taskr.Function(lambda task : print(f"Task B {task.getLabel()}"))
taskCfc = taskr.Function(lambda task : print(f"Task C {task.getLabel()}"))
taskAfc = taskr.Function(lambda task : print(f"Task A {task.getTaskId()}"))
taskBfc = taskr.Function(lambda task : print(f"Task B {task.getTaskId()}"))
taskCfc = taskr.Function(lambda task : print(f"Task C {task.getTaskId()}"))

# Initializing taskr
runtime.initialize()
Expand All @@ -19,13 +19,13 @@ def abcTasks(runtime):
# Creating the execution units (functions that the tasks will run)
for r in range(REPETITIONS):
# Calculating the base task id for this repetition
repetitionLabel = r * ITERATIONS * 3
repetitionTaskId = r * ITERATIONS * 3

for i in range(ITERATIONS):

taskA = taskr.Task(repetitionLabel + i * 3 + 0, taskAfc)
taskB = taskr.Task(repetitionLabel + i * 3 + 1, taskBfc)
taskC = taskr.Task(repetitionLabel + i * 3 + 2, taskCfc)
taskA = taskr.Task(repetitionTaskId + i * 3 + 0, taskAfc)
taskB = taskr.Task(repetitionTaskId + i * 3 + 1, taskBfc)
taskC = taskr.Task(repetitionTaskId + i * 3 + 2, taskCfc)

# Creating dependencies
if i > 0: taskA.addDependency(prevTaskC)
Expand Down
47 changes: 38 additions & 9 deletions examples/jacobi3d/meson.build
Original file line number Diff line number Diff line change
@@ -1,17 +1,46 @@
testSuite = [ 'examples', 'jacobi3d' ]

if 'boost' in get_option('executionStateType') and 'pthreads' in get_option('processingUnitType')
threading = executable('threading', [ 'source/pthreads.cpp', 'source/grid.cpp' ], dependencies: [ TaskRBuildDep ], cpp_args: [ TaskRDistributedCppFlag ] )
if distributedEngine == 'mpi'
TaskRDistributedCppFlag = '-D_TASKR_DISTRIBUTED_ENGINE_MPI'

if get_option('buildTests')
test('threading', threading, args : [ '-n', '64', '-i', '10' ], suite: testSuite, workdir: threading.path() + '.p' )
mpirunExecutable = HiCRProject.get_variable('mpirunExecutable')

if 'boost' in get_option('executionStateType') and 'pthreads' in get_option('processingUnitType')
threading = executable('threading', [ 'source/pthreads.cpp', 'source/grid.cpp' ], dependencies: [ TaskRBuildDep ], cpp_args: [ TaskRDistributedCppFlag ] )

if get_option('buildTests')
test('threading', mpirunExecutable, args : [ '-n', '2', '--oversubscribe', threading.full_path(), '-px', '1', '-py', '1', '-pz', '2', '-lx', '1', '-ly', '2', '-lz', '2', '-n', '64', '-i', '10'], suite: testSuite, workdir: threading.path() + '.p' )
endif
endif

# TODO: deadlocking (even for mpirun -n 1)
if 'nosv' in get_option('executionStateType') and 'nosv' in get_option('processingUnitType')
nosv = executable('nosv', [ 'source/nosv.cpp', 'source/grid.cpp' ], dependencies: [ TaskRBuildDep ], cpp_args: [ TaskRDistributedCppFlag ] )

if get_option('buildTests')
test('nosv', mpirunExecutable, args : [ '-n', '2', '--oversubscribe', nosv.full_path(), '-px', '1', '-py', '1', '-pz', '2', '-lx', '1', '-ly', '2', '-lz', '2', '-n', '64', '-i', '10'], is_parallel : false, suite: testSuite, workdir: nosv.path() + '.p' )
endif
endif

elif distributedEngine == 'lpf'
TaskRDistributedCppFlag = '-D_TASKR_DISTRIBUTED_ENGINE_LPF'

mpirunExecutable = find_program('mpirun', '/usr/bin/mpirun', '/usr/local/bin/mpirun', required : true)

if 'boost' in get_option('executionStateType') and 'pthreads' in get_option('processingUnitType')
threading = executable('threading', [ 'source/pthreads.cpp', 'source/grid.cpp' ], dependencies: [ TaskRBuildDep ], cpp_args: [ TaskRDistributedCppFlag ] )

if get_option('buildTests')
test('threading', mpirunExecutable, args : [ '-n', '2', '--oversubscribe', 'env', 'LPF_ENGINE=zero', threading.full_path(), '-px', '1', '-py', '1', '-pz', '2', '-lx', '1', '-ly', '2', '-lz', '2', '-n', '64', '-i', '10'], suite: testSuite, workdir: threading.path() + '.p' )
endif
endif
endif

if 'nosv' in get_option('executionStateType') and 'nosv' in get_option('processingUnitType')
nosv = executable('nosv', [ 'source/nosv.cpp', 'source/grid.cpp' ], dependencies: [ TaskRBuildDep ], cpp_args: [ TaskRDistributedCppFlag ] )

if 'nosv' in get_option('executionStateType') and 'nosv' in get_option('processingUnitType')
nosv = executable('nosv', [ 'source/nosv.cpp', 'source/grid.cpp' ], dependencies: [ TaskRBuildDep ], cpp_args: [ TaskRDistributedCppFlag ] )

if get_option('buildTests')
test('nosv', nosv, args : [ '-n', '64', '-i', '10' ], is_parallel : false, suite: testSuite, workdir: nosv.path() + '.p' )
if get_option('buildTests')
test('nosv', mpirunExecutable, args : [ '-n', '2', '--oversubscribe', 'env', 'LPF_ENGINE=zero', nosv.full_path(), '-px', '1', '-py', '1', '-pz', '2', '-lx', '1', '-ly', '2', '-lz', '2', '-n', '64', '-i', '10'], is_parallel : false, suite: testSuite, workdir: nosv.path() + '.p' )
endif
endif
endif
21 changes: 9 additions & 12 deletions examples/jacobi3d/mpi/Makefile
Original file line number Diff line number Diff line change
@@ -1,20 +1,17 @@
CFLAGS = -O3 -g -Wfatal-errors
LIBS =
MPICXX = mpicxx
CXX = g++
CXXFLAGS = -O3 -g -Wfatal-errors
INCLUDES = -I/usr/local/include
LIBS = -L/usr/local/lib -lmpi

.SECONDARY:
.SECONDARY:
BINARIES = jacobi

.PHONY: all stage
.PHONY: all clean

all: $(BINARIES)

jacobi: jacobi.cpp grid.cpp
$(MPICXX) $(CFLAGS) $(LIBS) -o $@ $^
$(CXX) $(CXXFLAGS) $(INCLUDES) $^ -o $@ $(LIBS)

.PHONY: clean
clean:
$(RM) $(BINARIES) *.o




$(RM) $(BINARIES) *.o
6 changes: 4 additions & 2 deletions examples/jacobi3d/source/grid.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
* limitations under the License.
*/

#pragma once

#include <vector>
#include <cstdint>
#include <string>
Expand All @@ -28,8 +30,8 @@
#include <hicr/frontends/channel/fixedSize/mpsc/locking/consumer.hpp>

#define CHANNEL_DEPTH 10
const int BLOCKZ = 96;
const int BLOCKY = 64;
constexpr int BLOCKZ = 96;
constexpr int BLOCKY = 64;

extern std::unordered_map<size_t, size_t> taskid_hashmap;

Expand Down
204 changes: 204 additions & 0 deletions examples/jacobi3d/source/jacobi3d.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/*
* Copyright 2025 Huawei Technologies Co., Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <chrono>
#include <hicr/core/instanceManager.hpp>
#include <taskr/taskr.hpp>

#include "grid.hpp"
#include "task.hpp"

void jacobi3d(HiCR::InstanceManager *instanceManager,
taskr::Runtime &taskr,
Grid *g,
size_t gDepth = 1,
size_t N = 128,
ssize_t nIters = 100,
D3 pt = D3({.x = 1, .y = 1, .z = 1}),
D3 lt = D3({.x = 1, .y = 1, .z = 1}))
{
// Getting distributed instance information
const auto instanceCount = instanceManager->getInstances().size();
const auto myInstanceId = instanceManager->getCurrentInstance()->getId();
const auto rootInstanceId = instanceManager->getRootInstanceId();
const auto isRootInstance = myInstanceId == rootInstanceId;

// Initializing the Grid
bool success = g->initialize();
if (success == false) instanceManager->abort(-1);

// Creating grid processing functions
g->resetFc = std::make_unique<taskr::Function>([&g](taskr::Task *task) { g->reset(task, ((Task *)task)->i, ((Task *)task)->j, ((Task *)task)->k); });
g->computeFc =
std::make_unique<taskr::Function>([&g](taskr::Task *task) { g->compute(task, ((Task *)task)->i, ((Task *)task)->j, ((Task *)task)->k, ((Task *)task)->iteration); });
g->receiveFc =
std::make_unique<taskr::Function>([&g](taskr::Task *task) { g->receive(task, ((Task *)task)->i, ((Task *)task)->j, ((Task *)task)->k, ((Task *)task)->iteration); });
g->unpackFc = std::make_unique<taskr::Function>([&g](taskr::Task *task) { g->unpack(task, ((Task *)task)->i, ((Task *)task)->j, ((Task *)task)->k, ((Task *)task)->iteration); });
g->packFc = std::make_unique<taskr::Function>([&g](taskr::Task *task) { g->pack(task, ((Task *)task)->i, ((Task *)task)->j, ((Task *)task)->k, ((Task *)task)->iteration); });
g->sendFc = std::make_unique<taskr::Function>([&g](taskr::Task *task) { g->send(task, ((Task *)task)->i, ((Task *)task)->j, ((Task *)task)->k, ((Task *)task)->iteration); });
g->localResidualFc = std::make_unique<taskr::Function>(
[&g](taskr::Task *task) { g->calculateLocalResidual(task, ((Task *)task)->i, ((Task *)task)->j, ((Task *)task)->k, ((Task *)task)->iteration); });

// Task map
std::map<taskr::taskId_t, std::shared_ptr<taskr::Task>> _taskMap;

printf("Instance %lu: Executing...\n", myInstanceId);

// Creating tasks to reset the grid
for (ssize_t i = 0; i < lt.x; i++)
for (ssize_t j = 0; j < lt.y; j++)
for (ssize_t k = 0; k < lt.z; k++)
{
auto resetTask = new Task("Reset", i, j, k, 0, g->resetFc.get());
taskr.addTask(resetTask);
}

// Initializing TaskR
taskr.initialize();

// Running Taskr initially
taskr.run();

// Waiting for Taskr to finish
taskr.await();

// Creating and adding tasks (graph nodes)
for (ssize_t it = 0; it < nIters; it++)
for (ssize_t i = 0; i < lt.x; i++)
for (ssize_t j = 0; j < lt.y; j++)
for (ssize_t k = 0; k < lt.z; k++)
{
auto localId = g->localSubGridMapping[k][j][i];
auto &subGrid = g->subgrids[localId];

// create new specific tasks
auto computeTask = std::make_shared<Task>("Compute", i, j, k, it, g->computeFc.get());
auto packTask = std::make_shared<Task>("Pack", i, j, k, it, g->packFc.get());
auto sendTask = std::make_shared<Task>("Send", i, j, k, it, g->sendFc.get());
auto recvTask = std::make_shared<Task>("Receive", i, j, k, it, g->receiveFc.get());
auto unpackTask = std::make_shared<Task>("Unpack", i, j, k, it, g->unpackFc.get());

_taskMap[Task::encodeTaskName("Compute", i, j, k, it)] = computeTask;
_taskMap[Task::encodeTaskName("Pack", i, j, k, it)] = packTask;
_taskMap[Task::encodeTaskName("Send", i, j, k, it)] = sendTask;
_taskMap[Task::encodeTaskName("Receive", i, j, k, it)] = recvTask;
_taskMap[Task::encodeTaskName("Unpack", i, j, k, it)] = unpackTask;

// Creating and adding local compute task dependencies
if (it > 0)
if (subGrid.X0.type == LOCAL) computeTask->addDependency(_taskMap[Task::encodeTaskName("Compute", i - 1, j + 0, k + 0, it - 1)].get());
if (it > 0)
if (subGrid.X1.type == LOCAL) computeTask->addDependency(_taskMap[Task::encodeTaskName("Compute", i + 1, j + 0, k + 0, it - 1)].get());
if (it > 0)
if (subGrid.Y0.type == LOCAL) computeTask->addDependency(_taskMap[Task::encodeTaskName("Compute", i + 0, j - 1, k + 0, it - 1)].get());
if (it > 0)
if (subGrid.Y1.type == LOCAL) computeTask->addDependency(_taskMap[Task::encodeTaskName("Compute", i + 0, j + 1, k + 0, it - 1)].get());
if (it > 0)
if (subGrid.Z0.type == LOCAL) computeTask->addDependency(_taskMap[Task::encodeTaskName("Compute", i + 0, j + 0, k - 1, it - 1)].get());
if (it > 0)
if (subGrid.Z1.type == LOCAL) computeTask->addDependency(_taskMap[Task::encodeTaskName("Compute", i + 0, j + 0, k + 1, it - 1)].get());
if (it > 0) computeTask->addDependency(_taskMap[Task::encodeTaskName("Compute", i + 0, j + 0, k + 0, it - 1)].get());

// Adding communication-related dependencies
if (it > 0) computeTask->addDependency(_taskMap[Task::encodeTaskName("Pack", i, j, k, it - 1)].get());
if (it > 0) computeTask->addDependency(_taskMap[Task::encodeTaskName("Unpack", i, j, k, it - 1)].get());

// Creating and adding receive task dependencies, from iteration 1 onwards
if (it > 0) recvTask->addDependency(_taskMap[Task::encodeTaskName("Unpack", i, j, k, it - 1)].get());

// Creating and adding unpack task dependencies
unpackTask->addDependency(_taskMap[Task::encodeTaskName("Receive", i, j, k, it)].get());
unpackTask->addDependency(_taskMap[Task::encodeTaskName("Compute", i, j, k, it)].get());

// Creating and adding send task dependencies, from iteration 1 onwards
packTask->addDependency(_taskMap[Task::encodeTaskName("Compute", i, j, k, it)].get());
if (it > 0) packTask->addDependency(_taskMap[Task::encodeTaskName("Send", i, j, k, it - 1)].get());

// Creating and adding send task dependencies, from iteration 1 onwards
sendTask->addDependency(_taskMap[Task::encodeTaskName("Pack", i, j, k, it)].get());

// Adding tasks to taskr
taskr.addTask(computeTask.get());
if (it < nIters - 1) taskr.addTask(packTask.get());
if (it < nIters - 1) taskr.addTask(sendTask.get());
if (it < nIters - 1) taskr.addTask(recvTask.get());
if (it < nIters - 1) taskr.addTask(unpackTask.get());
}

// Setting start time as now
auto t0 = std::chrono::high_resolution_clock::now();

// Running Taskr
taskr.run();

// Waiting for Taskr to finish
taskr.await();

////// Calculating residual

// Reset local residual to zero
g->resetResidual();

// Calculating local residual
for (ssize_t i = 0; i < lt.x; i++)
for (ssize_t j = 0; j < lt.y; j++)
for (ssize_t k = 0; k < lt.z; k++)
{
auto residualTask = new Task("Residual", i, j, k, nIters, g->localResidualFc.get());
taskr.addTask(residualTask);
}

// Running Taskr
taskr.run();

// Waiting for Taskr to finish
taskr.await();

// Finalizing TaskR
taskr.finalize();

// If i'm not the root instance, simply send my locally calculated residual
if (isRootInstance == false)
{
*(double *)g->residualSendBuffer->getPointer() = g->_residual;
g->residualProducerChannel->push(g->residualSendBuffer, 1);
}
else
{
// Otherwise gather all the residuals and print the results
double globalRes = g->_residual;

for (size_t i = 0; i < instanceCount - 1; i++)
{
while (g->residualConsumerChannel->isEmpty());
double *residualPtr = (double *)g->residualConsumerChannel->getTokenBuffer()->getSourceLocalMemorySlot()->getPointer() + g->residualConsumerChannel->peek(0);
g->residualConsumerChannel->pop();
globalRes += *residualPtr;
}

// Setting final time now
auto tf = std::chrono::high_resolution_clock::now();
std::chrono::duration<float> dt = tf - t0;
float execTime = dt.count();

double residual = sqrt(globalRes / ((double)(N - 1) * (double)(N - 1) * (double)(N - 1)));
double gflops = nIters * (double)N * (double)N * (double)N * (2 + gDepth * 8) / (1.0e9);
printf("%.4fs, %.3f GFlop/s (L2 Norm: %.10g)\n", execTime, gflops / execTime, residual);
}

// Finalizing grid
g->finalize();
}
Loading
Loading