Skip to content

Allow AM receive callback to control data allocation/copy#479

Open
pentschev wants to merge 15 commits intorapidsai:branch-0.46from
pentschev:am-delayed-receive
Open

Allow AM receive callback to control data allocation/copy#479
pentschev wants to merge 15 commits intorapidsai:branch-0.46from
pentschev:am-delayed-receive

Conversation

@pentschev
Copy link
Copy Markdown
Member

@pentschev pentschev commented Jul 28, 2025

WIP

@pentschev pentschev self-assigned this Jul 28, 2025
@pentschev pentschev requested a review from a team as a code owner July 28, 2025 21:09
@pentschev pentschev added feature request New feature or request non-breaking Introduces a non-breaking change DO NOT MERGE Hold off on merging; see PR for details labels Jul 28, 2025
@pentschev pentschev mentioned this pull request Aug 20, 2025
8 tasks
Copy link
Copy Markdown
Contributor

@wence- wence- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this looks broadly good though I am slightly worried about the reinterpret_cast usage.

Comment on lines +61 to +62
* @param[in] receiverCallbackInfo receiver callback info to execute when request completes,
* including user header.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this docstring addition. AFAICT the receiverCallbackInfo is not a function, so how can it be executed?

Comment on lines +306 to +307
: _data(reinterpret_cast<const uint8_t*>(&value),
reinterpret_cast<const uint8_t*>(&value) + sizeof(T))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is UB due to aliasing I think, you need to std::memcpy. The compiler will turn it in to the obvious thing.

reinterpret_cast<const uint8_t*>(&value) + sizeof(T))
{
static_assert(std::is_trivially_copyable_v<T>, "Type must be trivially copyable");
static_assert(sizeof(T) > 0, "Type size must be greater than zero");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: No C++ type has zero size (using Empty = T[0] for some T is UB)

* @returns String representation of the data.
*/
AmData(void* data, size_t length) : data(data), length(length) {}
[[nodiscard]] std::string asString() const { return std::string(_data.begin(), _data.end()); }
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This copies the values, is that what you want or would a std::string_view do?

throw std::runtime_error("AmUserHeader size mismatch: expected " + std::to_string(sizeof(T)) +
" bytes, got " + std::to_string(_data.size()));
}
return *reinterpret_cast<const T*>(_data.data());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be std::memcpy as well, I think, due to aliasing issues.

Comment on lines +175 to +177
* This method is used for delayed receive operations where `delayReceive`` was enabled.
* It takes a user-provided buffer and internally calls `ucp_am_recv_data_nbx`` to receive
* the AM data that was stored when the message first arrived. Returns a `Request`` object
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* This method is used for delayed receive operations where `delayReceive`` was enabled.
* It takes a user-provided buffer and internally calls `ucp_am_recv_data_nbx`` to receive
* the AM data that was stored when the message first arrived. Returns a `Request`` object
* This method is used for delayed receive operations where `delayReceive` was enabled.
* It takes a user-provided buffer and internally calls `ucp_am_recv_data_nbx` to receive
* the AM data that was stored when the message first arrived. Returns a `Request` object

Start in markdown, finish in rst :)

/**
* @brief Receive delayed Active Message data into a user-provided buffer pointer.
*
* This method is used for delayed receive operations where `delayReceive`` was enabled.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* This method is used for delayed receive operations where `delayReceive`` was enabled.
* This method is used for delayed receive operations where `delayReceive` was enabled.

*/
class AmUserHeader {
private:
std::vector<uint8_t> _data;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: use std::byte (these aren't characters...).

Also, do you need this to be a resizeable thing, or is it sufficient for this object to manage a std::byte *?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

DO NOT MERGE Hold off on merging; see PR for details feature request New feature or request non-breaking Introduces a non-breaking change

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants