Skip to content

Commit 892b0a4

Browse files
authored
Refactor: pthreads shared memory (#45)
* feat: pthread communication manager supports exhange of memory slots
1 parent faeb300 commit 892b0a4

File tree

12 files changed

+392
-102
lines changed

12 files changed

+392
-102
lines changed

examples/memcpy/local/source/pthreads.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include "include/telephoneGame.hpp"
1818
#include <hicr/backends/hwloc/memoryManager.hpp>
1919
#include <hicr/backends/pthreads/communicationManager.hpp>
20+
#include <hicr/backends/pthreads/sharedMemoryFactory.hpp>
2021
#include <hicr/backends/hwloc/topologyManager.hpp>
2122

2223
int main(int argc, char **argv)
@@ -33,8 +34,12 @@ int main(int argc, char **argv)
3334
// Instantiating host (CPU) memory manager
3435
HiCR::backend::hwloc::MemoryManager m(&topology);
3536

37+
// Create shared memory
38+
auto sharedMemoryFactory = HiCR::backend::pthreads::SharedMemoryFactory();
39+
auto &sharedMemory = sharedMemoryFactory.get(0, 1);
40+
3641
// Instantiating host (CPU) communication manager
37-
HiCR::backend::pthreads::CommunicationManager c;
42+
HiCR::backend::pthreads::CommunicationManager c(sharedMemory);
3843

3944
// Asking backend to check the available devices
4045
const auto t = dm.queryTopology();

examples/neuralNetwork/source/pthreads.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <hicr/backends/hwloc/topologyManager.hpp>
77
#include <hicr/backends/pthreads/communicationManager.hpp>
88
#include <hicr/backends/pthreads/computeManager.hpp>
9+
#include <hicr/backends/pthreads/sharedMemoryFactory.hpp>
910
#include <hicr/core/exceptions.hpp>
1011

1112
#include "./include/network.hpp"
@@ -27,10 +28,14 @@ int main(int argc, char **argv)
2728
hwloc_topology_t hwlocTopology;
2829
hwloc_topology_init(&hwlocTopology);
2930

31+
// Create shared memory
32+
auto sharedMemoryFactory = HiCR::backend::pthreads::SharedMemoryFactory();
33+
auto &sharedMemory = sharedMemoryFactory.get(0, 1);
34+
3035
// Instantiating HWLoc-based host (CPU) topology manager
3136
HiCR::backend::hwloc::TopologyManager topologyManager(&hwlocTopology);
3237
HiCR::backend::hwloc::MemoryManager memoryManager(&hwlocTopology);
33-
HiCR::backend::pthreads::CommunicationManager communicationManager;
38+
HiCR::backend::pthreads::CommunicationManager communicationManager(sharedMemory);
3439
HiCR::backend::pthreads::ComputeManager computeManager;
3540

3641
// Asking backend to check the available devices

examples/objectStore/singleInstance/source/main.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include <hicr/backends/hwloc/topologyManager.hpp>
1818
#include <hicr/backends/hwloc/memoryManager.hpp>
1919
#include <hicr/backends/pthreads/communicationManager.hpp>
20+
#include <hicr/backends/pthreads/sharedMemoryFactory.hpp>
2021

2122
#include <hicr/frontends/objectStore/objectStore.hpp>
2223

@@ -32,8 +33,12 @@ int main(int argc, char **argv)
3233
// Reserving memory for hwloc
3334
hwloc_topology_init(&topology);
3435

36+
// Create shared memory
37+
auto sharedMemoryFactory = HiCR::backend::pthreads::SharedMemoryFactory();
38+
auto &sharedMemory = sharedMemoryFactory.get(0, 1);
39+
3540
// Using default instance, communication and memory manager for single instance
36-
auto communicationManager = std::make_unique<HiCR::backend::pthreads::CommunicationManager>();
41+
auto communicationManager = std::make_unique<HiCR::backend::pthreads::CommunicationManager>(sharedMemory);
3742
auto memoryManager = std::make_unique<HiCR::backend::hwloc::MemoryManager>(&topology);
3843

3944
// Using HWLoc as topology managers

include/hicr/backends/mpi/communicationManager.hpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -163,14 +163,14 @@ class CommunicationManager : public HiCR::CommunicationManager
163163
{
164164
globalSlotPointers[i] = nullptr;
165165
globalSourceSlots[i] = nullptr;
166-
globalMemorySpaces[i] = nullptr;
166+
globalMemorySpaces[i] = nullptr;
167167
}
168168
else
169169
{
170170
const auto memorySlot = memorySlots[localPointerPos++].second;
171171
globalSlotPointers[i] = &memorySlot->getPointer();
172172
globalSourceSlots[i] = memorySlot;
173-
globalMemorySpaces[i] = memorySlot->getMemorySpace();
173+
globalMemorySpaces[i] = memorySlot->getMemorySpace();
174174
}
175175
}
176176

@@ -201,7 +201,7 @@ class CommunicationManager : public HiCR::CommunicationManager
201201

202202
// Freeing up memory of the old local memory slot
203203
// Commented out: it is the user who must free up this local memory slot
204-
// MPI_Free_mem(*(globalSlotPointers[i]));
204+
// MPI_Free_mem(*(globalSlotPointers[i]));
205205

206206
// Swapping pointers
207207
*(globalSlotPointers[i]) = ptr;

include/hicr/backends/pthreads/communicationManager.hpp

Lines changed: 25 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,14 @@
2424
#pragma once
2525

2626
#include <cstring>
27-
#include "pthread.h"
27+
#include <pthread.h>
28+
2829
#include <hicr/core/communicationManager.hpp>
2930
#include <hicr/core/localMemorySlot.hpp>
3031
#include <hicr/backends/hwloc/globalMemorySlot.hpp>
3132

33+
#include "sharedMemory.hpp"
34+
3235
namespace HiCR::backend::pthreads
3336
{
3437

@@ -42,43 +45,24 @@ class CommunicationManager final : public HiCR::CommunicationManager
4245
public:
4346

4447
/**
45-
* Constructor for the memory manager class for the Pthreads backend
48+
* Constructor for the communication manager class for the pthreads backend
4649
*
47-
* \param[in] fenceCount Specifies how many times a fence has to be called for it to release callers
50+
* \param[in] sharedMemory the shared memory used to exchange global slots among other threads
4851
*/
49-
CommunicationManager(const size_t fenceCount = 1)
50-
: HiCR::CommunicationManager()
51-
{
52-
// Initializing barrier for fence operation
53-
pthread_barrier_init(&_barrier, nullptr, fenceCount);
54-
55-
// Initializing mutex object
56-
pthread_mutex_init(&_mutex, nullptr);
57-
}
52+
CommunicationManager(SharedMemory &sharedMemory)
53+
: HiCR::CommunicationManager(),
54+
_sharedMemory(sharedMemory)
55+
{}
5856

5957
/**
6058
* The destructor deletes all created barrier/mutex locks
6159
*/
62-
~CommunicationManager() override
63-
{
64-
// Freeing barrier memory
65-
pthread_barrier_destroy(&_barrier);
66-
67-
// Freeing mutex memory
68-
pthread_mutex_destroy(&_mutex);
69-
}
60+
~CommunicationManager() = default;
7061

7162
__INLINE__ std::shared_ptr<HiCR::GlobalMemorySlot> getGlobalMemorySlotImpl(const HiCR::backend::hwloc::GlobalMemorySlot::tag_t tag,
7263
const HiCR::backend::hwloc::GlobalMemorySlot::globalKey_t globalKey) override
7364
{
74-
if (_shadowMap.find(tag) != _shadowMap.end())
75-
{
76-
if (_shadowMap[tag].find(globalKey) != _shadowMap[tag].end())
77-
return _shadowMap[tag][globalKey];
78-
else
79-
return nullptr;
80-
}
81-
else { return nullptr; }
65+
return _sharedMemory.get(tag, globalKey);
8266
}
8367

8468
/**
@@ -110,21 +94,8 @@ class CommunicationManager final : public HiCR::CommunicationManager
11094

11195
private:
11296

113-
/**
114-
* Stores a barrier object to check on a barrier operation
115-
*/
116-
pthread_barrier_t _barrier{};
117-
118-
/**
119-
* A mutex to make sure threads do not bother each other during certain operations
120-
*/
121-
pthread_mutex_t _mutex{};
122-
12397
__INLINE__ void exchangeGlobalMemorySlotsImpl(const HiCR::GlobalMemorySlot::tag_t tag, const std::vector<globalKeyMemorySlotPair_t> &memorySlots) override
12498
{
125-
// Synchronize all intervening threads in this call
126-
barrier();
127-
12899
// Simply adding local memory slots to the global map
129100
for (const auto &entry : memorySlots)
130101
{
@@ -139,29 +110,23 @@ class CommunicationManager final : public HiCR::CommunicationManager
139110

140111
// Registering memory slot
141112
registerGlobalMemorySlot(globalMemorySlot);
142-
_shadowMap[tag][globalKey] = globalMemorySlot;
143-
}
144113

145-
// Do not allow any thread to continue until the exchange is made
146-
barrier();
114+
// Push it to shared memory
115+
_sharedMemory.insert(tag, globalKey, globalMemorySlot);
116+
}
147117
}
148118

149119
__INLINE__ void queryMemorySlotUpdatesImpl(std::shared_ptr<HiCR::LocalMemorySlot> memorySlot) override
150120
{
151121
// This function should check and update the abstract class for completed memcpy operations
152122
}
153123

154-
/**
155-
* A barrier implementation that synchronizes all threads in the HiCR instance
156-
*/
157-
__INLINE__ void barrier() { pthread_barrier_wait(&_barrier); }
158-
159124
/**
160125
* Implementation of the fence operation for the pthreads backend. In this case, nothing needs to be done, as
161126
* the system's memcpy operation is synchronous. This means that it's mere execution (whether immediate or deferred)
162127
* ensures its completion.
163128
*/
164-
__INLINE__ void fenceImpl(const HiCR::GlobalMemorySlot::tag_t tag) override { barrier(); }
129+
__INLINE__ void fenceImpl(const HiCR::GlobalMemorySlot::tag_t tag) override { _sharedMemory.barrier(); }
165130

166131
__INLINE__ void memcpyImpl(const std::shared_ptr<HiCR::LocalMemorySlot> &destination,
167132
const size_t dst_offset,
@@ -193,7 +158,7 @@ class CommunicationManager final : public HiCR::CommunicationManager
193158
*/
194159
__INLINE__ void destroyGlobalMemorySlotImpl(std::shared_ptr<HiCR::GlobalMemorySlot> memorySlot) override
195160
{
196-
// Nothing to do here
161+
_sharedMemory.remove(memorySlot->getGlobalTag(), memorySlot->getGlobalKey());
197162
}
198163

199164
__INLINE__ void memcpyImpl(const std::shared_ptr<HiCR::GlobalMemorySlot> &destination,
@@ -202,17 +167,11 @@ class CommunicationManager final : public HiCR::CommunicationManager
202167
const size_t src_offset,
203168
const size_t size) override
204169
{
205-
// Getting up-casted pointer for the execution unit
206-
auto dst = dynamic_pointer_cast<HiCR::GlobalMemorySlot>(destination);
207-
208-
// Checking whether the execution unit passed is compatible with this backend
209-
if (dst == nullptr) HICR_THROW_LOGIC("The passed destination memory slot is not supported by this backend\n");
210-
211170
// Checking whether the memory slot is local. This backend only supports local data transfers
212-
if (dst->getSourceLocalMemorySlot() == nullptr) HICR_THROW_LOGIC("The passed destination memory slot is not local (required by this backend)\n");
171+
if (destination->getSourceLocalMemorySlot() == nullptr) HICR_THROW_LOGIC("The passed destination memory slot is not local (required by this backend)\n");
213172

214173
// Executing actual memcpy
215-
memcpy(dst->getSourceLocalMemorySlot(), dst_offset, source, src_offset, size);
174+
memcpy(destination->getSourceLocalMemorySlot(), dst_offset, source, src_offset, size);
216175

217176
// Increasing message received/sent counters for both memory slots
218177
increaseMessageRecvCounter(*destination->getSourceLocalMemorySlot());
@@ -225,17 +184,11 @@ class CommunicationManager final : public HiCR::CommunicationManager
225184
const size_t src_offset,
226185
const size_t size) override
227186
{
228-
// Getting up-casted pointer for the execution unit
229-
auto src = dynamic_pointer_cast<HiCR::GlobalMemorySlot>(source);
230-
231-
// Checking whether the memory slot is compatible with this backend
232-
if (src == nullptr) HICR_THROW_LOGIC("The passed source memory slot is not supported by this backend\n");
233-
234187
// Checking whether the memory slot is local. This backend only supports local data transfers
235-
if (src->getSourceLocalMemorySlot() == nullptr) HICR_THROW_LOGIC("The passed source memory slot is not local (required by this backend)\n");
188+
if (source->getSourceLocalMemorySlot() == nullptr) HICR_THROW_LOGIC("The passed source memory slot is not local (required by this backend)\n");
236189

237190
// Executing actual memcpy
238-
memcpy(destination, dst_offset, src->getSourceLocalMemorySlot(), src_offset, size);
191+
memcpy(destination, dst_offset, source->getSourceLocalMemorySlot(), src_offset, size);
239192

240193
// Increasing message received/sent counters for both memory slots
241194
increaseMessageRecvCounter(*destination);
@@ -266,24 +219,12 @@ class CommunicationManager final : public HiCR::CommunicationManager
266219
m->unlock();
267220
}
268221

269-
__INLINE__ void lock()
270-
{
271-
// Locking the pthread mutex
272-
pthread_mutex_lock(&_mutex);
273-
}
274-
275-
__INLINE__ void unlock()
276-
{
277-
// Locking the pthread mutex
278-
pthread_mutex_unlock(&_mutex);
279-
}
280-
281222
private:
282223

283-
// this map shadows the core HiCR map _globalMemorySlotTagKeyMap
284-
// to support getGlobalMemorySlot implementation
285-
// for this backend
286-
globalMemorySlotTagKeyMap_t _shadowMap;
224+
/**
225+
* Shared Memory to exchange slots
226+
*/
227+
SharedMemory &_sharedMemory;
287228
};
288229

289230
} // namespace HiCR::backend::pthreads

0 commit comments

Comments
 (0)