Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@

#include <rocprofiler-sdk/fwd.h>

#include <vector>

namespace rocprofiler
{
namespace context
Expand All @@ -47,7 +49,7 @@ get_correlation_id_map()
auto&
get_latest_correlation_id_impl()
{
static thread_local auto _v = common::container::small_vector<correlation_id*, 16>{};
static thread_local auto _v = std::vector<correlation_id*>{};
return _v;
}

Expand Down
436 changes: 273 additions & 163 deletions projects/rocprofiler-sdk/source/lib/rocprofiler-sdk/hsa/details/fmt.hpp

Large diffs are not rendered by default.

237 changes: 190 additions & 47 deletions projects/rocprofiler-sdk/source/lib/rocprofiler-sdk/hsa/queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,11 @@
#include <hsa/hsa.h>
#include <hsa/hsa_ext_amd.h>

#include <algorithm>
#include <atomic>
#include <chrono>
#include <memory>
#include <thread>

// static assert for rocprofiler_packet ABI compatibility
static_assert(sizeof(hsa_ext_amd_aql_pm4_packet_t) == sizeof(hsa_kernel_dispatch_packet_t),
Expand Down Expand Up @@ -96,20 +99,70 @@ context_filter(const context::context* ctx)
context_filter(ctx, ROCPROFILER_CALLBACK_TRACING_KERNEL_DISPATCH));
}

bool
AsyncSignalHandler(hsa_signal_value_t /*signal_v*/, void* data)
struct async_signal_handler_data
{
if(!data) return true;
std::shared_ptr<Queue::queue_info_session_t> session = {};
Queue* owner = nullptr;
std::atomic<bool> handled = false;
};

// if we have fully finalized, delete the data and return
if(registration::get_fini_status() > 0)
{
auto* _session = static_cast<Queue::queue_info_session_t**>(data);
delete _session;
return false;
}
auto&
get_async_signal_handler_data_map()
{
static auto _v = common::Synchronized<
std::unordered_map<void*, std::shared_ptr<async_signal_handler_data>>>{};
return _v;
}

void
register_async_signal_handler_data(const std::shared_ptr<async_signal_handler_data>& data)
{
get_async_signal_handler_data_map().wlock([&](auto& map) { map.emplace(data.get(), data); });
}

std::shared_ptr<async_signal_handler_data>
acquire_async_signal_handler_data(void* data)
{
return get_async_signal_handler_data_map().wlock([&](auto& map) {
auto itr = map.find(data);
if(itr == map.end()) return std::shared_ptr<async_signal_handler_data>{};

auto ret = itr->second;
if(!ret || ret->handled.exchange(true, std::memory_order_acq_rel))
{
return std::shared_ptr<async_signal_handler_data>{};
}

return ret;
});
}

void
drain_async_signal_handler_data(const Queue& queue)
{
get_async_signal_handler_data_map().wlock([&](auto& map) {
for(auto itr = map.begin(); itr != map.end();)
{
auto& entry = itr->second;
if(entry && entry->owner == &queue &&
entry->handled.load(std::memory_order_acquire))
{
itr = map.erase(itr);
}
else
{
++itr;
}
}
});
}

void
ProcessDispatchCompletion(std::shared_ptr<Queue::queue_info_session_t>& shared_ptr_info,
bool retire_signals)
{
if(!shared_ptr_info) return;

auto& shared_ptr_info = *static_cast<std::shared_ptr<Queue::queue_info_session_t>*>(data);
auto& queue_info_session = *shared_ptr_info;

auto dispatch_time = kernel_dispatch::get_dispatch_time(queue_info_session);
Expand Down Expand Up @@ -142,17 +195,24 @@ AsyncSignalHandler(hsa_signal_value_t /*signal_v*/, void* data)
if(queue_info_session.interrupt_signal.handle != 0u)
{
#if !defined(NDEBUG)
CHECK_NOTNULL(hsa::get_queue_controller())->_debug_signals.wlock([&](auto& signals) {
signals.erase(queue_info_session.interrupt_signal.handle);
});
if(retire_signals)
{
CHECK_NOTNULL(hsa::get_queue_controller())->_debug_signals.wlock([&](auto& signals) {
signals.erase(queue_info_session.interrupt_signal.handle);
});
}
#endif
hsa::get_core_table()->hsa_signal_store_screlease_fn(queue_info_session.interrupt_signal,
-1);
hsa::get_core_table()->hsa_signal_destroy_fn(queue_info_session.interrupt_signal);
if(retire_signals)
{
queue_info_session.queue.retire_signal(queue_info_session.interrupt_signal);
}
}
if(queue_info_session.kernel_pkt.ext_amd_aql_pm4.completion_signal.handle != 0u)
if(retire_signals &&
queue_info_session.kernel_pkt.ext_amd_aql_pm4.completion_signal.handle != 0u)
{
hsa::get_core_table()->hsa_signal_destroy_fn(
queue_info_session.queue.retire_signal(
queue_info_session.kernel_pkt.ext_amd_aql_pm4.completion_signal);
}

Expand All @@ -168,7 +228,25 @@ AsyncSignalHandler(hsa_signal_value_t /*signal_v*/, void* data)
}

queue_info_session.queue.async_complete();
delete &shared_ptr_info;
shared_ptr_info.reset();
}

bool
AsyncSignalHandler(hsa_signal_value_t /*signal_v*/, void* data)
{
if(!data) return true;

auto handler_data = acquire_async_signal_handler_data(data);
if(!handler_data) return false;

// if we have fully finalized, discard the retained session and return
if(registration::get_fini_status() > 0)
{
handler_data->session.reset();
return false;
}

ProcessDispatchCompletion(handler_data->session, true);

return false;
}
Expand Down Expand Up @@ -232,6 +310,23 @@ WriteInterceptor(const void* packets,
ROCP_FATAL_IF(data == nullptr) << "WriteInterceptor was not passed a pointer to the queue";

auto& queue = *static_cast<Queue*>(data);
// Graph replay can keep ROCr async-handler bookkeeping alive briefly after a dispatch
// completes. Retire completion signals in the callback and only reclaim them once the queue
// is fully idle.
if(queue.active_interceptors() == 0 && queue.active_async_packets() == 0)
{
drain_async_signal_handler_data(queue);
queue.drain_retired_signals();
}
queue.interceptor_started();
auto _interceptor_dtor =
common::scope_destructor{[&queue]() { queue.interceptor_complete(); }};

if(queue.get_state() != queue_state::normal)
{
writer(packets, pkt_count);
return;
}

// We have no packets or no one who needs to be notified, do nothing.
if(pkt_count == 0 ||
Expand Down Expand Up @@ -273,24 +368,18 @@ WriteInterceptor(const void* packets,
auto* corr_id = context::get_latest_correlation_id();
context::correlation_id* _corr_id_pop = nullptr;

if(!corr_id)
{
constexpr auto ref_count = 1;
corr_id = context::correlation_tracing_service::construct(ref_count);
_corr_id_pop = corr_id;
}

if(!corr_id)
// Graph replay and other queue writes can be intercepted on the HSA async event thread,
// outside any active host API scope. In that case we still want the dispatch trace, but
// synthesizing a fresh correlation ID per dispatch here adds allocator pressure on the
// async thread and is not required for kernel trace collection.
if(corr_id)
{
// During finalization - just write packet through without tracing
transformed_packets.emplace_back(packets_arr[i]);
continue;
// increase the reference count to denote that this correlation id is being used in a
// kernel
corr_id->add_ref_count();
corr_id->add_kern_count();
}

// increase the reference count to denote that this correlation id is being used in a kernel
corr_id->add_ref_count();
corr_id->add_kern_count();

auto thr_id = (corr_id) ? corr_id->thread_idx : common::get_tid();
auto user_data = rocprofiler_user_data_t{.value = 0};
auto internal_corr_id = (corr_id) ? corr_id->internal : 0;
Expand Down Expand Up @@ -321,10 +410,6 @@ WriteInterceptor(const void* packets,

// Copy kernel pkt, copy is to allow for signal to be modified
rocprofiler_packet kernel_pkt = packets_arr[i];
// create our own signal that we can get a callback on. if there is an original completion
// signal we will create a barrier packet, assign the original completion signal that that
// barrier packet, and add it right after the kernel packet
queue.create_signal(0, &kernel_pkt.kernel_dispatch.completion_signal);

// computes the "size" based on the offset of reserved_padding field
constexpr auto kernel_dispatch_info_rt_size =
Expand Down Expand Up @@ -417,6 +502,16 @@ WriteInterceptor(const void* packets,
}
}

const bool injected_end_pkt =
std::any_of(inst_pkt.begin(), inst_pkt.end(), [](const auto& pkt_injection) {
return !pkt_injection.first->after_krn_pkt.empty();
});

// create our own signal that we can get a callback on. if there is an original
// completion signal we will create a barrier packet, assign the original completion
// signal that that barrier packet, and add it right after the kernel packet
queue.create_signal(0, &kernel_pkt.kernel_dispatch.completion_signal);

#if ROCPROFILER_SDK_HSA_PC_SAMPLING > 0
if(pc_sampling::is_pc_sample_service_configured(queue.get_agent().get_rocp_agent()->id))
{
Expand All @@ -441,13 +536,11 @@ WriteInterceptor(const void* packets,
transformed_packets.emplace_back(barrier);
}

bool injected_end_pkt = false;
for(const auto& pkt_injection : inst_pkt)
{
for(const auto& pkt : pkt_injection.first->after_krn_pkt)
{
transformed_packets.emplace_back(pkt);
injected_end_pkt = true;
}
}

Expand Down Expand Up @@ -487,9 +580,12 @@ WriteInterceptor(const void* packets,
.is_serialized = bRequest_Serialize};

auto shared = std::make_shared<Queue::queue_info_session_t>(std::move(info_session));
auto async_handler_data = std::make_shared<async_signal_handler_data>();
async_handler_data->session = shared;
async_handler_data->owner = &queue;
register_async_signal_handler_data(async_handler_data);

queue.signal_async_handler(completion_signal,
new std::shared_ptr<Queue::queue_info_session_t>(shared));
queue.signal_async_handler(completion_signal, async_handler_data.get());

auto tracer_data = callback_record;
tracing::execute_phase_exit_callbacks(tracing_data_v.callback_contexts,
Expand Down Expand Up @@ -671,14 +767,61 @@ Queue::create_signal(uint32_t attribute, hsa_signal_t* signal) const
<< " :: " << hsa::get_hsa_status_string(status);
}

void
Queue::retire_signal(hsa_signal_t signal) const
{
if(signal.handle == 0u) return;
std::lock_guard<std::mutex> lk{_retired_signals_mutex};
_retired_signals.emplace_back(signal);
}

void
Queue::drain_retired_signals() const
{
auto pending = std::vector<hsa_signal_t>{};
{
std::lock_guard<std::mutex> lk{_retired_signals_mutex};
pending.swap(_retired_signals);
}

for(const auto& signal : pending)
{
_core_api.hsa_signal_destroy_fn(signal);
}
}

void
Queue::sync() const
{
if(_active_kernels.handle != 0u)
if(_active_kernels.handle == 0u) return;

constexpr auto wait_timeout =
std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::milliseconds{1})
.count();

while(true)
{
_core_api.hsa_signal_wait_relaxed_fn(
_active_kernels, HSA_SIGNAL_CONDITION_EQ, 0, UINT64_MAX, HSA_WAIT_STATE_ACTIVE);
const auto interceptors = active_interceptors();
const auto kernels = _core_api.hsa_signal_load_scacquire_fn(_active_kernels);

if(interceptors == 0 && kernels == 0) break;

if(kernels != 0)
{
_core_api.hsa_signal_wait_relaxed_fn(_active_kernels,
HSA_SIGNAL_CONDITION_EQ,
0,
wait_timeout,
HSA_WAIT_STATE_ACTIVE);
}
else
{
std::this_thread::yield();
}
}

drain_async_signal_handler_data(*this);
drain_retired_signals();
}

void
Expand All @@ -702,13 +845,13 @@ Queue::remove_callback(ClientID id)
queue_state
Queue::get_state() const
{
return _state;
return _state.load(std::memory_order_acquire);
}

void
Queue::set_state(queue_state state)
Queue::set_state(queue_state state) const
{
_state = state;
_state.store(state, std::memory_order_release);
}
} // namespace hsa
} // namespace rocprofiler
Loading
Loading