Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ void producerFc(HiCR::MemoryManager &coordinationMemoryManager,
// is the message sizes buffer full?
// Note: it might be necessary sometimes to also check the payload buffers
// as they might overflow independently of the message size buffers
while (producer.isFull()) { producer.updateDepth(); }
while (producer.isFull(nextElemSize)) { producer.updateDepth(); }

producer.push(sendSlot);
Printer<ELEMENT_TYPE>::printBytes(prefix, elements[i].first, payloadCapacity, 0, nextElemSize);
Expand Down
4 changes: 2 additions & 2 deletions examples/channels/variableSize/spsc/include/consumer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ void consumerFc(HiCR::MemoryManager &coordinationMemoryManager,
auto sizesBufferSlot = coordinationMemoryManager.allocateLocalMemorySlot(coordinationMemorySpace, sizesBufferSize);

// Allocating payload buffer as a local memory slot
auto payloadBufferSlot = payloadMemoryManager.allocateLocalMemorySlot(payloadMemorySpace, PAYLOAD_CAPACITY);
auto payloadBufferSlot = payloadMemoryManager.allocateLocalMemorySlot(payloadMemorySpace, PAYLOAD_CAPACITY * 2);

// Getting required buffer size
auto coordinationBufferSize = HiCR::channel::variableSize::Base::getCoordinationBufferSize();
Expand Down Expand Up @@ -86,7 +86,7 @@ void consumerFc(HiCR::MemoryManager &coordinationMemoryManager,
channelCapacity);

// Getting a single value from the channel
while (consumer.getDepth() != 1) consumer.updateDepth();
while (consumer.getCoordinationDepth() != 1) consumer.updateDepth();

// Getting internal pointer of the token buffer slot
auto payloadBufferPtr = (ELEMENT_TYPE *)payloadBufferSlot->getPointer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class Consumer
* immediately upon creation of the SPSC channel. Therefore we do not reset
* _depths to zero, and check for "early" received messages
*/
_depths.push_back(consumerPtr->getDepth());
_depths.push_back(consumerPtr->getCoordinationDepth());
for (size_t j = 0; j < _depths.back(); j++) { _channelPushes.push(i); }
}
}
Expand Down Expand Up @@ -222,7 +222,7 @@ class Consumer
for (size_t i = 0; i < _spscList.size(); i++)
{
_spscList[i]->updateDepth();
newDepths[i] = _spscList[i]->getDepth();
newDepths[i] = _spscList[i]->getCoordinationDepth();
}

for (size_t i = 0; i < _spscList.size(); i++)
Expand Down
42 changes: 34 additions & 8 deletions include/hicr/frontends/channel/variableSize/spsc/consumer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ class Consumer final : public variableSize::Base
*
* \param[in] coordinationCommunicationManager The backend's memory manager to facilitate communication between the producer and consumer coordination buffers
* \param[in] payloadCommunicationManager The backend's memory manager to facilitate communication between the producer and consumer payload buffers
* \param[in] payloadBuffer The memory slot pertaining to the payload buffer. The producer will push new tokens
* into this buffer, while there is enough space (in bytes). This buffer should be big enough to hold at least the
* largest message of the variable-sized messages to be pushed.
* \param[in] payloadBuffer The memory slot pertaining to the payload buffer. The producer will push messages into this
* buffer, while there is enough space. This buffer should be large enough to hold twice the capacity specified by payloadCapacity argument.
* Half of the buffer is used as excess buffer to avoid internal fragmentation of messages
* \param[in] tokenBuffer The memory slot pertaining to the token buffer. This buffer is only used to exchange internal metadata
* about the sizes of the individual messages being sent.
* \param[in] internalCoordinationBufferForCounts This is a small buffer to hold the internal (local) state of the
Expand All @@ -62,7 +62,7 @@ class Consumer final : public variableSize::Base
* buffer for message counts, used for remote updates on pop()
* \param[in] producerCoordinationBufferForPayloads A global reference to the producer channel's internal coordination
* buffer for payload sizes (in bytes), used for remote updates on pop()
* \param[in] payloadCapacity The capacity (in bytes) of the buffer for variable-sized messages
* \param[in] payloadCapacity The capacity (in bytes) of the buffer for variable-sized messages.
* \param[in] capacity The maximum number of tokens that will be held by this channel
* @note: The token size in var-size channels is used only internally, and is passed as having a type size_t (with size sizeof(size_t))
*/
Expand Down Expand Up @@ -243,23 +243,28 @@ class Consumer final : public variableSize::Base
* receiving the message counts (phase 2), returning this depth should guarantee
* we already have received the payloads
*
* \note This is not a thread-safe call
*
* This is a getter function that should complete in \f$ \Theta(1) \f$ time.
*
* @return The number of elements in variable-size consumer channel
*
* \note This is not a thread-safe call
* \note Even though there might be space for additional tokens in the coordination buffer, it is not guaranteed that
* the push() will succeed due to insufficient space in the payload buffer
*/
size_t getDepth() { return getCircularBufferForCounts()->getDepth(); }
size_t getCoordinationDepth() { return getCircularBufferForCounts()->getDepth(); }

/**
* Returns the current depth of the channel holding the payloads
*
* \note This is not a thread-safe call
*
* This is a getter function that should complete in \f$ \Theta(1) \f$ time.
*
* @returns The number of total bytes in the payloads channel
*
* \note This is not a thread-safe call
* \note Even though there might be space for additional tokens in the payload buffer, it is not guaranteed that
* the push() will succeed due to insufficient space in the coordination buffer
*/
size_t getPayloadDepth() { return getCircularBufferForPayloads()->getDepth(); }

Expand All @@ -271,7 +276,28 @@ class Consumer final : public variableSize::Base
* \returns true, if both message count and payload buffers are empty
* \returns false, if one of the buffers is not empty
*/
bool isEmpty() { return (getDepth() == 0); }
bool isEmpty() { return getCoordinationDepth() == 0; }

/**
* This funciton can be used to quickly check whether the channel is becoming full when trying
* to push an element of a given size. First thing, we are checking if we can still
* push tokens (i.e., if the coordination buffer has space). Second thing, we are checking the
* payload buffer. If the current depth of the payload and the requiredBufferSize to push
* exceed the channel capacity, the channel is considered full.
*
* \param[in] requiredBufferSize size of the token to push into the channel
*
* \return true if there is enough space to push the token, false otherwise
*/
bool isFull(size_t requiredBufferSize)
{
auto coordinationCircularBuffer = getCircularBufferForCounts();
if (coordinationCircularBuffer->getDepth() == coordinationCircularBuffer->getCapacity()) return true;
auto payloadCircularBuffer = getCircularBufferForPayloads();
if (payloadCircularBuffer->getDepth() + requiredBufferSize > payloadCircularBuffer->getCapacity()) return true;

return false;
}

/**
* Retrieves the pointer to the channel's payload buffer
Expand Down
Loading