Skip to content

Commit c96452b

Browse files
committed
try to determine the actual connection destination using the conntrack table in the container net namespace
1 parent cc34909 commit c96452b

File tree

4 files changed

+88
-44
lines changed

4 files changed

+88
-44
lines changed

containers/conntrack.go

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,29 +3,25 @@ package containers
33
import (
44
"github.com/coroot/coroot-node-agent/common"
55
"github.com/florianl/go-conntrack"
6+
"github.com/vishvananda/netns"
67
"inet.af/netaddr"
78
"k8s.io/klog/v2"
89
"syscall"
910
)
1011

11-
var (
12-
conntrackClient *conntrack.Nfct
13-
)
12+
type Conntrack struct {
13+
client *conntrack.Nfct
14+
}
1415

15-
func ConntrackInit() error {
16-
c, err := conntrack.Open(&conntrack.Config{})
16+
func NewConntrack(netNs netns.NsHandle) (*Conntrack, error) {
17+
c, err := conntrack.Open(&conntrack.Config{NetNS: int(netNs)})
1718
if err != nil {
18-
return err
19+
return nil, err
1920
}
20-
conntrackClient = c
21-
return nil
21+
return &Conntrack{client: c}, nil
2222
}
2323

24-
func ConntrackGetActualDestination(src, dst netaddr.IPPort) netaddr.IPPort {
25-
if conntrackClient == nil {
26-
return dst
27-
}
28-
24+
func (c *Conntrack) GetActualDestination(src, dst netaddr.IPPort) *netaddr.IPPort {
2925
tcp := uint8(syscall.IPPROTO_TCP)
3026
sip := src.IP().IPAddr().IP
3127
dip := dst.IP().IPAddr().IP
@@ -47,12 +43,12 @@ func ConntrackGetActualDestination(src, dst netaddr.IPPort) netaddr.IPPort {
4743
if dst.IP().Is6() {
4844
family = conntrack.IPv6
4945
}
50-
sessions, err := conntrackClient.Get(conntrack.Conntrack, family, req)
46+
sessions, err := c.client.Get(conntrack.Conntrack, family, req)
5147
if err != nil {
5248
if !common.IsNotExist(err) {
5349
klog.Errorf("failed to resolve actual destination for %s->%s: %s", src, dst, err)
5450
}
55-
return dst
51+
return nil
5652
}
5753
for _, s := range sessions {
5854
if !ipTupleValid(s.Origin) || !ipTupleValid(s.Reply) {
@@ -71,9 +67,14 @@ func ConntrackGetActualDestination(src, dst netaddr.IPPort) netaddr.IPPort {
7167
if !ok {
7268
continue
7369
}
74-
return netaddr.IPPortFrom(ip, *reply.Proto.SrcPort)
70+
res := netaddr.IPPortFrom(ip, *reply.Proto.SrcPort)
71+
return &res
7572
}
76-
return dst
73+
return nil
74+
}
75+
76+
func (c *Conntrack) Close() error {
77+
return c.client.Close()
7778
}
7879

7980
func ipTuplesEqual(a, b *conntrack.IPTuple) bool {

containers/container.go

Lines changed: 55 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -102,12 +102,21 @@ type Container struct {
102102

103103
logParsers map[string]*LogParser
104104

105+
isHostNs bool
106+
hostConntrack *Conntrack
107+
nsConntrack *Conntrack
108+
105109
lock sync.RWMutex
106110

107111
done chan struct{}
108112
}
109113

110-
func NewContainer(cg *cgroup.Cgroup, md *ContainerMetadata) *Container {
114+
func NewContainer(cg *cgroup.Cgroup, md *ContainerMetadata, hostConntrack *Conntrack, pid uint32) (*Container, error) {
115+
netNs, err := proc.GetNetNs(pid)
116+
if err != nil {
117+
return nil, err
118+
}
119+
defer netNs.Close()
111120
c := &Container{
112121
cgroup: cg,
113122
metadata: md,
@@ -129,6 +138,9 @@ func NewContainer(cg *cgroup.Cgroup, md *ContainerMetadata) *Container {
129138

130139
logParsers: map[string]*LogParser{},
131140

141+
isHostNs: hostNetNsId == netNs.UniqueId(),
142+
hostConntrack: hostConntrack,
143+
132144
done: make(chan struct{}),
133145
}
134146

@@ -147,13 +159,16 @@ func NewContainer(cg *cgroup.Cgroup, md *ContainerMetadata) *Container {
147159
}
148160
}()
149161

150-
return c
162+
return c, nil
151163
}
152164

153165
func (c *Container) Close() {
154166
for _, p := range c.logParsers {
155167
p.Stop()
156168
}
169+
if c.nsConntrack != nil {
170+
_ = c.nsConntrack.Close()
171+
}
157172
close(c.done)
158173
}
159174

@@ -384,34 +399,52 @@ func (c *Container) onListenClose(pid uint32, addr netaddr.IPPort) {
384399
}
385400

386401
func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, timestamp uint64, failed bool) {
387-
if dst.IP().IsLoopback() {
388-
netNs, err := proc.GetNetNs(pid)
389-
isHostNs := err == nil && hostNetNsId == netNs.UniqueId()
390-
netNs.Close()
391-
if !isHostNs {
392-
return
393-
}
394-
} else {
395-
whitelisted := false
396-
for _, prefix := range flags.ExternalNetworksWhitelist {
397-
if prefix.Contains(dst.IP()) {
398-
whitelisted = true
399-
break
400-
}
401-
}
402-
if !whitelisted && !common.IsIpPrivate(dst.IP()) {
403-
return
402+
if dst.IP().IsLoopback() && !c.isHostNs {
403+
return
404+
}
405+
whitelisted := false
406+
for _, prefix := range flags.ExternalNetworksWhitelist {
407+
if prefix.Contains(dst.IP()) {
408+
whitelisted = true
409+
break
404410
}
405411
}
412+
if !whitelisted && !common.IsIpPrivate(dst.IP()) {
413+
return
414+
}
406415
c.lock.Lock()
407416
defer c.lock.Unlock()
408417
if failed {
409418
c.connectsFailed[dst]++
410419
} else {
411-
actualDst := ConntrackGetActualDestination(src, dst)
412-
c.connectsSuccessful[AddrPair{src: dst, dst: actualDst}]++
420+
actualDst := c.hostConntrack.GetActualDestination(src, dst)
421+
if actualDst == nil && !c.isHostNs {
422+
if c.nsConntrack == nil {
423+
netNs, err := proc.GetNetNs(pid)
424+
if err != nil {
425+
if !common.IsNotExist(err) {
426+
klog.Warningf("cannot open NetNs for pid %d: %s", pid, err)
427+
}
428+
return
429+
}
430+
defer netNs.Close()
431+
c.nsConntrack, err = NewConntrack(netNs)
432+
if err != nil {
433+
klog.Warningln(err)
434+
return
435+
}
436+
}
437+
actualDst = c.nsConntrack.GetActualDestination(src, dst)
438+
}
439+
switch {
440+
case actualDst == nil:
441+
actualDst = &dst
442+
case actualDst.IP().IsLoopback() && !c.isHostNs:
443+
return
444+
}
445+
c.connectsSuccessful[AddrPair{src: dst, dst: *actualDst}]++
413446
c.connectionsActive[AddrPair{src: src, dst: dst}] = &ActiveConnection{
414-
ActualDest: actualDst,
447+
ActualDest: *actualDst,
415448
Pid: pid,
416449
Fd: fd,
417450
Timestamp: timestamp,

containers/registry.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ type Registry struct {
2626
tracer *ebpftracer.Tracer
2727
events chan ebpftracer.Event
2828

29+
hostConntrack *Conntrack
30+
2931
containersById map[ContainerID]*Container
3032
containersByCgroupId map[string]*Container
3133
containersByPid map[uint32]*Container
@@ -48,9 +50,6 @@ func NewRegistry(reg prometheus.Registerer, kernelVersion string) (*Registry, er
4850
if err := TaskstatsInit(); err != nil {
4951
return err
5052
}
51-
if err := ConntrackInit(); err != nil {
52-
return err
53-
}
5453
return nil
5554
})
5655
if err != nil {
@@ -68,11 +67,17 @@ func NewRegistry(reg prometheus.Registerer, kernelVersion string) (*Registry, er
6867
if err := JournaldInit(); err != nil {
6968
klog.Warningln(err)
7069
}
70+
ct, err := NewConntrack(hostNetNs)
71+
if err != nil {
72+
return nil, err
73+
}
7174

7275
cs := &Registry{
7376
reg: reg,
7477
events: make(chan ebpftracer.Event, 10000),
7578

79+
hostConntrack: ct,
80+
7681
containersById: map[ContainerID]*Container{},
7782
containersByCgroupId: map[string]*Container{},
7883
containersByPid: map[uint32]*Container{},
@@ -258,7 +263,12 @@ func (r *Registry) getOrCreateContainer(pid uint32) *Container {
258263
r.containersByCgroupId[cg.Id] = c
259264
return c
260265
}
261-
c := NewContainer(cg, md)
266+
c, err := NewContainer(cg, md, r.hostConntrack, pid)
267+
if err != nil {
268+
klog.Warningf("failed to create container pid=%d cg=%s id=%s: %s", pid, cg.Id, id, err)
269+
return nil
270+
}
271+
262272
klog.InfoS("detected a new container", "pid", pid, "cg", cg.Id, "id", id)
263273
if err := prometheus.WrapRegistererWith(prometheus.Labels{"container_id": string(id)}, r.reg).Register(c); err != nil {
264274
klog.Warningln(err)

logs/tail_reader_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ func TestTailReader(t *testing.T) {
2626
}
2727

2828
wait := func() {
29-
time.Sleep(3 * tailPollInterval)
29+
time.Sleep(time.Second)
3030
}
3131

3232
get := func(expected string) {

0 commit comments

Comments
 (0)