@@ -3,6 +3,7 @@ package containers
33import (
44 "github.com/coroot/coroot-node-agent/cgroup"
55 "github.com/coroot/coroot-node-agent/common"
6+ "github.com/coroot/coroot-node-agent/ebpftracer"
67 "github.com/coroot/coroot-node-agent/flags"
78 "github.com/coroot/coroot-node-agent/logs"
89 "github.com/coroot/coroot-node-agent/node"
@@ -14,6 +15,7 @@ import (
1415 "inet.af/netaddr"
1516 "k8s.io/klog/v2"
1617 "os"
18+ "strconv"
1719 "strings"
1820 "sync"
1921 "time"
@@ -57,6 +59,17 @@ type AddrPair struct {
5759 dst netaddr.IPPort
5860}
5961
62+ type ActiveConnection struct {
63+ ActualDest netaddr.IPPort
64+ Pid uint32
65+ Fd uint64
66+ }
67+
68+ type L7Stats struct {
69+ Requests * prometheus.CounterVec
70+ Latency prometheus.Histogram
71+ }
72+
6073type Container struct {
6174 cgroup * cgroup.Cgroup
6275 metadata * ContainerMetadata
@@ -73,11 +86,13 @@ type Container struct {
7386
7487 listens map [netaddr.IPPort ]map [uint32 ]time.Time // listen addr -> pid -> close time
7588
76- connectsSuccessful map [AddrPair ]int // dst:actual_dst -> count
77- connectsFailed map [netaddr.IPPort ]int // dst -> count
89+ connectsSuccessful map [AddrPair ]int64 // dst:actual_dst -> count
90+ connectsFailed map [netaddr.IPPort ]int64 // dst -> count
7891 connectLastAttempt map [netaddr.IPPort ]time.Time // dst -> time
79- connectionsActive map [AddrPair ]netaddr.IPPort // src:dst -> actual_dst
80- retransmits map [AddrPair ]int // dst:actual_dst -> count
92+ connectionsActive map [AddrPair ]ActiveConnection
93+ retransmits map [AddrPair ]int64 // dst:actual_dst -> count
94+
95+ l7Stats map [ebpftracer.L7Protocol ]map [AddrPair ]* L7Stats // protocol -> dst:actual_dst -> stats
8196
8297 oomKills int
8398
@@ -101,11 +116,12 @@ func NewContainer(cg *cgroup.Cgroup, md *ContainerMetadata) *Container {
101116
102117 listens : map [netaddr.IPPort ]map [uint32 ]time.Time {},
103118
104- connectsSuccessful : map [AddrPair ]int {},
105- connectsFailed : map [netaddr.IPPort ]int {},
119+ connectsSuccessful : map [AddrPair ]int64 {},
120+ connectsFailed : map [netaddr.IPPort ]int64 {},
106121 connectLastAttempt : map [netaddr.IPPort ]time.Time {},
107- connectionsActive : map [AddrPair ]netaddr.IPPort {},
108- retransmits : map [AddrPair ]int {},
122+ connectionsActive : map [AddrPair ]ActiveConnection {},
123+ retransmits : map [AddrPair ]int64 {},
124+ l7Stats : map [ebpftracer.L7Protocol ]map [AddrPair ]* L7Stats {},
109125
110126 mountIds : map [string ]struct {}{},
111127
@@ -147,6 +163,12 @@ func (c *Container) Describe(ch chan<- *prometheus.Desc) {
147163 for _ , m := range metricsList {
148164 ch <- m
149165 }
166+ for _ , protoStats := range c .l7Stats {
167+ for _ , s := range protoStats {
168+ s .Requests .Describe (ch )
169+ s .Latency .Describe (ch )
170+ }
171+ }
150172}
151173
152174func (c * Container ) Collect (ch chan <- prometheus.Metric ) {
@@ -242,8 +264,8 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
242264 }
243265
244266 connections := map [AddrPair ]int {}
245- for c , actualDst := range c .connectionsActive {
246- connections [AddrPair {src : c .dst , dst : actualDst }]++
267+ for c , conn := range c .connectionsActive {
268+ connections [AddrPair {src : c .dst , dst : conn . ActualDest }]++
247269 }
248270 for d , count := range connections {
249271 ch <- gauge (metrics .NetConnectionsActive , float64 (count ), d .src .String (), d .dst .String ())
@@ -271,7 +293,14 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
271293 ch <- gauge (metrics .ApplicationType , 1 , appType )
272294 }
273295
274- if ! * flags .NoPingUpstreams {
296+ for _ , protoStats := range c .l7Stats {
297+ for _ , s := range protoStats {
298+ s .Requests .Collect (ch )
299+ s .Latency .Collect (ch )
300+ }
301+ }
302+
303+ if ! * flags .DisablePinger {
275304 for ip , rtt := range c .ping (netNs ) {
276305 ch <- gauge (metrics .NetLatency , rtt , ip .String ())
277306 }
@@ -316,7 +345,7 @@ func (c *Container) onProcessExit(pid uint32, oomKill bool) {
316345 }
317346}
318347
319- func (c * Container ) onFileOpen (pid uint32 , fd uint32 ) {
348+ func (c * Container ) onFileOpen (pid uint32 , fd uint64 ) {
320349 mntId , logPath := resolveFd (pid , fd )
321350 c .lock .Lock ()
322351 defer c .lock .Unlock ()
@@ -349,7 +378,7 @@ func (c *Container) onListenClose(pid uint32, addr netaddr.IPPort) {
349378 }
350379}
351380
352- func (c * Container ) onConnectionOpen (pid uint32 , src , dst netaddr.IPPort , failed bool ) {
381+ func (c * Container ) onConnectionOpen (pid uint32 , fd uint64 , src , dst netaddr.IPPort , failed bool ) {
353382 if dst .IP ().IsLoopback () {
354383 netNs , err := proc .GetNetNs (pid )
355384 isHostNs := err == nil && hostNetNsId == netNs .UniqueId ()
@@ -376,7 +405,11 @@ func (c *Container) onConnectionOpen(pid uint32, src, dst netaddr.IPPort, failed
376405 } else {
377406 actualDst := ConntrackGetActualDestination (src , dst )
378407 c .connectsSuccessful [AddrPair {src : dst , dst : actualDst }]++
379- c .connectionsActive [AddrPair {src : src , dst : dst }] = actualDst
408+ c .connectionsActive [AddrPair {src : src , dst : dst }] = ActiveConnection {
409+ ActualDest : actualDst ,
410+ Pid : pid ,
411+ Fd : fd ,
412+ }
380413 }
381414 c .connectLastAttempt [dst ] = time .Now ()
382415}
@@ -391,14 +424,65 @@ func (c *Container) onConnectionClose(srcDst AddrPair) bool {
391424 return true
392425}
393426
427+ func (c * Container ) onL7Request (pid uint32 , fd uint64 , r * ebpftracer.L7Request ) {
428+ for dest , conn := range c .connectionsActive {
429+ if conn .Pid == pid && conn .Fd == fd {
430+ key := AddrPair {src : dest .dst , dst : conn .ActualDest }
431+ stats := c .l7Stats [r .Protocol ]
432+ if stats == nil {
433+ stats = map [AddrPair ]* L7Stats {}
434+ c .l7Stats [r .Protocol ] = stats
435+ }
436+ s := stats [key ]
437+ if s == nil {
438+ constLabels := map [string ]string {"destination" : key .src .String (), "actual_destination" : key .dst .String ()}
439+ cOpts , ok := L7Requests [r .Protocol ]
440+ if ! ok {
441+ klog .Warningln ("cannot find metric description for L7 protocol: %s" , r .Protocol .String ())
442+ return
443+ }
444+ hOpts , ok := L7Latency [r .Protocol ]
445+ if ! ok {
446+ klog .Warningln ("cannot find metric description for L7 protocol: %s" , r .Protocol .String ())
447+ return
448+ }
449+ s = & L7Stats {
450+ Requests : prometheus .NewCounterVec (
451+ prometheus.CounterOpts {Name : cOpts .Name , Help : cOpts .Help , ConstLabels : constLabels },
452+ []string {"status" },
453+ ),
454+ Latency : prometheus .NewHistogram (
455+ prometheus.HistogramOpts {Name : hOpts .Name , Help : hOpts .Help , ConstLabels : constLabels },
456+ ),
457+ }
458+ stats [key ] = s
459+ }
460+ status := ""
461+ switch r .Protocol {
462+ case ebpftracer .L7ProtocolHTTP :
463+ status = strconv .Itoa (r .Status )
464+ default :
465+ if r .Status == 500 {
466+ status = "failed"
467+ } else {
468+ status = "ok"
469+ }
470+ }
471+ s .Requests .WithLabelValues (status ).Inc ()
472+ s .Latency .Observe (r .Duration .Seconds ())
473+ return
474+ }
475+ }
476+ }
477+
394478func (c * Container ) onRetransmit (srcDst AddrPair ) bool {
395479 c .lock .Lock ()
396480 defer c .lock .Unlock ()
397- actualDst , ok := c .connectionsActive [srcDst ]
481+ conn , ok := c .connectionsActive [srcDst ]
398482 if ! ok {
399483 return false
400484 }
401- c .retransmits [AddrPair {src : srcDst .dst , dst : actualDst }]++
485+ c .retransmits [AddrPair {src : srcDst .dst , dst : conn . ActualDest }]++
402486 return true
403487}
404488
@@ -566,7 +650,7 @@ func (c *Container) ping(netNs netns.NsHandle) map[netaddr.IP]float64 {
566650}
567651
568652func (c * Container ) runLogParser (logPath string ) {
569- if * flags .NoParseLogs {
653+ if * flags .DisableLogParsing {
570654 return
571655 }
572656
@@ -667,6 +751,13 @@ func (c *Container) gc(now time.Time) {
667751 delete (c .retransmits , d )
668752 }
669753 }
754+ for _ , protoStats := range c .l7Stats {
755+ for d := range protoStats {
756+ if d .src == dst {
757+ delete (protoStats , d )
758+ }
759+ }
760+ }
670761 }
671762 }
672763}
@@ -738,7 +829,7 @@ func (c *Container) revalidateListens(now time.Time, actualListens map[netaddr.I
738829 }
739830}
740831
741- func resolveFd (pid uint32 , fd uint32 ) (mntId string , logPath string ) {
832+ func resolveFd (pid uint32 , fd uint64 ) (mntId string , logPath string ) {
742833 info := proc .GetFdInfo (pid , fd )
743834 if info == nil {
744835 return
0 commit comments