Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 82 additions & 44 deletions src/OpenTelemetry.AutoInstrumentation.Native/continuous_profiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ constexpr auto kMaxCodesPerBuffer = 10 * 1000;
constexpr auto kSamplesBufferMaximumSize = 200 * 1024;

constexpr auto kSamplesBufferDefaultSize = 20 * 1024;
constexpr auto kSelectiveSamplingMaxSpans = 50;
constexpr auto kSelectiveSamplingMaxTraces = 50;
constexpr auto kSelectiveSamplingMaxAgeMinutes = 15;

// If you change these, change ThreadSampler.cs too
Expand Down Expand Up @@ -90,9 +90,9 @@ static std::mutex thread_span_context_lock;
static continuous_profiler::ThreadSpanContextMap thread_span_context_map;

// Keep timestamps as values to handle stalled spans
static std::mutex selective_sampling_lock;
static std::unordered_map<continuous_profiler::thread_span_context, long long> selective_sampling_trace_map;
static std::unordered_set<ThreadID> selective_sampling_thread_buffer;
static std::mutex selective_sampling_lock;
static std::unordered_map<continuous_profiler::trace_context, long long> selective_sampling_trace_map;
static std::unordered_set<ThreadID> selective_sampling_thread_buffer;

static std::mutex name_cache_lock = std::mutex();

Expand Down Expand Up @@ -172,9 +172,14 @@ static void AppendToSelectedThreadsSampleBuffer(int32_t appendLen, unsigned char
selective_sampling_buffer.insert(selective_sampling_buffer.end(), appendBuf, &appendBuf[appendLen]);
}

bool continuous_profiler::trace_context::IsDefault() const
{
return trace_id_high_ == 0 && trace_id_low_ == 0;
}

bool continuous_profiler::thread_span_context::IsDefault() const
{
return trace_id_high_ == 0 && trace_id_low_ == 0 && span_id_ == 0;
return trace_context_.IsDefault() && span_id_ == 0;
}

void AllocationSamplingAppendToBuffer(int32_t appendLen, unsigned char* appendBuf)
Expand Down Expand Up @@ -318,16 +323,21 @@ void ThreadSamplesBuffer::EndSelectedThreadsBatch() const
WriteByte(kSelectedThreadsEndBatch);
}

void ThreadSamplesBuffer::WriteSpanContext(const thread_span_context& span_context) const
{
WriteUInt64(span_context.trace_context_.trace_id_high_);
WriteUInt64(span_context.trace_context_.trace_id_low_);
WriteUInt64(span_context.span_id_);
}

void ThreadSamplesBuffer::StartSample(ThreadID id,
const ThreadState* state,
const thread_span_context& span_context) const
{
CHECK_SAMPLES_BUFFER_LENGTH()
WriteByte(kThreadSamplesStartSample);
WriteString(state->thread_name_);
WriteUInt64(span_context.trace_id_high_);
WriteUInt64(span_context.trace_id_low_);
WriteUInt64(span_context.span_id_);
WriteSpanContext(span_context);
// Feature possibilities: (managed/native) thread priority, cpu/wait times, etc.
}

Expand All @@ -338,9 +348,7 @@ void ThreadSamplesBuffer::StartSampleForSelectedThread(const ThreadState*
WriteByte(kSelectedThreadSample);
WriteCurrentTimeMillis();
WriteString(state->thread_name_);
WriteUInt64(span_context.trace_id_high_);
WriteUInt64(span_context.trace_id_low_);
WriteUInt64(span_context.span_id_);
WriteSpanContext(span_context);
}

void ThreadSamplesBuffer::MarkSelectedForFrequentSampling(bool value) const
Expand All @@ -363,9 +371,7 @@ void ThreadSamplesBuffer::AllocationSample(uint64_t allocSize,
WriteUInt64(allocSize);
WriteString(allocType, allocTypeCharLen);
WriteString(state->thread_name_);
WriteUInt64(span_context.trace_id_high_);
WriteUInt64(span_context.trace_id_low_);
WriteUInt64(span_context.span_id_);
WriteSpanContext(span_context);
}

void ThreadSamplesBuffer::RecordFrame(const FunctionIdentifier& fid, const trace::WSTRING& frame)
Expand Down Expand Up @@ -476,6 +482,18 @@ void ThreadSpanContextMap::Remove(const thread_span_context& spanContext)
thread_span_context_map.erase(threadId);
}
span_context_thread_map.erase(spanContext);

const auto foundByTraceContext = trace_active_span_map.find(spanContext.trace_context_);
if (foundByTraceContext == trace_active_span_map.end())
{
return; // nothing to remove
}
auto& traceActiveSpans = foundByTraceContext->second;
traceActiveSpans.erase(spanContext);
if (traceActiveSpans.empty())
{
trace_active_span_map.erase(spanContext.trace_context_);
}
}

void ThreadSpanContextMap::Remove(ThreadID threadId)
Expand All @@ -485,13 +503,15 @@ void ThreadSpanContextMap::Remove(ThreadID threadId)
{
return; // nothing to remove
}

const auto spanContext = foundByThreadId->second;
const auto foundBySpanContext = span_context_thread_map.find(spanContext);
if (foundBySpanContext != span_context_thread_map.end())
{
auto& threadIds = foundBySpanContext->second;
threadIds.erase(threadId);
}

thread_span_context_map.erase(threadId);
}

Expand All @@ -506,24 +526,43 @@ void ThreadSpanContextMap::Put(ThreadID threadId, const thread_span_context& cur
span_context_thread_map[previousContext].erase(threadId);
}
}

thread_span_context_map[threadId] = currentSpanContext;
if (!currentSpanContext.IsDefault())
if (currentSpanContext.IsDefault())
{
span_context_thread_map[currentSpanContext].insert(threadId);
return;
}

span_context_thread_map[currentSpanContext].insert(threadId);

// Also note of possibly new activity, store it in trace map.
// This is a noop if already present.
trace_active_span_map[currentSpanContext.trace_context_].insert(currentSpanContext);
}

const std::unordered_set<ThreadID>* ThreadSpanContextMap::Get(const thread_span_context& spanContext)
void ThreadSpanContextMap::GetAllThreads(const trace_context traceContext, std::unordered_set<ThreadID>& buffer)
{
const auto iterator = span_context_thread_map.find(spanContext);
if (iterator == span_context_thread_map.end())
const auto traceSpans = trace_active_span_map.find(traceContext);
if (traceSpans == trace_active_span_map.end())
{
return;
}

const auto& spanContexts = traceSpans->second;
for (const auto& spanContext : spanContexts)
{
return nullptr;
const auto foundBySpanContext = span_context_thread_map.find(spanContext);
if (foundBySpanContext == span_context_thread_map.end())
{
continue;
}

const auto& threadIds = foundBySpanContext->second;
buffer.insert(threadIds.begin(), threadIds.end());
}
return &iterator->second;
}

std::optional<thread_span_context> ThreadSpanContextMap::Get(ThreadID threadId)
std::optional<thread_span_context> ThreadSpanContextMap::GetContext(ThreadID threadId)
{
const auto iterator = thread_span_context_map.find(threadId);
if (iterator == thread_span_context_map.end())
Expand Down Expand Up @@ -858,7 +897,7 @@ static ThreadState* GetThreadState(const std::unordered_map<ThreadID, ThreadStat

static thread_span_context GetContext(ThreadID threadId)
{
auto retrievedContext = thread_span_context_map.Get(threadId);
auto retrievedContext = thread_span_context_map.GetContext(threadId);
return retrievedContext.has_value() ? retrievedContext.value() : thread_span_context{};
}

Expand All @@ -883,7 +922,7 @@ static void ResolveSymbolsAndPublishBufferForAllThreads(
if (prof->selectedThreadsSamplingInterval.has_value())
{
const auto threadSelectedForFrequentSampling =
selective_sampling_trace_map.find(spanContext) != selective_sampling_trace_map.end();
selective_sampling_trace_map.find(spanContext.trace_context_) != selective_sampling_trace_map.end();
prof->cur_cpu_writer_->MarkSelectedForFrequentSampling(threadSelectedForFrequentSampling);
}

Expand Down Expand Up @@ -924,7 +963,7 @@ static void ResolveSymbolsAndPublishBufferForSelectedThreads(
AppendToSelectedThreadsSampleBuffer(static_cast<int32_t>(localBytes.size()), localBytes.data());
}

static void RemoveOutdatedEntries(std::unordered_map<thread_span_context, long long>& selectiveSamplingTraceSet)
static void RemoveOutdatedEntries(std::unordered_map<trace_context, long long>& selectiveSamplingTraceSet)
{
const auto now = std::chrono::steady_clock::now();

Expand All @@ -936,10 +975,10 @@ static void RemoveOutdatedEntries(std::unordered_map<thread_span_context, long l
auto deadline = enqueuedTime + std::chrono::minutes(kSelectiveSamplingMaxAgeMinutes);
if (now > deadline)
{
trace::Logger::Warn("SelectiveSampling: removing outdated entry for span {",
trace::Logger::Warn("SelectiveSampling: removing outdated entry for trace {",
"traceIdHigh: ", it->first.trace_id_high_, ", traceIdLow: ", it->first.trace_id_low_,
", spanId: ", it->first.span_id_, "} because it was enqueued more than ",
kSelectiveSamplingMaxAgeMinutes, " minutes ago");
"} because it was enqueued more than ", kSelectiveSamplingMaxAgeMinutes,
" minutes ago");
it = selectiveSamplingTraceSet.erase(it);
}
else
Expand Down Expand Up @@ -983,13 +1022,9 @@ static void PauseClrAndCaptureSamples(ContinuousProfiler*
}

selective_sampling_thread_buffer.clear();
for (const auto& spanContext : selective_sampling_trace_map)
for (const auto& [traceContext, _] : selective_sampling_trace_map)
{
const auto threadsForSpanContext = thread_span_context_map.Get(spanContext.first);
if (threadsForSpanContext != nullptr)
{
selective_sampling_thread_buffer.insert(threadsForSpanContext->begin(), threadsForSpanContext->end());
}
thread_span_context_map.GetAllThreads(traceContext, selective_sampling_thread_buffer);
}
if (selective_sampling_thread_buffer.empty())
{
Expand Down Expand Up @@ -1192,7 +1227,7 @@ void ContinuousProfiler::SetGlobalInfo12(ICorProfilerInfo12* cor_profiler_info12
void ContinuousProfiler::InitSelectiveSamplingBuffer()
{
selective_sampling_buffer.reserve(kSamplesBufferDefaultSize);
selective_sampling_thread_buffer.reserve(kSelectiveSamplingMaxSpans);
selective_sampling_thread_buffer.reserve(kSelectiveSamplingMaxTraces);
}

void ContinuousProfiler::StartThreadSampling()
Expand Down Expand Up @@ -1544,6 +1579,9 @@ extern "C"
}
EXPORTTHIS void ContinuousProfilerSetNativeContext(uint64_t traceIdHigh, uint64_t traceIdLow, uint64_t spanId)
{
// This method is called anytime thread-span association changes, e.g. when activity is started/stopped or
// when suspension/resumption occurs.

ThreadID threadId;
const HRESULT hr = profiler_info->GetCurrentThreadID(&threadId);
if (FAILED(hr))
Expand All @@ -1554,28 +1592,28 @@ extern "C"
}
std::lock_guard<std::mutex> guard(thread_span_context_lock);

continuous_profiler::thread_span_context newSpanContext = {traceIdHigh, traceIdLow, spanId};
const continuous_profiler::thread_span_context newSpanContext = {traceIdHigh, traceIdLow, spanId};
thread_span_context_map.Put(threadId, newSpanContext);
}
EXPORTTHIS void SelectiveSamplingStart(uint64_t traceIdHigh, uint64_t traceIdLow, uint64_t spanId)
EXPORTTHIS void SelectiveSamplingStart(uint64_t traceIdHigh, uint64_t traceIdLow)
{
if (profiler_info == nullptr)
{
return;
}

continuous_profiler::thread_span_context context = {traceIdHigh, traceIdLow, spanId};
const continuous_profiler::trace_context context = {traceIdHigh, traceIdLow};
if (context.IsDefault())
{
return;
}
std::lock_guard<std::mutex> guard(selective_sampling_lock);
// Don't allow for too many samples at once
if (selective_sampling_trace_map.size() >= kSelectiveSamplingMaxSpans)
if (selective_sampling_trace_map.size() >= kSelectiveSamplingMaxTraces)
{
trace::Logger::Warn("SelectiveSamplingStart: ignoring request to start sampling for span {",
"traceIdHigh: ", traceIdHigh, ", traceIdLow: ", traceIdLow, ", spanId: ", spanId,
"} because maximum number of spans is already being sampled.");
trace::Logger::Warn("SelectiveSamplingStart: ignoring request to start sampling for trace {",
"traceIdHigh: ", traceIdHigh, ", traceIdLow: ", traceIdLow,
"} because maximum number of traces is already being sampled.");
return;
}
const auto now =
Expand All @@ -1584,14 +1622,14 @@ extern "C"
selective_sampling_trace_map[context] = now;
}

EXPORTTHIS void SelectiveSamplingStop(uint64_t traceIdHigh, uint64_t traceIdLow, uint64_t spanId)
EXPORTTHIS void SelectiveSamplingStop(uint64_t traceIdHigh, uint64_t traceIdLow)
{
if (profiler_info == nullptr)
{
return;
}

continuous_profiler::thread_span_context context = {traceIdHigh, traceIdLow, spanId};
const continuous_profiler::trace_context context = {traceIdHigh, traceIdLow};
if (context.IsDefault())
{
return;
Expand Down
Loading
Loading