Skip to content

Commit acb3fac

Browse files
authored
Merge pull request #48 from Algebraic-Programming/feat/channelExcessBuffer
Feat: channel excess buffer Variable size SPSC channel: - the payload is not split into chunks. We require the user to provide a bigger buffer (ideally double the capacity of the channel), and we use part of it to store entire tokens. The channel logic remains unchanged and it allows you to push only MAX_TOKENS * TOKEN_SIZE - channels take into account both how many tokens are in the channel and the available space - extended the isFull function to take a size_t and compute whether a push with such size can be made. - add MPI tests for channel testing the following: - a single push that fills the payload buffer - a series of push that fills the payload buffer - a series of push that fills the coordination buffer - push-pop-push with a buffer big enough to force the second push to use the excess buffer
2 parents cb4bf02 + 50a4d4e commit acb3fac

File tree

16 files changed

+1146
-122
lines changed

16 files changed

+1146
-122
lines changed

examples/channels/variableSize/mpsc/nonlocking/include/producer.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ void producerFc(HiCR::MemoryManager &coordinationMemoryManager,
129129
// is the message sizes buffer full?
130130
// Note: it might be necessary sometimes to also check the payload buffers
131131
// as they might overflow independently of the message size buffers
132-
while (producer.isFull()) { producer.updateDepth(); }
132+
while (producer.isFull(nextElemSize)) { producer.updateDepth(); }
133133

134134
producer.push(sendSlot);
135135
Printer<ELEMENT_TYPE>::printBytes(prefix, elements[i].first, payloadCapacity, 0, nextElemSize);

examples/channels/variableSize/spsc/include/consumer.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ void consumerFc(HiCR::MemoryManager &coordinationMemoryManager,
3636
auto sizesBufferSlot = coordinationMemoryManager.allocateLocalMemorySlot(coordinationMemorySpace, sizesBufferSize);
3737

3838
// Allocating payload buffer as a local memory slot
39-
auto payloadBufferSlot = payloadMemoryManager.allocateLocalMemorySlot(payloadMemorySpace, PAYLOAD_CAPACITY);
39+
auto payloadBufferSlot = payloadMemoryManager.allocateLocalMemorySlot(payloadMemorySpace, PAYLOAD_CAPACITY * 2);
4040

4141
// Getting required buffer size
4242
auto coordinationBufferSize = HiCR::channel::variableSize::Base::getCoordinationBufferSize();
@@ -86,7 +86,7 @@ void consumerFc(HiCR::MemoryManager &coordinationMemoryManager,
8686
channelCapacity);
8787

8888
// Getting a single value from the channel
89-
while (consumer.getDepth() != 1) consumer.updateDepth();
89+
while (consumer.getCoordinationDepth() != 1) consumer.updateDepth();
9090

9191
// Getting internal pointer of the token buffer slot
9292
auto payloadBufferPtr = (ELEMENT_TYPE *)payloadBufferSlot->getPointer();

include/hicr/frontends/channel/variableSize/mpsc/nonlocking/consumer.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ class Consumer
104104
* immediately upon creation of the SPSC channel. Therefore we do not reset
105105
* _depths to zero, and check for "early" received messages
106106
*/
107-
_depths.push_back(consumerPtr->getDepth());
107+
_depths.push_back(consumerPtr->getCoordinationDepth());
108108
for (size_t j = 0; j < _depths.back(); j++) { _channelPushes.push(i); }
109109
}
110110
}
@@ -222,7 +222,7 @@ class Consumer
222222
for (size_t i = 0; i < _spscList.size(); i++)
223223
{
224224
_spscList[i]->updateDepth();
225-
newDepths[i] = _spscList[i]->getDepth();
225+
newDepths[i] = _spscList[i]->getCoordinationDepth();
226226
}
227227

228228
for (size_t i = 0; i < _spscList.size(); i++)

include/hicr/frontends/channel/variableSize/spsc/consumer.hpp

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,9 @@ class Consumer final : public variableSize::Base
4949
*
5050
* \param[in] coordinationCommunicationManager The backend's memory manager to facilitate communication between the producer and consumer coordination buffers
5151
* \param[in] payloadCommunicationManager The backend's memory manager to facilitate communication between the producer and consumer payload buffers
52-
* \param[in] payloadBuffer The memory slot pertaining to the payload buffer. The producer will push new tokens
53-
* into this buffer, while there is enough space (in bytes). This buffer should be big enough to hold at least the
54-
* largest message of the variable-sized messages to be pushed.
52+
* \param[in] payloadBuffer The memory slot pertaining to the payload buffer. The producer will push messages into this
53+
* buffer, while there is enough space. This buffer should be large enough to hold twice the capacity specified by payloadCapacity argument.
54+
* Half of the buffer is used as excess buffer to avoid internal fragmentation of messages
5555
* \param[in] tokenBuffer The memory slot pertaining to the token buffer. This buffer is only used to exchange internal metadata
5656
* about the sizes of the individual messages being sent.
5757
* \param[in] internalCoordinationBufferForCounts This is a small buffer to hold the internal (local) state of the
@@ -62,7 +62,7 @@ class Consumer final : public variableSize::Base
6262
* buffer for message counts, used for remote updates on pop()
6363
* \param[in] producerCoordinationBufferForPayloads A global reference to the producer channel's internal coordination
6464
* buffer for payload sizes (in bytes), used for remote updates on pop()
65-
* \param[in] payloadCapacity The capacity (in bytes) of the buffer for variable-sized messages
65+
* \param[in] payloadCapacity The capacity (in bytes) of the buffer for variable-sized messages.
6666
* \param[in] capacity The maximum number of tokens that will be held by this channel
6767
* @note: The token size in var-size channels is used only internally, and is passed as having a type size_t (with size sizeof(size_t))
6868
*/
@@ -243,23 +243,28 @@ class Consumer final : public variableSize::Base
243243
* receiving the message counts (phase 2), returning this depth should guarantee
244244
* we already have received the payloads
245245
*
246-
* \note This is not a thread-safe call
247246
*
248247
* This is a getter function that should complete in \f$ \Theta(1) \f$ time.
249248
*
250249
* @return The number of elements in variable-size consumer channel
250+
*
251+
* \note This is not a thread-safe call
252+
* \note Even though there might be space for additional tokens in the coordination buffer, it is not guaranteed that
253+
* the push() will succeed due to insufficient space in the payload buffer
251254
*/
252-
size_t getDepth() { return getCircularBufferForCounts()->getDepth(); }
255+
size_t getCoordinationDepth() { return getCircularBufferForCounts()->getDepth(); }
253256

254257
/**
255258
* Returns the current depth of the channel holding the payloads
256259
*
257-
* \note This is not a thread-safe call
258260
*
259261
* This is a getter function that should complete in \f$ \Theta(1) \f$ time.
260262
*
261263
* @returns The number of total bytes in the payloads channel
262264
*
265+
* \note This is not a thread-safe call
266+
* \note Even though there might be space for additional tokens in the payload buffer, it is not guaranteed that
267+
* the push() will succeed due to insufficient space in the coordination buffer
263268
*/
264269
size_t getPayloadDepth() { return getCircularBufferForPayloads()->getDepth(); }
265270

@@ -271,7 +276,28 @@ class Consumer final : public variableSize::Base
271276
* \returns true, if both message count and payload buffers are empty
272277
* \returns false, if one of the buffers is not empty
273278
*/
274-
bool isEmpty() { return (getDepth() == 0); }
279+
bool isEmpty() { return getCoordinationDepth() == 0; }
280+
281+
/**
282+
* This funciton can be used to quickly check whether the channel is becoming full when trying
283+
* to push an element of a given size. First thing, we are checking if we can still
284+
* push tokens (i.e., if the coordination buffer has space). Second thing, we are checking the
285+
* payload buffer. If the current depth of the payload and the requiredBufferSize to push
286+
* exceed the channel capacity, the channel is considered full.
287+
*
288+
* \param[in] requiredBufferSize size of the token to push into the channel
289+
*
290+
* \return true if there is enough space to push the token, false otherwise
291+
*/
292+
bool isFull(size_t requiredBufferSize)
293+
{
294+
auto coordinationCircularBuffer = getCircularBufferForCounts();
295+
if (coordinationCircularBuffer->getDepth() == coordinationCircularBuffer->getCapacity()) return true;
296+
auto payloadCircularBuffer = getCircularBufferForPayloads();
297+
if (payloadCircularBuffer->getDepth() + requiredBufferSize > payloadCircularBuffer->getCapacity()) return true;
298+
299+
return false;
300+
}
275301

276302
/**
277303
* Retrieves the pointer to the channel's payload buffer

0 commit comments

Comments
 (0)