Skip to content

Commit 05e9a33

Browse files
authored
Merge pull request #46 from Algebraic-Programming/feat/channelDifferentCommManagers
Feat: channel different communication managers - channels now require separate communication manager for coordination buffers and payloads - fix pthreads communication manager to keep exchanged global memory slots up to date - add pthreads channels examples - fix object store example - add missing docs
2 parents 892b0a4 + 022c8dc commit 05e9a33

File tree

76 files changed

+2026
-750
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

76 files changed

+2026
-750
lines changed

.github/workflows/master.yml

Lines changed: 30 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -75,35 +75,37 @@ jobs:
7575
os: ubuntu-24.04-arm
7676
arch: arm64
7777

78-
build-acl-docker-amd64:
79-
needs: [ push-buildenv-manifest ]
80-
if: |
81-
always() &&
82-
(contains(needs.push-buildenv-manifest.result, 'success') || contains(needs.push-buildenv-manifest.result, 'skipped'))
83-
uses: Algebraic-Programming/HiCR/.github/workflows/master-build-acl-workflow.yml@master
84-
with:
85-
os: ubuntu-24.04
86-
arch: amd64
78+
# disabled because of broken docker image creation
79+
80+
# build-acl-docker-amd64:
81+
# needs: [ push-buildenv-manifest ]
82+
# if: |
83+
# always() &&
84+
# (contains(needs.push-buildenv-manifest.result, 'success') || contains(needs.push-buildenv-manifest.result, 'skipped'))
85+
# uses: Algebraic-Programming/HiCR/.github/workflows/master-build-acl-workflow.yml@master
86+
# with:
87+
# os: ubuntu-24.04
88+
# arch: amd64
8789

88-
push-buildenv-acl-manifest:
89-
runs-on: ubuntu-latest
90-
needs: [ build-acl-docker-amd64, build-acl-docker-arm64 ]
91-
if: |
92-
always() &&
93-
contains(needs.build-acl-docker-amd64.result, 'success') &&
94-
contains(needs.build-acl-docker-arm64.result, 'success')
95-
steps:
96-
- name: Set up Docker Buildx
97-
uses: docker/setup-buildx-action@v3
90+
# push-buildenv-acl-manifest:
91+
# runs-on: ubuntu-latest
92+
# needs: [ build-acl-docker-amd64, build-acl-docker-arm64 ]
93+
# if: |
94+
# always() &&
95+
# contains(needs.build-acl-docker-amd64.result, 'success') &&
96+
# contains(needs.build-acl-docker-arm64.result, 'success')
97+
# steps:
98+
# - name: Set up Docker Buildx
99+
# uses: docker/setup-buildx-action@v3
98100

99-
- name: Log in to the Container registry
100-
uses: docker/login-action@v3
101-
with:
102-
registry: ${{ env.REGISTRY }}
103-
username: ${{ github.repository_owner }}
104-
password: ${{ secrets.GITHUB_TOKEN }}
101+
# - name: Log in to the Container registry
102+
# uses: docker/login-action@v3
103+
# with:
104+
# registry: ${{ env.REGISTRY }}
105+
# username: ${{ github.repository_owner }}
106+
# password: ${{ secrets.GITHUB_TOKEN }}
105107

106-
- name: Create and push manifest images
107-
run:
108-
docker buildx imagetools create --tag ${{ env.DOCKERIMAGE }}-acl:latest ${{ env.DOCKERIMAGE }}-acl:amd64-latest ${{ env.DOCKERIMAGE }}-acl:arm64-latest
108+
# - name: Create and push manifest images
109+
# run:
110+
# docker buildx imagetools create --tag ${{ env.DOCKERIMAGE }}-acl:latest ${{ env.DOCKERIMAGE }}-acl:amd64-latest ${{ env.DOCKERIMAGE }}-acl:arm64-latest
109111

.github/workflows/style.yml

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,52 @@ jobs:
2020
run: sudo apt update
2121
- name: Installing clang-tormat
2222
run: |
23-
python3 -m pip install 'clang-format==18.1.0'
23+
python3 -m pip install --upgrade pip
24+
python3 -m pip install 'clang-format==18.1.0'
25+
python3 -m pip install sphinx
26+
python3 -m pip install sphinx_rtd_theme
27+
python3 -m pip install sphinxcontrib-bibtex
28+
python3 -m pip install doxysphinx
29+
python3 -m pip install sphinxcontrib.needs
30+
python3 -m pip install sphinxcontrib.plantuml
31+
python3 -m pip install autoapi
32+
python3 -m pip install sphinx-autoapi
33+
python3 -m pip install myst_parser
34+
python3 -m pip install sphinx_copybutton
35+
python3 -m pip install sphinxcontrib.doxylink
36+
python3 -m pip install sphinx_design
37+
python3 -m pip install Sphinx-Substitution-Extensions
38+
python3 -m pip install sphinx_toolbox
39+
python3 -m pip install sphinx-theme
40+
python3 -m pip install sphinx-book-theme
41+
sudo apt update --fix-missing
42+
sudo apt install -y build-essential \
43+
doxygen \
44+
graphviz \
45+
fonts-freefont-ttf \
46+
texlive \
47+
texlive-latex-extra \
48+
texlive-fonts-extra \
49+
libffi-dev \
50+
ghostscript \
51+
texlive-extra-utils \
52+
texlive-font-utils
53+
54+
python3 -m pip install 'sphinx==7.2.6'
55+
sphinx-build --version
2456
- name: Checking style
2557
run: |
2658
echo "Checking HiCR source and test formatting..."
2759
.build-tools/style/check-style.sh check include
2860
.build-tools/style/check-style.sh check tests
2961
.build-tools/style/check-style.sh check examples
62+
- name: Build documentation
63+
run: |
64+
echo "Building code documentation..."
65+
make -j1 -C docs
66+
mkdir public
67+
cp -r docs/build/html/* public
68+
- uses: actions/upload-artifact@v4
69+
with:
70+
name: docs
71+
path: docs/build/html

examples/channels/fixedSize/mpsc/locking/include/consumer.hpp

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -22,41 +22,45 @@
2222
#include <hicr/frontends/channel/fixedSize/mpsc/locking/consumer.hpp>
2323
#include "common.hpp"
2424

25-
void consumerFc(HiCR::MemoryManager &memoryManager,
26-
HiCR::CommunicationManager &communicationManager,
27-
std::shared_ptr<HiCR::MemorySpace> bufferMemorySpace,
25+
void consumerFc(HiCR::MemoryManager &coordinationMemoryManager,
26+
HiCR::MemoryManager &payloadMemoryManager,
27+
HiCR::CommunicationManager &coordinationCommunicationManager,
28+
HiCR::CommunicationManager &payloadCommunicationManager,
29+
std::shared_ptr<HiCR::MemorySpace> coordinationMemorySpace,
30+
std::shared_ptr<HiCR::MemorySpace> payloadMemorySpace,
2831
const size_t channelCapacity,
2932
const size_t producerCount)
3033
{
3134
// Getting required buffer sizes
3235
auto tokenBufferSize = HiCR::channel::fixedSize::Base::getTokenBufferSize(sizeof(ELEMENT_TYPE), channelCapacity);
3336

3437
// Registering token buffer as a local memory slot
35-
auto tokenBufferSlot = memoryManager.allocateLocalMemorySlot(bufferMemorySpace, tokenBufferSize);
38+
auto tokenBufferSlot = payloadMemoryManager.allocateLocalMemorySlot(payloadMemorySpace, tokenBufferSize);
3639

3740
// Getting required buffer size
3841
auto coordinationBufferSize = HiCR::channel::fixedSize::Base::getCoordinationBufferSize();
3942

4043
// Registering token buffer as a local memory slot
41-
auto coordinationBuffer = memoryManager.allocateLocalMemorySlot(bufferMemorySpace, coordinationBufferSize);
44+
auto coordinationBuffer = coordinationMemoryManager.allocateLocalMemorySlot(coordinationMemorySpace, coordinationBufferSize);
4245

4346
// Initializing coordination buffer (sets to zero the counters)
4447
HiCR::channel::fixedSize::Base::initializeCoordinationBuffer(coordinationBuffer);
4548

4649
// Exchanging local memory slots to become global for them to be used by the remote end
47-
communicationManager.exchangeGlobalMemorySlots(CHANNEL_TAG, /* global tag */
48-
{{TOKEN_BUFFER_KEY, tokenBufferSlot}, /* key-slot pairs */
49-
{CONSUMER_COORDINATION_BUFFER_KEY, coordinationBuffer}});
50+
coordinationCommunicationManager.exchangeGlobalMemorySlots(CHANNEL_TAG, {{CONSUMER_COORDINATION_BUFFER_KEY, coordinationBuffer}});
51+
payloadCommunicationManager.exchangeGlobalMemorySlots(CHANNEL_TAG, {{TOKEN_BUFFER_KEY, tokenBufferSlot}});
5052

5153
// Synchronizing so that all actors have finished registering their global memory slots
52-
communicationManager.fence(CHANNEL_TAG);
54+
coordinationCommunicationManager.fence(CHANNEL_TAG);
55+
payloadCommunicationManager.fence(CHANNEL_TAG);
5356

5457
// Obtaining the globally exchanged memory slots
55-
auto globalTokenBufferSlot = communicationManager.getGlobalMemorySlot(CHANNEL_TAG, TOKEN_BUFFER_KEY);
56-
auto consumerCoordinationBuffer = communicationManager.getGlobalMemorySlot(CHANNEL_TAG, CONSUMER_COORDINATION_BUFFER_KEY);
58+
auto consumerCoordinationBuffer = coordinationCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, CONSUMER_COORDINATION_BUFFER_KEY);
59+
auto globalTokenBufferSlot = payloadCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, TOKEN_BUFFER_KEY);
5760

5861
// Creating producer and consumer channels
59-
auto consumer = HiCR::channel::fixedSize::MPSC::locking::Consumer(communicationManager,
62+
auto consumer = HiCR::channel::fixedSize::MPSC::locking::Consumer(coordinationCommunicationManager,
63+
payloadCommunicationManager,
6064
globalTokenBufferSlot, /* tokenBuffer */
6165
coordinationBuffer, /* internalCoordinationBuffer */
6266
consumerCoordinationBuffer,
@@ -90,18 +94,20 @@ void consumerFc(HiCR::MemoryManager &memoryManager,
9094
}
9195

9296
// Synchronizing so that all actors have finished registering their global memory slots
93-
communicationManager.fence(CHANNEL_TAG);
97+
coordinationCommunicationManager.fence(CHANNEL_TAG);
98+
payloadCommunicationManager.fence(CHANNEL_TAG);
9499

95100
// De-registering global slots
96-
communicationManager.deregisterGlobalMemorySlot(globalTokenBufferSlot);
97-
communicationManager.deregisterGlobalMemorySlot(consumerCoordinationBuffer);
101+
payloadCommunicationManager.deregisterGlobalMemorySlot(globalTokenBufferSlot);
102+
coordinationCommunicationManager.deregisterGlobalMemorySlot(consumerCoordinationBuffer);
98103

99-
communicationManager.destroyGlobalMemorySlot(globalTokenBufferSlot);
100-
communicationManager.destroyGlobalMemorySlot(consumerCoordinationBuffer);
104+
payloadCommunicationManager.destroyGlobalMemorySlot(globalTokenBufferSlot);
105+
coordinationCommunicationManager.destroyGlobalMemorySlot(consumerCoordinationBuffer);
101106

102-
communicationManager.fence(CHANNEL_TAG);
107+
coordinationCommunicationManager.fence(CHANNEL_TAG);
108+
payloadCommunicationManager.fence(CHANNEL_TAG);
103109

104110
// Freeing up local memory
105-
memoryManager.freeLocalMemorySlot(tokenBufferSlot);
106-
memoryManager.freeLocalMemorySlot(coordinationBuffer);
111+
payloadMemoryManager.freeLocalMemorySlot(tokenBufferSlot);
112+
coordinationMemoryManager.freeLocalMemorySlot(coordinationBuffer);
107113
}

examples/channels/fixedSize/mpsc/locking/include/producer.hpp

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,33 +22,39 @@
2222
#include <hicr/frontends/channel/fixedSize/mpsc/locking/producer.hpp>
2323
#include "common.hpp"
2424

25-
void producerFc(HiCR::MemoryManager &memoryManager,
26-
HiCR::CommunicationManager &communicationManager,
27-
std::shared_ptr<HiCR::MemorySpace> bufferMemorySpace,
25+
void producerFc(HiCR::MemoryManager &coordinationMemoryManager,
26+
HiCR::MemoryManager &payloadMemoryManager,
27+
HiCR::CommunicationManager &coordinationCommunicationManager,
28+
HiCR::CommunicationManager &payloadCommunicationManager,
29+
std::shared_ptr<HiCR::MemorySpace> coordinationMemorySpace,
30+
std::shared_ptr<HiCR::MemorySpace> payloadMemorySpace,
2831
const size_t channelCapacity,
2932
const size_t producerId)
3033
{
3134
// Getting required buffer size
3235
auto coordinationBufferSize = HiCR::channel::fixedSize::Base::getCoordinationBufferSize();
3336

3437
// Registering token buffer as a local memory slot
35-
auto coordinationBuffer = memoryManager.allocateLocalMemorySlot(bufferMemorySpace, coordinationBufferSize);
38+
auto coordinationBuffer = coordinationMemoryManager.allocateLocalMemorySlot(coordinationMemorySpace, coordinationBufferSize);
3639

3740
// Initializing coordination buffer (sets to zero the counters)
3841
HiCR::channel::fixedSize::Base::initializeCoordinationBuffer(coordinationBuffer);
3942

4043
// Exchanging local memory slots to become global for them to be used by the remote end
41-
communicationManager.exchangeGlobalMemorySlots(CHANNEL_TAG, {});
44+
coordinationCommunicationManager.exchangeGlobalMemorySlots(CHANNEL_TAG, {});
45+
payloadCommunicationManager.exchangeGlobalMemorySlots(CHANNEL_TAG, {});
4246

4347
// Synchronizing so that all actors have finished registering their global memory slots
44-
communicationManager.fence(CHANNEL_TAG);
48+
coordinationCommunicationManager.fence(CHANNEL_TAG);
49+
payloadCommunicationManager.fence(CHANNEL_TAG);
4550

4651
// Obtaining the globally exchanged memory slots
47-
auto globalTokenBufferSlot = communicationManager.getGlobalMemorySlot(CHANNEL_TAG, TOKEN_BUFFER_KEY);
48-
auto consumerCoordinationBuffer = communicationManager.getGlobalMemorySlot(CHANNEL_TAG, CONSUMER_COORDINATION_BUFFER_KEY);
52+
auto globalTokenBufferSlot = payloadCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, TOKEN_BUFFER_KEY);
53+
auto consumerCoordinationBuffer = coordinationCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, CONSUMER_COORDINATION_BUFFER_KEY);
4954

5055
// Creating producer and consumer channels
51-
auto producer = HiCR::channel::fixedSize::MPSC::locking::Producer(communicationManager,
56+
auto producer = HiCR::channel::fixedSize::MPSC::locking::Producer(coordinationCommunicationManager,
57+
payloadCommunicationManager,
5258
globalTokenBufferSlot, /* tokenBuffer */
5359
coordinationBuffer, /* internalCoordinationBuffer */
5460
consumerCoordinationBuffer,
@@ -58,7 +64,7 @@ void producerFc(HiCR::MemoryManager &memoryManager,
5864
// Allocating a send slot to put the values we want to communicate
5965
ELEMENT_TYPE sendBuffer = 0;
6066
auto sendBufferPtr = &sendBuffer;
61-
auto sendSlot = memoryManager.registerLocalMemorySlot(bufferMemorySpace, sendBufferPtr, sizeof(ELEMENT_TYPE));
67+
auto sendSlot = payloadMemoryManager.registerLocalMemorySlot(payloadMemorySpace, sendBufferPtr, sizeof(ELEMENT_TYPE));
6268

6369
// Pushing values to the channel, one by one, suspending when/if the channel is full
6470
for (size_t i = 0; i < MESSAGES_PER_PRODUCER; i++)
@@ -74,15 +80,17 @@ void producerFc(HiCR::MemoryManager &memoryManager,
7480
}
7581

7682
// Synchronizing so that all actors have finished registering their global memory slots
77-
communicationManager.fence(CHANNEL_TAG);
83+
coordinationCommunicationManager.fence(CHANNEL_TAG);
84+
payloadCommunicationManager.fence(CHANNEL_TAG);
7885

7986
// De-registering global slots
80-
communicationManager.deregisterGlobalMemorySlot(globalTokenBufferSlot);
81-
communicationManager.deregisterGlobalMemorySlot(consumerCoordinationBuffer);
87+
payloadCommunicationManager.deregisterGlobalMemorySlot(globalTokenBufferSlot);
88+
coordinationCommunicationManager.deregisterGlobalMemorySlot(consumerCoordinationBuffer);
8289

8390
// Destroying global slots
84-
communicationManager.fence(CHANNEL_TAG);
91+
coordinationCommunicationManager.fence(CHANNEL_TAG);
92+
payloadCommunicationManager.fence(CHANNEL_TAG);
8593

8694
// Freeing up local memory
87-
memoryManager.freeLocalMemorySlot(coordinationBuffer);
95+
coordinationMemoryManager.freeLocalMemorySlot(coordinationBuffer);
8896
}
Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,6 @@
1-
testSuite = [ 'examples', 'channels', 'fixedSize', 'mpsc', 'locking']
2-
test_timeout = 60
3-
41
exampleBuildIncludes = include_directories([
52
'include'
63
])
7-
8-
if 'mpi' in enabledBackends and 'hwloc' in enabledBackends
9-
mpi = executable('mpi', [ 'source/mpi.cpp' ], dependencies: hicrBuildDep, include_directories: [exampleBuildIncludes] )
10-
11-
if get_option('buildTests')
12-
test('mpi', mpirunExecutable, args : [ '-n', '4', '--oversubscribe', '--host', 'localhost:16', mpi.full_path(), '3' ], timeout: test_timeout, suite: testSuite )
13-
endif
14-
endif
154

16-
if 'lpf' in enabledBackends and 'hwloc' in enabledBackends
17-
lpf = executable('lpf', [ 'source/lpf.cpp' ], dependencies: hicrBuildDep, include_directories: [exampleBuildIncludes] )
18-
if get_option('buildTests')
19-
test('lpf', lpfrunExecutable, args : [ '-np', '4', '-engine', 'zero', lpf.full_path(), '3' ], timeout: test_timeout, suite: testSuite )
20-
endif
21-
endif
5+
subdir('source/distributed')
6+
subdir('source/local')

examples/channels/fixedSize/mpsc/locking/source/lpf.cpp renamed to examples/channels/fixedSize/mpsc/locking/source/distributed/lpf.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
#include <hicr/backends/lpf/memoryManager.hpp>
2121
#include <hicr/backends/lpf/communicationManager.hpp>
2222
#include <hicr/backends/hwloc/topologyManager.hpp>
23-
#include "include/consumer.hpp"
24-
#include "include/producer.hpp"
23+
#include "../include/consumer.hpp"
24+
#include "../include/producer.hpp"
2525

2626
// flag needed when using MPI to launch
2727
const int LPF_MPI_AUTO_INITIALIZE = 0;
@@ -81,8 +81,8 @@ void spmd(lpf_t lpf, lpf_pid_t pid, lpf_pid_t nprocs, lpf_args_t args)
8181
size_t rankId = pid;
8282

8383
// Rank 0 is consumer, the rest are producers
84-
if (rankId == 0) consumerFc(m, c, firstMemorySpace, channelCapacity, producerCount);
85-
if (rankId >= 1) producerFc(m, c, firstMemorySpace, channelCapacity, rankId);
84+
if (rankId == 0) consumerFc(m, m, c, c, firstMemorySpace, firstMemorySpace, channelCapacity, producerCount);
85+
if (rankId >= 1) producerFc(m, m, c, c, firstMemorySpace, firstMemorySpace, channelCapacity, rankId);
8686
}
8787

8888
int main(int argc, char **argv)
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
testSuite = ['examples', 'channels', 'fixedSize', 'mpsc', 'locking', 'distributed']
2+
test_timeout = 60
3+
4+
if 'mpi' in enabledBackends and 'hwloc' in enabledBackends
5+
mpi = executable(
6+
'mpi',
7+
['mpi.cpp'],
8+
dependencies: hicrBuildDep,
9+
include_directories: [exampleBuildIncludes],
10+
)
11+
12+
if get_option('buildTests')
13+
test(
14+
'mpi',
15+
mpirunExecutable,
16+
args: [
17+
'-n', '4',
18+
'--oversubscribe',
19+
'--host', 'localhost:16',
20+
mpi.full_path(),
21+
'3',
22+
],
23+
timeout: test_timeout,
24+
suite: testSuite,
25+
)
26+
endif
27+
endif
28+
29+
if 'lpf' in enabledBackends and 'hwloc' in enabledBackends
30+
lpf = executable(
31+
'lpf',
32+
['lpf.cpp'],
33+
dependencies: hicrBuildDep,
34+
include_directories: [exampleBuildIncludes],
35+
)
36+
if get_option('buildTests')
37+
test(
38+
'lpf',
39+
lpfrunExecutable,
40+
args: ['-np', '4', '-engine', 'zero', lpf.full_path(), '3'],
41+
timeout: test_timeout,
42+
suite: testSuite,
43+
)
44+
endif
45+
endif

0 commit comments

Comments
 (0)