Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 22 additions & 5 deletions containers/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,9 @@ type Container struct {

gpuStats map[string]*GpuUsage

oomKills int
pythonThreadLockWaitTime time.Duration
nodejsStats *ebpftracer.NodejsStats
oomKills int
nodejsStats *ebpftracer.NodejsStats
pythonStats *ebpftracer.PythonStats

mounts map[string]proc.MountInfo
seenMounts map[uint64]struct{}
Expand Down Expand Up @@ -399,8 +399,8 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
for appType := range appTypes {
ch <- gauge(metrics.ApplicationType, 1, appType)
}
if c.pythonThreadLockWaitTime > 0 {
ch <- counter(metrics.PythonThreadLockWaitTime, c.pythonThreadLockWaitTime.Seconds())
if c.pythonStats != nil {
ch <- counter(metrics.PythonThreadLockWaitTime, c.pythonStats.ThreadLockWaitTime.Seconds())
}
if c.nodejsStats != nil {
ch <- counter(metrics.NodejsEventLoopBlockedTime, c.nodejsStats.EventLoopBlockedTime.Seconds())
Expand Down Expand Up @@ -850,6 +850,23 @@ func (c *Container) updateNodejsStats(s NodejsStatsUpdate) {
p.nodejsPrevStats = &s.Stats
}

func (c *Container) updatePythonStats(s PythonStatsUpdate) {
c.lock.Lock()
defer c.lock.Unlock()

p := c.processes[s.Pid]
if p == nil || p.pythonPrevStats == nil {
return
}
if delta := s.Stats.ThreadLockWaitTime - p.pythonPrevStats.ThreadLockWaitTime; delta > 0 {
if c.pythonStats == nil {
c.pythonStats = &ebpftracer.PythonStats{}
}
c.pythonStats.ThreadLockWaitTime += delta
}
p.pythonPrevStats = &s.Stats
}

func (c *Container) getMounts() map[string]map[string]*proc.FSStat {
if len(c.mounts) == 0 {
return nil
Expand Down
2 changes: 2 additions & 0 deletions containers/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type Process struct {
pythonGilChecked bool
nodejsChecked bool
nodejsPrevStats *ebpftracer.NodejsStats
pythonPrevStats *ebpftracer.PythonStats

gpuUsageSamples []gpu.ProcessUsageSample
}
Expand Down Expand Up @@ -113,6 +114,7 @@ func (p *Process) instrumentPython(cmdline []byte, tracer *ebpftracer.Tracer) {
if !pythonCmd.Match(cmd) {
return
}
p.pythonPrevStats = &ebpftracer.PythonStats{}
p.uprobes = append(p.uprobes, tracer.AttachPythonThreadLockProbes(p.Pid)...)
}

Expand Down
34 changes: 30 additions & 4 deletions containers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type Registry struct {
ebpfStatsLock sync.Mutex
trafficStatsUpdateCh chan *TrafficStatsUpdate
nodejsStatsUpdateCh chan *NodejsStatsUpdate
pythonStatsUpdateCh chan *PythonStatsUpdate

gpuProcessUsageSampleChan chan gpu.ProcessUsageSample
}
Expand Down Expand Up @@ -119,6 +120,7 @@ func NewRegistry(reg prometheus.Registerer, processInfoCh chan<- ProcessInfo, gp

trafficStatsUpdateCh: make(chan *TrafficStatsUpdate),
nodejsStatsUpdateCh: make(chan *NodejsStatsUpdate),
pythonStatsUpdateCh: make(chan *PythonStatsUpdate),

gpuProcessUsageSampleChan: gpuProcessUsageSampleChan,
}
Expand Down Expand Up @@ -220,6 +222,13 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
if c := r.containersByPid[u.Pid]; c != nil {
c.updateNodejsStats(*u)
}
case u := <-r.pythonStatsUpdateCh:
if u == nil {
continue
}
if c := r.containersByPid[u.Pid]; c != nil {
c.updatePythonStats(*u)
}
case sample := <-r.gpuProcessUsageSampleChan:
if c := r.containersByPid[sample.Pid]; c != nil {
if p := c.processes[sample.Pid]; p != nil {
Expand Down Expand Up @@ -306,10 +315,6 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
}
r.ip2fqdnLock.Unlock()
}
case ebpftracer.EventTypePythonThreadLock:
if c := r.containersByPid[e.Pid]; c != nil {
c.pythonThreadLockWaitTime += e.Duration
}
}
}
}
Expand Down Expand Up @@ -410,6 +415,7 @@ func (r *Registry) updateStatsFromEbpfMapsIfNecessary() {

r.updateTrafficStats()
r.updateNodejsStats()
r.updatePythonStats()

r.ebpfStatsLastUpdated = time.Now()
}
Expand Down Expand Up @@ -447,6 +453,21 @@ func (r *Registry) updateNodejsStats() {
r.nodejsStatsUpdateCh <- nil
}

func (r *Registry) updatePythonStats() {
iter := r.tracer.PythonStatsIterator()
var pid uint64
stats := ebpftracer.PythonStats{}

for iter.Next(&pid, &stats) {
r.pythonStatsUpdateCh <- &PythonStatsUpdate{Pid: uint32(pid), Stats: stats}
}

if err := iter.Err(); err != nil {
klog.Warningln(err)
}
r.pythonStatsUpdateCh <- nil
}

func (r *Registry) getDomain(ip netaddr.IP) *common.Domain {
r.ip2fqdnLock.RLock()
defer r.ip2fqdnLock.RUnlock()
Expand Down Expand Up @@ -563,3 +584,8 @@ type NodejsStatsUpdate struct {
Pid uint32
Stats ebpftracer.NodejsStats
}

type PythonStatsUpdate struct {
Pid uint32
Stats ebpftracer.PythonStats
}
20 changes: 10 additions & 10 deletions ebpftracer/ebpf.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion ebpftracer/ebpf/ebpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ struct trace_event_raw_sys_exit__stub {
};

#include "nodejs.c"
#include "python.c"
#include "proc.c"
#include "file.c"
#include "tcp/conntrack.c"
Expand All @@ -48,6 +49,5 @@ struct trace_event_raw_sys_exit__stub {
#include "l7/l7.c"
#include "l7/gotls.c"
#include "l7/openssl.c"
#include "python.c"

char _license[] SEC("license") = "GPL";
1 change: 1 addition & 0 deletions ebpftracer/ebpf/proc.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ int sched_process_exit(struct trace_event_raw_sched_process_template__stub *args
return 0;
}

bpf_map_delete_elem(&python_stats, &pid);
bpf_map_delete_elem(&nodejs_stats, &pid);
bpf_map_delete_elem(&nodejs_prev_event_loop_iter, &pid);
bpf_map_delete_elem(&nodejs_current_io_cb, &pid);
Expand Down
39 changes: 23 additions & 16 deletions ebpftracer/ebpf/python.c
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
struct python_proc_stats {
__u64 thread_lock_wait_time;
};

struct {
__uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
__uint(key_size, sizeof(int));
__uint(value_size, sizeof(int));
} python_thread_events SEC(".maps");
__uint(type, BPF_MAP_TYPE_HASH);
__uint(key_size, sizeof(__u64));
__uint(value_size, sizeof(struct python_proc_stats));
__uint(max_entries, 10240);
} python_stats SEC(".maps");


struct {
__uint(type, BPF_MAP_TYPE_HASH);
Expand All @@ -19,24 +25,25 @@ int pthread_cond_timedwait_enter(struct pt_regs *ctx) {
return 0;
}

struct python_thread_event {
__u32 type;
__u32 pid;
__u64 duration;
};

SEC("uprobe/pthread_cond_timedwait_exit")
int pthread_cond_timedwait_exit(struct pt_regs *ctx) {
__u64 pid_tgid = bpf_get_current_pid_tgid();
__u64 *timestamp = bpf_map_lookup_elem(&python_thread_locks, &pid_tgid);
if (!timestamp) {
return 0;
}
struct python_thread_event e = {
.type = EVENT_TYPE_PYTHON_THREAD_LOCK,
.pid = pid_tgid >> 32,
.duration = bpf_ktime_get_ns()-*timestamp,
};
bpf_perf_event_output(ctx, &python_thread_events, BPF_F_CURRENT_CPU, &e, sizeof(e));
__u64 duration = bpf_ktime_get_ns() - *timestamp;
bpf_map_delete_elem(&python_thread_locks, &pid_tgid);
__u64 pid = pid_tgid >> 32;
struct python_proc_stats *stats = bpf_map_lookup_elem(&python_stats, &pid);
if (!stats) {
struct python_proc_stats s = {};
bpf_map_update_elem(&python_stats, &pid, &s, BPF_ANY);
stats = bpf_map_lookup_elem(&python_stats, &pid);
if (!stats) {
return 0;
}
}
__sync_fetch_and_add(&stats->thread_lock_wait_time, duration);
return 0;
}
56 changes: 22 additions & 34 deletions ebpftracer/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,16 @@ type EventType uint32
type EventReason uint32

const (
EventTypeProcessStart EventType = 1
EventTypeProcessExit EventType = 2
EventTypeConnectionOpen EventType = 3
EventTypeConnectionClose EventType = 4
EventTypeConnectionError EventType = 5
EventTypeListenOpen EventType = 6
EventTypeListenClose EventType = 7
EventTypeFileOpen EventType = 8
EventTypeTCPRetransmit EventType = 9
EventTypeL7Request EventType = 10
EventTypePythonThreadLock EventType = 11
EventTypeProcessStart EventType = 1
EventTypeProcessExit EventType = 2
EventTypeConnectionOpen EventType = 3
EventTypeConnectionClose EventType = 4
EventTypeConnectionError EventType = 5
EventTypeListenOpen EventType = 6
EventTypeListenClose EventType = 7
EventTypeFileOpen EventType = 8
EventTypeTCPRetransmit EventType = 9
EventTypeL7Request EventType = 10

EventReasonNone EventReason = 0
EventReasonOOMKill EventReason = 1
Expand Down Expand Up @@ -73,11 +72,10 @@ type Event struct {
type perfMapType uint8

const (
perfMapTypeProcEvents perfMapType = 1
perfMapTypeTCPEvents perfMapType = 2
perfMapTypeFileEvents perfMapType = 3
perfMapTypeL7Events perfMapType = 4
perfMapTypePythonThreadEvents perfMapType = 5
perfMapTypeProcEvents perfMapType = 1
perfMapTypeTCPEvents perfMapType = 2
perfMapTypeFileEvents perfMapType = 3
perfMapTypeL7Events perfMapType = 4
)

type Tracer struct {
Expand Down Expand Up @@ -139,10 +137,18 @@ func (t *Tracer) NodejsStatsIterator() *ebpf.MapIterator {
return t.collection.Maps["nodejs_stats"].Iterate()
}

func (t *Tracer) PythonStatsIterator() *ebpf.MapIterator {
return t.collection.Maps["python_stats"].Iterate()
}

type NodejsStats struct {
EventLoopBlockedTime time.Duration
}

type PythonStats struct {
ThreadLockWaitTime time.Duration
}

type ConnectionId struct {
FD uint64
PID uint32
Expand Down Expand Up @@ -230,7 +236,6 @@ func (t *Tracer) ebpf(ch chan<- Event) error {
{name: "tcp_connect_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 8, readTimeout: 10 * time.Millisecond},
{name: "tcp_retransmit_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4},
{name: "file_events", typ: perfMapTypeFileEvents, perCPUBufferSizePages: 4},
{name: "python_thread_events", typ: perfMapTypePythonThreadEvents, perCPUBufferSizePages: 4},
}

if !t.disableL7Tracing {
Expand Down Expand Up @@ -365,12 +370,6 @@ type l7Event struct {
PayloadSize uint64
}

type pythonThreadEvent struct {
Type EventType
Pid uint32
Duration uint64
}

func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapType, readTimeout time.Duration) {
if readTimeout == 0 {
readTimeout = 100 * time.Millisecond
Expand Down Expand Up @@ -450,17 +449,6 @@ func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapTy
BytesReceived: v.BytesReceived,
}
}
case perfMapTypePythonThreadEvents:
v := &pythonThreadEvent{}
if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
klog.Warningln("failed to read msg:", err)
continue
}
event = Event{
Type: v.Type,
Pid: v.Pid,
Duration: time.Duration(v.Duration),
}
default:
continue
}
Expand Down
Loading