Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 25 additions & 13 deletions xptifw/src/xpti_trace_framework.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1484,12 +1484,12 @@ class Notifications {
/// @brief Maps a trace type to its associated callback entries.
/// @details This unordered map uses a uint16_t as the key to represent the
/// trace point type, and cb_entries_t to store the associated callbacks.
using cb_t = emhash7::HashMap<uint16_t, cb_entries_t>;
using cb_t = phmap::flat_hash_map<uint16_t, cb_entries_t>;

/// @typedef stream_cb_t
/// @brief Maps a stream ID to its corresponding callbacks for different
/// trace types
/// @details This unordered map uses the xpti::strem_id_t as the key for the
/// @details This unordered map uses the xpti::stream_id_t as the key for the
/// stream ID, and cb_t to map the stream to registered callbacks for each
/// trace type
using stream_cb_t = emhash7::HashMap<xpti::stream_id_t, cb_t>;
Expand All @@ -1507,7 +1507,7 @@ class Notifications {
/// @details This unordered map uses a uint16_t as the key for the trace
/// type, and a boolean value to indicate whether callbacks are registered
/// for this trace type (e.g., registered or unregisterted/no callback).
using trace_flags_t = emhash7::HashMap<uint16_t, bool>;
using trace_flags_t = phmap::flat_hash_map<uint16_t, bool>;

/// @typedef stream_flags_t
/// @brief Maps a stream ID to its corresponding trace flags for different
Expand All @@ -1518,6 +1518,9 @@ class Notifications {
/// the given stream.
using stream_flags_t = emhash7::HashMap<xpti::stream_id_t, trace_flags_t>;

Notifications(size_t size = 512)
: MCallbacksByStream(size), MStreamFlags(size) {}

/// @brief Registers a callback function for a specific trace type and stream
/// ID.
///
Expand Down Expand Up @@ -1562,12 +1565,15 @@ class Notifications {
}
}
#endif
{
std::unique_lock<std::shared_mutex> Lock(MFlagsLock);
// Get the flags for the stream
auto &TraceFlags = MStreamFlags[StreamID];
TraceFlags[TraceType] = true; // Set the trace type flag to true
}
// If reader-writer locks were emplyed, this is where the writer lock can
// be used
std::unique_lock<std::shared_mutex> Lock(MCBsLock);
auto &TraceFlags = MStreamFlags[StreamID]; // Get the trace flags for the
// stream ID
TraceFlags[TraceType] = true; // Set the trace type flag to true

auto &StreamCBs =
MCallbacksByStream[StreamID]; // thread-safe
Expand Down Expand Up @@ -1637,14 +1643,16 @@ class Notifications {
if (!cbFunc)
return xpti::result_t::XPTI_RESULT_INVALIDARG;

{
std::unique_lock<std::shared_mutex> Lock(MFlagsLock);
auto &TraceFlags = MStreamFlags[StreamID]; // Get the trace flags for the
// stream ID
TraceFlags[TraceType] = false; // Set the trace type flag to false
}
// Since we do not remove the callback function when they are unregistered
// and only reset the flag, the writer lock is not held for very long; use
// writer lock here.
std::unique_lock<std::shared_mutex> Lock(MCBsLock);
auto &TraceFlags = MStreamFlags[StreamID]; // Get the trace flags for the
// stream ID
TraceFlags[TraceType] = false; // Set the trace type flag to false

auto &StreamCBs =
MCallbacksByStream[StreamID]; // thread-safe
// What we get is a concurrent_hash_map
Expand Down Expand Up @@ -1691,16 +1699,18 @@ class Notifications {
/// use reader-writer locks, a reader lock should be used where appropriate.

xpti::result_t unregisterStream(xpti::stream_id_t StreamID) {
{
std::unique_lock<std::shared_mutex> Lock(MFlagsLock);
// Get the trace flags for the stream
MStreamFlags.erase(StreamID);
}
// If there are no callbacks registered for the requested stream ID, we
// return not found; use reader lock here if the implementation moves to
// reader-writer locks.
std::unique_lock<std::shared_mutex> Lock(MCBsLock);
if (MCallbacksByStream.count(StreamID) == 0)
return xpti::result_t::XPTI_RESULT_NOTFOUND;

// Get the trace flags for the stream
MStreamFlags.erase(StreamID);

auto &StreamCBs = MCallbacksByStream[StreamID]; // thread-safe
// Disable all callbacks registered for the stream represented by StreamID
for (auto &Item : StreamCBs) {
Expand Down Expand Up @@ -1732,6 +1742,7 @@ class Notifications {
if (StreamID == 0)
return false;

std::shared_lock<std::shared_mutex> Lock(MFlagsLock);
// Instead of checking the MCallbacksByStream to see if there are
// registered callbacks for a given stream/trace type query, we check
// this against a shadow data structure that sets a boolean flag equals
Expand Down Expand Up @@ -1899,6 +1910,7 @@ class Notifications {
#endif
stream_cb_t MCallbacksByStream;
mutable std::shared_mutex MCBsLock;
mutable std::shared_mutex MFlagsLock;
std::mutex MStatsLock;
statistics_t MStats;
stream_flags_t MStreamFlags;
Expand Down
Loading