diff --git a/examples/channels/variableSize/mpsc/nonlocking/include/producer.hpp b/examples/channels/variableSize/mpsc/nonlocking/include/producer.hpp index 74bb6830..2b4dc020 100644 --- a/examples/channels/variableSize/mpsc/nonlocking/include/producer.hpp +++ b/examples/channels/variableSize/mpsc/nonlocking/include/producer.hpp @@ -129,7 +129,7 @@ void producerFc(HiCR::MemoryManager &coordinationMemoryManager, // is the message sizes buffer full? // Note: it might be necessary sometimes to also check the payload buffers // as they might overflow independently of the message size buffers - while (producer.isFull()) { producer.updateDepth(); } + while (producer.isFull(nextElemSize)) { producer.updateDepth(); } producer.push(sendSlot); Printer::printBytes(prefix, elements[i].first, payloadCapacity, 0, nextElemSize); diff --git a/examples/channels/variableSize/spsc/include/consumer.hpp b/examples/channels/variableSize/spsc/include/consumer.hpp index 9bd079fe..8b4ee68f 100644 --- a/examples/channels/variableSize/spsc/include/consumer.hpp +++ b/examples/channels/variableSize/spsc/include/consumer.hpp @@ -36,7 +36,7 @@ void consumerFc(HiCR::MemoryManager &coordinationMemoryManager, auto sizesBufferSlot = coordinationMemoryManager.allocateLocalMemorySlot(coordinationMemorySpace, sizesBufferSize); // Allocating payload buffer as a local memory slot - auto payloadBufferSlot = payloadMemoryManager.allocateLocalMemorySlot(payloadMemorySpace, PAYLOAD_CAPACITY); + auto payloadBufferSlot = payloadMemoryManager.allocateLocalMemorySlot(payloadMemorySpace, PAYLOAD_CAPACITY * 2); // Getting required buffer size auto coordinationBufferSize = HiCR::channel::variableSize::Base::getCoordinationBufferSize(); @@ -86,7 +86,7 @@ void consumerFc(HiCR::MemoryManager &coordinationMemoryManager, channelCapacity); // Getting a single value from the channel - while (consumer.getDepth() != 1) consumer.updateDepth(); + while (consumer.getCoordinationDepth() != 1) consumer.updateDepth(); // Getting internal pointer of the token buffer slot auto payloadBufferPtr = (ELEMENT_TYPE *)payloadBufferSlot->getPointer(); diff --git a/include/hicr/frontends/channel/variableSize/mpsc/nonlocking/consumer.hpp b/include/hicr/frontends/channel/variableSize/mpsc/nonlocking/consumer.hpp index 31b2c291..4af63b9e 100644 --- a/include/hicr/frontends/channel/variableSize/mpsc/nonlocking/consumer.hpp +++ b/include/hicr/frontends/channel/variableSize/mpsc/nonlocking/consumer.hpp @@ -104,7 +104,7 @@ class Consumer * immediately upon creation of the SPSC channel. Therefore we do not reset * _depths to zero, and check for "early" received messages */ - _depths.push_back(consumerPtr->getDepth()); + _depths.push_back(consumerPtr->getCoordinationDepth()); for (size_t j = 0; j < _depths.back(); j++) { _channelPushes.push(i); } } } @@ -222,7 +222,7 @@ class Consumer for (size_t i = 0; i < _spscList.size(); i++) { _spscList[i]->updateDepth(); - newDepths[i] = _spscList[i]->getDepth(); + newDepths[i] = _spscList[i]->getCoordinationDepth(); } for (size_t i = 0; i < _spscList.size(); i++) diff --git a/include/hicr/frontends/channel/variableSize/spsc/consumer.hpp b/include/hicr/frontends/channel/variableSize/spsc/consumer.hpp index d04b20ea..457e7948 100644 --- a/include/hicr/frontends/channel/variableSize/spsc/consumer.hpp +++ b/include/hicr/frontends/channel/variableSize/spsc/consumer.hpp @@ -49,9 +49,9 @@ class Consumer final : public variableSize::Base * * \param[in] coordinationCommunicationManager The backend's memory manager to facilitate communication between the producer and consumer coordination buffers * \param[in] payloadCommunicationManager The backend's memory manager to facilitate communication between the producer and consumer payload buffers - * \param[in] payloadBuffer The memory slot pertaining to the payload buffer. The producer will push new tokens - * into this buffer, while there is enough space (in bytes). This buffer should be big enough to hold at least the - * largest message of the variable-sized messages to be pushed. + * \param[in] payloadBuffer The memory slot pertaining to the payload buffer. The producer will push messages into this + * buffer, while there is enough space. This buffer should be large enough to hold twice the capacity specified by payloadCapacity argument. + * Half of the buffer is used as excess buffer to avoid internal fragmentation of messages * \param[in] tokenBuffer The memory slot pertaining to the token buffer. This buffer is only used to exchange internal metadata * about the sizes of the individual messages being sent. * \param[in] internalCoordinationBufferForCounts This is a small buffer to hold the internal (local) state of the @@ -62,7 +62,7 @@ class Consumer final : public variableSize::Base * buffer for message counts, used for remote updates on pop() * \param[in] producerCoordinationBufferForPayloads A global reference to the producer channel's internal coordination * buffer for payload sizes (in bytes), used for remote updates on pop() - * \param[in] payloadCapacity The capacity (in bytes) of the buffer for variable-sized messages + * \param[in] payloadCapacity The capacity (in bytes) of the buffer for variable-sized messages. * \param[in] capacity The maximum number of tokens that will be held by this channel * @note: The token size in var-size channels is used only internally, and is passed as having a type size_t (with size sizeof(size_t)) */ @@ -243,23 +243,28 @@ class Consumer final : public variableSize::Base * receiving the message counts (phase 2), returning this depth should guarantee * we already have received the payloads * - * \note This is not a thread-safe call * * This is a getter function that should complete in \f$ \Theta(1) \f$ time. * * @return The number of elements in variable-size consumer channel + * + * \note This is not a thread-safe call + * \note Even though there might be space for additional tokens in the coordination buffer, it is not guaranteed that + * the push() will succeed due to insufficient space in the payload buffer */ - size_t getDepth() { return getCircularBufferForCounts()->getDepth(); } + size_t getCoordinationDepth() { return getCircularBufferForCounts()->getDepth(); } /** * Returns the current depth of the channel holding the payloads * - * \note This is not a thread-safe call * * This is a getter function that should complete in \f$ \Theta(1) \f$ time. * * @returns The number of total bytes in the payloads channel * + * \note This is not a thread-safe call + * \note Even though there might be space for additional tokens in the payload buffer, it is not guaranteed that + * the push() will succeed due to insufficient space in the coordination buffer */ size_t getPayloadDepth() { return getCircularBufferForPayloads()->getDepth(); } @@ -271,7 +276,28 @@ class Consumer final : public variableSize::Base * \returns true, if both message count and payload buffers are empty * \returns false, if one of the buffers is not empty */ - bool isEmpty() { return (getDepth() == 0); } + bool isEmpty() { return getCoordinationDepth() == 0; } + + /** + * This funciton can be used to quickly check whether the channel is becoming full when trying + * to push an element of a given size. First thing, we are checking if we can still + * push tokens (i.e., if the coordination buffer has space). Second thing, we are checking the + * payload buffer. If the current depth of the payload and the requiredBufferSize to push + * exceed the channel capacity, the channel is considered full. + * + * \param[in] requiredBufferSize size of the token to push into the channel + * + * \return true if there is enough space to push the token, false otherwise + */ + bool isFull(size_t requiredBufferSize) + { + auto coordinationCircularBuffer = getCircularBufferForCounts(); + if (coordinationCircularBuffer->getDepth() == coordinationCircularBuffer->getCapacity()) return true; + auto payloadCircularBuffer = getCircularBufferForPayloads(); + if (payloadCircularBuffer->getDepth() + requiredBufferSize > payloadCircularBuffer->getCapacity()) return true; + + return false; + } /** * Retrieves the pointer to the channel's payload buffer diff --git a/include/hicr/frontends/channel/variableSize/spsc/producer.hpp b/include/hicr/frontends/channel/variableSize/spsc/producer.hpp index 40236950..ea07817f 100644 --- a/include/hicr/frontends/channel/variableSize/spsc/producer.hpp +++ b/include/hicr/frontends/channel/variableSize/spsc/producer.hpp @@ -48,7 +48,8 @@ class Producer : public variableSize::Base * \param[in] payloadCommunicationManager The backend's memory manager to facilitate communication between the producer and consumer payload buffers * \param[in] sizeInfoBuffer The local memory slot used to hold the information about the next message size * \param[in] payloadBuffer The global memory slot pertaining to the payload of all messages. The producer will push messages into this - * buffer, while there is enough space. This buffer should be large enough to hold at least the largest of the variable-size messages. + * buffer, while there is enough space. This buffer should be large enough to hold twice the capacity specified by payloadCapacity argument. + * Half of the buffer is used as excess buffer to avoid internal fragmentation of messages * \param[in] tokenBuffer The memory slot pertaining to the token buffer, which is used to hold message size data. * The producer will push message sizes into this buffer, while there is enough space. This buffer should be large enough to * hold at least one message size. @@ -56,8 +57,8 @@ class Producer : public variableSize::Base * \param[in] internalCoordinationBufferForPayloads This is a small buffer to hold the internal (local) state of the channel's payload sizes (in bytes) * \param[in] consumerCoordinationBufferForCounts A global reference of the consumer's own coordination buffer to check for updates on message counts * \param[in] consumerCoordinationBufferForPayloads A global reference of the consumer's own coordination buffer to check for updates on payload sizes (in bytes) - * \param[in] payloadCapacity capacity in bytes of the buffer for message payloads - * \param[in] payloadSize size in bytes of the datatype used for variable-sized messages + * \param[in] payloadCapacity capacity in bytes of the buffer for message payloads. + * \param[in] payloadSize size in bytes of the datatype used for variable-sized messages. * \param[in] capacity The maximum number of tokens that will be held by this channel */ Producer(CommunicationManager &coordinationCommunicationManager, @@ -88,46 +89,6 @@ class Producer : public variableSize::Base ~Producer() = default; - /** - * Identical to Producer::updateDepth(), but this coordination buffer - * is larger and contains payload information as well as token metadata - */ - __INLINE__ void updateDepth() {} - - /** - * get payload buffer head position - * @return payload buffer head position (in bytes) - */ - [[nodiscard]] __INLINE__ size_t getPayloadHeadPosition() const noexcept { return getCircularBufferForPayloads()->getHeadPosition(); } - - /** - * get the datatype size used for payload buffer - * @return datatype size (in bytes) for payload buffer - */ - __INLINE__ size_t getPayloadSize() { return _payloadSize; } - - /** - * get payload buffer depth - * @return payload buffer depth (in bytes) - */ - __INLINE__ size_t getPayloadDepth() { return getCircularBufferForPayloads()->getDepth(); } - - /** - * get payload buffer capacity - * @return payload buffer capacity (in bytes) - */ - __INLINE__ size_t getPayloadCapacity() { return getCircularBufferForPayloads()->getCapacity(); } - - /** - * Given a proposed message, indicate whether there is enough payload space to push it - */ - __INLINE__ bool hasEnoughPayloadSpace(const size_t msgSize) - { - auto currentPayloadDepth = getCircularBufferForPayloads()->getDepth(); - if (msgSize + currentPayloadDepth > getPayloadCapacity()) return false; - return true; - } - /** * Puts new variable-sized messages unto the channel. * The implementation consists of two phases. In phase 1, we copy the @@ -155,65 +116,64 @@ class Producer : public variableSize::Base { if (n != 1) HICR_THROW_RUNTIME("HiCR currently has no implementation for n != 1 with push(sourceSlot, n) for variable size version."); - // Make sure source slot is beg enough to satisfy the operation - size_t requiredBufferSize = sourceSlot->getSize(); - size_t providedBufferCapacity = getPayloadCapacity(); - // Updating depth of token (message sizes) and payload buffers updateDepth(); - auto currentPayloadDepth = getCircularBufferForPayloads()->getDepth(); - auto currentDepth = getDepth(); - - /* - * Part 1: Copy the payload data - */ - if (hasEnoughPayloadSpace(requiredBufferSize) == false) - HICR_THROW_RUNTIME("Attempting to push (%lu) bytes while the channel currently has payload depth (%lu). This would exceed capacity (%lu).\n", - requiredBufferSize, - currentPayloadDepth, - providedBufferCapacity); - // Get communication managers - auto payloadCommunicationManager = getPayloadCommunicationManager(); - - /* - * Payload copy: - * - We have checked (requiredBufferSize <= depth) - * that the payload fits into available circular buffer, - * but it is possible it spills over the end into the - * beginning. Cover this corner case below - */ - if (requiredBufferSize + getPayloadHeadPosition() > getPayloadCapacity()) - { - size_t first_chunk = getPayloadCapacity() - getPayloadHeadPosition(); - size_t second_chunk = requiredBufferSize - first_chunk; - - // copy first part to end of buffer - payloadCommunicationManager->memcpy(_payloadBuffer, /* destination */ - getPayloadHeadPosition(), /* dst_offset */ - sourceSlot, /* source */ - 0, /* src_offset */ - first_chunk); /* size */ - // copy second part to beginning of buffer - payloadCommunicationManager->memcpy(_payloadBuffer, /* destination */ - 0, /* dst_offset */ - sourceSlot, /* source */ - first_chunk, /* src_offset */ - second_chunk); /* size */ - payloadCommunicationManager->fence(sourceSlot, 2, 0); - } - else + /** + * Payload copy. + * + * We partition the payload buffer in 2 parts: + * - payload buffer: it is the logical size of the buffer, and the channel will work + * as if a buffer of that size was passed as input + * - excess buffer: extra space needed for the example described below. This can not + * be directly used from the outside, but is used by the channel + * to avoid fragmentation: we currently require the token to be pushed + * and peeked as a contiguous memory region + * + * 2 possible scenarios after the push: + * + * 1) we have space between the head and the end of the buffer + * 2) we do not have enough space ahead after the push, but we have it at the beginning of the buffer + * and their sum (i.e., channel capacity - channel depth) allows us to push the token. + * + * If 2), we push to the excess buffer to avoid breaking the token in 2 chunks. + * The head is always pointing to a valid position in the buffer, meaning that when a push into the + * excess buffer is made, the logical position of the head behaves as if there were no excess buffer (See below). + * An excess buffer with the same size as the payload buffer guarantees that all kind of + * token size will succeed. + * + * 0 buffer capacity End of excess buffer + * TAIL HEAD1 + * |-------|--------|-------|-------------|----------| In this case HEAD1 indicates where the the token + * HEAD ends in the implementation, while HEAD is its logical position + * and observable value + * */ + + // Get bytes required to push the token + size_t requiredPayloadBufferSize = sourceSlot->getSize(); + + // Throw exception if the token can not be pushed + if (isFull(requiredPayloadBufferSize) == true) { - payloadCommunicationManager->memcpy(_payloadBuffer, getPayloadHeadPosition(), sourceSlot, 0, requiredBufferSize); - payloadCommunicationManager->fence(sourceSlot, 1, 0); + HICR_THROW_RUNTIME("Attempting to push a token while the channel is full.\nChannel depth: %lu capacity: %lu\nPayload depth: %lu capacity: %lu", + getCircularBufferForCounts()->getDepth(), + getCircularBufferForCounts()->getCapacity(), + getCircularBufferForPayloads()->getDepth(), + getCircularBufferForPayloads()->getCapacity()); } - getCircularBufferForPayloads()->advanceHead(requiredBufferSize); - + // Get communication managers + auto payloadCommunicationManager = getPayloadCommunicationManager(); auto coordinationCommunicationManager = getCoordinationCommunicationManager(); - // update the consumer coordination buffers (consumer does not update - // its own coordination head positions) + // Payload copy. Just push to the channel, we know there is enough space + payloadCommunicationManager->memcpy(_payloadBuffer, getPayloadHeadPosition(), sourceSlot, 0, requiredPayloadBufferSize); + payloadCommunicationManager->fence(sourceSlot, 1, 0); + + // Advance the head in the payload buffer + getCircularBufferForPayloads()->advanceHead(requiredPayloadBufferSize); + + // update the consumer coordination buffers (consumer does not update its own coordination head positions) coordinationCommunicationManager->memcpy(_consumerCoordinationBufferForPayloads, _HICR_CHANNEL_HEAD_ADVANCE_COUNT_IDX * sizeof(size_t), getCoordinationBufferForPayloads(), @@ -224,16 +184,8 @@ class Producer : public variableSize::Base /* * Part 2: Copy the message size */ - auto *sizeInfoBufferPtr = static_cast(_sizeInfoBuffer->getPointer()); - sizeInfoBufferPtr[0] = requiredBufferSize; - - // If the exchange buffer does not have n free slots, reject the operation - if (currentDepth + 1 > getCircularBufferForCounts()->getCapacity()) - HICR_THROW_RUNTIME("Attempting to push with (%lu) tokens while the channel has (%lu) tokens and this would exceed capacity (%lu).\n", - 1, - getDepth(), - getCircularBufferForCounts()->getCapacity()); + sizeInfoBufferPtr[0] = requiredPayloadBufferSize; coordinationCommunicationManager->memcpy(_tokenBuffer, /* destination */ getTokenSize() * getCircularBufferForCounts()->getHeadPosition(), /* dst_offset */ @@ -252,14 +204,50 @@ class Producer : public variableSize::Base } /** - * Get depth of variable-size producer. Since we have 2 buffers - one - * for counts, and one for payloads, we need to be careful. + * Identical to Producer::updateDepth(), but this coordination buffer + * is larger and contains payload information as well as token metadata + */ + __INLINE__ void updateDepth() {} + + /** + * get payload buffer head position + * @return payload buffer head position (in bytes) + */ + [[nodiscard]] __INLINE__ size_t getPayloadHeadPosition() const noexcept { return getCircularBufferForPayloads()->getHeadPosition(); } + + /** + * get the datatype size used for payload buffer + * @return datatype size (in bytes) for payload buffer + */ + __INLINE__ size_t getPayloadSize() { return _payloadSize; } + + /** + * Get payload buffer depth + * + * \return payload buffer depth (in bytes). It is the occupancy of the buffers + * \note Even though there might be space for additional tokens in the payload buffer, it is not guaranteed that + * the push() will succeed due to insufficient space in the coordination buffer + */ + __INLINE__ size_t getPayloadDepth() { return getCircularBufferForPayloads()->getDepth(); } + + /** + * get payload buffer capacity + * @return payload buffer capacity (in bytes) + */ + __INLINE__ size_t getPayloadCapacity() { return getCircularBufferForPayloads()->getCapacity(); } + + /** + * Get depth of the coordination buffer of variable-size producer. * Because the current implementation first receives the payload (phase 1) before - // receiving the message counts (phase 2), returning this depth should guarantee - // we already have received the payloads + * receiving the message counts (phase 2), returning this depth should guarantee + * we already have received the payloads + * * @return The number of elements in the variable-size producer channel + * + * \note Even though there might be space for additional tokens in the coordination buffer, it is not guaranteed that + * the push() will succeed due to insufficient space in the payload buffer */ - size_t getDepth() { return getCircularBufferForCounts()->getDepth(); } + size_t getCoordinationDepth() { return getCircularBufferForCounts()->getDepth(); } /** * This function can be used to quickly check whether the channel is empty. @@ -268,8 +256,33 @@ class Producer : public variableSize::Base * * \returns true, if both message count and payload buffers are empty * \returns false, if one of the buffers is not empty + * */ - bool isEmpty() { return getDepth() == 0; } + bool isEmpty() { return getCoordinationDepth() == 0; } + + /** + * This funciton can be used to quickly check whether the channel is becoming full when trying + * to push an element of a given size. First thing, we are checking if we can still + * push tokens (i.e., if the coordination buffer has space). Second thing, we are checking the + * payload buffer. If the current depth of the payload and the requiredBufferSize to push + * exceed the channel capacity, the channel is considered full. + * + * \param[in] requiredBufferSize size of the token to push into the channel + * + * \return true if there is enough space to push the token, false otherwise + */ + bool isFull(size_t requiredBufferSize) + { + // Check if we can push one more token + auto coordinationCircularBuffer = getCircularBufferForCounts(); + if (coordinationCircularBuffer->getDepth() == coordinationCircularBuffer->getCapacity()) { return true; } + + // Check if there is enough space in the payload buffer. If + auto payloadCircularBuffer = getCircularBufferForPayloads(); + if (payloadCircularBuffer->getDepth() + requiredBufferSize > payloadCircularBuffer->getCapacity()) { return true; } + + return false; + } private: diff --git a/tests/frontends/channel/meson.build b/tests/frontends/channel/meson.build index ff48683b..2b3a2aa0 100644 --- a/tests/frontends/channel/meson.build +++ b/tests/frontends/channel/meson.build @@ -1,2 +1,3 @@ subdir('circularBuffer') subdir('fixedSize') +subdir('variableSize') diff --git a/tests/frontends/channel/variableSize/meson.build b/tests/frontends/channel/variableSize/meson.build new file mode 100644 index 00000000..27e4d367 --- /dev/null +++ b/tests/frontends/channel/variableSize/meson.build @@ -0,0 +1 @@ +subdir('spsc') diff --git a/tests/frontends/channel/variableSize/spsc/include/channelFixture.hpp b/tests/frontends/channel/variableSize/spsc/include/channelFixture.hpp new file mode 100644 index 00000000..39490226 --- /dev/null +++ b/tests/frontends/channel/variableSize/spsc/include/channelFixture.hpp @@ -0,0 +1,218 @@ +#pragma once + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "common.hpp" + +class ChannelFixture : public ::testing::Test +{ + public: + + std::unique_ptr createProducer(HiCR::MemoryManager &coordinationMemoryManager, + HiCR::MemoryManager &payloadMemoryManager, + HiCR::CommunicationManager &coordinationCommunicationManager, + HiCR::CommunicationManager &payloadCommunicationManager, + std::shared_ptr coordinationMemorySpace, + std::shared_ptr payloadMemorySpace, + const size_t channelCapacity) + { + // Getting required buffer size + auto coordinationBufferSize = HiCR::channel::variableSize::Base::getCoordinationBufferSize(); + + // Allocating sizes buffer as a local memory slot + auto coordinationBufferForCounts = coordinationMemoryManager.allocateLocalMemorySlot(coordinationMemorySpace, coordinationBufferSize); + + auto coordinationBufferForPayloads = coordinationMemoryManager.allocateLocalMemorySlot(coordinationMemorySpace, coordinationBufferSize); + + auto sizeInfoBuffer = coordinationMemoryManager.allocateLocalMemorySlot(coordinationMemorySpace, sizeof(size_t)); + + // Initializing coordination buffers for message sizes and payloads (sets to zero the counters) + HiCR::channel::variableSize::Base::initializeCoordinationBuffer(coordinationBufferForCounts); + HiCR::channel::variableSize::Base::initializeCoordinationBuffer(coordinationBufferForPayloads); + + // Exchanging local memory slots to become global for them to be used by the remote end + coordinationCommunicationManager.exchangeGlobalMemorySlots(CHANNEL_TAG, /* global tag */ + {{PRODUCER_COORDINATION_BUFFER_FOR_SIZES_KEY, coordinationBufferForCounts}, /* key-slot pairs */ + {PRODUCER_COORDINATION_BUFFER_FOR_PAYLOADS_KEY, coordinationBufferForPayloads}}); + + payloadCommunicationManager.exchangeGlobalMemorySlots(CHANNEL_TAG, {}); + + // Synchronizing so that all actors have finished registering their global memory slots + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // Obtaining the globally exchanged memory slots + auto sizesBuffer = coordinationCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, SIZES_BUFFER_KEY); + auto producerCoordinationBufferForCounts = coordinationCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, PRODUCER_COORDINATION_BUFFER_FOR_SIZES_KEY); + auto producerCoordinationBufferForPayloads = coordinationCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, PRODUCER_COORDINATION_BUFFER_FOR_PAYLOADS_KEY); + auto consumerCoordinationBufferForCounts = coordinationCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, CONSUMER_COORDINATION_BUFFER_FOR_SIZES_KEY); + auto consumerCoordinationBufferForPayloads = coordinationCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, CONSUMER_COORDINATION_BUFFER_FOR_PAYLOADS_KEY); + auto payloadBuffer = payloadCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, CONSUMER_PAYLOAD_KEY); + + _localSlots.insert(coordinationBufferForCounts); + _localSlots.insert(coordinationBufferForPayloads); + _localSlots.insert(sizeInfoBuffer); + + _globalSlotsToDestroy.insert(sizesBuffer); + _globalSlotsToDestroy.insert(producerCoordinationBufferForCounts); + _globalSlotsToDestroy.insert(producerCoordinationBufferForPayloads); + + _globalSlots.insert(sizesBuffer); + _globalSlots.insert(producerCoordinationBufferForCounts); + _globalSlots.insert(producerCoordinationBufferForPayloads); + _globalSlots.insert(consumerCoordinationBufferForCounts); + _globalSlots.insert(consumerCoordinationBufferForPayloads); + _globalSlots.insert(payloadBuffer); + + // Creating producer and consumer channels + return std::make_unique(coordinationCommunicationManager, + payloadCommunicationManager, + sizeInfoBuffer, + payloadBuffer, + sizesBuffer, + coordinationBufferForCounts, + coordinationBufferForPayloads, + consumerCoordinationBufferForCounts, + consumerCoordinationBufferForPayloads, + CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE), + sizeof(ELEMENT_TYPE), + channelCapacity); + } + + std::unique_ptr createConsumer(HiCR::MemoryManager &coordinationMemoryManager, + HiCR::MemoryManager &payloadMemoryManager, + HiCR::CommunicationManager &coordinationCommunicationManager, + HiCR::CommunicationManager &payloadCommunicationManager, + std::shared_ptr coordinationMemorySpace, + std::shared_ptr payloadMemorySpace, + const size_t channelCapacity) + { + // Getting required buffer sizes + auto sizesBufferSize = HiCR::channel::variableSize::Base::getTokenBufferSize(sizeof(size_t), channelCapacity); + + // Allocating sizes buffer as a local memory slot + auto sizesBufferSlot = coordinationMemoryManager.allocateLocalMemorySlot(coordinationMemorySpace, sizesBufferSize); + + // Allocating payload buffer as a local memory slot + auto payloadBufferSlot = payloadMemoryManager.allocateLocalMemorySlot(payloadMemorySpace, 2 * CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE)); + + // Getting required buffer size + auto coordinationBufferSize = HiCR::channel::variableSize::Base::getCoordinationBufferSize(); + + // Allocating coordination buffer for internal message size metadata + auto coordinationBufferForCounts = coordinationMemoryManager.allocateLocalMemorySlot(coordinationMemorySpace, coordinationBufferSize); + + // Allocating coordination buffer for internal payload metadata + auto coordinationBufferForPayloads = coordinationMemoryManager.allocateLocalMemorySlot(coordinationMemorySpace, coordinationBufferSize); + + // Initializing coordination buffer (sets to zero the counters) + HiCR::channel::variableSize::Base::initializeCoordinationBuffer(coordinationBufferForCounts); + + HiCR::channel::variableSize::Base::initializeCoordinationBuffer(coordinationBufferForPayloads); + + // Exchanging local memory slots to become global for them to be used by the remote end + coordinationCommunicationManager.exchangeGlobalMemorySlots(CHANNEL_TAG, + {{SIZES_BUFFER_KEY, sizesBufferSlot}, + {CONSUMER_COORDINATION_BUFFER_FOR_SIZES_KEY, coordinationBufferForCounts}, + {CONSUMER_COORDINATION_BUFFER_FOR_PAYLOADS_KEY, coordinationBufferForPayloads}}); + + payloadCommunicationManager.exchangeGlobalMemorySlots(CHANNEL_TAG, {{CONSUMER_PAYLOAD_KEY, payloadBufferSlot}}); + + // Synchronizing so that all actors have finished registering their global memory slots + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // Obtaining the globally exchanged memory slots + std::shared_ptr globalSizesBufferSlot = coordinationCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, SIZES_BUFFER_KEY); + + auto producerCoordinationBufferForCounts = coordinationCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, PRODUCER_COORDINATION_BUFFER_FOR_SIZES_KEY); + auto producerCoordinationBufferForPayloads = coordinationCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, PRODUCER_COORDINATION_BUFFER_FOR_PAYLOADS_KEY); + auto consumerCoordinationBufferForCounts = coordinationCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, CONSUMER_COORDINATION_BUFFER_FOR_SIZES_KEY); + auto consumerCoordinationBufferForPayloads = coordinationCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, CONSUMER_COORDINATION_BUFFER_FOR_PAYLOADS_KEY); + auto payloadBuffer = payloadCommunicationManager.getGlobalMemorySlot(CHANNEL_TAG, CONSUMER_PAYLOAD_KEY); + + _localSlots.insert(sizesBufferSlot); + _localSlots.insert(payloadBufferSlot); + _localSlots.insert(coordinationBufferForCounts); + _localSlots.insert(coordinationBufferForPayloads); + + _globalSlotsToDestroy.insert(consumerCoordinationBufferForCounts); + _globalSlotsToDestroy.insert(consumerCoordinationBufferForPayloads); + _globalSlotsToDestroy.insert(payloadBuffer); + + _globalSlots.insert(globalSizesBufferSlot); + _globalSlots.insert(producerCoordinationBufferForCounts); + _globalSlots.insert(producerCoordinationBufferForPayloads); + _globalSlots.insert(consumerCoordinationBufferForCounts); + _globalSlots.insert(consumerCoordinationBufferForPayloads); + + // Creating producer and consumer channels + return std::make_unique(coordinationCommunicationManager, + payloadCommunicationManager, + payloadBuffer /*payload buffer */, + globalSizesBufferSlot, + coordinationBufferForCounts, + coordinationBufferForPayloads, + producerCoordinationBufferForCounts, + producerCoordinationBufferForPayloads, + CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE), + channelCapacity); + } + + std::unique_ptr communicationManager; + std::unique_ptr instanceManager; + std::unique_ptr memoryManager; + std::unique_ptr topologyManager; + std::unique_ptr computeManager; + + std::unique_ptr consumer; + std::unique_ptr producer; + + std::shared_ptr memorySpace; + + protected: + + void SetUp() override + { + instanceManager = std::make_unique(MPI_COMM_WORLD); + + // Sanity Check + if (instanceManager->getInstances().size() != 2) + { + if (instanceManager->getCurrentInstance()->isRootInstance()) fprintf(stderr, "Launch error: MPI process count must be equal to 2\n"); + MPI_Finalize(); + } + + communicationManager = std::make_unique(MPI_COMM_WORLD); + memoryManager = std::make_unique(); + computeManager = std::make_unique(); + topologyManager = HiCR::backend::hwloc::TopologyManager::createDefault(); + + _topology = topologyManager->queryTopology(); + memorySpace = _topology.getDevices().begin().operator*()->getMemorySpaceList().begin().operator*(); + } + + void TearDown() override + { + for (auto &g : _globalSlots) { communicationManager->deregisterGlobalMemorySlot(g); } + for (auto &g : _globalSlotsToDestroy) { communicationManager->destroyGlobalMemorySlot(g); } + communicationManager->fence(CHANNEL_TAG); + for (auto &l : _localSlots) { memoryManager->freeLocalMemorySlot(l); } + } + + private: + + HiCR::Topology _topology; + + std::unordered_set> _globalSlots; + std::unordered_set> _globalSlotsToDestroy; + std::unordered_set> _localSlots; +}; \ No newline at end of file diff --git a/tests/frontends/channel/variableSize/spsc/include/common.hpp b/tests/frontends/channel/variableSize/spsc/include/common.hpp new file mode 100644 index 00000000..c0d8dcce --- /dev/null +++ b/tests/frontends/channel/variableSize/spsc/include/common.hpp @@ -0,0 +1,68 @@ +/* + * Copyright 2025 Huawei Technologies Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#define CHANNEL_TAG 0 +#define SIZES_BUFFER_KEY 0 +#define PRODUCER_COORDINATION_BUFFER_FOR_SIZES_KEY 1 +#define PRODUCER_COORDINATION_BUFFER_FOR_PAYLOADS_KEY 2 +#define CONSUMER_COORDINATION_BUFFER_FOR_SIZES_KEY 3 +#define CONSUMER_COORDINATION_BUFFER_FOR_PAYLOADS_KEY 4 +#define CONSUMER_PAYLOAD_KEY 5 +#define ELEMENT_TYPE unsigned int +#define CHANNEL_CAPACITY 5 + +using namespace std; +template +class Printer +{ + public: + + static void printBytes(const string prepend, void *buffer, size_t channelCapacity, size_t startIndex, size_t bytes) + { + if (bytes > channelCapacity) + { + std::cout << "Bytes larger than channel capacity, will not print\n"; + return; + } + + cout << "=====\n"; + cout << prepend; + if (buffer == NULL) + { + cerr << "buffer is NULL in print routine" << endl; + abort(); + } + if (startIndex + bytes <= channelCapacity) + { + vector v(static_cast(buffer) + startIndex / sizeof(T), static_cast(buffer) + (startIndex + bytes) / sizeof(ELEMENT_TYPE)); + copy(v.begin(), v.end(), ostream_iterator(cout, ",")); + } + else + { + vector v1(static_cast(buffer) + startIndex / sizeof(T), static_cast(buffer) + channelCapacity / sizeof(ELEMENT_TYPE)); + vector v2(static_cast(buffer), static_cast(buffer) + (bytes + startIndex - channelCapacity) / sizeof(ELEMENT_TYPE)); + copy(v1.begin(), v1.end(), ostream_iterator(cout, ",")); + copy(v2.begin(), v2.end(), ostream_iterator(cout, ",")); + } + cout << "\n=====\n"; + } +}; diff --git a/tests/frontends/channel/variableSize/spsc/mainMPI.cpp b/tests/frontends/channel/variableSize/spsc/mainMPI.cpp new file mode 100644 index 00000000..ac693531 --- /dev/null +++ b/tests/frontends/channel/variableSize/spsc/mainMPI.cpp @@ -0,0 +1,31 @@ +#include +#include + +#include +#include + +class MPITestEnvironment : public ::testing::Environment +{ + void TearDown() override + { + int result = ::testing::UnitTest::GetInstance()->failed_test_count(); + int rank; + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + if (result > 0) + { + std::this_thread::sleep_for(std::chrono::seconds(1700)); + fprintf(stderr, "[Rank %d] Test failed, aborting MPI.\n", rank); + MPI_Abort(MPI_COMM_WORLD, 1); + } + } +}; + +int main(int argc, char **argv) +{ + MPI_Init(&argc, &argv); + ::testing::InitGoogleTest(&argc, argv); + ::testing::AddGlobalTestEnvironment(new MPITestEnvironment); + int result = RUN_ALL_TESTS(); + MPI_Finalize(); + return result; +} diff --git a/tests/frontends/channel/variableSize/spsc/meson.build b/tests/frontends/channel/variableSize/spsc/meson.build new file mode 100644 index 00000000..ed2b47b8 --- /dev/null +++ b/tests/frontends/channel/variableSize/spsc/meson.build @@ -0,0 +1,9 @@ +channelsTestIncludeDirs = include_directories(['include']) + +channelsTestMPIDep = declare_dependency( + sources: ['mainMPI.cpp'], + include_directories: channelsTestIncludeDirs, + dependencies: [HiCRTestBuildDependencies], +) + +subdir('source') \ No newline at end of file diff --git a/tests/frontends/channel/variableSize/spsc/source/excessBuffer.cpp b/tests/frontends/channel/variableSize/spsc/source/excessBuffer.cpp new file mode 100644 index 00000000..be717675 --- /dev/null +++ b/tests/frontends/channel/variableSize/spsc/source/excessBuffer.cpp @@ -0,0 +1,186 @@ +#include +#include +#include + +#include +#include +#include +#include + +#include "../include/channelFixture.hpp" + +void producerFc(ChannelFixture &fixture) +{ + // Create producer and pick managers from the fixture + fixture.producer = fixture.createProducer( + *fixture.memoryManager, *fixture.memoryManager, *fixture.communicationManager, *fixture.communicationManager, fixture.memorySpace, fixture.memorySpace, CHANNEL_CAPACITY); + + auto &producer = *fixture.producer; + auto &payloadMemoryManager = *fixture.memoryManager; + auto &coordinationCommunicationManager = *fixture.communicationManager; + auto &payloadCommunicationManager = *fixture.communicationManager; + auto payloadMemorySpace = fixture.memorySpace; + + ////////////////////// Test begin + + // Check payload capacity, that buffer is empty, an thus not full + ASSERT_EQ(producer.getPayloadCapacity(), CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE)); + producer.updateDepth(); + ASSERT_EQ(producer.getCoordinationDepth(), 0); + ASSERT_EQ(producer.getPayloadDepth(), 0); + ASSERT_TRUE(producer.isEmpty()); + ASSERT_FALSE(producer.isFull(CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE))); + ASSERT_TRUE(producer.isFull(CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE) + 1)); + + // Send a buffer big as the buffer channel + ELEMENT_TYPE sendBuffer[CHANNEL_CAPACITY - 1] = {0, 1, 2, 3}; + auto sendBufferPtr = &sendBuffer; + auto sendSlot = payloadMemoryManager.registerLocalMemorySlot(payloadMemorySpace, sendBufferPtr, sizeof(sendBuffer)); + + // Wait for the consumer 1 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // Push the slot + EXPECT_NO_THROW(producer.push(sendSlot)); + + // Check that the channel can accept one more element + ASSERT_FALSE(producer.isFull(sizeof(ELEMENT_TYPE))); + ASSERT_TRUE(producer.isFull(2 * sizeof(ELEMENT_TYPE))); + ASSERT_FALSE(producer.isEmpty()); + + // Check there is only one token, and the payload depth is equal to the capacity of the buffer minus 1 element + producer.updateDepth(); + ASSERT_EQ(producer.getCoordinationDepth(), 1); + ASSERT_EQ(producer.getPayloadDepth(), (CHANNEL_CAPACITY - 1) * sizeof(ELEMENT_TYPE)); + + // Check that trying to push another element throws exception since the channel does not have enough space + EXPECT_THROW(producer.push(sendSlot), HiCR::RuntimeException); + + // Wait for the consumer 2 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // Let the consumer pop + + // Wait for the consumer 3 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // Now that consumer has popped it should succeed, and push to the excess buffer + EXPECT_NO_THROW(producer.push(sendSlot)); + + // Wait for the consumer 4 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // Let the consumer do its part of the test + + // Wait for the consumer 5 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); +} + +void consumerFc(ChannelFixture &fixture) +{ + // Create producer and pick managers from the fixture + fixture.consumer = fixture.createConsumer( + *fixture.memoryManager, *fixture.memoryManager, *fixture.communicationManager, *fixture.communicationManager, fixture.memorySpace, fixture.memorySpace, CHANNEL_CAPACITY); + + auto &consumer = *fixture.consumer; + auto &coordinationCommunicationManager = *fixture.communicationManager; + auto &payloadCommunicationManager = *fixture.communicationManager; + + ////////////////////// Test begin + + // Check payload capacity, that buffer is empty, an thus not full + consumer.updateDepth(); + ASSERT_EQ(consumer.getCoordinationDepth(), 0); + ASSERT_EQ(consumer.getPayloadDepth(), 0); + ASSERT_TRUE(consumer.isEmpty()); + ASSERT_FALSE(consumer.isFull(CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE))); + ASSERT_TRUE(consumer.isFull(CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE) + 1)); + + // Wait for producer 1 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // Let the producer do its part of the test + + // Wait for the producer 2 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // After the push, check there is one token and payload buffer is full + consumer.updateDepth(); + ASSERT_EQ(consumer.getCoordinationDepth(), 1); + ASSERT_EQ(consumer.getPayloadDepth(), (CHANNEL_CAPACITY - 1) * sizeof(ELEMENT_TYPE)); + ASSERT_FALSE(consumer.isEmpty()); + // Check that we still have space to push 1 token + ASSERT_FALSE(consumer.isFull(sizeof(ELEMENT_TYPE))); + ASSERT_TRUE(consumer.isFull(2 * sizeof(ELEMENT_TYPE))); + + // Peek and check the token data are correct + auto res = consumer.peek(); + ASSERT_EQ(res[0], 0); + ASSERT_EQ(res[1], (CHANNEL_CAPACITY - 1) * sizeof(ELEMENT_TYPE)); + + // Check the vectory elements corresponds to the ground truth + auto tokenBuffer = (uint8_t *)consumer.getPayloadBufferMemorySlot()->getSourceLocalMemorySlot()->getPointer(); + void *tokenPtr = &tokenBuffer[res[0]]; + for (ELEMENT_TYPE i = 0; i < (res[1] / sizeof(ELEMENT_TYPE)); ++i) { ASSERT_EQ(i, static_cast(tokenPtr)[i]); } + + // Pop and check that the channel is empty, the depth are updated + consumer.pop(); + ASSERT_TRUE(consumer.isEmpty()); + ASSERT_FALSE(consumer.isFull(CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE))); + ASSERT_EQ(consumer.getCoordinationDepth(), 0); + ASSERT_EQ(consumer.getPayloadDepth(), 0); + + // Wait for the producer 3 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // Let the producer push again + + // Wait for the producer 4 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // Wait for the producer 5 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // After the push, check there is one token + consumer.updateDepth(); + ASSERT_EQ(consumer.getCoordinationDepth(), 1); + ASSERT_EQ(consumer.getPayloadDepth(), (CHANNEL_CAPACITY - 1) * sizeof(ELEMENT_TYPE)); + ASSERT_FALSE(consumer.isEmpty()); + // Check that we still have space to push 1 token + ASSERT_FALSE(consumer.isFull(sizeof(ELEMENT_TYPE))); + ASSERT_TRUE(consumer.isFull(2 * sizeof(ELEMENT_TYPE))); + + // Peek and check the token data are correct + res = consumer.peek(); + ASSERT_EQ(res[0], (CHANNEL_CAPACITY - 1) * sizeof(ELEMENT_TYPE)); + ASSERT_EQ(res[1], (CHANNEL_CAPACITY - 1) * sizeof(ELEMENT_TYPE)); + + // Check the vectory elements corresponds to the ground truth + tokenBuffer = (uint8_t *)consumer.getPayloadBufferMemorySlot()->getSourceLocalMemorySlot()->getPointer(); + tokenPtr = &tokenBuffer[res[0]]; + for (ELEMENT_TYPE i = 0; i < (res[1] / sizeof(ELEMENT_TYPE)); ++i) { ASSERT_EQ(i, static_cast(tokenPtr)[i]); } + + // Pop and check that the channel is empty, the depth are updated + consumer.pop(); + ASSERT_TRUE(consumer.isEmpty()); + ASSERT_FALSE(consumer.isFull(CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE))); + ASSERT_EQ(consumer.getCoordinationDepth(), 0); + ASSERT_EQ(consumer.getPayloadDepth(), 0); +} + +TEST_F(ChannelFixture, UseExcessBuffer) +{ + // Rank 0 is producer, Rank 1 is consumer + if (instanceManager->getCurrentInstance()->isRootInstance()) { producerFc(*this); } + else { consumerFc(*this); } +} diff --git a/tests/frontends/channel/variableSize/spsc/source/fillBufferMultipleTokens.cpp b/tests/frontends/channel/variableSize/spsc/source/fillBufferMultipleTokens.cpp new file mode 100644 index 00000000..6334b8b8 --- /dev/null +++ b/tests/frontends/channel/variableSize/spsc/source/fillBufferMultipleTokens.cpp @@ -0,0 +1,146 @@ +#include +#include +#include + +#include +#include +#include +#include + +#include "../include/channelFixture.hpp" + +void producerFc(ChannelFixture &fixture) +{ + // Create producer and pick managers from the fixture + fixture.producer = fixture.createProducer( + *fixture.memoryManager, *fixture.memoryManager, *fixture.communicationManager, *fixture.communicationManager, fixture.memorySpace, fixture.memorySpace, CHANNEL_CAPACITY); + + auto &producer = *fixture.producer; + auto &payloadMemoryManager = *fixture.memoryManager; + auto &coordinationCommunicationManager = *fixture.communicationManager; + auto &payloadCommunicationManager = *fixture.communicationManager; + auto payloadMemorySpace = fixture.memorySpace; + + ////////////////////// Test begin + + // Wait for the consumer 1 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // Send token one by one + for (size_t i = 0; i < CHANNEL_CAPACITY; ++i) + { + // Prepare slot to send + ELEMENT_TYPE sendBuffer[1] = {static_cast(i)}; + auto sendBufferPtr = &sendBuffer; + auto sendSlot = payloadMemoryManager.registerLocalMemorySlot(payloadMemorySpace, sendBufferPtr, sizeof(sendBuffer)); + + // Push and check depths are increased + ASSERT_EQ(producer.getCoordinationDepth(), i); + ASSERT_EQ(producer.getPayloadDepth(), i * sizeof(ELEMENT_TYPE)); + ASSERT_NO_THROW(producer.push(sendSlot)); + + // Fence to synchronize with the consumer + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + ASSERT_EQ(producer.getCoordinationDepth(), i + 1); + ASSERT_EQ(producer.getPayloadDepth(), (i + 1) * sizeof(ELEMENT_TYPE)); + } + + // Check the channel is full + ASSERT_TRUE(producer.isFull(1)); + + // Wait for the consumer 2 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // The consumer is popping, check the depths are updated + for (size_t i = CHANNEL_CAPACITY; i > 0; --i) + { + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + ASSERT_EQ(producer.getCoordinationDepth(), i - 1); + ASSERT_EQ(producer.getPayloadDepth(), (i - 1) * sizeof(ELEMENT_TYPE)); + } + + // Wait for the consumer 3 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); +} + +void consumerFc(ChannelFixture &fixture) +{ + // Create producer and pick managers from the fixture + fixture.consumer = fixture.createConsumer( + *fixture.memoryManager, *fixture.memoryManager, *fixture.communicationManager, *fixture.communicationManager, fixture.memorySpace, fixture.memorySpace, CHANNEL_CAPACITY); + + auto &consumer = *fixture.consumer; + auto &coordinationCommunicationManager = *fixture.communicationManager; + auto &payloadCommunicationManager = *fixture.communicationManager; + + ////////////////////// Test begin + + // Wait for producer 1 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // The producer is pushing, check depths are increasing + for (size_t i = 0; i < CHANNEL_CAPACITY; ++i) + { + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + ASSERT_EQ(consumer.getCoordinationDepth(), i + 1); + ASSERT_EQ(consumer.getPayloadDepth(), (i + 1) * sizeof(ELEMENT_TYPE)); + } + + // Check channel is full + ASSERT_TRUE(consumer.isFull(1)); + + // Wait for the producer 2 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // Pop token one by one + auto peekIndex = 0; + for (size_t i = CHANNEL_CAPACITY; i > 0; --i) + { + // Check depths are decreasing + ASSERT_EQ(consumer.getCoordinationDepth(), i); + ASSERT_EQ(consumer.getPayloadDepth(), i * sizeof(ELEMENT_TYPE)); + + // Peek and check data are correct + auto res = consumer.peek(); + ASSERT_EQ(res[0], peekIndex * sizeof(ELEMENT_TYPE)); + ASSERT_EQ(res[1], sizeof(ELEMENT_TYPE)); + + auto tokenBuffer = (uint8_t *)consumer.getPayloadBufferMemorySlot()->getSourceLocalMemorySlot()->getPointer(); + void *tokenPtr = &tokenBuffer[res[0]]; + auto token = static_cast(tokenPtr)[0]; + // In this specific test, the value pushed coincides with the index in the channel + ASSERT_EQ(peekIndex, token); + + // Pop + consumer.pop(); + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + ASSERT_EQ(consumer.getCoordinationDepth(), i - 1); + ASSERT_EQ(consumer.getPayloadDepth(), (i - 1) * sizeof(ELEMENT_TYPE)); + + // Update peek index + peekIndex++; + } + + // Wait for the producer 3 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); +} + +TEST_F(ChannelFixture, FillBufferWithMultipleTokens) +{ + // Rank 0 is producer, Rank 1 is consumer + if (instanceManager->getCurrentInstance()->isRootInstance()) { producerFc(*this); } + else { consumerFc(*this); } +} diff --git a/tests/frontends/channel/variableSize/spsc/source/fillBufferOneToken.cpp b/tests/frontends/channel/variableSize/spsc/source/fillBufferOneToken.cpp new file mode 100644 index 00000000..e830a4f1 --- /dev/null +++ b/tests/frontends/channel/variableSize/spsc/source/fillBufferOneToken.cpp @@ -0,0 +1,137 @@ +#include +#include +#include + +#include +#include +#include +#include + +#include "../include/channelFixture.hpp" + +void producerFc(ChannelFixture &fixture) +{ + // Create producer and pick managers from the fixture + fixture.producer = fixture.createProducer( + *fixture.memoryManager, *fixture.memoryManager, *fixture.communicationManager, *fixture.communicationManager, fixture.memorySpace, fixture.memorySpace, CHANNEL_CAPACITY); + + auto &producer = *fixture.producer; + auto &payloadMemoryManager = *fixture.memoryManager; + auto &coordinationCommunicationManager = *fixture.communicationManager; + auto &payloadCommunicationManager = *fixture.communicationManager; + auto payloadMemorySpace = fixture.memorySpace; + + ////////////////////// Test begin + + // Check payload capacity, that buffer is empty, an thus not full + ASSERT_EQ(producer.getPayloadCapacity(), CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE)); + producer.updateDepth(); + ASSERT_EQ(producer.getCoordinationDepth(), 0); + ASSERT_EQ(producer.getPayloadDepth(), 0); + ASSERT_TRUE(producer.isEmpty()); + ASSERT_FALSE(producer.isFull(CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE))); + ASSERT_TRUE(producer.isFull(CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE) + 1)); + + // Send a buffer big as the buffer channel + ELEMENT_TYPE sendBuffer[CHANNEL_CAPACITY] = {0, 1, 2, 3, 4}; + auto sendBufferPtr = &sendBuffer; + auto sendSlot = payloadMemoryManager.registerLocalMemorySlot(payloadMemorySpace, sendBufferPtr, sizeof(sendBuffer)); + + // Wait for the consumer 1 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // Push the slot + EXPECT_NO_THROW(producer.push(sendSlot)); + + // Check that the channel is full + ASSERT_TRUE(producer.isFull(1)); + ASSERT_FALSE(producer.isEmpty()); + + // Check there is only one token, but the payload depth is equal to the capacity of the buffer + producer.updateDepth(); + ASSERT_EQ(producer.getCoordinationDepth(), 1); + ASSERT_EQ(producer.getPayloadDepth(), producer.getPayloadCapacity()); + + // Check that trying to push another element throws exception + ELEMENT_TYPE sendBuffer2[1] = {5}; + auto sendBufferPtr2 = &sendBuffer2; + auto sendSlot2 = payloadMemoryManager.registerLocalMemorySlot(payloadMemorySpace, sendBufferPtr2, sizeof(sendBuffer2)); + EXPECT_THROW(producer.push(sendSlot2), HiCR::RuntimeException); + + // Wait for the consumer 2 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // Let the consumer do its part of the test + + // Wait for the consumer 3 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); +} + +void consumerFc(ChannelFixture &fixture) +{ + // Create producer and pick managers from the fixture + fixture.consumer = fixture.createConsumer( + *fixture.memoryManager, *fixture.memoryManager, *fixture.communicationManager, *fixture.communicationManager, fixture.memorySpace, fixture.memorySpace, CHANNEL_CAPACITY); + + auto &consumer = *fixture.consumer; + auto &coordinationCommunicationManager = *fixture.communicationManager; + auto &payloadCommunicationManager = *fixture.communicationManager; + + ////////////////////// Test begin + + // Check payload capacity, that buffer is empty, an thus not full + consumer.updateDepth(); + ASSERT_EQ(consumer.getCoordinationDepth(), 0); + ASSERT_EQ(consumer.getPayloadDepth(), 0); + ASSERT_TRUE(consumer.isEmpty()); + ASSERT_FALSE(consumer.isFull(CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE))); + ASSERT_TRUE(consumer.isFull(CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE) + 1)); + + // Wait for producer 1 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // Let the producer do its part of the test + + // Wait for the producer 2 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // After the push, check there is one token and payload buffer is full + consumer.updateDepth(); + ASSERT_EQ(consumer.getCoordinationDepth(), 1); + ASSERT_EQ(consumer.getPayloadDepth(), CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE)); + ASSERT_FALSE(consumer.isEmpty()); + ASSERT_TRUE(consumer.isFull(1)); + + // Peek and check the token data are correct + auto res = consumer.peek(); + ASSERT_EQ(res[0], 0); + ASSERT_EQ(res[1], CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE)); + + // Check the vectory elements corresponds to the ground truth + auto tokenBuffer = (uint8_t *)consumer.getPayloadBufferMemorySlot()->getSourceLocalMemorySlot()->getPointer(); + void *tokenPtr = &tokenBuffer[res[0]]; + for (ELEMENT_TYPE i = 0; i < (res[1] / sizeof(ELEMENT_TYPE)); ++i) { ASSERT_EQ(i, static_cast(tokenPtr)[i]); } + + // Pop and check that the channel is empty, the depth are updated + consumer.pop(); + ASSERT_TRUE(consumer.isEmpty()); + ASSERT_FALSE(consumer.isFull(CHANNEL_CAPACITY * sizeof(ELEMENT_TYPE))); + ASSERT_EQ(consumer.getCoordinationDepth(), 0); + ASSERT_EQ(consumer.getPayloadDepth(), 0); + + // Wait for the producer 3 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); +} + +TEST_F(ChannelFixture, FillBufferWithOneToken) +{ + // Rank 0 is producer, Rank 1 is consumer + if (instanceManager->getCurrentInstance()->isRootInstance()) { producerFc(*this); } + else { consumerFc(*this); } +} diff --git a/tests/frontends/channel/variableSize/spsc/source/fillCoordination.cpp b/tests/frontends/channel/variableSize/spsc/source/fillCoordination.cpp new file mode 100644 index 00000000..1c85bf9a --- /dev/null +++ b/tests/frontends/channel/variableSize/spsc/source/fillCoordination.cpp @@ -0,0 +1,158 @@ +#include +#include +#include + +#include +#include +#include +#include + +#include "../include/channelFixture.hpp" + +void producerFc(ChannelFixture &fixture) +{ + // Create producer and pick managers from the fixture + fixture.producer = fixture.createProducer( + *fixture.memoryManager, *fixture.memoryManager, *fixture.communicationManager, *fixture.communicationManager, fixture.memorySpace, fixture.memorySpace, CHANNEL_CAPACITY - 1); + + auto &producer = *fixture.producer; + auto &payloadMemoryManager = *fixture.memoryManager; + auto &coordinationCommunicationManager = *fixture.communicationManager; + auto &payloadCommunicationManager = *fixture.communicationManager; + auto payloadMemorySpace = fixture.memorySpace; + + ////////////////////// Test begin + + // Here the channel has been created with a smaller capacity. Hence we should fail the last push, + // even though the payload buffer has enough space to hold one more token + + // Wait for the consumer 1 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // Send token one by one + for (size_t i = 0; i < CHANNEL_CAPACITY - 1; ++i) + { + // Prepare slot to send + ELEMENT_TYPE sendBuffer[1] = {static_cast(i)}; + auto sendBufferPtr = &sendBuffer; + auto sendSlot = payloadMemoryManager.registerLocalMemorySlot(payloadMemorySpace, sendBufferPtr, sizeof(sendBuffer)); + + // Push and check depths are increased + ASSERT_EQ(producer.getCoordinationDepth(), i); + ASSERT_EQ(producer.getPayloadDepth(), i * sizeof(ELEMENT_TYPE)); + ASSERT_NO_THROW(producer.push(sendSlot)); + + // Fence to synchronize with the consumer + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + ASSERT_EQ(producer.getCoordinationDepth(), i + 1); + ASSERT_EQ(producer.getPayloadDepth(), (i + 1) * sizeof(ELEMENT_TYPE)); + } + + // This is the last push, and should fail + // Prepare slot to send + ELEMENT_TYPE sendBuffer[1] = {static_cast(10)}; + auto sendBufferPtr = &sendBuffer; + auto sendSlot = payloadMemoryManager.registerLocalMemorySlot(payloadMemorySpace, sendBufferPtr, sizeof(sendBuffer)); + + // Push and check it fails + ASSERT_THROW(producer.push(sendSlot), HiCR::RuntimeException); + + // Check the channel is full + ASSERT_TRUE(producer.isFull(1)); + + // Wait for the consumer 2 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // The consumer is popping, check the depths are updated + for (size_t i = CHANNEL_CAPACITY - 1; i > 0; --i) + { + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + ASSERT_EQ(producer.getCoordinationDepth(), i - 1); + ASSERT_EQ(producer.getPayloadDepth(), (i - 1) * sizeof(ELEMENT_TYPE)); + } + + // Wait for the consumer 3 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); +} + +void consumerFc(ChannelFixture &fixture) +{ + // Create producer and pick managers from the fixture + fixture.consumer = fixture.createConsumer( + *fixture.memoryManager, *fixture.memoryManager, *fixture.communicationManager, *fixture.communicationManager, fixture.memorySpace, fixture.memorySpace, CHANNEL_CAPACITY - 1); + + auto &consumer = *fixture.consumer; + auto &coordinationCommunicationManager = *fixture.communicationManager; + auto &payloadCommunicationManager = *fixture.communicationManager; + + ////////////////////// Test begin + + // Wait for producer 1 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // The producer is pushing, check depths are increasing + for (size_t i = 0; i < CHANNEL_CAPACITY - 1; ++i) + { + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + ASSERT_EQ(consumer.getCoordinationDepth(), i + 1); + ASSERT_EQ(consumer.getPayloadDepth(), (i + 1) * sizeof(ELEMENT_TYPE)); + } + + // Check channel is full + ASSERT_TRUE(consumer.isFull(1)); + + // Wait for the producer 2 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + // Pop token one by one + auto peekIndex = 0; + for (size_t i = CHANNEL_CAPACITY - 1; i > 0; --i) + { + // Check depths are decreasing + ASSERT_EQ(consumer.getCoordinationDepth(), i); + ASSERT_EQ(consumer.getPayloadDepth(), i * sizeof(ELEMENT_TYPE)); + + // Peek and check data are correct + auto res = consumer.peek(); + ASSERT_EQ(res[0], peekIndex * sizeof(ELEMENT_TYPE)); + ASSERT_EQ(res[1], sizeof(ELEMENT_TYPE)); + + auto tokenBuffer = (uint8_t *)consumer.getPayloadBufferMemorySlot()->getSourceLocalMemorySlot()->getPointer(); + void *tokenPtr = &tokenBuffer[res[0]]; + auto token = static_cast(tokenPtr)[0]; + // In this specific test, the value pushed coincides with the index in the channel + ASSERT_EQ(peekIndex, token); + + // Pop + consumer.pop(); + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); + + ASSERT_EQ(consumer.getCoordinationDepth(), i - 1); + ASSERT_EQ(consumer.getPayloadDepth(), (i - 1) * sizeof(ELEMENT_TYPE)); + + // Update peek index + peekIndex++; + } + + // Wait for the producer 3 + coordinationCommunicationManager.fence(CHANNEL_TAG); + payloadCommunicationManager.fence(CHANNEL_TAG); +} + +TEST_F(ChannelFixture, FillCoordinationBuffer) +{ + // Rank 0 is producer, Rank 1 is consumer + if (instanceManager->getCurrentInstance()->isRootInstance()) { producerFc(*this); } + else { consumerFc(*this); } +} diff --git a/tests/frontends/channel/variableSize/spsc/source/meson.build b/tests/frontends/channel/variableSize/spsc/source/meson.build new file mode 100644 index 00000000..e4166982 --- /dev/null +++ b/tests/frontends/channel/variableSize/spsc/source/meson.build @@ -0,0 +1,30 @@ +testSuite = ['tests', 'channels', 'variableSize', 'spsc', 'distributed'] +test_timeout = 60 + +sourceList = [ + 'fillBufferOneToken', + 'fillBufferMultipleTokens', + 'fillCoordination', + 'excessBuffer', +] + +foreach source : sourceList + if 'mpi' in enabledBackends and 'hwloc' in enabledBackends + mpi = executable( + source, + [source + '.cpp'], + dependencies: channelsTestMPIDep, + include_directories: [exampleBuildIncludes], + ) + + if get_option('buildTests') + test( + source, + mpirunExecutable, + args: ['-np', '2', '--oversubscribe', mpi.full_path()], + timeout: test_timeout, + suite: testSuite, + ) + endif + endif +endforeach \ No newline at end of file