Skip to content

Commit dde5462

Browse files
authored
[XPTI][INFRA] Fixes race issues with Emhash under contention (#19600)
- Under severe concurrent access, rehash in Emhash was being called during insert and this is documented as a problem in Emhash Readme. Rehash for the Emhash containers will not have to be called now as enough space for handling streams is reserved. - xptiCheckTraceEnabled() is now thread-safe - Emhash containers now have sizes reserved to accommodate the entries and all second level hash maps that were Emhash in the past have been replaced with parallel hashmap. --------- Signed-off-by: Vasanth Tovinkere <[email protected]>
1 parent 7bfef3f commit dde5462

File tree

1 file changed

+25
-13
lines changed

1 file changed

+25
-13
lines changed

xptifw/src/xpti_trace_framework.cpp

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1484,12 +1484,12 @@ class Notifications {
14841484
/// @brief Maps a trace type to its associated callback entries.
14851485
/// @details This unordered map uses a uint16_t as the key to represent the
14861486
/// trace point type, and cb_entries_t to store the associated callbacks.
1487-
using cb_t = emhash7::HashMap<uint16_t, cb_entries_t>;
1487+
using cb_t = phmap::flat_hash_map<uint16_t, cb_entries_t>;
14881488

14891489
/// @typedef stream_cb_t
14901490
/// @brief Maps a stream ID to its corresponding callbacks for different
14911491
/// trace types
1492-
/// @details This unordered map uses the xpti::strem_id_t as the key for the
1492+
/// @details This unordered map uses the xpti::stream_id_t as the key for the
14931493
/// stream ID, and cb_t to map the stream to registered callbacks for each
14941494
/// trace type
14951495
using stream_cb_t = emhash7::HashMap<xpti::stream_id_t, cb_t>;
@@ -1507,7 +1507,7 @@ class Notifications {
15071507
/// @details This unordered map uses a uint16_t as the key for the trace
15081508
/// type, and a boolean value to indicate whether callbacks are registered
15091509
/// for this trace type (e.g., registered or unregisterted/no callback).
1510-
using trace_flags_t = emhash7::HashMap<uint16_t, bool>;
1510+
using trace_flags_t = phmap::flat_hash_map<uint16_t, bool>;
15111511

15121512
/// @typedef stream_flags_t
15131513
/// @brief Maps a stream ID to its corresponding trace flags for different
@@ -1518,6 +1518,9 @@ class Notifications {
15181518
/// the given stream.
15191519
using stream_flags_t = emhash7::HashMap<xpti::stream_id_t, trace_flags_t>;
15201520

1521+
Notifications(size_t size = 512)
1522+
: MCallbacksByStream(size), MStreamFlags(size) {}
1523+
15211524
/// @brief Registers a callback function for a specific trace type and stream
15221525
/// ID.
15231526
///
@@ -1562,12 +1565,15 @@ class Notifications {
15621565
}
15631566
}
15641567
#endif
1568+
{
1569+
std::unique_lock<std::shared_mutex> Lock(MFlagsLock);
1570+
// Get the flags for the stream
1571+
auto &TraceFlags = MStreamFlags[StreamID];
1572+
TraceFlags[TraceType] = true; // Set the trace type flag to true
1573+
}
15651574
// If reader-writer locks were emplyed, this is where the writer lock can
15661575
// be used
15671576
std::unique_lock<std::shared_mutex> Lock(MCBsLock);
1568-
auto &TraceFlags = MStreamFlags[StreamID]; // Get the trace flags for the
1569-
// stream ID
1570-
TraceFlags[TraceType] = true; // Set the trace type flag to true
15711577

15721578
auto &StreamCBs =
15731579
MCallbacksByStream[StreamID]; // thread-safe
@@ -1637,14 +1643,16 @@ class Notifications {
16371643
if (!cbFunc)
16381644
return xpti::result_t::XPTI_RESULT_INVALIDARG;
16391645

1646+
{
1647+
std::unique_lock<std::shared_mutex> Lock(MFlagsLock);
1648+
auto &TraceFlags = MStreamFlags[StreamID]; // Get the trace flags for the
1649+
// stream ID
1650+
TraceFlags[TraceType] = false; // Set the trace type flag to false
1651+
}
16401652
// Since we do not remove the callback function when they are unregistered
16411653
// and only reset the flag, the writer lock is not held for very long; use
16421654
// writer lock here.
16431655
std::unique_lock<std::shared_mutex> Lock(MCBsLock);
1644-
auto &TraceFlags = MStreamFlags[StreamID]; // Get the trace flags for the
1645-
// stream ID
1646-
TraceFlags[TraceType] = false; // Set the trace type flag to false
1647-
16481656
auto &StreamCBs =
16491657
MCallbacksByStream[StreamID]; // thread-safe
16501658
// What we get is a concurrent_hash_map
@@ -1691,16 +1699,18 @@ class Notifications {
16911699
/// use reader-writer locks, a reader lock should be used where appropriate.
16921700

16931701
xpti::result_t unregisterStream(xpti::stream_id_t StreamID) {
1702+
{
1703+
std::unique_lock<std::shared_mutex> Lock(MFlagsLock);
1704+
// Get the trace flags for the stream
1705+
MStreamFlags.erase(StreamID);
1706+
}
16941707
// If there are no callbacks registered for the requested stream ID, we
16951708
// return not found; use reader lock here if the implementation moves to
16961709
// reader-writer locks.
16971710
std::unique_lock<std::shared_mutex> Lock(MCBsLock);
16981711
if (MCallbacksByStream.count(StreamID) == 0)
16991712
return xpti::result_t::XPTI_RESULT_NOTFOUND;
17001713

1701-
// Get the trace flags for the stream
1702-
MStreamFlags.erase(StreamID);
1703-
17041714
auto &StreamCBs = MCallbacksByStream[StreamID]; // thread-safe
17051715
// Disable all callbacks registered for the stream represented by StreamID
17061716
for (auto &Item : StreamCBs) {
@@ -1732,6 +1742,7 @@ class Notifications {
17321742
if (StreamID == 0)
17331743
return false;
17341744

1745+
std::shared_lock<std::shared_mutex> Lock(MFlagsLock);
17351746
// Instead of checking the MCallbacksByStream to see if there are
17361747
// registered callbacks for a given stream/trace type query, we check
17371748
// this against a shadow data structure that sets a boolean flag equals
@@ -1899,6 +1910,7 @@ class Notifications {
18991910
#endif
19001911
stream_cb_t MCallbacksByStream;
19011912
mutable std::shared_mutex MCBsLock;
1913+
mutable std::shared_mutex MFlagsLock;
19021914
std::mutex MStatsLock;
19031915
statistics_t MStats;
19041916
stream_flags_t MStreamFlags;

0 commit comments

Comments
 (0)