Skip to content

Commit d9c1a26

Browse files
gnurizenclaude
andcommitted
Add correlation ID filter to track traced kernel launches
Implement a thread-safe correlation ID filter to ensure activity records are only emitted for kernel launches that have been traced via callbacks. This prevents emitting activity records for kernels that were filtered out by the rate limiter or other filtering logic. The filter uses a C++ unordered_set wrapped with mutex protection and exposed via a C API. When a callback is fired for a kernel launch, the correlation ID is inserted into the filter. When the corresponding activity record arrives, it's checked against the filter - only activities whose correlation IDs are in the filter will fire the kernel_executed probe. Changes: - Add correlation_filter.cpp/h with thread-safe insert/check_and_remove API - Update cupti-prof.c to insert correlation IDs on callback and check on activity - Add debug output to trace callback and activity matching flow - Update CMakeLists.txt to build the correlation filter Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent 1f208d4 commit d9c1a26

File tree

4 files changed

+181
-15
lines changed

4 files changed

+181
-15
lines changed

cupti/CMakeLists.txt

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
cmake_minimum_required(VERSION 3.18)
2-
project(parcagpucupti C)
2+
project(parcagpucupti C CXX)
33

44
# CUDA root directory (can be overridden via -DCUDA_ROOT=...)
55
if(NOT DEFINED CUDA_ROOT)
@@ -20,18 +20,20 @@ set(CUDAToolkit_INCLUDE_DIRS
2020
# Add CUDA library directory to link directories
2121
link_directories(${CUDA_LIBDIR})
2222

23-
# Create shared library
24-
add_library(parcagpucupti SHARED cupti-prof.c)
23+
# Create shared library with both C and C++ sources
24+
add_library(parcagpucupti SHARED cupti-prof.c correlation_filter.cpp)
2525

2626
# Set properties
2727
set_target_properties(parcagpucupti PROPERTIES
2828
C_STANDARD 11
2929
C_STANDARD_REQUIRED ON
30+
CXX_STANDARD 17
31+
CXX_STANDARD_REQUIRED ON
3032
POSITION_INDEPENDENT_CODE ON
3133
)
3234

33-
# Add debug symbols
34-
target_compile_options(parcagpucupti PRIVATE -g)
35+
# Add debug symbols and disable C++ exceptions
36+
target_compile_options(parcagpucupti PRIVATE -g -fno-exceptions)
3537

3638
# Include directories
3739
target_include_directories(parcagpucupti PRIVATE

cupti/correlation_filter.cpp

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
#include "correlation_filter.h"
2+
#include <unordered_set>
3+
#include <mutex>
4+
#include <memory>
5+
6+
// CorrelationFilter implementation using std::unordered_set with mutex protection
7+
// This provides thread-safe access with minimal overhead for our use case
8+
class CorrelationFilter {
9+
public:
10+
CorrelationFilter() = default;
11+
12+
// Insert a correlation ID into the filter
13+
// Thread-safe
14+
void insert(uint32_t correlation_id) {
15+
std::lock_guard<std::mutex> lock(mutex_);
16+
set_.insert(correlation_id);
17+
}
18+
19+
// Check if correlation ID exists and remove it atomically
20+
// Returns true if found and removed, false if not found
21+
// Thread-safe
22+
bool check_and_remove(uint32_t correlation_id) {
23+
std::lock_guard<std::mutex> lock(mutex_);
24+
auto it = set_.find(correlation_id);
25+
if (it != set_.end()) {
26+
set_.erase(it);
27+
return true;
28+
}
29+
return false;
30+
}
31+
32+
// Get current size
33+
// Thread-safe
34+
size_t size() const {
35+
std::lock_guard<std::mutex> lock(mutex_);
36+
return set_.size();
37+
}
38+
39+
private:
40+
std::unordered_set<uint32_t> set_;
41+
mutable std::mutex mutex_;
42+
};
43+
44+
// C API implementation
45+
extern "C" {
46+
47+
CorrelationFilterHandle correlation_filter_create(void) {
48+
return new CorrelationFilter();
49+
}
50+
51+
void correlation_filter_destroy(CorrelationFilterHandle filter) {
52+
if (filter) {
53+
delete static_cast<CorrelationFilter*>(filter);
54+
}
55+
}
56+
57+
void correlation_filter_insert(CorrelationFilterHandle filter, uint32_t correlation_id) {
58+
if (filter) {
59+
static_cast<CorrelationFilter*>(filter)->insert(correlation_id);
60+
}
61+
}
62+
63+
bool correlation_filter_check_and_remove(CorrelationFilterHandle filter, uint32_t correlation_id) {
64+
if (filter) {
65+
return static_cast<CorrelationFilter*>(filter)->check_and_remove(correlation_id);
66+
}
67+
return false;
68+
}
69+
70+
size_t correlation_filter_size(CorrelationFilterHandle filter) {
71+
if (filter) {
72+
return static_cast<CorrelationFilter*>(filter)->size();
73+
}
74+
return 0;
75+
}
76+
77+
} // extern "C"

cupti/correlation_filter.h

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
#pragma once
2+
3+
#include <stdint.h>
4+
#include <stdbool.h>
5+
#include <stddef.h>
6+
7+
#ifdef __cplusplus
8+
extern "C" {
9+
#endif
10+
11+
// Opaque handle to correlation filter
12+
typedef void* CorrelationFilterHandle;
13+
14+
// Create a new correlation filter
15+
CorrelationFilterHandle correlation_filter_create(void);
16+
17+
// Destroy the correlation filter
18+
void correlation_filter_destroy(CorrelationFilterHandle filter);
19+
20+
// Insert a correlation ID into the filter
21+
// Thread-safe: can be called from multiple threads concurrently
22+
void correlation_filter_insert(CorrelationFilterHandle filter, uint32_t correlation_id);
23+
24+
// Check if a correlation ID exists and remove it if found
25+
// Returns true if the correlation ID was found and removed, false otherwise
26+
// Thread-safe: safe to call concurrently with inserts
27+
bool correlation_filter_check_and_remove(CorrelationFilterHandle filter, uint32_t correlation_id);
28+
29+
// Get the current size of the filter (number of tracked correlation IDs)
30+
// Note: This is an approximate count in concurrent scenarios
31+
size_t correlation_filter_size(CorrelationFilterHandle filter);
32+
33+
#ifdef __cplusplus
34+
}
35+
#endif

cupti/cupti-prof.c

Lines changed: 62 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111

1212
#include <cupti.h>
1313

14+
#include "correlation_filter.h"
15+
1416
// Debug logging control
1517
static bool debug_enabled = false;
1618

@@ -61,6 +63,9 @@ static bool rateLimiterTryAcquire(uint64_t nowNs) {
6163
return false;
6264
}
6365

66+
// Correlation ID filter
67+
static CorrelationFilterHandle correlationFilter = NULL;
68+
6469
static void init_debug(void) {
6570
static bool initialized = false;
6671
if (!initialized) {
@@ -200,6 +205,14 @@ int InitializeInjection(void) {
200205
DEBUG_PRINTF("[CUPTI] Enabled CONCURRENT_KERNEL activity\n");
201206
}
202207

208+
// Create correlation filter
209+
correlationFilter = correlation_filter_create();
210+
if (correlationFilter) {
211+
DEBUG_PRINTF("[CUPTI] Correlation filter created and enabled\n");
212+
} else {
213+
fprintf(stderr, "[CUPTI] Warning: Failed to create correlation filter\n");
214+
}
215+
203216
atexit(cleanup);
204217

205218
DEBUG_PRINTF("[CUPTI] Successfully initialized CUPTI callbacks with external "
@@ -236,13 +249,17 @@ static void parcagpuCuptiCallback(void *userdata, CUpti_CallbackDomain domain,
236249
if (domain == CUPTI_CB_DOMAIN_RUNTIME_API &&
237250
cbdata->callbackSite == CUPTI_API_ENTER) {
238251
runtimeEnterCorrelationId = correlationId;
252+
DEBUG_PRINTF("[CUPTI] Runtime API ENTER: correlationId=%u\n", correlationId);
239253
return;
240254
}
241255

242256
// We hook on EXIT because that makes our probe overhead not add to GPU
243257
// launch latency and hopefully covers some of the overhead in the shadow of
244258
// GPU async work.
245259
if (cbdata->callbackSite != CUPTI_API_EXIT) {
260+
if (cbdata->callbackSite == CUPTI_API_ENTER && domain == CUPTI_CB_DOMAIN_DRIVER_API) {
261+
DEBUG_PRINTF("[CUPTI] Driver API ENTER: correlationId=%u (will check on EXIT)\n", correlationId);
262+
}
246263
return;
247264
}
248265

@@ -253,22 +270,22 @@ static void parcagpuCuptiCallback(void *userdata, CUpti_CallbackDomain domain,
253270
if (domain == CUPTI_CB_DOMAIN_DRIVER_API) {
254271
// Skip if this driver call is under a runtime call (same correlation ID)
255272
if (correlationId == runtimeEnterCorrelationId) {
256-
DEBUG_PRINTF("[CUPTI] Skipping driver EXIT correlationId=%u - runtime "
273+
DEBUG_PRINTF("[CUPTI] Skipping driver EXIT correlationId=%u (runtimeEnter=%u) - runtime "
257274
"will handle\n",
258-
correlationId);
275+
correlationId, runtimeEnterCorrelationId);
259276
return;
260277
}
261278
// Pure driver call (no runtime wrapper) - use negative cbid
262279
signedCbid = -(int)cbid;
263280
DEBUG_PRINTF(
264-
"[CUPTI] Driver API callback: cbid=%d, correlationId=%u, func=%s\n",
265-
cbid, correlationId, name);
281+
"[CUPTI] Driver API EXIT callback: cbid=%d, correlationId=%u, runtimeEnter=%u, func=%s\n",
282+
cbid, correlationId, runtimeEnterCorrelationId, name);
266283
} else if (domain == CUPTI_CB_DOMAIN_RUNTIME_API) {
267284
signedCbid = (int)cbid;
268-
runtimeEnterCorrelationId = 0; // Clear after use
269285
DEBUG_PRINTF(
270-
"[CUPTI] Runtime API callback: cbid=%d, correlationId=%u, func=%s\n",
271-
cbid, correlationId, name);
286+
"[CUPTI] Runtime API EXIT callback: cbid=%d, correlationId=%u, runtimeEnter=%u, func=%s\n",
287+
cbid, correlationId, runtimeEnterCorrelationId, name);
288+
runtimeEnterCorrelationId = 0; // Clear after use
272289
} else {
273290
return;
274291
}
@@ -286,6 +303,14 @@ static void parcagpuCuptiCallback(void *userdata, CUpti_CallbackDomain domain,
286303

287304
outstandingEvents++;
288305
DTRACE_PROBE3(parcagpu, cuda_correlation, correlationId, signedCbid, name);
306+
307+
// Insert correlation ID into filter so we can match it later in activity buffer
308+
if (correlationFilter) {
309+
correlation_filter_insert(correlationFilter, correlationId);
310+
DEBUG_PRINTF("[CUPTI] Inserted correlationId=%u into filter (size=%zu)\n",
311+
correlationId, correlation_filter_size(correlationFilter));
312+
}
313+
289314
// If we let too many events pile up it overwhelms the perf_event buffers,
290315
// just another reason to explore just passing the activity buffer through to
291316
// eBPF.
@@ -342,9 +367,26 @@ static void parcagpuBufferCompleted(CUcontext ctx, uint32_t streamId,
342367
k->graphId, k->graphNodeId, k->name, k->correlationId,
343368
k->deviceId, k->streamId, k->start, k->end,
344369
k->end - k->start);
345-
DTRACE_PROBE8(parcagpu, kernel_executed, k->start, k->end,
346-
k->correlationId, k->deviceId, k->streamId, k->graphId,
347-
k->graphNodeId, k->name);
370+
371+
// Check if this correlation ID is in our filter
372+
bool should_fire = true;
373+
if (correlationFilter) {
374+
should_fire = correlation_filter_check_and_remove(correlationFilter, k->correlationId);
375+
if (!should_fire) {
376+
DEBUG_PRINTF("[CUPTI] Filtered out correlationId=%u (not tracked)\n",
377+
k->correlationId);
378+
} else {
379+
DEBUG_PRINTF("[CUPTI] Matched correlationId=%u - firing kernel_executed (filter size=%zu)\n",
380+
k->correlationId, correlation_filter_size(correlationFilter));
381+
}
382+
}
383+
384+
// Only fire probe if correlation ID was tracked (or filter disabled)
385+
if (should_fire) {
386+
DTRACE_PROBE8(parcagpu, kernel_executed, k->start, k->end,
387+
k->correlationId, k->deviceId, k->streamId, k->graphId,
388+
k->graphNodeId, k->name);
389+
}
348390
}
349391
}
350392

@@ -388,5 +430,15 @@ void cleanup(void) {
388430
subscriber = 0;
389431
}
390432

433+
// Destroy correlation filter
434+
if (correlationFilter) {
435+
size_t remaining = correlation_filter_size(correlationFilter);
436+
if (remaining > 0) {
437+
DEBUG_PRINTF("[CUPTI] Warning: %zu correlation IDs still in filter at cleanup\n", remaining);
438+
}
439+
correlation_filter_destroy(correlationFilter);
440+
correlationFilter = NULL;
441+
}
442+
391443
DEBUG_PRINTF("[CUPTI] Cleanup completed\n");
392444
}

0 commit comments

Comments
 (0)