Skip to content

Commit 8b7bc8d

Browse files
authored
Merge pull request #9 from parca-dev/correlation-filter
Add correlation ID filter to track traced kernel launches
2 parents 1f208d4 + 6adeb17 commit 8b7bc8d

File tree

5 files changed

+470
-18
lines changed

5 files changed

+470
-18
lines changed

Dockerfile

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,13 @@ WORKDIR /build/cupti
3030
COPY --from=cuda12-headers /usr/local/cuda /usr/local/cuda
3131

3232
# Copy source code
33-
COPY cupti/cupti-prof.c cupti/CMakeLists.txt ./
33+
COPY cupti/cupti-prof.c cupti/correlation_filter.cpp cupti/correlation_filter.h cupti/CMakeLists.txt ./
3434

3535
# Build the library for CUDA 12
3636
ENV CUDA_ROOT=/usr/local/cuda
3737
RUN mkdir -p build && \
3838
cd build && \
39-
cmake -DCUDA_ROOT=${CUDA_ROOT} .. && \
39+
cmake -DCUDA_ROOT=${CUDA_ROOT} -DCMAKE_BUILD_TYPE=RelWithDebInfo .. && \
4040
make VERBOSE=1 && \
4141
mv libparcagpucupti.so libparcagpucupti.so.12
4242

@@ -58,7 +58,7 @@ WORKDIR /build/cupti
5858
COPY --from=cuda13-headers /usr/local/cuda /usr/local/cuda
5959

6060
# Copy source code
61-
COPY cupti/cupti-prof.c cupti/CMakeLists.txt ./
61+
COPY cupti/cupti-prof.c cupti/correlation_filter.cpp cupti/correlation_filter.h cupti/CMakeLists.txt ./
6262

6363
# Build the library for CUDA 13
6464
ENV CUDA_ROOT=/usr/local/cuda

cupti/CMakeLists.txt

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
cmake_minimum_required(VERSION 3.18)
2-
project(parcagpucupti C)
2+
project(parcagpucupti C CXX)
33

44
# CUDA root directory (can be overridden via -DCUDA_ROOT=...)
55
if(NOT DEFINED CUDA_ROOT)
@@ -20,18 +20,20 @@ set(CUDAToolkit_INCLUDE_DIRS
2020
# Add CUDA library directory to link directories
2121
link_directories(${CUDA_LIBDIR})
2222

23-
# Create shared library
24-
add_library(parcagpucupti SHARED cupti-prof.c)
23+
# Create shared library with both C and C++ sources
24+
add_library(parcagpucupti SHARED cupti-prof.c correlation_filter.cpp)
2525

2626
# Set properties
2727
set_target_properties(parcagpucupti PROPERTIES
2828
C_STANDARD 11
2929
C_STANDARD_REQUIRED ON
30+
CXX_STANDARD 17
31+
CXX_STANDARD_REQUIRED ON
3032
POSITION_INDEPENDENT_CODE ON
3133
)
3234

33-
# Add debug symbols
34-
target_compile_options(parcagpucupti PRIVATE -g)
35+
# Add debug symbols and disable C++ exceptions
36+
target_compile_options(parcagpucupti PRIVATE -g -fno-exceptions)
3537

3638
# Include directories
3739
target_include_directories(parcagpucupti PRIVATE

cupti/correlation_filter.cpp

Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
#include "correlation_filter.h"
2+
#include <unordered_set>
3+
#include <unordered_map>
4+
#include <mutex>
5+
#include <memory>
6+
7+
// CorrelationFilter implementation using std::unordered_set with mutex protection
8+
// This provides thread-safe access with minimal overhead for our use case
9+
class CorrelationFilter {
10+
public:
11+
CorrelationFilter() = default;
12+
13+
// Insert a correlation ID into the filter
14+
// Thread-safe
15+
void insert(uint32_t correlation_id) {
16+
std::lock_guard<std::mutex> lock(mutex_);
17+
set_.insert(correlation_id);
18+
}
19+
20+
// Check if correlation ID exists and remove it atomically
21+
// Returns true if found and removed, false if not found
22+
// Thread-safe
23+
bool check_and_remove(uint32_t correlation_id) {
24+
std::lock_guard<std::mutex> lock(mutex_);
25+
auto it = set_.find(correlation_id);
26+
if (it != set_.end()) {
27+
set_.erase(it);
28+
return true;
29+
}
30+
return false;
31+
}
32+
33+
// Get current size
34+
// Thread-safe
35+
size_t size() const {
36+
std::lock_guard<std::mutex> lock(mutex_);
37+
return set_.size();
38+
}
39+
40+
private:
41+
std::unordered_set<uint32_t> set_;
42+
mutable std::mutex mutex_;
43+
};
44+
45+
// GraphCorrelationMap implementation for tracking graph launches across buffer cycles
46+
// Uses a 2-slot state machine per correlation ID to detect when graph launches are complete
47+
struct GraphCorrelationEntry {
48+
uint8_t state[2]; // State for alternating cycles
49+
bool ever_seen_kernel; // True once we've seen at least one kernel activity
50+
uint32_t insertion_cycle; // Buffer cycle when entry was created (for fallback cleanup)
51+
52+
GraphCorrelationEntry(uint32_t cycle)
53+
: state{GRAPH_STATE_UNINITIALIZED, GRAPH_STATE_UNINITIALIZED}
54+
, ever_seen_kernel(false)
55+
, insertion_cycle(cycle) {}
56+
};
57+
58+
class GraphCorrelationMap {
59+
public:
60+
GraphCorrelationMap() : current_cycle_(0) {}
61+
62+
// Insert a correlation ID (called when sampling a graph launch)
63+
// Thread-safe
64+
void insert(uint32_t correlation_id) {
65+
std::lock_guard<std::mutex> lock(mutex_);
66+
map_.emplace(correlation_id, GraphCorrelationEntry(current_cycle_));
67+
}
68+
69+
// Start a new processing cycle - clear the appropriate slot for all entries
70+
// Thread-safe
71+
void cycle_start(uint32_t cycle) {
72+
std::lock_guard<std::mutex> lock(mutex_);
73+
current_cycle_ = cycle;
74+
uint32_t slot = cycle % 2;
75+
for (auto& pair : map_) {
76+
pair.second.state[slot] = GRAPH_STATE_CYCLE_CLEARED;
77+
}
78+
}
79+
80+
// Check if correlation ID is tracked and mark as seen for this cycle
81+
// Returns true if tracked (should fire probe)
82+
// Thread-safe
83+
bool check_and_mark_seen(uint32_t correlation_id, uint32_t cycle) {
84+
std::lock_guard<std::mutex> lock(mutex_);
85+
auto it = map_.find(correlation_id);
86+
if (it != map_.end()) {
87+
uint32_t slot = cycle % 2;
88+
it->second.state[slot] = GRAPH_STATE_KERNEL_SEEN;
89+
it->second.ever_seen_kernel = true; // Mark that we've seen at least one kernel
90+
return true;
91+
}
92+
return false;
93+
}
94+
95+
// End processing cycle - remove entries based on two conditions:
96+
// 1. Primary: Both slots CYCLE_CLEARED AND we've seen at least one kernel (graph completed)
97+
// 2. Fallback: Both slots CYCLE_CLEARED AND never seen kernel AND age > 100 cycles
98+
// (handles GPU reset, failed launches, etc.)
99+
// Thread-safe
100+
void cycle_end() {
101+
std::lock_guard<std::mutex> lock(mutex_);
102+
size_t removed_normal = 0;
103+
size_t removed_fallback = 0;
104+
105+
for (auto it = map_.begin(); it != map_.end(); ) {
106+
bool should_remove = false;
107+
bool is_fallback = false;
108+
109+
if (it->second.state[0] == GRAPH_STATE_CYCLE_CLEARED &&
110+
it->second.state[1] == GRAPH_STATE_CYCLE_CLEARED) {
111+
112+
if (it->second.ever_seen_kernel) {
113+
// Primary: Graph completed normally (saw kernels, then stopped)
114+
should_remove = true;
115+
removed_normal++;
116+
} else if ((current_cycle_ - it->second.insertion_cycle) > 100) {
117+
// Fallback: Never saw kernels and entry is very old (>100 cycles)
118+
// Prevents leaking entries when GPU resets or launches fail
119+
should_remove = true;
120+
is_fallback = true;
121+
removed_fallback++;
122+
}
123+
}
124+
125+
if (should_remove) {
126+
it = map_.erase(it);
127+
} else {
128+
++it;
129+
}
130+
}
131+
}
132+
133+
// Get cleanup stats (for debugging)
134+
void get_stats(size_t& size, size_t& oldest_age) const {
135+
std::lock_guard<std::mutex> lock(mutex_);
136+
size = map_.size();
137+
oldest_age = 0;
138+
for (const auto& pair : map_) {
139+
uint32_t age = current_cycle_ - pair.second.insertion_cycle;
140+
if (age > oldest_age) {
141+
oldest_age = age;
142+
}
143+
}
144+
}
145+
146+
// Get current size
147+
// Thread-safe
148+
size_t size() const {
149+
std::lock_guard<std::mutex> lock(mutex_);
150+
return map_.size();
151+
}
152+
153+
private:
154+
std::unordered_map<uint32_t, GraphCorrelationEntry> map_;
155+
uint32_t current_cycle_;
156+
mutable std::mutex mutex_;
157+
};
158+
159+
// C API implementation
160+
extern "C" {
161+
162+
CorrelationFilterHandle correlation_filter_create(void) {
163+
return new CorrelationFilter();
164+
}
165+
166+
void correlation_filter_destroy(CorrelationFilterHandle filter) {
167+
if (filter) {
168+
delete static_cast<CorrelationFilter*>(filter);
169+
}
170+
}
171+
172+
void correlation_filter_insert(CorrelationFilterHandle filter, uint32_t correlation_id) {
173+
if (filter) {
174+
static_cast<CorrelationFilter*>(filter)->insert(correlation_id);
175+
}
176+
}
177+
178+
bool correlation_filter_check_and_remove(CorrelationFilterHandle filter, uint32_t correlation_id) {
179+
if (filter) {
180+
return static_cast<CorrelationFilter*>(filter)->check_and_remove(correlation_id);
181+
}
182+
return false;
183+
}
184+
185+
size_t correlation_filter_size(CorrelationFilterHandle filter) {
186+
if (filter) {
187+
return static_cast<CorrelationFilter*>(filter)->size();
188+
}
189+
return 0;
190+
}
191+
192+
GraphCorrelationMapHandle graph_correlation_map_create(void) {
193+
return new GraphCorrelationMap();
194+
}
195+
196+
void graph_correlation_map_destroy(GraphCorrelationMapHandle map) {
197+
if (map) {
198+
delete static_cast<GraphCorrelationMap*>(map);
199+
}
200+
}
201+
202+
void graph_correlation_map_insert(GraphCorrelationMapHandle map, uint32_t correlation_id) {
203+
if (map) {
204+
static_cast<GraphCorrelationMap*>(map)->insert(correlation_id);
205+
}
206+
}
207+
208+
void graph_correlation_map_cycle_start(GraphCorrelationMapHandle map, uint32_t cycle) {
209+
if (map) {
210+
static_cast<GraphCorrelationMap*>(map)->cycle_start(cycle);
211+
}
212+
}
213+
214+
bool graph_correlation_map_check_and_mark_seen(GraphCorrelationMapHandle map, uint32_t correlation_id, uint32_t cycle) {
215+
if (map) {
216+
return static_cast<GraphCorrelationMap*>(map)->check_and_mark_seen(correlation_id, cycle);
217+
}
218+
return false;
219+
}
220+
221+
void graph_correlation_map_cycle_end(GraphCorrelationMapHandle map) {
222+
if (map) {
223+
static_cast<GraphCorrelationMap*>(map)->cycle_end();
224+
}
225+
}
226+
227+
size_t graph_correlation_map_size(GraphCorrelationMapHandle map) {
228+
if (map) {
229+
return static_cast<GraphCorrelationMap*>(map)->size();
230+
}
231+
return 0;
232+
}
233+
234+
void graph_correlation_map_get_stats(GraphCorrelationMapHandle map, size_t* size, size_t* oldest_age) {
235+
if (map && size && oldest_age) {
236+
static_cast<GraphCorrelationMap*>(map)->get_stats(*size, *oldest_age);
237+
}
238+
}
239+
240+
} // extern "C"

cupti/correlation_filter.h

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
#pragma once
2+
3+
#include <stdint.h>
4+
#include <stdbool.h>
5+
#include <stddef.h>
6+
7+
#ifdef __cplusplus
8+
extern "C" {
9+
#endif
10+
11+
// Opaque handle to correlation filter
12+
typedef void* CorrelationFilterHandle;
13+
14+
// Create a new correlation filter
15+
CorrelationFilterHandle correlation_filter_create(void);
16+
17+
// Destroy the correlation filter
18+
void correlation_filter_destroy(CorrelationFilterHandle filter);
19+
20+
// Insert a correlation ID into the filter
21+
// Thread-safe: can be called from multiple threads concurrently
22+
void correlation_filter_insert(CorrelationFilterHandle filter, uint32_t correlation_id);
23+
24+
// Check if a correlation ID exists and remove it if found
25+
// Returns true if the correlation ID was found and removed, false otherwise
26+
// Thread-safe: safe to call concurrently with inserts
27+
bool correlation_filter_check_and_remove(CorrelationFilterHandle filter, uint32_t correlation_id);
28+
29+
// Get the current size of the filter (number of tracked correlation IDs)
30+
// Note: This is an approximate count in concurrent scenarios
31+
size_t correlation_filter_size(CorrelationFilterHandle filter);
32+
33+
// Graph correlation state values
34+
enum GraphCorrelationState {
35+
GRAPH_STATE_UNINITIALIZED = 0, // Entry just created, slot not yet processed
36+
GRAPH_STATE_CYCLE_CLEARED = 1, // Cycle started, no kernels seen yet
37+
GRAPH_STATE_KERNEL_SEEN = 2 // At least one kernel seen this cycle
38+
};
39+
40+
// Opaque handle to graph correlation map
41+
typedef void* GraphCorrelationMapHandle;
42+
43+
// Create a new graph correlation map
44+
GraphCorrelationMapHandle graph_correlation_map_create(void);
45+
46+
// Destroy the graph correlation map
47+
void graph_correlation_map_destroy(GraphCorrelationMapHandle map);
48+
49+
// Insert a correlation ID into the map (called when sampling a graph launch)
50+
// Thread-safe: can be called from multiple threads concurrently
51+
void graph_correlation_map_insert(GraphCorrelationMapHandle map, uint32_t correlation_id);
52+
53+
// Start a new processing cycle - clears the appropriate slot for all entries
54+
// Thread-safe
55+
void graph_correlation_map_cycle_start(GraphCorrelationMapHandle map, uint32_t cycle);
56+
57+
// Check if correlation ID should fire probe and mark as seen for this cycle
58+
// Returns true if the correlation ID is tracked (should fire probe)
59+
// Thread-safe
60+
bool graph_correlation_map_check_and_mark_seen(GraphCorrelationMapHandle map, uint32_t correlation_id, uint32_t cycle);
61+
62+
// End processing cycle - removes entries that haven't seen kernels in 2 consecutive cycles
63+
// Thread-safe
64+
void graph_correlation_map_cycle_end(GraphCorrelationMapHandle map);
65+
66+
// Get the current size of the map (number of tracked correlation IDs)
67+
// Note: This is an approximate count in concurrent scenarios
68+
size_t graph_correlation_map_size(GraphCorrelationMapHandle map);
69+
70+
// Get statistics about the map (for debugging)
71+
// Returns the current size and age of the oldest entry (in cycles)
72+
void graph_correlation_map_get_stats(GraphCorrelationMapHandle map, size_t* size, size_t* oldest_age);
73+
74+
#ifdef __cplusplus
75+
}
76+
#endif

0 commit comments

Comments
 (0)