Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion examples/memcpy/local/source/pthreads.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "include/telephoneGame.hpp"
#include <hicr/backends/hwloc/memoryManager.hpp>
#include <hicr/backends/pthreads/communicationManager.hpp>
#include <hicr/backends/pthreads/sharedMemoryFactory.hpp>
#include <hicr/backends/hwloc/topologyManager.hpp>

int main(int argc, char **argv)
Expand All @@ -33,8 +34,12 @@ int main(int argc, char **argv)
// Instantiating host (CPU) memory manager
HiCR::backend::hwloc::MemoryManager m(&topology);

// Create shared memory
auto sharedMemoryFactory = HiCR::backend::pthreads::SharedMemoryFactory();
auto &sharedMemory = sharedMemoryFactory.get(0, 1);

// Instantiating host (CPU) communication manager
HiCR::backend::pthreads::CommunicationManager c;
HiCR::backend::pthreads::CommunicationManager c(sharedMemory);

// Asking backend to check the available devices
const auto t = dm.queryTopology();
Expand Down
7 changes: 6 additions & 1 deletion examples/neuralNetwork/source/pthreads.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <hicr/backends/hwloc/topologyManager.hpp>
#include <hicr/backends/pthreads/communicationManager.hpp>
#include <hicr/backends/pthreads/computeManager.hpp>
#include <hicr/backends/pthreads/sharedMemoryFactory.hpp>
#include <hicr/core/exceptions.hpp>

#include "./include/network.hpp"
Expand All @@ -27,10 +28,14 @@ int main(int argc, char **argv)
hwloc_topology_t hwlocTopology;
hwloc_topology_init(&hwlocTopology);

// Create shared memory
auto sharedMemoryFactory = HiCR::backend::pthreads::SharedMemoryFactory();
auto &sharedMemory = sharedMemoryFactory.get(0, 1);

// Instantiating HWLoc-based host (CPU) topology manager
HiCR::backend::hwloc::TopologyManager topologyManager(&hwlocTopology);
HiCR::backend::hwloc::MemoryManager memoryManager(&hwlocTopology);
HiCR::backend::pthreads::CommunicationManager communicationManager;
HiCR::backend::pthreads::CommunicationManager communicationManager(sharedMemory);
HiCR::backend::pthreads::ComputeManager computeManager;

// Asking backend to check the available devices
Expand Down
7 changes: 6 additions & 1 deletion examples/objectStore/singleInstance/source/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <hicr/backends/hwloc/topologyManager.hpp>
#include <hicr/backends/hwloc/memoryManager.hpp>
#include <hicr/backends/pthreads/communicationManager.hpp>
#include <hicr/backends/pthreads/sharedMemoryFactory.hpp>

#include <hicr/frontends/objectStore/objectStore.hpp>

Expand All @@ -32,8 +33,12 @@ int main(int argc, char **argv)
// Reserving memory for hwloc
hwloc_topology_init(&topology);

// Create shared memory
auto sharedMemoryFactory = HiCR::backend::pthreads::SharedMemoryFactory();
auto &sharedMemory = sharedMemoryFactory.get(0, 1);

// Using default instance, communication and memory manager for single instance
auto communicationManager = std::make_unique<HiCR::backend::pthreads::CommunicationManager>();
auto communicationManager = std::make_unique<HiCR::backend::pthreads::CommunicationManager>(sharedMemory);
auto memoryManager = std::make_unique<HiCR::backend::hwloc::MemoryManager>(&topology);

// Using HWLoc as topology managers
Expand Down
6 changes: 3 additions & 3 deletions include/hicr/backends/mpi/communicationManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,14 +163,14 @@ class CommunicationManager : public HiCR::CommunicationManager
{
globalSlotPointers[i] = nullptr;
globalSourceSlots[i] = nullptr;
globalMemorySpaces[i] = nullptr;
globalMemorySpaces[i] = nullptr;
}
else
{
const auto memorySlot = memorySlots[localPointerPos++].second;
globalSlotPointers[i] = &memorySlot->getPointer();
globalSourceSlots[i] = memorySlot;
globalMemorySpaces[i] = memorySlot->getMemorySpace();
globalMemorySpaces[i] = memorySlot->getMemorySpace();
}
}

Expand Down Expand Up @@ -201,7 +201,7 @@ class CommunicationManager : public HiCR::CommunicationManager

// Freeing up memory of the old local memory slot
// Commented out: it is the user who must free up this local memory slot
// MPI_Free_mem(*(globalSlotPointers[i]));
// MPI_Free_mem(*(globalSlotPointers[i]));

// Swapping pointers
*(globalSlotPointers[i]) = ptr;
Expand Down
109 changes: 25 additions & 84 deletions include/hicr/backends/pthreads/communicationManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@
#pragma once

#include <cstring>
#include "pthread.h"
#include <pthread.h>

#include <hicr/core/communicationManager.hpp>
#include <hicr/core/localMemorySlot.hpp>
#include <hicr/backends/hwloc/globalMemorySlot.hpp>

#include "sharedMemory.hpp"

namespace HiCR::backend::pthreads
{

Expand All @@ -42,43 +45,24 @@ class CommunicationManager final : public HiCR::CommunicationManager
public:

/**
* Constructor for the memory manager class for the Pthreads backend
* Constructor for the communication manager class for the pthreads backend
*
* \param[in] fenceCount Specifies how many times a fence has to be called for it to release callers
* \param[in] sharedMemory the shared memory used to exchange global slots among other threads
*/
CommunicationManager(const size_t fenceCount = 1)
: HiCR::CommunicationManager()
{
// Initializing barrier for fence operation
pthread_barrier_init(&_barrier, nullptr, fenceCount);

// Initializing mutex object
pthread_mutex_init(&_mutex, nullptr);
}
CommunicationManager(SharedMemory &sharedMemory)
: HiCR::CommunicationManager(),
_sharedMemory(sharedMemory)
{}

/**
* The destructor deletes all created barrier/mutex locks
*/
~CommunicationManager() override
{
// Freeing barrier memory
pthread_barrier_destroy(&_barrier);

// Freeing mutex memory
pthread_mutex_destroy(&_mutex);
}
~CommunicationManager() = default;

__INLINE__ std::shared_ptr<HiCR::GlobalMemorySlot> getGlobalMemorySlotImpl(const HiCR::backend::hwloc::GlobalMemorySlot::tag_t tag,
const HiCR::backend::hwloc::GlobalMemorySlot::globalKey_t globalKey) override
{
if (_shadowMap.find(tag) != _shadowMap.end())
{
if (_shadowMap[tag].find(globalKey) != _shadowMap[tag].end())
return _shadowMap[tag][globalKey];
else
return nullptr;
}
else { return nullptr; }
return _sharedMemory.get(tag, globalKey);
}

/**
Expand Down Expand Up @@ -110,21 +94,8 @@ class CommunicationManager final : public HiCR::CommunicationManager

private:

/**
* Stores a barrier object to check on a barrier operation
*/
pthread_barrier_t _barrier{};

/**
* A mutex to make sure threads do not bother each other during certain operations
*/
pthread_mutex_t _mutex{};

__INLINE__ void exchangeGlobalMemorySlotsImpl(const HiCR::GlobalMemorySlot::tag_t tag, const std::vector<globalKeyMemorySlotPair_t> &memorySlots) override
{
// Synchronize all intervening threads in this call
barrier();

// Simply adding local memory slots to the global map
for (const auto &entry : memorySlots)
{
Expand All @@ -139,29 +110,23 @@ class CommunicationManager final : public HiCR::CommunicationManager

// Registering memory slot
registerGlobalMemorySlot(globalMemorySlot);
_shadowMap[tag][globalKey] = globalMemorySlot;
}

// Do not allow any thread to continue until the exchange is made
barrier();
// Push it to shared memory
_sharedMemory.insert(tag, globalKey, globalMemorySlot);
}
}

__INLINE__ void queryMemorySlotUpdatesImpl(std::shared_ptr<HiCR::LocalMemorySlot> memorySlot) override
{
// This function should check and update the abstract class for completed memcpy operations
}

/**
* A barrier implementation that synchronizes all threads in the HiCR instance
*/
__INLINE__ void barrier() { pthread_barrier_wait(&_barrier); }

/**
* Implementation of the fence operation for the pthreads backend. In this case, nothing needs to be done, as
* the system's memcpy operation is synchronous. This means that it's mere execution (whether immediate or deferred)
* ensures its completion.
*/
__INLINE__ void fenceImpl(const HiCR::GlobalMemorySlot::tag_t tag) override { barrier(); }
__INLINE__ void fenceImpl(const HiCR::GlobalMemorySlot::tag_t tag) override { _sharedMemory.barrier(); }

__INLINE__ void memcpyImpl(const std::shared_ptr<HiCR::LocalMemorySlot> &destination,
const size_t dst_offset,
Expand Down Expand Up @@ -193,7 +158,7 @@ class CommunicationManager final : public HiCR::CommunicationManager
*/
__INLINE__ void destroyGlobalMemorySlotImpl(std::shared_ptr<HiCR::GlobalMemorySlot> memorySlot) override
{
// Nothing to do here
_sharedMemory.remove(memorySlot->getGlobalTag(), memorySlot->getGlobalKey());
}

__INLINE__ void memcpyImpl(const std::shared_ptr<HiCR::GlobalMemorySlot> &destination,
Expand All @@ -202,17 +167,11 @@ class CommunicationManager final : public HiCR::CommunicationManager
const size_t src_offset,
const size_t size) override
{
// Getting up-casted pointer for the execution unit
auto dst = dynamic_pointer_cast<HiCR::GlobalMemorySlot>(destination);

// Checking whether the execution unit passed is compatible with this backend
if (dst == nullptr) HICR_THROW_LOGIC("The passed destination memory slot is not supported by this backend\n");

// Checking whether the memory slot is local. This backend only supports local data transfers
if (dst->getSourceLocalMemorySlot() == nullptr) HICR_THROW_LOGIC("The passed destination memory slot is not local (required by this backend)\n");
if (destination->getSourceLocalMemorySlot() == nullptr) HICR_THROW_LOGIC("The passed destination memory slot is not local (required by this backend)\n");

// Executing actual memcpy
memcpy(dst->getSourceLocalMemorySlot(), dst_offset, source, src_offset, size);
memcpy(destination->getSourceLocalMemorySlot(), dst_offset, source, src_offset, size);

// Increasing message received/sent counters for both memory slots
increaseMessageRecvCounter(*destination->getSourceLocalMemorySlot());
Expand All @@ -225,17 +184,11 @@ class CommunicationManager final : public HiCR::CommunicationManager
const size_t src_offset,
const size_t size) override
{
// Getting up-casted pointer for the execution unit
auto src = dynamic_pointer_cast<HiCR::GlobalMemorySlot>(source);

// Checking whether the memory slot is compatible with this backend
if (src == nullptr) HICR_THROW_LOGIC("The passed source memory slot is not supported by this backend\n");

// Checking whether the memory slot is local. This backend only supports local data transfers
if (src->getSourceLocalMemorySlot() == nullptr) HICR_THROW_LOGIC("The passed source memory slot is not local (required by this backend)\n");
if (source->getSourceLocalMemorySlot() == nullptr) HICR_THROW_LOGIC("The passed source memory slot is not local (required by this backend)\n");

// Executing actual memcpy
memcpy(destination, dst_offset, src->getSourceLocalMemorySlot(), src_offset, size);
memcpy(destination, dst_offset, source->getSourceLocalMemorySlot(), src_offset, size);

// Increasing message received/sent counters for both memory slots
increaseMessageRecvCounter(*destination);
Expand Down Expand Up @@ -266,24 +219,12 @@ class CommunicationManager final : public HiCR::CommunicationManager
m->unlock();
}

__INLINE__ void lock()
{
// Locking the pthread mutex
pthread_mutex_lock(&_mutex);
}

__INLINE__ void unlock()
{
// Locking the pthread mutex
pthread_mutex_unlock(&_mutex);
}

private:

// this map shadows the core HiCR map _globalMemorySlotTagKeyMap
// to support getGlobalMemorySlot implementation
// for this backend
globalMemorySlotTagKeyMap_t _shadowMap;
/**
* Shared Memory to exchange slots
*/
SharedMemory &_sharedMemory;
};

} // namespace HiCR::backend::pthreads
Loading