@@ -19,34 +19,49 @@ import (
1919 "context"
2020 "encoding/binary"
2121 "fmt"
22- "time"
22+ "reflect"
23+ "sync/atomic"
2324
2425 "huatuo-bamai/internal/bpf"
25- "huatuo-bamai/internal/log"
2626 "huatuo-bamai/internal/pod"
27+ "huatuo-bamai/pkg/metric"
28+ "huatuo-bamai/pkg/tracing"
2729)
2830
2931//go:generate $BPF_COMPILE $BPF_INCLUDE -s $BPF_DIR/runqlat_tracing.c -o $BPF_DIR/runqlat_tracing.o
3032
3133type latencyBpfData struct {
3234 NumVoluntarySwitch uint64
3335 NumInVoluntarySwitch uint64
34- NumLatency01 uint64
35- NumLatency02 uint64
36- NumLatency03 uint64
37- NumLatency04 uint64
36+ NumLatencyZone0 uint64
37+ NumLatencyZone1 uint64
38+ NumLatencyZone2 uint64
39+ NumLatencyZone3 uint64
3840}
3941
40- var (
41- globalRunqlat latencyBpfData
42- runqlatRunning bool
43- )
42+ type runqlatCollector struct {
43+ running atomic.Bool
44+ bpf bpf.BPF
45+ runqlatHost latencyBpfData
46+ }
47+
48+ func init () {
49+ tracing .RegisterEventTracing ("runqlat" , newRunqlatCollector )
50+ _ = pod .RegisterContainerLifeResources ("runqlat" , reflect .TypeOf (& latencyBpfData {}))
51+ }
52+
53+ func newRunqlatCollector () (* tracing.EventTracingAttr , error ) {
54+ return & tracing.EventTracingAttr {
55+ TracingData : & runqlatCollector {},
56+ Interval : 10 ,
57+ Flag : tracing .FlagTracing | tracing .FlagMetric ,
58+ }, nil
59+ }
4460
45- func startRunqlatTracerWork (ctx context.Context ) error {
46- // load bpf.
61+ func (c * runqlatCollector ) Start (ctx context.Context ) error {
4762 b , err := bpf .LoadBpf (bpf .ThisBpfOBJ (), nil )
4863 if err != nil {
49- return fmt . Errorf ( "load bpf: %w" , err )
64+ return err
5065 }
5166 defer b .Close ()
5267
@@ -59,67 +74,88 @@ func startRunqlatTracerWork(ctx context.Context) error {
5974
6075 b .WaitDetachByBreaker (childCtx , cancel )
6176
62- runqlatRunning = true
63-
64- for {
65- select {
66- case <- ctx .Done ():
67- return nil
68- default :
69- var css uint64
70-
71- containers , err := pod .NormalContainers ()
72- if err != nil {
73- return fmt .Errorf ("get normal containers: %w" , err )
74- }
75- cssToContainer := pod .BuildCssContainers (containers , pod .SubSysCPU )
76-
77- items , err := b .DumpMapByName ("cpu_tg_metric" )
78- if err != nil {
79- return fmt .Errorf ("failed to dump cpu_tg_metric: %w" , err )
80- }
81- for _ , v := range items {
82- buf := bytes .NewReader (v .Key )
83- if err = binary .Read (buf , binary .LittleEndian , & css ); err != nil {
84- return fmt .Errorf ("can't read cpu_tg_metric key: %w" , err )
85- }
86- container := cssToContainer [css ]
87- if container == nil {
88- continue
89- }
90-
91- buf = bytes .NewReader (v .Value )
92- if err = binary .Read (buf , binary .LittleEndian , container .LifeResouces ("runqlat" ).(* latencyBpfData )); err != nil {
93- return fmt .Errorf ("can't read cpu_tg_metric value: %w" , err )
94- }
95- }
96-
97- item , err := b .ReadMap (b .MapIDByName ("cpu_host_metric" ), []byte {0 , 0 , 0 , 0 })
98- if err != nil {
99- return fmt .Errorf ("failed to read cpu_host_metric: %w" , err )
100- }
101- buf := bytes .NewReader (item )
102- if err = binary .Read (buf , binary .LittleEndian , & globalRunqlat ); err != nil {
103- log .Errorf ("can't read cpu_host_metric: %v" , err )
104- return err
105- }
106-
107- time .Sleep (2 * time .Second )
77+ c .bpf = b
78+ c .running .Store (true )
79+
80+ // wait stop
81+ <- childCtx .Done ()
82+ c .running .Store (false )
83+ return nil
84+ }
85+
86+ func (c * runqlatCollector ) updateContainerDataCache (cssContainers map [uint64 ]* pod.Container ) error {
87+ items , err := c .bpf .DumpMapByName ("cpu_tg_metric" )
88+ if err != nil {
89+ return fmt .Errorf ("dump bpf map, %w" , err )
90+ }
91+
92+ var css uint64
93+
94+ for _ , v := range items {
95+ buf := bytes .NewReader (v .Key )
96+
97+ if err := binary .Read (buf , binary .LittleEndian , & css ); err != nil {
98+ return fmt .Errorf ("read cpu_tg_metric key: %w" , err )
99+ }
100+
101+ container , ok := cssContainers [css ]
102+ if ! ok {
103+ continue
104+ }
105+
106+ buf = bytes .NewReader (v .Value )
107+ if err := binary .Read (buf , binary .LittleEndian , container .LifeResouces ("runqlat" ).(* latencyBpfData )); err != nil {
108+ return fmt .Errorf ("read cpu_tg_metric value: %w" , err )
108109 }
109110 }
111+
112+ return nil
110113}
111114
112- // Start runqlat work, load bpf and wait data form perfevent
113- func (c * runqlatCollector ) Start (ctx context.Context ) error {
114- err := startRunqlatTracerWork (ctx )
115+ func (c * runqlatCollector ) fetchHostRunqlat () []* metric.Data {
116+ item , err := c .bpf .ReadMap (c .bpf .MapIDByName ("cpu_host_metric" ), []byte {0 , 0 , 0 , 0 })
117+ if err != nil {
118+ return nil
119+ }
115120
116- containers , _ := pod .ContainersByType (pod .ContainerTypeNormal )
117- for _ , container := range containers {
118- runqlatData := container .LifeResouces ("runqlat" ).(* latencyBpfData )
119- * runqlatData = latencyBpfData {}
121+ buf := bytes .NewReader (item )
122+ if err = binary .Read (buf , binary .LittleEndian , & c .runqlatHost ); err != nil {
123+ return nil
120124 }
121125
122- runqlatRunning = false
126+ return []* metric.Data {
127+ metric .NewGaugeData ("latency" , float64 (c .runqlatHost .NumLatencyZone0 ), "cpu run queue latency for the host" , map [string ]string {"zone" : "0" }),
128+ metric .NewGaugeData ("latency" , float64 (c .runqlatHost .NumLatencyZone1 ), "cpu run queue latency for the host" , map [string ]string {"zone" : "1" }),
129+ metric .NewGaugeData ("latency" , float64 (c .runqlatHost .NumLatencyZone2 ), "cpu run queue latency for the host" , map [string ]string {"zone" : "2" }),
130+ metric .NewGaugeData ("latency" , float64 (c .runqlatHost .NumLatencyZone3 ), "cpu run queue latency for the host" , map [string ]string {"zone" : "3" }),
131+ }
132+ }
133+
134+ func (c * runqlatCollector ) Update () ([]* metric.Data , error ) {
135+ if ! c .running .Load () {
136+ return nil , nil
137+ }
138+
139+ containers , err := pod .ContainersByType (pod .ContainerTypeNormal )
140+ if err != nil {
141+ return nil , err
142+ }
143+
144+ cssContainer := pod .BuildCssContainers (containers , pod .SubSysCPU )
145+
146+ // update all containers cache data
147+ _ = c .updateContainerDataCache (cssContainer )
148+
149+ data := []* metric.Data {}
150+ for _ , container := range containers {
151+ cache := container .LifeResouces ("runqlat" ).(* latencyBpfData )
152+
153+ data = append (data ,
154+ metric .NewContainerGaugeData (container , "latency" , float64 (cache .NumLatencyZone0 ), "cpu run queue latency for the containers" , map [string ]string {"zone" : "0" }),
155+ metric .NewContainerGaugeData (container , "latency" , float64 (cache .NumLatencyZone1 ), "cpu run queue latency for the containers" , map [string ]string {"zone" : "1" }),
156+ metric .NewContainerGaugeData (container , "latency" , float64 (cache .NumLatencyZone2 ), "cpu run queue latency for the containers" , map [string ]string {"zone" : "2" }),
157+ metric .NewContainerGaugeData (container , "latency" , float64 (cache .NumLatencyZone3 ), "cpu run queue latency for the containers" , map [string ]string {"zone" : "3" }))
158+ }
123159
124- return err
160+ return append ( data , c . fetchHostRunqlat () ... ), nil
125161}
0 commit comments