diff --git a/cmd/system-probe/config/adjust_npm.go b/cmd/system-probe/config/adjust_npm.go index 58cfe1c3ac8a3e..ab0fc468bf5533 100644 --- a/cmd/system-probe/config/adjust_npm.go +++ b/cmd/system-probe/config/adjust_npm.go @@ -25,6 +25,8 @@ const ( ) func adjustNetwork(cfg config.Config) { + ebpflessEnabled := cfg.GetBool(netNS("enable_ebpfless")) + limitMaxInt(cfg, spNS("max_conns_per_message"), maxConnsMessageBatchSize) if cfg.GetBool(spNS("disable_tcp")) { @@ -99,4 +101,24 @@ func adjustNetwork(cfg config.Config) { log.Warn("disabling NPM connection rollups since USM connection rollups are not enabled") cfg.Set(netNS("enable_connection_rollup"), false, model.SourceAgentRuntime) } + + // disable features that are not supported on certain + // configs/platforms + var disableConfigs []struct { + key, reason string + } + if ebpflessEnabled { + const notSupportedEbpfless = "not supported when ebpf-less is enabled" + disableConfigs = append(disableConfigs, []struct{ key, reason string }{ + {netNS("enable_protocol_classification"), notSupportedEbpfless}, + {evNS("network_process", "enabled"), notSupportedEbpfless}}..., + ) + } + + for _, c := range disableConfigs { + if cfg.GetBool(c.key) { + log.Warnf("disabling %s: %s", c.key, c.reason) + cfg.Set(c.key, false, model.SourceAgentRuntime) + } + } } diff --git a/cmd/system-probe/modules/network_tracer_linux.go b/cmd/system-probe/modules/network_tracer_linux.go index c3274c39df631d..9481d9d9208e5a 100644 --- a/cmd/system-probe/modules/network_tracer_linux.go +++ b/cmd/system-probe/modules/network_tracer_linux.go @@ -10,6 +10,7 @@ package modules import ( "github.com/DataDog/datadog-agent/cmd/system-probe/api/module" "github.com/DataDog/datadog-agent/cmd/system-probe/config" + "github.com/DataDog/datadog-agent/pkg/network/tracer" ) // NetworkTracer is a factory for NPM's tracer @@ -17,7 +18,5 @@ var NetworkTracer = module.Factory{ Name: config.NetworkTracerModule, ConfigNamespaces: networkTracerModuleConfigNamespaces, Fn: createNetworkTracerModule, - NeedsEBPF: func() bool { - return true - }, + NeedsEBPF: tracer.NeedsEBPF, } diff --git a/go.mod b/go.mod index fc6c85fe356d4e..3135c86490b5a7 100644 --- a/go.mod +++ b/go.mod @@ -1075,3 +1075,5 @@ replace ( // Prevent a false-positive detection by the Google and Ikarus security vendors on VirusTotal exclude go.opentelemetry.io/proto/otlp v1.1.0 + +replace github.com/google/gopacket v1.1.19 => github.com/DataDog/gopacket v0.0.0-20240626205202-4ac4cee31f14 diff --git a/go.sum b/go.sum index 019d7521a3dcb9..b3bf2e8247cdeb 100644 --- a/go.sum +++ b/go.sum @@ -718,6 +718,8 @@ github.com/DataDog/go-tuf v1.1.0-0.5.2 h1:4CagiIekonLSfL8GMHRHcHudo1fQnxELS9g4ti github.com/DataDog/go-tuf v1.1.0-0.5.2/go.mod h1:zBcq6f654iVqmkk8n2Cx81E1JnNTMOAx1UEO/wZR+P0= github.com/DataDog/gohai v0.0.0-20230524154621-4316413895ee h1:tXibLZk3G6HncIFJKaNItsdzcrk4YqILNDZlXPTNt4k= github.com/DataDog/gohai v0.0.0-20230524154621-4316413895ee/go.mod h1:nTot/Iy0kW16bXgXr6blEc8gFeAS7vTqYlhAxh+dbc0= +github.com/DataDog/gopacket v0.0.0-20240626205202-4ac4cee31f14 h1:t34NfJA77KgFZsh8kcNFW57LZLa0kW2YSUs4MvLKRxU= +github.com/DataDog/gopacket v0.0.0-20240626205202-4ac4cee31f14/go.mod h1:riddUzxTSBpJXk3qBHtYr4qOhFhT6k/1c0E3qkQjQpA= github.com/DataDog/gopsutil v1.2.2 h1:8lmthwyyCXa1NKiYcHlrtl9AAFdfbNI2gPcioCJcBPU= github.com/DataDog/gopsutil v1.2.2/go.mod h1:glkxNt/qRu9lnpmUEQwOIAXW+COWDTBOTEAHqbgBPts= github.com/DataDog/gostackparse v0.7.0 h1:i7dLkXHvYzHV308hnkvVGDL3BR4FWl7IsXNPz/IGQh4= @@ -1507,8 +1509,6 @@ github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/ github.com/google/gofuzz v1.1.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= -github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8= -github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo= github.com/google/licenseclassifier/v2 v2.0.0 h1:1Y57HHILNf4m0ABuMVb6xk4vAJYEUO0gDxNpog0pyeA= github.com/google/licenseclassifier/v2 v2.0.0/go.mod h1:cOjbdH0kyC9R22sdQbYsFkto4NGCAc+ZSwbeThazEtM= github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no= @@ -2567,6 +2567,7 @@ github.com/vbatts/tar-split v0.11.5/go.mod h1:yZbwRsSeGjusneWgA781EKej9HF8vme8ok github.com/vibrantbyte/go-antpath v1.1.1 h1:SWDIMx4pSjyo7QoAsgTkpNU7QD0X9O0JAgr5O3TsYKk= github.com/vibrantbyte/go-antpath v1.1.1/go.mod h1:ZqMGIk+no3BL2o6OdEZ3ZDiWfIteuastNSaTFv7kgUY= github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= +github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= github.com/vishvananda/netns v0.0.4 h1:Oeaw1EM2JMxD51g9uhtC0D7erkIjgmj8+JZc26m1YX8= github.com/vishvananda/netns v0.0.4/go.mod h1:SpkAiCQRtJ6TvvxPnOSyH3BMl6unz3xZlaprSwhNNJM= github.com/vito/go-sse v1.0.0 h1:e6/iTrrvy8BRrOwJwmQmlndlil+TLdxXvHi55ZDzH6M= diff --git a/pkg/config/setup/system_probe.go b/pkg/config/setup/system_probe.go index e7754f72c842b7..d5fb6983e0a3a7 100644 --- a/pkg/config/setup/system_probe.go +++ b/pkg/config/setup/system_probe.go @@ -298,6 +298,8 @@ func InitSystemProbeConfig(cfg pkgconfigmodel.Config) { // connection aggregation with port rollups cfg.BindEnvAndSetDefault(join(netNS, "enable_connection_rollup"), false) + cfg.BindEnvAndSetDefault(join(netNS, "enable_ebpfless"), false) + // windows config cfg.BindEnvAndSetDefault(join(spNS, "windows.enable_monotonic_count"), false) diff --git a/pkg/ebpf/ebpftest/buildmode.go b/pkg/ebpf/ebpftest/buildmode.go index 6fcc977f80af43..76e774f68cbbb4 100644 --- a/pkg/ebpf/ebpftest/buildmode.go +++ b/pkg/ebpf/ebpftest/buildmode.go @@ -16,6 +16,7 @@ var ( RuntimeCompiled BuildMode CORE BuildMode Fentry BuildMode + Ebpfless BuildMode ) func init() { @@ -23,6 +24,7 @@ func init() { RuntimeCompiled = runtimeCompiled{} CORE = core{} Fentry = fentry{} + Ebpfless = ebpfless{} } // BuildMode is an eBPF build mode @@ -95,6 +97,23 @@ func (f fentry) Env() map[string]string { } } +type ebpfless struct{} + +func (e ebpfless) String() string { + return "eBPFless" +} + +func (e ebpfless) Env() map[string]string { + return map[string]string{ + "NETWORK_TRACER_FENTRY_TESTS": "false", + "DD_ENABLE_RUNTIME_COMPILER": "false", + "DD_ENABLE_CO_RE": "false", + "DD_ALLOW_RUNTIME_COMPILED_FALLBACK": "false", + "DD_ALLOW_PRECOMPILED_FALLBACK": "false", + "DD_NETWORK_CONFIG_ENABLE_EBPFLESS": "true", + } +} + // GetBuildMode returns which build mode the current environment matches, if any func GetBuildMode() BuildMode { for _, mode := range []BuildMode{Prebuilt, RuntimeCompiled, CORE, Fentry} { diff --git a/pkg/ebpf/ebpftest/buildmode_linux.go b/pkg/ebpf/ebpftest/buildmode_linux.go index a16d7bbc8dc2d1..0e88f0f528271a 100644 --- a/pkg/ebpf/ebpftest/buildmode_linux.go +++ b/pkg/ebpf/ebpftest/buildmode_linux.go @@ -30,6 +30,10 @@ func SupportedBuildModes() []BuildMode { (runtime.GOARCH == "amd64" && (hostPlatform == "amazon" || hostPlatform == "amzn") && kv.Major() == 5 && kv.Minor() == 10) { modes = append(modes, Fentry) } + if os.Getenv("TEST_EBPFLESS_OVERRIDE") == "true" { + modes = append(modes, Ebpfless) + } + return modes } diff --git a/pkg/network/config/config.go b/pkg/network/config/config.go index 026842a9c289f2..5ef2ce8677ad97 100644 --- a/pkg/network/config/config.go +++ b/pkg/network/config/config.go @@ -302,6 +302,8 @@ type Config struct { // buffers (>=5.8) will result in forcing the use of Perf Maps instead. EnableUSMRingBuffers bool + EnableEbpfless bool + // EnableUSMEventStream enables USM to use the event stream instead // of netlink for receiving process events. EnableUSMEventStream bool @@ -406,6 +408,8 @@ func New() *Config { EnableNPMConnectionRollup: cfg.GetBool(join(netNS, "enable_connection_rollup")), + EnableEbpfless: cfg.GetBool(join(netNS, "enable_ebpfless")), + // Service Monitoring EnableJavaTLSSupport: cfg.GetBool(join(smjtNS, "enabled")), JavaAgentDebug: cfg.GetBool(join(smjtNS, "debug")), diff --git a/pkg/network/dns/driver_windows.go b/pkg/network/dns/driver_windows.go index 82e0cb3e885294..3056d38c25cdd0 100644 --- a/pkg/network/dns/driver_windows.go +++ b/pkg/network/dns/driver_windows.go @@ -23,6 +23,7 @@ import ( "github.com/DataDog/datadog-agent/comp/core/telemetry" "github.com/DataDog/datadog-agent/pkg/network/driver" + "github.com/DataDog/datadog-agent/pkg/network/filter" ) const ( @@ -99,7 +100,7 @@ func (d *dnsDriver) SetDataFilters(filters []driver.FilterDefinition) error { } // ReadDNSPacket visits a raw DNS packet if one is available. -func (d *dnsDriver) ReadDNSPacket(visit func([]byte, time.Time) error) (didRead bool, err error) { +func (d *dnsDriver) ReadDNSPacket(visit func(data []byte, info filter.PacketInfo, t time.Time) error) (didRead bool, err error) { var bytesRead uint32 var key uintptr // returned by GetQueuedCompletionStatus, then ignored var ol *windows.Overlapped @@ -125,7 +126,7 @@ func (d *dnsDriver) ReadDNSPacket(visit func([]byte, time.Time) error) (didRead start := driver.FilterPacketHeaderSize - if err := visit(buf.data[start:], captureTime); err != nil { + if err := visit(buf.data[start:], nil, captureTime); err != nil { return false, err } diff --git a/pkg/network/dns/monitor_linux.go b/pkg/network/dns/monitor_linux.go index af87db20f2ee72..bd8c606415f0ff 100644 --- a/pkg/network/dns/monitor_linux.go +++ b/pkg/network/dns/monitor_linux.go @@ -11,17 +11,14 @@ import ( "fmt" "math" - "golang.org/x/net/bpf" - - "github.com/vishvananda/netns" - manager "github.com/DataDog/ebpf-manager" + "github.com/vishvananda/netns" "github.com/DataDog/datadog-agent/comp/core/telemetry" ddebpf "github.com/DataDog/datadog-agent/pkg/ebpf" "github.com/DataDog/datadog-agent/pkg/network/config" "github.com/DataDog/datadog-agent/pkg/network/ebpf/probes" - filterpkg "github.com/DataDog/datadog-agent/pkg/network/filter" + "github.com/DataDog/datadog-agent/pkg/network/filter" "github.com/DataDog/datadog-agent/pkg/util/kernel" "github.com/DataDog/datadog-agent/pkg/util/log" ) @@ -33,6 +30,26 @@ type dnsMonitor struct { // NewReverseDNS starts snooping on DNS traffic to allow IP -> domain reverse resolution func NewReverseDNS(cfg *config.Config, _ telemetry.Component) (ReverseDNS, error) { + // Create the RAW_SOCKET inside the root network namespace + var ( + packetSrc *filter.AFPacketSource + srcErr error + ns netns.NsHandle + ) + ns, err := cfg.GetRootNetNs() + if err != nil { + return nil, err + } + defer ns.Close() + + err = kernel.WithNS(ns, func() error { + packetSrc, srcErr = filter.NewAFPacketSource(4 << 20) // 4 MB total + return srcErr + }) + if err != nil { + return nil, err + } + currKernelVersion, err := kernel.HostVersion() if err != nil { // if the platform couldn't be determined, treat it as new kernel case @@ -42,12 +59,11 @@ func NewReverseDNS(cfg *config.Config, _ telemetry.Component) (ReverseDNS, error pre410Kernel := currKernelVersion < kernel.VersionCode(4, 1, 0) var p *ebpfProgram - var filter *manager.Probe - var bpfFilter []bpf.RawInstruction - if pre410Kernel { - bpfFilter, err = generateBPFFilter(cfg) - if err != nil { + if pre410Kernel || cfg.EnableEbpfless { + if bpfFilter, err := generateBPFFilter(cfg); err != nil { return nil, fmt.Errorf("error creating bpf classic filter: %w", err) + } else if err = packetSrc.SetBPF(bpfFilter); err != nil { + return nil, fmt.Errorf("could not set BPF filter on packet source: %w", err) } } else { p, err = newEBPFProgram(cfg) @@ -59,35 +75,21 @@ func NewReverseDNS(cfg *config.Config, _ telemetry.Component) (ReverseDNS, error return nil, fmt.Errorf("error initializing ebpf programs: %w", err) } - filter, _ = p.GetProbe(manager.ProbeIdentificationPair{EBPFFuncName: probes.SocketDNSFilter, UID: probeUID}) + filter, _ := p.GetProbe(manager.ProbeIdentificationPair{EBPFFuncName: probes.SocketDNSFilter, UID: probeUID}) if filter == nil { return nil, fmt.Errorf("error retrieving socket filter") } - } - // Create the RAW_SOCKET inside the root network namespace - var ( - packetSrc *filterpkg.AFPacketSource - srcErr error - ns netns.NsHandle - ) - if ns, err = cfg.GetRootNetNs(); err != nil { - return nil, err - } - defer ns.Close() - - err = kernel.WithNS(ns, func() error { - packetSrc, srcErr = filterpkg.NewPacketSource(filter, bpfFilter) - return srcErr - }) - if err != nil { - return nil, err + if err = packetSrc.SetEbpf(filter); err != nil { + return nil, fmt.Errorf("could not set file descriptor for eBPF program: %w", err) + } } snoop, err := newSocketFilterSnooper(cfg, packetSrc) if err != nil { return nil, err } + return &dnsMonitor{ snoop, p, diff --git a/pkg/network/dns/packet_source_windows.go b/pkg/network/dns/packet_source_windows.go index 5a00280591eda9..52ae79ffa4cef4 100644 --- a/pkg/network/dns/packet_source_windows.go +++ b/pkg/network/dns/packet_source_windows.go @@ -14,16 +14,17 @@ import ( "github.com/google/gopacket/layers" "github.com/DataDog/datadog-agent/comp/core/telemetry" + "github.com/DataDog/datadog-agent/pkg/network/filter" ) -var _ packetSource = &windowsPacketSource{} +var _ filter.PacketSource = &windowsPacketSource{} type windowsPacketSource struct { di *dnsDriver } // newWindowsPacketSource constructs a new packet source -func newWindowsPacketSource(telemetrycomp telemetry.Component) (packetSource, error) { +func newWindowsPacketSource(telemetrycomp telemetry.Component) (filter.PacketSource, error) { di, err := newDriver(telemetrycomp) if err != nil { return nil, err @@ -31,7 +32,7 @@ func newWindowsPacketSource(telemetrycomp telemetry.Component) (packetSource, er return &windowsPacketSource{di: di}, nil } -func (p *windowsPacketSource) VisitPackets(exit <-chan struct{}, visit func([]byte, time.Time) error) error { +func (p *windowsPacketSource) VisitPackets(exit <-chan struct{}, visit func([]byte, filter.PacketInfo, time.Time) error) error { for { didReadPacket, err := p.di.ReadDNSPacket(visit) if err != nil { @@ -50,7 +51,7 @@ func (p *windowsPacketSource) VisitPackets(exit <-chan struct{}, visit func([]by } } -func (p *windowsPacketSource) PacketType() gopacket.LayerType { +func (p *windowsPacketSource) LayerType() gopacket.LayerType { return layers.LayerTypeIPv4 } diff --git a/pkg/network/dns/snooper.go b/pkg/network/dns/snooper.go index e392a96bca5503..c1bc700a33b3cd 100644 --- a/pkg/network/dns/snooper.go +++ b/pkg/network/dns/snooper.go @@ -11,9 +11,8 @@ import ( "sync" "time" - "github.com/google/gopacket" - "github.com/DataDog/datadog-agent/pkg/network/config" + "github.com/DataDog/datadog-agent/pkg/network/filter" "github.com/DataDog/datadog-agent/pkg/process/util" "github.com/DataDog/datadog-agent/pkg/telemetry" "github.com/DataDog/datadog-agent/pkg/util/log" @@ -45,7 +44,7 @@ var _ ReverseDNS = &socketFilterSnooper{} // socketFilterSnooper is a DNS traffic snooper built on top of an eBPF SOCKET_FILTER type socketFilterSnooper struct { - source packetSource + source filter.PacketSource parser *dnsParser cache *reverseDNSCache statKeeper *dnsStatKeeper @@ -62,24 +61,8 @@ func (s *socketFilterSnooper) WaitForDomain(domain string) error { return s.statKeeper.WaitForDomain(domain) } -// packetSource reads raw packet data -type packetSource interface { - // VisitPackets reads all new raw packets that are available, invoking the given callback for each packet. - // If no packet is available, VisitPacket returns immediately. - // The format of the packet is dependent on the implementation of packetSource -- i.e. it may be an ethernet frame, or a IP frame. - // The data buffer is reused between invocations of VisitPacket and thus should not be pointed to. - // If the cancel channel is closed, VisitPackets will stop reading. - VisitPackets(cancel <-chan struct{}, visitor func(data []byte, timestamp time.Time) error) error - - // PacketType returns the type of packet this source reads - PacketType() gopacket.LayerType - - // Close closes the packet source - Close() -} - // newSocketFilterSnooper returns a new socketFilterSnooper -func newSocketFilterSnooper(cfg *config.Config, source packetSource) (*socketFilterSnooper, error) { +func newSocketFilterSnooper(cfg *config.Config, source filter.PacketSource) (*socketFilterSnooper, error) { cache := newReverseDNSCache(dnsCacheSize, dnsCacheExpirationPeriod) var statKeeper *dnsStatKeeper if cfg.CollectDNSStats { @@ -93,7 +76,7 @@ func newSocketFilterSnooper(cfg *config.Config, source packetSource) (*socketFil } snooper := &socketFilterSnooper{ source: source, - parser: newDNSParser(source.PacketType(), cfg), + parser: newDNSParser(source.LayerType(), cfg), cache: cache, statKeeper: statKeeper, translation: new(translation), @@ -154,7 +137,7 @@ func (s *socketFilterSnooper) Close() { // The *translation is recycled and re-used in subsequent calls and it should not be accessed concurrently. // The second parameter `ts` is the time when the packet was captured off the wire. This is used for latency calculation // and much more reliable than calling time.Now() at the user layer. -func (s *socketFilterSnooper) processPacket(data []byte, ts time.Time) error { +func (s *socketFilterSnooper) processPacket(data []byte, _ filter.PacketInfo, ts time.Time) error { t := s.getCachedTranslation() pktInfo := dnsPacketInfo{} diff --git a/pkg/network/dns/snooper_test.go b/pkg/network/dns/snooper_test.go index 3c79c25bb7e59a..3a0d334c80488e 100644 --- a/pkg/network/dns/snooper_test.go +++ b/pkg/network/dns/snooper_test.go @@ -390,7 +390,7 @@ func TestParsingError(t *testing.T) { reverseDNS := rdns.(*dnsMonitor) // Pass a byte array of size 1 which should result in parsing error - err = reverseDNS.processPacket(make([]byte, 1), time.Now()) + err = reverseDNS.processPacket(make([]byte, 1), 0, time.Now()) require.NoError(t, err) assert.True(t, cacheTelemetry.length.Load() == 0) assert.True(t, snooperTelemetry.decodingErrors.Load() == 1) diff --git a/pkg/network/event_common.go b/pkg/network/event_common.go index 7ec7a5390e34af..3c5c4f9769741f 100644 --- a/pkg/network/event_common.go +++ b/pkg/network/event_common.go @@ -30,6 +30,9 @@ const ( maxByteCountChange uint64 = 375 << 30 // use typical small MTU size, 1300, to get max packet count maxPacketCountChange uint64 = maxByteCountChange / 1300 + + // ConnectionByteKeyMaxLen represents the maximum size in bytes of a connection byte key + ConnectionByteKeyMaxLen = 41 ) // ConnectionType will be either TCP or UDP @@ -332,6 +335,15 @@ func (c ConnectionStats) ByteKeyNAT(buf []byte) []byte { return generateConnectionKey(c, buf, true) } +// IsValid returns `true` if the connection has a valid source and dest +// ports and IPs +func (c ConnectionStats) IsValid() bool { + return c.Source.IsValid() && + c.Dest.IsValid() && + c.SPort > 0 && + c.DPort > 0 +} + const keyFmt = "p:%d|src:%s:%d|dst:%s:%d|f:%d|t:%d" // BeautifyKey returns a human readable byte key (used for debugging purposes) diff --git a/pkg/network/filter/packet_source.go b/pkg/network/filter/packet_source.go new file mode 100644 index 00000000000000..c75ad1222e9bfb --- /dev/null +++ b/pkg/network/filter/packet_source.go @@ -0,0 +1,33 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024-present Datadog, Inc. + +// Package filter exposes interfaces and implementations for packet capture +package filter + +import ( + "time" + + "github.com/google/gopacket" +) + +// PacketInfo holds OS dependent packet information +// about a packet +type PacketInfo interface{} + +// PacketSource reads raw packet data +type PacketSource interface { + // VisitPackets reads all new raw packets that are available, invoking the given callback for each packet. + // If no packet is available, VisitPacket returns immediately. + // The format of the packet is dependent on the implementation of PacketSource -- i.e. it may be an ethernet frame, or a IP frame. + // The data buffer is reused between invocations of VisitPacket and thus should not be pointed to. + // If the cancel channel is closed, VisitPackets will stop reading. + VisitPackets(cancel <-chan struct{}, visitor func(data []byte, info PacketInfo, timestamp time.Time) error) error + + // LayerType returns the type of packet this source reads + LayerType() gopacket.LayerType + + // Close closes the packet source + Close() +} diff --git a/pkg/network/filter/packet_source_linux.go b/pkg/network/filter/packet_source_linux.go index 8e21c04e7d5f2a..65fbbe9f9270f5 100644 --- a/pkg/network/filter/packet_source_linux.go +++ b/pkg/network/filter/packet_source_linux.go @@ -3,13 +3,14 @@ // This product includes software developed at Datadog (https://www.datadoghq.com/). // Copyright 2016-present Datadog, Inc. -//go:build linux_bpf +//go:build linux -//nolint:revive // TODO(NET) Fix revive linter +// Package filter exposes interfaces and implementations for packet capture package filter import ( "fmt" + "os" "reflect" "syscall" "time" @@ -19,12 +20,16 @@ import ( "github.com/google/gopacket/afpacket" "github.com/google/gopacket/layers" "golang.org/x/net/bpf" + "golang.org/x/sys/unix" "github.com/DataDog/datadog-agent/pkg/telemetry" "github.com/DataDog/datadog-agent/pkg/util/log" ) -const telemetryModuleName = "network_tracer__dns" +const ( + telemetryModuleName = "network_tracer__filter" + defaultSnapLen = 4096 +) // Telemetry var packetSourceTelemetry = struct { @@ -42,49 +47,91 @@ var packetSourceTelemetry = struct { // AFPacketSource provides a RAW_SOCKET attached to an eBPF SOCKET_FILTER type AFPacketSource struct { *afpacket.TPacket - socketFilter *manager.Probe exit chan struct{} } -// NewPacketSource creates an AFPacketSource using the provided BPF filter -func NewPacketSource(filter *manager.Probe, bpfFilter []bpf.RawInstruction) (*AFPacketSource, error) { +// AFPacketInfo holds information about a packet +type AFPacketInfo struct { + // PktType corresponds to sll_pkttype in the + // sockaddr_ll struct; see packet(7) + // https://man7.org/linux/man-pages/man7/packet.7.html + PktType uint8 +} + +// OptSnapLen specifies the maximum length of the packet to read +// +// Defaults to 4096 bytes +type OptSnapLen int + +// NewAFPacketSource creates an AFPacketSource using the provided BPF filter +func NewAFPacketSource(size int, opts ...interface{}) (*AFPacketSource, error) { + snapLen := defaultSnapLen + for _, opt := range opts { + switch o := opt.(type) { + case OptSnapLen: + snapLen = int(o) + if snapLen <= 0 || snapLen > 65536 { + return nil, fmt.Errorf("snap len should be between 0 and 65536") + } + default: + return nil, fmt.Errorf("unknown option %+v", opt) + } + } + + frameSize, blockSize, numBlocks, err := afpacketComputeSize(size, snapLen, os.Getpagesize()) + if err != nil { + return nil, fmt.Errorf("error computing mmap'ed buffer parameters: %w", err) + } + + log.Debugf("creating tpacket source with frame_size=%d block_size=%d num_blocks=%d", frameSize, blockSize, numBlocks) rawSocket, err := afpacket.NewTPacket( - afpacket.OptPollTimeout(1*time.Second), - // This setup will require ~4Mb that is mmap'd into the process virtual space - // More information here: https://www.kernel.org/doc/Documentation/networking/packet_mmap.txt - afpacket.OptFrameSize(4096), - afpacket.OptBlockSize(4096*128), - afpacket.OptNumBlocks(8), + afpacket.OptPollTimeout(time.Second), + afpacket.OptFrameSize(frameSize), + afpacket.OptBlockSize(blockSize), + afpacket.OptNumBlocks(numBlocks), + afpacket.OptAddPktType(true), ) + if err != nil { return nil, fmt.Errorf("error creating raw socket: %s", err) } - if filter != nil { - // The underlying socket file descriptor is private, hence the use of reflection - // Point socket filter program to the RAW_SOCKET file descriptor - // Note the filter attachment itself is triggered by the ebpf.Manager - filter.SocketFD = int(reflect.ValueOf(rawSocket).Elem().FieldByName("fd").Int()) - } else { - err = rawSocket.SetBPF(bpfFilter) - if err != nil { - return nil, fmt.Errorf("error setting classic bpf filter: %w", err) - } - } - ps := &AFPacketSource{ - TPacket: rawSocket, - socketFilter: filter, - exit: make(chan struct{}), + TPacket: rawSocket, + exit: make(chan struct{}), } go ps.pollStats() return ps, nil } +// SetEbpf attaches an eBPF socket filter to the AFPacketSource +func (p *AFPacketSource) SetEbpf(filter *manager.Probe) error { + // The underlying socket file descriptor is private, hence the use of reflection + // Point socket filter program to the RAW_SOCKET file descriptor + // Note the filter attachment itself is triggered by the ebpf.Manager + f := reflect.ValueOf(p.TPacket).Elem().FieldByName("fd") + if !f.IsValid() { + return fmt.Errorf("could not find fd field in TPacket object") + } + + if !f.CanInt() { + return fmt.Errorf("fd TPacket field is not an int") + } + + filter.SocketFD = int(f.Int()) + return nil +} + +// SetBPF attaches a (classic) BPF socket filter to the AFPacketSource +func (p *AFPacketSource) SetBPF(filter []bpf.RawInstruction) error { + return p.TPacket.SetBPF(filter) +} + // VisitPackets starts reading packets from the source -func (p *AFPacketSource) VisitPackets(exit <-chan struct{}, visit func([]byte, time.Time) error) error { +func (p *AFPacketSource) VisitPackets(exit <-chan struct{}, visit func(data []byte, info PacketInfo, t time.Time) error) error { + pktInfo := &AFPacketInfo{} for { // allow the read loop to be prematurely interrupted select { @@ -108,14 +155,15 @@ func (p *AFPacketSource) VisitPackets(exit <-chan struct{}, visit func([]byte, t return err } - if err := visit(data, stats.Timestamp); err != nil { + pktInfo.PktType = stats.AncillaryData[0].(afpacket.AncillaryPktType).Type + if err := visit(data, pktInfo, stats.Timestamp); err != nil { return err } } } -// PacketType is the gopacket.LayerType for this source -func (p *AFPacketSource) PacketType() gopacket.LayerType { +// LayerType is the gopacket.LayerType for this source +func (p *AFPacketSource) LayerType() gopacket.LayerType { return layers.LayerTypeEthernet } @@ -160,3 +208,57 @@ func (p *AFPacketSource) pollStats() { } } } + +// afpacketComputeSize computes the block_size and the num_blocks in such a way that the +// allocated mmap buffer is close to but smaller than target_size_mb. +// The restriction is that the block_size must be divisible by both the +// frame size and page size. +// +// See https://www.kernel.org/doc/Documentation/networking/packet_mmap.txt +func afpacketComputeSize(targetSize, snaplen, pageSize int) (frameSize, blockSize, numBlocks int, err error) { + frameSize = tpacketAlign(unix.TPACKET_HDRLEN) + tpacketAlign(snaplen) + if frameSize <= pageSize { + frameSize = int(nextPowerOf2(int64(frameSize))) + if frameSize <= pageSize { + blockSize = pageSize + } + } else { + // align frameSize to pageSize + frameSize = (frameSize + pageSize - 1) & ^(pageSize - 1) + blockSize = frameSize + } + + numBlocks = targetSize / blockSize + if numBlocks == 0 { + return 0, 0, 0, fmt.Errorf("buffer size is too small") + } + + blockSizeInc := blockSize + for numBlocks > afpacket.DefaultNumBlocks { + blockSize += blockSizeInc + numBlocks = targetSize / blockSize + } + + return frameSize, blockSize, numBlocks, nil +} + +func tpacketAlign(x int) int { + return (x + unix.TPACKET_ALIGNMENT - 1) & ^(unix.TPACKET_ALIGNMENT - 1) +} + +// nextPowerOf2 rounds up `v` to the next power of 2 +// +// Taken from Hacker's Delight by Henry S. Warren, Jr., +// https://en.wikipedia.org/wiki/Hacker%27s_Delight +func nextPowerOf2(v int64) int64 { + v-- + v |= v >> 1 + v |= v >> 2 + v |= v >> 4 + v |= v >> 8 + v |= v >> 16 + v |= v >> 32 + v++ + + return v +} diff --git a/pkg/network/filter/socket_filter.go b/pkg/network/filter/socket_filter.go index 467cb3931d29f1..5c29b2d15cea26 100644 --- a/pkg/network/filter/socket_filter.go +++ b/pkg/network/filter/socket_filter.go @@ -5,15 +5,15 @@ //go:build linux_bpf +// Package filter exposes interfaces and implementations for packet capture package filter import ( "encoding/binary" "runtime" - "golang.org/x/sys/unix" - manager "github.com/DataDog/ebpf-manager" + "golang.org/x/sys/unix" "github.com/DataDog/datadog-agent/pkg/network/config" "github.com/DataDog/datadog-agent/pkg/util/kernel" diff --git a/pkg/network/netlink/conntracker.go b/pkg/network/netlink/conntracker.go index af4314b800cbd1..9f066d2f8cb979 100644 --- a/pkg/network/netlink/conntracker.go +++ b/pkg/network/netlink/conntracker.go @@ -10,6 +10,7 @@ package netlink import ( "container/list" "context" + "errors" "fmt" "net" "net/netip" @@ -17,6 +18,7 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" + "github.com/syndtr/gocapability/capability" "golang.org/x/sys/unix" "github.com/cihub/seelog" @@ -39,6 +41,9 @@ const ( var defaultBuckets = []float64{10, 25, 50, 75, 100, 250, 500, 1000, 10000} +// ErrNotPermitted is the error returned when the current process does not have the required permissions for netlink conntracker +var ErrNotPermitted = errors.New("netlink conntracker requires NET_ADMIN capability") + // Conntracker is a wrapper around go-conntracker that keeps a record of all connections in user space type Conntracker interface { // Describe returns all descriptions of the collector @@ -115,6 +120,18 @@ func NewConntracker(config *config.Config, telemetrycomp telemetryComp.Component conntracker Conntracker ) + // check if we have the right capabilities for the netlink NewConntracker + // NET_ADMIN is required + if caps, err := capability.NewPid2(0); err == nil { + if err = caps.Load(); err != nil { + return nil, fmt.Errorf("could not load process capabilities: %w", err) + } + + if !caps.Get(capability.EFFECTIVE, capability.CAP_NET_ADMIN) { + return nil, ErrNotPermitted + } + } + done := make(chan struct{}) go func() { diff --git a/pkg/network/port.go b/pkg/network/port.go index 6a1ed3f5605558..cce191d642382d 100644 --- a/pkg/network/port.go +++ b/pkg/network/port.go @@ -30,8 +30,8 @@ var statusMap = map[ConnectionType]int64{ UDP: tcpClose, } -// ReadInitialState reads the /proc filesystem and determines which ports are being listened on -func ReadInitialState(procRoot string, protocol ConnectionType, collectIPv6 bool) (map[PortMapping]uint32, error) { +// ReadListeningPorts reads the /proc filesystem and determines which ports are being listened on +func ReadListeningPorts(procRoot string, protocol ConnectionType, collectIPv6 bool) (map[PortMapping]uint32, error) { start := time.Now() defer func() { log.Debugf("Read initial %s pid->port mapping in %s", protocol.String(), time.Since(start)) diff --git a/pkg/network/port_test.go b/pkg/network/port_test.go index e52c80829cecb8..995a4e8ef0a2f9 100644 --- a/pkg/network/port_test.go +++ b/pkg/network/port_test.go @@ -89,16 +89,16 @@ func runServerProcess(t *testing.T, proto string, port uint16, ns netns.NsHandle return port, proc } -func TestReadInitialState(t *testing.T) { +func TestReadListeningPorts(t *testing.T) { t.Run("TCP", func(t *testing.T) { - testReadInitialState(t, "tcp") + testReadListeningPorts(t, "tcp") }) t.Run("UDP", func(t *testing.T) { - testReadInitialState(t, "udp") + testReadListeningPorts(t, "udp") }) } -func testReadInitialState(t *testing.T, proto string) { +func testReadListeningPorts(t *testing.T, proto string) { var ns, rootNs netns.NsHandle var err error nsName := netlinktestutil.AddNS(t) @@ -159,7 +159,7 @@ func testReadInitialState(t *testing.T, proto string) { connType, otherConnType = otherConnType, connType } - initialPorts, err := ReadInitialState("/proc", connType, true) + initialPorts, err := ReadListeningPorts("/proc", connType, true) if !assert.NoError(t, err) { return } diff --git a/pkg/network/state.go b/pkg/network/state.go index 708e535adeec5e..7bd2684022837b 100644 --- a/pkg/network/state.go +++ b/pkg/network/state.go @@ -74,9 +74,6 @@ const ( // constant is not worth the increased memory cost. DNSResponseCodeNoError = 0 - // ConnectionByteKeyMaxLen represents the maximum size in bytes of a connection byte key - ConnectionByteKeyMaxLen = 41 - stateModuleName = "network_tracer__state" shortLivedConnectionThreshold = 2 * time.Minute diff --git a/pkg/network/tracer/connection/cookie.go b/pkg/network/tracer/connection/cookie.go new file mode 100644 index 00000000000000..bc32836d81ff43 --- /dev/null +++ b/pkg/network/tracer/connection/cookie.go @@ -0,0 +1,44 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024-present Datadog, Inc. + +//go:build linux && npm + +package connection + +import ( + "encoding/binary" + "hash" + + "github.com/twmb/murmur3" + + "github.com/DataDog/datadog-agent/pkg/network" + "github.com/DataDog/datadog-agent/pkg/util/log" +) + +type cookieHasher struct { + hash hash.Hash64 + buf []byte +} + +func newCookieHasher() *cookieHasher { + return &cookieHasher{ + hash: murmur3.New64(), + buf: make([]byte, network.ConnectionByteKeyMaxLen), + } +} + +func (h *cookieHasher) Hash(stats *network.ConnectionStats) { + h.hash.Reset() + if err := binary.Write(h.hash, binary.BigEndian, stats.Cookie); err != nil { + log.Errorf("error writing cookie to hash: %s", err) + return + } + key := stats.ByteKey(h.buf) + if _, err := h.hash.Write(key); err != nil { + log.Errorf("error writing byte key to hash: %s", err) + return + } + stats.Cookie = h.hash.Sum64() +} diff --git a/pkg/network/tracer/connection/ebpf_tracer.go b/pkg/network/tracer/connection/ebpf_tracer.go new file mode 100644 index 00000000000000..558dc2a83218a0 --- /dev/null +++ b/pkg/network/tracer/connection/ebpf_tracer.go @@ -0,0 +1,733 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024-present Datadog, Inc. + +//go:build linux_bpf + +package connection + +import ( + "errors" + "fmt" + "io" + "math" + "sync" + "time" + + manager "github.com/DataDog/ebpf-manager" + "github.com/DataDog/ebpf-manager/tracefs" + "github.com/cihub/seelog" + "github.com/cilium/ebpf" + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/atomic" + "golang.org/x/sys/unix" + + telemetryComponent "github.com/DataDog/datadog-agent/comp/core/telemetry" + ddebpf "github.com/DataDog/datadog-agent/pkg/ebpf" + "github.com/DataDog/datadog-agent/pkg/ebpf/maps" + ebpftelemetry "github.com/DataDog/datadog-agent/pkg/ebpf/telemetry" + "github.com/DataDog/datadog-agent/pkg/network" + "github.com/DataDog/datadog-agent/pkg/network/config" + netebpf "github.com/DataDog/datadog-agent/pkg/network/ebpf" + "github.com/DataDog/datadog-agent/pkg/network/ebpf/probes" + "github.com/DataDog/datadog-agent/pkg/network/protocols" + "github.com/DataDog/datadog-agent/pkg/network/tracer/connection/failure" + "github.com/DataDog/datadog-agent/pkg/network/tracer/connection/fentry" + "github.com/DataDog/datadog-agent/pkg/network/tracer/connection/kprobe" + "github.com/DataDog/datadog-agent/pkg/network/tracer/connection/util" + "github.com/DataDog/datadog-agent/pkg/telemetry" + "github.com/DataDog/datadog-agent/pkg/util/log" +) + +const ( + defaultClosedChannelSize = 500 + defaultFailedChannelSize = 500 + connTracerModuleName = "network_tracer__ebpf" +) + +//nolint:revive // TODO(NET) Fix revive linter +var EbpfTracerTelemetry = struct { + connections telemetry.Gauge + tcpFailedConnects *prometheus.Desc + //nolint:revive // TODO(NET) Fix revive linter + TcpSentMiscounts *prometheus.Desc + //nolint:revive // TODO(NET) Fix revive linter + unbatchedTcpClose *prometheus.Desc + //nolint:revive // TODO(NET) Fix revive linter + unbatchedUdpClose *prometheus.Desc + //nolint:revive // TODO(NET) Fix revive linter + UdpSendsProcessed *prometheus.Desc + //nolint:revive // TODO(NET) Fix revive linter + UdpSendsMissed *prometheus.Desc + //nolint:revive // TODO(NET) Fix revive linter + UdpDroppedConns *prometheus.Desc + // doubleFlushAttemptsClose is a counter measuring the number of attempts to flush a closed connection twice from tcp_close + doubleFlushAttemptsClose *prometheus.Desc + // doubleFlushAttemptsDone is a counter measuring the number of attempts to flush a closed connection twice from tcp_done + doubleFlushAttemptsDone *prometheus.Desc + // unsupportedTcpFailures is a counter measuring the number of attempts to flush a TCP failure that is not supported + unsupportedTcpFailures *prometheus.Desc + // tcpDonePidMismatch is a counter measuring the number of TCP connections with a PID mismatch between tcp_connect and tcp_done + tcpDonePidMismatch *prometheus.Desc + PidCollisions *telemetry.StatCounterWrapper + iterationDups telemetry.Counter + iterationAborts telemetry.Counter + + //nolint:revive // TODO(NET) Fix revive linter + lastTcpFailedConnects *atomic.Int64 + //nolint:revive // TODO(NET) Fix revive linter + LastTcpSentMiscounts *atomic.Int64 + //nolint:revive // TODO(NET) Fix revive linter + lastUnbatchedTcpClose *atomic.Int64 + //nolint:revive // TODO(NET) Fix revive linter + lastUnbatchedUdpClose *atomic.Int64 + //nolint:revive // TODO(NET) Fix revive linter + lastUdpSendsProcessed *atomic.Int64 + //nolint:revive // TODO(NET) Fix revive linter + lastUdpSendsMissed *atomic.Int64 + //nolint:revive // TODO(NET) Fix revive linter + lastUdpDroppedConns *atomic.Int64 + // lastDoubleFlushAttemptsClose is a counter measuring the diff between the last two values of doubleFlushAttemptsClose + lastDoubleFlushAttemptsClose *atomic.Int64 + // lastDoubleFlushAttemptsDone is a counter measuring the diff between the last two values of doubleFlushAttemptsDone + lastDoubleFlushAttemptsDone *atomic.Int64 + // lastUnsupportedTcpFailures is a counter measuring the diff between the last two values of unsupportedTcpFailures + lastUnsupportedTcpFailures *atomic.Int64 + // lastTcpDonePidMismatch is a counter measuring the diff between the last two values of tcpDonePidMismatch + lastTcpDonePidMismatch *atomic.Int64 +}{ + telemetry.NewGauge(connTracerModuleName, "connections", []string{"ip_proto", "family"}, "Gauge measuring the number of active connections in the EBPF map"), + prometheus.NewDesc(connTracerModuleName+"__tcp_failed_connects", "Counter measuring the number of failed TCP connections in the EBPF map", nil, nil), + prometheus.NewDesc(connTracerModuleName+"__tcp_sent_miscounts", "Counter measuring the number of miscounted tcp sends in the EBPF map", nil, nil), + prometheus.NewDesc(connTracerModuleName+"__unbatched_tcp_close", "Counter measuring the number of missed TCP close events in the EBPF map", nil, nil), + prometheus.NewDesc(connTracerModuleName+"__unbatched_udp_close", "Counter measuring the number of missed UDP close events in the EBPF map", nil, nil), + prometheus.NewDesc(connTracerModuleName+"__udp_sends_processed", "Counter measuring the number of processed UDP sends in EBPF", nil, nil), + prometheus.NewDesc(connTracerModuleName+"__udp_sends_missed", "Counter measuring failures to process UDP sends in EBPF", nil, nil), + prometheus.NewDesc(connTracerModuleName+"__udp_dropped_conns", "Counter measuring the number of dropped UDP connections in the EBPF map", nil, nil), + prometheus.NewDesc(connTracerModuleName+"__double_flush_attempts_close", "Counter measuring the number of attempts to flush a closed connection twice from tcp_close", nil, nil), + prometheus.NewDesc(connTracerModuleName+"__double_flush_attempts_done", "Counter measuring the number of attempts to flush a closed connection twice from tcp_done", nil, nil), + prometheus.NewDesc(connTracerModuleName+"__unsupported_tcp_failures", "Counter measuring the number of attempts to flush a TCP failure that is not supported", nil, nil), + prometheus.NewDesc(connTracerModuleName+"__tcp_done_pid_mismatch", "Counter measuring the number of TCP connections with a PID mismatch between tcp_connect and tcp_done", nil, nil), + telemetry.NewStatCounterWrapper(connTracerModuleName, "pid_collisions", []string{}, "Counter measuring number of process collisions"), + telemetry.NewCounter(connTracerModuleName, "iteration_dups", []string{}, "Counter measuring the number of connections iterated more than once"), + telemetry.NewCounter(connTracerModuleName, "iteration_aborts", []string{}, "Counter measuring how many times ebpf iteration of connection map was aborted"), + atomic.NewInt64(0), + atomic.NewInt64(0), + atomic.NewInt64(0), + atomic.NewInt64(0), + atomic.NewInt64(0), + atomic.NewInt64(0), + atomic.NewInt64(0), + atomic.NewInt64(0), + atomic.NewInt64(0), + atomic.NewInt64(0), + atomic.NewInt64(0), +} + +type ebpfTracer struct { + m *manager.Manager + + conns *maps.GenericMap[netebpf.ConnTuple, netebpf.ConnStats] + tcpStats *maps.GenericMap[netebpf.ConnTuple, netebpf.TCPStats] + tcpRetransmits *maps.GenericMap[netebpf.ConnTuple, uint32] + config *config.Config + + // tcp_close events + closeConsumer *tcpCloseConsumer + // tcp failure events + failedConnConsumer *failure.TCPFailedConnConsumer + + removeTuple *netebpf.ConnTuple + + closeTracer func() + stopOnce sync.Once + + ebpfTracerType TracerType + + exitTelemetry chan struct{} + + ch *cookieHasher +} + +// NewTracer creates a new tracer +func newEbpfTracer(config *config.Config, _ telemetryComponent.Component) (Tracer, error) { + if _, err := tracefs.Root(); err != nil { + return nil, fmt.Errorf("eBPF based network tracer unsupported: %s", err) + } + + mgrOptions := manager.Options{ + // Extend RLIMIT_MEMLOCK (8) size + // On some systems, the default for RLIMIT_MEMLOCK may be as low as 64 bytes. + // This will result in an EPERM (Operation not permitted) error, when trying to create an eBPF map + // using bpf(2) with BPF_MAP_CREATE. + // + // We are setting the limit to infinity until we have a better handle on the true requirements. + RLimit: &unix.Rlimit{ + Cur: math.MaxUint64, + Max: math.MaxUint64, + }, + MapSpecEditors: map[string]manager.MapSpecEditor{ + probes.ConnMap: {MaxEntries: config.MaxTrackedConnections, EditorFlag: manager.EditMaxEntries}, + probes.TCPStatsMap: {MaxEntries: config.MaxTrackedConnections, EditorFlag: manager.EditMaxEntries}, + probes.TCPRetransmitsMap: {MaxEntries: config.MaxTrackedConnections, EditorFlag: manager.EditMaxEntries}, + probes.PortBindingsMap: {MaxEntries: config.MaxTrackedConnections, EditorFlag: manager.EditMaxEntries}, + probes.UDPPortBindingsMap: {MaxEntries: config.MaxTrackedConnections, EditorFlag: manager.EditMaxEntries}, + probes.ConnectionProtocolMap: {MaxEntries: config.MaxTrackedConnections, EditorFlag: manager.EditMaxEntries}, + probes.ConnectionTupleToSocketSKBConnMap: {MaxEntries: config.MaxTrackedConnections, EditorFlag: manager.EditMaxEntries}, + }, + ConstantEditors: []manager.ConstantEditor{ + boolConst("tcpv6_enabled", config.CollectTCPv6Conns), + boolConst("udpv6_enabled", config.CollectUDPv6Conns), + }, + DefaultKProbeMaxActive: maxActive, + BypassEnabled: config.BypassEnabled, + } + + begin, end := network.EphemeralRange() + mgrOptions.ConstantEditors = append(mgrOptions.ConstantEditors, + manager.ConstantEditor{Name: "ephemeral_range_begin", Value: uint64(begin)}, + manager.ConstantEditor{Name: "ephemeral_range_end", Value: uint64(end)}) + + closedChannelSize := defaultClosedChannelSize + if config.ClosedChannelSize > 0 { + closedChannelSize = config.ClosedChannelSize + } + var connCloseEventHandler ddebpf.EventHandler + var failedConnsHandler ddebpf.EventHandler + if config.RingBufferSupportedNPM() { + connCloseEventHandler = ddebpf.NewRingBufferHandler(closedChannelSize) + failedConnsHandler = ddebpf.NewRingBufferHandler(defaultFailedChannelSize) + } else { + connCloseEventHandler = ddebpf.NewPerfHandler(closedChannelSize) + failedConnsHandler = ddebpf.NewPerfHandler(defaultFailedChannelSize) + } + + var m *manager.Manager + //nolint:revive // TODO(NET) Fix revive linter + var tracerType TracerType = TracerTypeFentry + var closeTracerFn func() + m, closeTracerFn, err := fentry.LoadTracer(config, mgrOptions, connCloseEventHandler) + if err != nil && !errors.Is(err, fentry.ErrorNotSupported) { + // failed to load fentry tracer + return nil, err + } + + if err != nil { + // load the kprobe tracer + log.Info("fentry tracer not supported, falling back to kprobe tracer") + var kprobeTracerType kprobe.TracerType + m, closeTracerFn, kprobeTracerType, err = kprobe.LoadTracer(config, mgrOptions, connCloseEventHandler, failedConnsHandler) + if err != nil { + return nil, err + } + tracerType = TracerType(kprobeTracerType) + } + m.DumpHandler = dumpMapsHandler + ddebpf.AddNameMappings(m, "npm_tracer") + + numCPUs, err := ebpf.PossibleCPU() + if err != nil { + return nil, fmt.Errorf("could not determine number of CPUs: %w", err) + } + extractor := newBatchExtractor(numCPUs) + batchMgr, err := newConnBatchManager(m, extractor) + if err != nil { + return nil, fmt.Errorf("could not create connection batch manager: %w", err) + } + + closeConsumer := newTCPCloseConsumer(connCloseEventHandler, batchMgr) + + var failedConnConsumer *failure.TCPFailedConnConsumer + // Failed connections are not supported on prebuilt + if tracerType == TracerTypeKProbePrebuilt { + config.TCPFailedConnectionsEnabled = false + } + if config.FailedConnectionsSupported() { + failedConnConsumer = failure.NewFailedConnConsumer(failedConnsHandler, m, config.MaxFailedConnectionsBuffered) + } + + tr := &ebpfTracer{ + m: m, + config: config, + closeConsumer: closeConsumer, + failedConnConsumer: failedConnConsumer, + removeTuple: &netebpf.ConnTuple{}, + closeTracer: closeTracerFn, + ebpfTracerType: tracerType, + exitTelemetry: make(chan struct{}), + ch: newCookieHasher(), + } + + tr.conns, err = maps.GetMap[netebpf.ConnTuple, netebpf.ConnStats](m, probes.ConnMap) + if err != nil { + tr.Stop() + return nil, fmt.Errorf("error retrieving the bpf %s map: %s", probes.ConnMap, err) + } + + tr.tcpStats, err = maps.GetMap[netebpf.ConnTuple, netebpf.TCPStats](m, probes.TCPStatsMap) + if err != nil { + tr.Stop() + return nil, fmt.Errorf("error retrieving the bpf %s map: %s", probes.TCPStatsMap, err) + } + + if tr.tcpRetransmits, err = maps.GetMap[netebpf.ConnTuple, uint32](m, probes.TCPRetransmitsMap); err != nil { + tr.Stop() + return nil, fmt.Errorf("error retrieving the bpf %s map: %s", probes.TCPRetransmitsMap, err) + } + + return tr, nil +} + +func boolConst(name string, value bool) manager.ConstantEditor { + c := manager.ConstantEditor{ + Name: name, + Value: uint64(1), + } + if !value { + c.Value = uint64(0) + } + + return c +} + +func (t *ebpfTracer) Start(callback func(*network.ConnectionStats)) (err error) { + defer func() { + if err != nil { + t.Stop() + } + }() + + err = t.initializePortBindingMaps() + if err != nil { + return fmt.Errorf("error initializing port binding maps: %s", err) + } + + if err := t.m.Start(); err != nil { + return fmt.Errorf("could not start ebpf manager: %s", err) + } + + t.closeConsumer.Start(callback) + t.failedConnConsumer.Start() + return nil +} + +func (t *ebpfTracer) Pause() error { + // add small delay for socket filters to properly detach + time.Sleep(1 * time.Millisecond) + return t.m.Pause() +} + +func (t *ebpfTracer) Resume() error { + err := t.m.Resume() + // add small delay for socket filters to properly attach + time.Sleep(1 * time.Millisecond) + return err +} + +func (t *ebpfTracer) FlushPending() { + t.closeConsumer.FlushPending() +} + +func (t *ebpfTracer) GetFailedConnections() *failure.FailedConns { + if t.failedConnConsumer == nil { + return nil + } + return t.failedConnConsumer.FailedConns +} + +func (t *ebpfTracer) Stop() { + t.stopOnce.Do(func() { + close(t.exitTelemetry) + ddebpf.RemoveNameMappings(t.m) + ebpftelemetry.UnregisterTelemetry(t.m) + _ = t.m.Stop(manager.CleanAll) + t.closeConsumer.Stop() + t.failedConnConsumer.Stop() + if t.closeTracer != nil { + t.closeTracer() + } + }) +} + +func (t *ebpfTracer) GetMap(name string) *ebpf.Map { + switch name { + case probes.ConnectionProtocolMap: + default: + return nil + } + m, _, _ := t.m.GetMap(name) + return m +} + +func (t *ebpfTracer) GetConnections(buffer *network.ConnectionBuffer, filter func(*network.ConnectionStats) bool) error { + // Iterate through all key-value pairs in map + key, stats := &netebpf.ConnTuple{}, &netebpf.ConnStats{} + seen := make(map[netebpf.ConnTuple]struct{}) + // connsByTuple is used to detect whether we are iterating over + // a connection we have previously seen. This can happen when + // ebpf maps are being iterated over and deleted at the same time. + // The iteration can reset when that happens. + // See https://justin.azoff.dev/blog/bpf_map_get_next_key-pitfalls/ + connsByTuple := make(map[netebpf.ConnTuple]uint32) + + // Cached objects + conn := new(network.ConnectionStats) + tcp := new(netebpf.TCPStats) + + var tcp4, tcp6, udp4, udp6 float64 + entries := t.conns.Iterate() + for entries.Next(key, stats) { + if cookie, exists := connsByTuple[*key]; exists && cookie == stats.Cookie { + // already seen the connection in current batch processing, + // due to race between the iterator and bpf_map_delete + EbpfTracerTelemetry.iterationDups.Inc() + continue + } + + populateConnStats(conn, key, stats, t.ch) + connsByTuple[*key] = stats.Cookie + + isTCP := conn.Type == network.TCP + switch conn.Family { + case network.AFINET6: + if isTCP { + tcp6++ + } else { + udp6++ + } + case network.AFINET: + if isTCP { + tcp4++ + } else { + udp4++ + } + } + + if filter != nil && !filter(conn) { + continue + } + + if t.getTCPStats(tcp, key) { + updateTCPStats(conn, tcp, 0) + } + if retrans, ok := t.getTCPRetransmits(key, seen); ok { + updateTCPStats(conn, nil, retrans) + } + + *buffer.Next() = *conn + } + + if err := entries.Err(); err != nil { + if !errors.Is(err, ebpf.ErrIterationAborted) { + return fmt.Errorf("unable to iterate connection map: %w", err) + } + + log.Warn("eBPF conn_stats map iteration aborted. Some connections may not be reported") + EbpfTracerTelemetry.iterationAborts.Inc() + } + + updateTelemetry(tcp4, tcp6, udp4, udp6) + + return nil +} + +func updateTelemetry(tcp4 float64, tcp6 float64, udp4 float64, udp6 float64) { + EbpfTracerTelemetry.connections.Set(tcp4, "tcp", "v4") + EbpfTracerTelemetry.connections.Set(tcp6, "tcp", "v6") + EbpfTracerTelemetry.connections.Set(udp4, "udp", "v4") + EbpfTracerTelemetry.connections.Set(udp6, "udp", "v6") +} + +func removeConnectionFromTelemetry(conn *network.ConnectionStats) { + isTCP := conn.Type == network.TCP + switch conn.Family { + case network.AFINET6: + if isTCP { + EbpfTracerTelemetry.connections.Dec("tcp", "v6") + } else { + EbpfTracerTelemetry.connections.Dec("udp", "v6") + } + case network.AFINET: + if isTCP { + EbpfTracerTelemetry.connections.Dec("tcp", "v4") + } else { + EbpfTracerTelemetry.connections.Dec("udp", "v4") + } + } +} + +func (t *ebpfTracer) Remove(conn *network.ConnectionStats) error { + util.ConnStatsToTuple(conn, t.removeTuple) + + err := t.conns.Delete(t.removeTuple) + if err != nil { + // If this entry no longer exists in the eBPF map it means `tcp_close` has executed + // during this function call. In that case state.StoreClosedConnection() was already called for this connection, + // and we can't delete the corresponding client state, or we'll likely over-report the metric values. + // By skipping to the next iteration and not calling state.RemoveConnections() we'll let + // this connection expire "naturally" when either next connection check runs or the client itself expires. + return err + } + + removeConnectionFromTelemetry(conn) + + if conn.Type == network.TCP { + // We can ignore the error for this map since it will not always contain the entry + _ = t.tcpStats.Delete(t.removeTuple) + // We remove the PID from the tuple as it is not used in the retransmits map + pid := t.removeTuple.Pid + t.removeTuple.Pid = 0 + _ = t.tcpRetransmits.Delete(t.removeTuple) + t.removeTuple.Pid = pid + } + return nil +} + +func (t *ebpfTracer) getEBPFTelemetry() *netebpf.Telemetry { + var zero uint32 + mp, err := maps.GetMap[uint32, netebpf.Telemetry](t.m, probes.TelemetryMap) + if err != nil { + log.Warnf("error retrieving telemetry map: %s", err) + return nil + } + + tm := &netebpf.Telemetry{} + if err := mp.Lookup(&zero, tm); err != nil { + // This can happen if we haven't initialized the telemetry object yet + // so let's just use a trace log + if log.ShouldLog(seelog.TraceLvl) { + log.Tracef("error retrieving the telemetry struct: %s", err) + } + return nil + } + return tm +} + +// Describe returns all descriptions of the collector +func (t *ebpfTracer) Describe(ch chan<- *prometheus.Desc) { + ch <- EbpfTracerTelemetry.tcpFailedConnects + ch <- EbpfTracerTelemetry.TcpSentMiscounts + ch <- EbpfTracerTelemetry.unbatchedTcpClose + ch <- EbpfTracerTelemetry.unbatchedUdpClose + ch <- EbpfTracerTelemetry.UdpSendsProcessed + ch <- EbpfTracerTelemetry.UdpSendsMissed + ch <- EbpfTracerTelemetry.UdpDroppedConns + ch <- EbpfTracerTelemetry.doubleFlushAttemptsClose + ch <- EbpfTracerTelemetry.doubleFlushAttemptsDone + ch <- EbpfTracerTelemetry.unsupportedTcpFailures + ch <- EbpfTracerTelemetry.tcpDonePidMismatch +} + +// Collect returns the current state of all metrics of the collector +func (t *ebpfTracer) Collect(ch chan<- prometheus.Metric) { + ebpfTelemetry := t.getEBPFTelemetry() + if ebpfTelemetry == nil { + return + } + delta := int64(ebpfTelemetry.Tcp_failed_connect) - EbpfTracerTelemetry.lastTcpFailedConnects.Load() + EbpfTracerTelemetry.lastTcpFailedConnects.Store(int64(ebpfTelemetry.Tcp_failed_connect)) + ch <- prometheus.MustNewConstMetric(EbpfTracerTelemetry.tcpFailedConnects, prometheus.CounterValue, float64(delta)) + + delta = int64(ebpfTelemetry.Tcp_sent_miscounts) - EbpfTracerTelemetry.LastTcpSentMiscounts.Load() + EbpfTracerTelemetry.LastTcpSentMiscounts.Store(int64(ebpfTelemetry.Tcp_sent_miscounts)) + ch <- prometheus.MustNewConstMetric(EbpfTracerTelemetry.TcpSentMiscounts, prometheus.CounterValue, float64(delta)) + + delta = int64(ebpfTelemetry.Unbatched_tcp_close) - EbpfTracerTelemetry.lastUnbatchedTcpClose.Load() + EbpfTracerTelemetry.lastUnbatchedTcpClose.Store(int64(ebpfTelemetry.Unbatched_tcp_close)) + ch <- prometheus.MustNewConstMetric(EbpfTracerTelemetry.unbatchedTcpClose, prometheus.CounterValue, float64(delta)) + + delta = int64(ebpfTelemetry.Unbatched_udp_close) - EbpfTracerTelemetry.lastUnbatchedUdpClose.Load() + EbpfTracerTelemetry.lastUnbatchedUdpClose.Store(int64(ebpfTelemetry.Unbatched_udp_close)) + ch <- prometheus.MustNewConstMetric(EbpfTracerTelemetry.unbatchedUdpClose, prometheus.CounterValue, float64(delta)) + + delta = int64(ebpfTelemetry.Udp_sends_processed) - EbpfTracerTelemetry.lastUdpSendsProcessed.Load() + EbpfTracerTelemetry.lastUdpSendsProcessed.Store(int64(ebpfTelemetry.Udp_sends_processed)) + ch <- prometheus.MustNewConstMetric(EbpfTracerTelemetry.UdpSendsProcessed, prometheus.CounterValue, float64(delta)) + + delta = int64(ebpfTelemetry.Udp_sends_missed) - EbpfTracerTelemetry.lastUdpSendsMissed.Load() + EbpfTracerTelemetry.lastUdpSendsMissed.Store(int64(ebpfTelemetry.Udp_sends_missed)) + ch <- prometheus.MustNewConstMetric(EbpfTracerTelemetry.UdpSendsMissed, prometheus.CounterValue, float64(delta)) + + delta = int64(ebpfTelemetry.Udp_dropped_conns) - EbpfTracerTelemetry.lastUdpDroppedConns.Load() + EbpfTracerTelemetry.lastUdpDroppedConns.Store(int64(ebpfTelemetry.Udp_dropped_conns)) + ch <- prometheus.MustNewConstMetric(EbpfTracerTelemetry.UdpDroppedConns, prometheus.CounterValue, float64(delta)) + + delta = int64(ebpfTelemetry.Double_flush_attempts_close) - EbpfTracerTelemetry.lastDoubleFlushAttemptsClose.Load() + EbpfTracerTelemetry.lastDoubleFlushAttemptsClose.Store(int64(ebpfTelemetry.Double_flush_attempts_close)) + ch <- prometheus.MustNewConstMetric(EbpfTracerTelemetry.doubleFlushAttemptsClose, prometheus.CounterValue, float64(delta)) + + delta = int64(ebpfTelemetry.Double_flush_attempts_done) - EbpfTracerTelemetry.lastDoubleFlushAttemptsDone.Load() + EbpfTracerTelemetry.lastDoubleFlushAttemptsDone.Store(int64(ebpfTelemetry.Double_flush_attempts_done)) + ch <- prometheus.MustNewConstMetric(EbpfTracerTelemetry.doubleFlushAttemptsDone, prometheus.CounterValue, float64(delta)) + + delta = int64(ebpfTelemetry.Unsupported_tcp_failures) - EbpfTracerTelemetry.lastUnsupportedTcpFailures.Load() + EbpfTracerTelemetry.lastUnsupportedTcpFailures.Store(int64(ebpfTelemetry.Unsupported_tcp_failures)) + ch <- prometheus.MustNewConstMetric(EbpfTracerTelemetry.unsupportedTcpFailures, prometheus.CounterValue, float64(delta)) + + delta = int64(ebpfTelemetry.Tcp_done_pid_mismatch) - EbpfTracerTelemetry.lastTcpDonePidMismatch.Load() + EbpfTracerTelemetry.lastTcpDonePidMismatch.Store(int64(ebpfTelemetry.Tcp_done_pid_mismatch)) + ch <- prometheus.MustNewConstMetric(EbpfTracerTelemetry.tcpDonePidMismatch, prometheus.CounterValue, float64(delta)) + +} + +// DumpMaps (for debugging purpose) returns all maps content by default or selected maps from maps parameter. +func (t *ebpfTracer) DumpMaps(w io.Writer, maps ...string) error { + return t.m.DumpMaps(w, maps...) +} + +// Type returns the type of the underlying ebpf tracer that is currently loaded +func (t *ebpfTracer) Type() TracerType { + return t.ebpfTracerType +} + +func (t *ebpfTracer) initializePortBindingMaps() error { + tcpPorts, err := network.ReadListeningPorts(t.config.ProcRoot, network.TCP, t.config.CollectTCPv6Conns) + if err != nil { + return fmt.Errorf("failed to read initial TCP pid->port mapping: %s", err) + } + + tcpPortMap, err := maps.GetMap[netebpf.PortBinding, uint32](t.m, probes.PortBindingsMap) + if err != nil { + return fmt.Errorf("failed to get TCP port binding map: %w", err) + } + for p, count := range tcpPorts { + log.Debugf("adding initial TCP port binding: netns: %d port: %d", p.Ino, p.Port) + pb := netebpf.PortBinding{Netns: p.Ino, Port: p.Port} + err = tcpPortMap.Update(&pb, &count, ebpf.UpdateNoExist) + if err != nil && !errors.Is(err, ebpf.ErrKeyExist) { + return fmt.Errorf("failed to update TCP port binding map: %w", err) + } + } + + udpPorts, err := network.ReadListeningPorts(t.config.ProcRoot, network.UDP, t.config.CollectUDPv6Conns) + if err != nil { + return fmt.Errorf("failed to read initial UDP pid->port mapping: %s", err) + } + + udpPortMap, err := maps.GetMap[netebpf.PortBinding, uint32](t.m, probes.UDPPortBindingsMap) + if err != nil { + return fmt.Errorf("failed to get UDP port binding map: %w", err) + } + for p, count := range udpPorts { + // ignore ephemeral port binds as they are more likely to be from + // clients calling bind with port 0 + if network.IsPortInEphemeralRange(network.AFINET, network.UDP, p.Port) == network.EphemeralTrue { + log.Debugf("ignoring initial ephemeral UDP port bind to %d", p) + continue + } + + log.Debugf("adding initial UDP port binding: netns: %d port: %d", p.Ino, p.Port) + pb := netebpf.PortBinding{Netns: p.Ino, Port: p.Port} + err = udpPortMap.Update(&pb, &count, ebpf.UpdateNoExist) + if err != nil && !errors.Is(err, ebpf.ErrKeyExist) { + return fmt.Errorf("failed to update UDP port binding map: %w", err) + } + } + return nil +} + +func (t *ebpfTracer) getTCPRetransmits(tuple *netebpf.ConnTuple, seen map[netebpf.ConnTuple]struct{}) (uint32, bool) { + if tuple.Type() != netebpf.TCP { + return 0, false + } + + // The PID isn't used as a key in the stats map, we will temporarily set it to 0 here and reset it when we're done + pid := tuple.Pid + tuple.Pid = 0 + + var retransmits uint32 + if err := t.tcpRetransmits.Lookup(tuple, &retransmits); err == nil { + // This is required to avoid (over)reporting retransmits for connections sharing the same socket. + if _, reported := seen[*tuple]; reported { + EbpfTracerTelemetry.PidCollisions.Inc() + retransmits = 0 + } else { + seen[*tuple] = struct{}{} + } + } + + tuple.Pid = pid + return retransmits, true +} + +// getTCPStats reads tcp related stats for the given ConnTuple +func (t *ebpfTracer) getTCPStats(stats *netebpf.TCPStats, tuple *netebpf.ConnTuple) bool { + if tuple.Type() != netebpf.TCP { + return false + } + + return t.tcpStats.Lookup(tuple, stats) == nil +} + +func populateConnStats(stats *network.ConnectionStats, t *netebpf.ConnTuple, s *netebpf.ConnStats, ch *cookieHasher) { + *stats = network.ConnectionStats{ + Pid: t.Pid, + NetNS: t.Netns, + Source: t.SourceAddress(), + Dest: t.DestAddress(), + SPort: t.Sport, + DPort: t.Dport, + Monotonic: network.StatCounters{ + SentBytes: s.Sent_bytes, + RecvBytes: s.Recv_bytes, + SentPackets: uint64(s.Sent_packets), + RecvPackets: uint64(s.Recv_packets), + }, + LastUpdateEpoch: s.Timestamp, + IsAssured: s.IsAssured(), + Cookie: network.StatCookie(s.Cookie), + } + + if s.Duration <= uint64(math.MaxInt64) { + stats.Duration = time.Duration(s.Duration) * time.Nanosecond + } + + stats.ProtocolStack = protocols.Stack{ + API: protocols.API(s.Protocol_stack.Api), + Application: protocols.Application(s.Protocol_stack.Application), + Encryption: protocols.Encryption(s.Protocol_stack.Encryption), + } + + if t.Type() == netebpf.TCP { + stats.Type = network.TCP + } else { + stats.Type = network.UDP + } + + switch t.Family() { + case netebpf.IPv4: + stats.Family = network.AFINET + case netebpf.IPv6: + stats.Family = network.AFINET6 + } + + stats.SPortIsEphemeral = network.IsPortInEphemeralRange(stats.Family, stats.Type, t.Sport) + + switch s.ConnectionDirection() { + case netebpf.Incoming: + stats.Direction = network.INCOMING + case netebpf.Outgoing: + stats.Direction = network.OUTGOING + default: + stats.Direction = network.OUTGOING + } + + if ch != nil { + ch.Hash(stats) + } +} + +func updateTCPStats(conn *network.ConnectionStats, tcpStats *netebpf.TCPStats, retransmits uint32) { + if conn.Type != network.TCP { + return + } + + conn.Monotonic.Retransmits = retransmits + if tcpStats != nil { + conn.Monotonic.TCPEstablished = uint32(tcpStats.State_transitions >> netebpf.Established & 1) + conn.Monotonic.TCPClosed = uint32(tcpStats.State_transitions >> netebpf.Close & 1) + conn.RTT = tcpStats.Rtt + conn.RTTVar = tcpStats.Rtt_var + } +} diff --git a/pkg/network/tracer/connection/ebpfless/payload.go b/pkg/network/tracer/connection/ebpfless/payload.go new file mode 100644 index 00000000000000..50d9e1d190b531 --- /dev/null +++ b/pkg/network/tracer/connection/ebpfless/payload.go @@ -0,0 +1,102 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024-present Datadog, Inc. + +//go:build linux + +// Package ebpfless contains supporting code for the ebpfless tracer +package ebpfless + +import ( + "errors" + "fmt" + + "github.com/google/gopacket" + "github.com/google/gopacket/layers" + + "github.com/DataDog/datadog-agent/pkg/network" +) + +var errZeroLengthUDPPacket = errors.New("UDP packet with length 0") +var errZeroLengthIPPacket = errors.New("IP packet with length 0") + +// UDPPayloadLen returns the UDP payload length from a layers.UDP object +func UDPPayloadLen(udp *layers.UDP) (uint16, error) { + if udp.Length == 0 { + return 0, errZeroLengthUDPPacket + } + + // Length includes the header (8 bytes), + // so we need to exclude that here + return udp.Length - 8, nil +} + +// TCPPayloadLen returns the TCP payload length from a layers.TCP object +func TCPPayloadLen(family network.ConnectionFamily, ip4 *layers.IPv4, ip6 *layers.IPv6, tcp *layers.TCP) (uint16, error) { + var ipl uint16 + var err error + switch family { + case network.AFINET: + ipl, err = ipv4PayloadLen(ip4) + case network.AFINET6: + ipl, err = ipv6PayloadLen(ip6) + default: + return 0, fmt.Errorf("unknown family %s", family) + } + + if err != nil { + return 0, nil + } + + if ipl == 0 { + return 0, errZeroLengthIPPacket + } + + // the data offset field in the TCP header specifies + // the length of the TCP header in 32 bit words, so + // subtracting that here to get the payload size + // + // see https://en.wikipedia.org/wiki/Transmission_Control_Protocol#TCP_segment_structure + return ipl - uint16(tcp.DataOffset)*4, nil +} + +func ipv4PayloadLen(ip4 *layers.IPv4) (uint16, error) { + // the IHL field specifies the the size of the IP + // header in 32 bit words, so subtracting that here + // to get the payload size + // + // see https://en.wikipedia.org/wiki/IPv4#Header + return ip4.Length - uint16(ip4.IHL)*4, nil +} + +func ipv6PayloadLen(ip6 *layers.IPv6) (uint16, error) { + if ip6.NextHeader == layers.IPProtocolUDP || ip6.NextHeader == layers.IPProtocolTCP { + return ip6.Length, nil + } + + var ipExt layers.IPv6ExtensionSkipper + parser := gopacket.NewDecodingLayerParser(gopacket.LayerTypePayload, &ipExt) + decoded := make([]gopacket.LayerType, 0, 1) + l := ip6.Length + payload := ip6.Payload + for len(payload) > 0 { + err := parser.DecodeLayers(payload, &decoded) + if err != nil { + return 0, fmt.Errorf("error decoding with ipv6 extension skipper: %w", err) + } + + if len(decoded) == 0 { + return l, nil + } + + l -= uint16(len(ipExt.Contents)) + if ipExt.NextHeader == layers.IPProtocolTCP || ipExt.NextHeader == layers.IPProtocolUDP { + break + } + + payload = ipExt.Payload + } + + return l, nil +} diff --git a/pkg/network/tracer/connection/ebpfless/ports.go b/pkg/network/tracer/connection/ebpfless/ports.go new file mode 100644 index 00000000000000..00ebebf273c643 --- /dev/null +++ b/pkg/network/tracer/connection/ebpfless/ports.go @@ -0,0 +1,130 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024-present Datadog, Inc. + +//go:build linux + +// Package ebpfless contains supporting code for the ebpfless tracer +package ebpfless + +import ( + "fmt" + "sync" + "time" + + "github.com/DataDog/datadog-agent/pkg/network" + "github.com/DataDog/datadog-agent/pkg/network/config" + "github.com/DataDog/datadog-agent/pkg/util/kernel" + "github.com/DataDog/datadog-agent/pkg/util/log" +) + +type boundPortsKey struct { + proto network.ConnectionType + port uint16 +} + +// BoundPorts is a collection of bound ports on the host +// that is periodically updated from procfs +type BoundPorts struct { + mu sync.RWMutex + + config *config.Config + ports map[boundPortsKey]struct{} + + stop chan struct{} + ino uint32 +} + +// NewBoundPorts returns a new BoundPorts instance +func NewBoundPorts(cfg *config.Config) *BoundPorts { + ino, _ := kernel.GetCurrentIno() + return &BoundPorts{ + config: cfg, + ports: map[boundPortsKey]struct{}{}, + stop: make(chan struct{}), + ino: ino, + } +} + +// Start starts a BoundPorts instance +func (b *BoundPorts) Start() error { + if err := b.update(); err != nil { + return err + } + + go func() { + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + for { + select { + case <-b.stop: + return + case <-ticker.C: + if err := b.update(); err != nil { + log.Errorf("error updating bound ports, exiting loop: %s", err) + return + } + } + } + }() + + return nil +} + +// Stop stops a BoundPorts instance +func (b *BoundPorts) Stop() { + close(b.stop) +} + +func (b *BoundPorts) update() error { + b.mu.Lock() + defer b.mu.Unlock() + + tcpPorts, err := network.ReadListeningPorts(b.config.ProcRoot, network.TCP, b.config.CollectTCPv6Conns) + if err != nil { + return fmt.Errorf("failed to read initial TCP pid->port mapping: %s", err) + } + + for p := range tcpPorts { + if p.Ino != b.ino { + continue + } + log.Debugf("adding initial TCP port binding: netns: %d port: %d", p.Ino, p.Port) + b.ports[boundPortsKey{network.TCP, p.Port}] = struct{}{} + } + + udpPorts, err := network.ReadListeningPorts(b.config.ProcRoot, network.UDP, b.config.CollectUDPv6Conns) + if err != nil { + return fmt.Errorf("failed to read initial UDP pid->port mapping: %s", err) + } + + for p := range udpPorts { + // ignore ephemeral port binds as they are more likely to be from + // clients calling bind with port 0 + if network.IsPortInEphemeralRange(network.AFINET, network.UDP, p.Port) == network.EphemeralTrue { + log.Debugf("ignoring initial ephemeral UDP port bind to %d", p) + continue + } + + if p.Ino != b.ino { + continue + } + + log.Debugf("adding initial UDP port binding: netns: %d port: %d", p.Ino, p.Port) + b.ports[boundPortsKey{network.UDP, p.Port}] = struct{}{} + } + + return nil + +} + +// Find returns `true` if the given `(proto, port)` exists in +// the BoundPorts collection +func (b *BoundPorts) Find(proto network.ConnectionType, port uint16) bool { + b.mu.RLock() + defer b.mu.RUnlock() + + _, ok := b.ports[boundPortsKey{proto, port}] + return ok +} diff --git a/pkg/network/tracer/connection/ebpfless_tracer.go b/pkg/network/tracer/connection/ebpfless_tracer.go new file mode 100644 index 00000000000000..fd57ec1b3d0ba8 --- /dev/null +++ b/pkg/network/tracer/connection/ebpfless_tracer.go @@ -0,0 +1,417 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +//go:build linux && npm + +package connection + +import ( + "fmt" + "io" + "sync" + "time" + + "github.com/cilium/ebpf" + "github.com/google/gopacket" + "github.com/google/gopacket/layers" + "github.com/prometheus/client_golang/prometheus" + "github.com/vishvananda/netns" + "golang.org/x/sys/unix" + + "github.com/DataDog/datadog-agent/pkg/network" + "github.com/DataDog/datadog-agent/pkg/network/config" + "github.com/DataDog/datadog-agent/pkg/network/filter" + "github.com/DataDog/datadog-agent/pkg/network/tracer/connection/ebpfless" + "github.com/DataDog/datadog-agent/pkg/network/tracer/connection/failure" + "github.com/DataDog/datadog-agent/pkg/process/util" + "github.com/DataDog/datadog-agent/pkg/telemetry" + "github.com/DataDog/datadog-agent/pkg/util/log" +) + +const ( + // the segment length to read + // mac header (with vlan) + ip header + tcp header + segmentLen = 18 + 60 + 60 + + ebpfLessTelemetryPrefix = "network_tracer__ebpfless" +) + +var ( + ebpfLessTracerTelemetry = struct { + skippedPackets telemetry.Counter + }{ + telemetry.NewCounter(ebpfLessTelemetryPrefix, "skipped_packets", []string{"reason"}, "Counter measuring skipped packets"), + } +) + +type ebpfLessTracer struct { + m sync.Mutex + + config *config.Config + + packetSrc *filter.AFPacketSource + exit chan struct{} + keyBuf []byte + scratchConn *network.ConnectionStats + + udp *udpProcessor + tcp *tcpProcessor + + // connection maps + conns map[string]*network.ConnectionStats + boundPorts *ebpfless.BoundPorts + cookieHasher *cookieHasher + + ns netns.NsHandle +} + +// newEbpfLessTracer creates a new ebpfLessTracer instance +func newEbpfLessTracer(cfg *config.Config) (*ebpfLessTracer, error) { + packetSrc, err := filter.NewAFPacketSource( + 8<<20, // 8 MB total space + filter.OptSnapLen(segmentLen)) + if err != nil { + return nil, fmt.Errorf("error creating packet source: %w", err) + } + + tr := &ebpfLessTracer{ + config: cfg, + packetSrc: packetSrc, + exit: make(chan struct{}), + keyBuf: make([]byte, network.ConnectionByteKeyMaxLen), + scratchConn: &network.ConnectionStats{}, + udp: &udpProcessor{}, + tcp: newTCPProcessor(), + conns: make(map[string]*network.ConnectionStats, cfg.MaxTrackedConnections), + boundPorts: ebpfless.NewBoundPorts(cfg), + cookieHasher: newCookieHasher(), + } + + tr.ns, err = netns.Get() + if err != nil { + return nil, fmt.Errorf("error getting current net ns: %w", err) + } + + return tr, nil +} + +// Start begins collecting network connection data. +func (t *ebpfLessTracer) Start(func(*network.ConnectionStats)) error { + if err := t.boundPorts.Start(); err != nil { + return fmt.Errorf("could not update bound ports: %w", err) + } + + go func() { + var eth layers.Ethernet + var ip4 layers.IPv4 + var ip6 layers.IPv6 + var tcp layers.TCP + var udp layers.UDP + decoded := make([]gopacket.LayerType, 0, 5) + parser := gopacket.NewDecodingLayerParser(layers.LayerTypeEthernet, ð, &ip4, &ip6, &tcp, &udp) + parser.IgnoreUnsupported = true + for { + err := t.packetSrc.VisitPackets(t.exit, func(b []byte, info filter.PacketInfo, ts time.Time) error { + if err := parser.DecodeLayers(b, &decoded); err != nil { + return fmt.Errorf("error decoding packet layers: %w", err) + } + + pktType := info.(*filter.AFPacketInfo).PktType + // only process PACKET_HOST and PACK_OUTGOING packets + if pktType != unix.PACKET_HOST && pktType != unix.PACKET_OUTGOING { + ebpfLessTracerTelemetry.skippedPackets.Inc("unsupported_packet_type") + return nil + } + + if err := t.processConnection(pktType, &ip4, &ip6, &udp, &tcp, decoded); err != nil { + log.Warnf("could not process packet: %s", err) + } + + return nil + }) + + if err != nil { + log.Errorf("exiting packet loop: %s", err) + return + } + } + }() + + return nil +} + +func (t *ebpfLessTracer) processConnection( + pktType uint8, + ip4 *layers.IPv4, + ip6 *layers.IPv6, + udp *layers.UDP, + tcp *layers.TCP, + decoded []gopacket.LayerType, +) error { + t.scratchConn.Source, t.scratchConn.Dest = util.Address{}, util.Address{} + t.scratchConn.SPort, t.scratchConn.DPort = 0, 0 + var udpPresent, tcpPresent bool + for _, layerType := range decoded { + switch layerType { + case layers.LayerTypeIPv4: + t.scratchConn.Source = util.AddressFromNetIP(ip4.SrcIP) + t.scratchConn.Dest = util.AddressFromNetIP(ip4.DstIP) + t.scratchConn.Family = network.AFINET + case layers.LayerTypeIPv6: + t.scratchConn.Source = util.AddressFromNetIP(ip6.SrcIP) + t.scratchConn.Dest = util.AddressFromNetIP(ip6.DstIP) + t.scratchConn.Family = network.AFINET6 + case layers.LayerTypeTCP: + t.scratchConn.SPort = uint16(tcp.SrcPort) + t.scratchConn.DPort = uint16(tcp.DstPort) + t.scratchConn.Type = network.TCP + tcpPresent = true + case layers.LayerTypeUDP: + t.scratchConn.SPort = uint16(udp.SrcPort) + t.scratchConn.DPort = uint16(udp.DstPort) + t.scratchConn.Type = network.UDP + udpPresent = true + } + } + + // check if have all the basic pieces + if !udpPresent && !tcpPresent { + log.Debugf("ignoring packet since its not udp or tcp") + ebpfLessTracerTelemetry.skippedPackets.Inc("not_tcp_udp") + return nil + } + + flipSourceDest(t.scratchConn, pktType) + t.determineConnectionDirection(t.scratchConn, pktType) + + t.m.Lock() + defer t.m.Unlock() + + key := string(t.scratchConn.ByteKey(t.keyBuf)) + conn := t.conns[key] + if conn == nil { + conn = &network.ConnectionStats{} + *conn = *t.scratchConn + t.cookieHasher.Hash(conn) + conn.Duration = time.Duration(time.Now().UnixNano()) + } + + var err error + switch conn.Type { + case network.UDP: + err = t.udp.process(conn, pktType, udp) + case network.TCP: + err = t.tcp.process(conn, pktType, ip4, ip6, tcp) + default: + err = fmt.Errorf("unsupported connection type %d", conn.Type) + } + + if err != nil { + return fmt.Errorf("error processing connection: %w", err) + } + + if conn.Type == network.UDP || conn.Monotonic.TCPEstablished > 0 { + conn.LastUpdateEpoch = uint64(time.Now().UnixNano()) + t.conns[key] = conn + } + + log.TraceFunc(func() string { + return fmt.Sprintf("connection: %s", conn) + }) + return nil +} + +func flipSourceDest(conn *network.ConnectionStats, pktType uint8) { + if pktType == unix.PACKET_HOST { + conn.Dest, conn.Source = conn.Source, conn.Dest + conn.DPort, conn.SPort = conn.SPort, conn.DPort + } +} + +func (t *ebpfLessTracer) determineConnectionDirection(conn *network.ConnectionStats, pktType uint8) { + t.m.Lock() + defer t.m.Unlock() + + ok := t.boundPorts.Find(conn.Type, conn.SPort) + if ok { + // incoming connection + conn.Direction = network.INCOMING + return + } + + if conn.Type == network.TCP { + switch pktType { + case unix.PACKET_HOST: + conn.Direction = network.INCOMING + case unix.PACKET_OUTGOING: + conn.Direction = network.OUTGOING + } + } +} + +// Stop halts all network data collection. +func (t *ebpfLessTracer) Stop() { + if t == nil { + return + } + + close(t.exit) + t.ns.Close() + t.boundPorts.Stop() +} + +// GetConnections returns the list of currently active connections, using the buffer provided. +// The optional filter function is used to prevent unwanted connections from being returned and consuming resources. +func (t *ebpfLessTracer) GetConnections(buffer *network.ConnectionBuffer, filter func(*network.ConnectionStats) bool) error { + t.m.Lock() + defer t.m.Unlock() + + if len(t.conns) == 0 { + return nil + } + + log.Trace(t.conns) + conns := make([]network.ConnectionStats, 0, len(t.conns)) + for _, c := range t.conns { + if filter != nil && !filter(c) { + continue + } + + conns = append(conns, *c) + } + + buffer.Append(conns) + return nil +} + +// FlushPending forces any closed connections waiting for batching to be processed immediately. +func (t *ebpfLessTracer) FlushPending() {} + +// Remove deletes the connection from tracking state. +// It does not prevent the connection from re-appearing later, if additional traffic occurs. +func (t *ebpfLessTracer) Remove(conn *network.ConnectionStats) error { + t.m.Lock() + defer t.m.Unlock() + + delete(t.conns, string(conn.ByteKey(t.keyBuf))) + return nil +} + +// GetMap returns the underlying named map. This is useful if any maps are shared with other eBPF components. +// An individual ebpfLessTracer implementation may choose which maps to expose via this function. +func (t *ebpfLessTracer) GetMap(string) *ebpf.Map { return nil } + +// DumpMaps (for debugging purpose) returns all maps content by default or selected maps from maps parameter. +func (t *ebpfLessTracer) DumpMaps(_ io.Writer, _ ...string) error { + return fmt.Errorf("not implemented") +} + +// Type returns the type of the underlying ebpf ebpfLessTracer that is currently loaded +func (t *ebpfLessTracer) Type() TracerType { + return TracerTypeEbpfless +} + +func (t *ebpfLessTracer) Pause() error { + return fmt.Errorf("not implemented") +} + +func (t *ebpfLessTracer) Resume() error { + return fmt.Errorf("not implemented") +} + +// Describe returns all descriptions of the collector +func (t *ebpfLessTracer) Describe(_ chan<- *prometheus.Desc) {} + +// Collect returns the current state of all metrics of the collector +func (t *ebpfLessTracer) Collect(_ chan<- prometheus.Metric) {} + +// GetFailedConnections returns the underlying map used to store failed connections +func (t *ebpfLessTracer) GetFailedConnections() *failure.FailedConns { return nil } + +var _ Tracer = &ebpfLessTracer{} + +type udpProcessor struct { +} + +func (u *udpProcessor) process(conn *network.ConnectionStats, pktType uint8, udp *layers.UDP) error { + payloadLen, err := ebpfless.UDPPayloadLen(udp) + if err != nil { + return err + } + + switch pktType { + case unix.PACKET_OUTGOING: + conn.Monotonic.SentPackets++ + conn.Monotonic.SentBytes += uint64(payloadLen) + case unix.PACKET_HOST: + conn.Monotonic.RecvPackets++ + conn.Monotonic.RecvBytes += uint64(payloadLen) + } + + return nil +} + +type tcpProcessor struct { + buf []byte + conns map[string]struct { + established bool + closed bool + } +} + +func newTCPProcessor() *tcpProcessor { + return &tcpProcessor{ + buf: make([]byte, network.ConnectionByteKeyMaxLen), + conns: map[string]struct { + established bool + closed bool + }{}, + } +} + +func (t *tcpProcessor) process(conn *network.ConnectionStats, pktType uint8, ip4 *layers.IPv4, ip6 *layers.IPv6, tcp *layers.TCP) error { + payloadLen, err := ebpfless.TCPPayloadLen(conn.Family, ip4, ip6, tcp) + if err != nil { + return err + } + + log.TraceFunc(func() string { + return fmt.Sprintf("tcp processor: pktType=%+v seq=%+v ack=%+v fin=%+v rst=%+v syn=%+v ack=%+v", pktType, tcp.Seq, tcp.Ack, tcp.FIN, tcp.RST, tcp.SYN, tcp.ACK) + }) + key := string(conn.ByteKey(t.buf)) + c := t.conns[key] + log.TraceFunc(func() string { + return fmt.Sprintf("pre ack_seq=%+v", c) + }) + switch pktType { + case unix.PACKET_OUTGOING: + conn.Monotonic.SentPackets++ + conn.Monotonic.SentBytes += uint64(payloadLen) + case unix.PACKET_HOST: + conn.Monotonic.RecvPackets++ + conn.Monotonic.RecvBytes += uint64(payloadLen) + } + + if tcp.FIN || tcp.RST { + if !c.closed { + c.closed = true + conn.Monotonic.TCPClosed++ + conn.Duration = time.Duration(time.Now().UnixNano() - int64(conn.Duration)) + } + delete(t.conns, key) + return nil + } + + if !tcp.SYN && !c.established { + c.established = true + conn.Monotonic.TCPEstablished++ + } + + log.TraceFunc(func() string { + return fmt.Sprintf("ack_seq=%+v", c) + }) + t.conns[key] = c + return nil +} diff --git a/pkg/network/tracer/connection/tracer.go b/pkg/network/tracer/connection/tracer.go index 8c580e43339ade..a8a9a13112b0e0 100644 --- a/pkg/network/tracer/connection/tracer.go +++ b/pkg/network/tracer/connection/tracer.go @@ -3,44 +3,20 @@ // This product includes software developed at Datadog (https://www.datadoghq.com/). // Copyright 2016-present Datadog, Inc. -//go:build linux_bpf +//go:build linux && npm package connection import ( - "encoding/binary" - "errors" - "fmt" - "hash" "io" - "math" - "sync" - "time" - "github.com/cihub/seelog" "github.com/cilium/ebpf" "github.com/prometheus/client_golang/prometheus" - "github.com/twmb/murmur3" - "go.uber.org/atomic" - "golang.org/x/sys/unix" - manager "github.com/DataDog/ebpf-manager" - - telemetryComponent "github.com/DataDog/datadog-agent/comp/core/telemetry" - ddebpf "github.com/DataDog/datadog-agent/pkg/ebpf" - "github.com/DataDog/datadog-agent/pkg/ebpf/maps" - ebpftelemetry "github.com/DataDog/datadog-agent/pkg/ebpf/telemetry" + "github.com/DataDog/datadog-agent/comp/core/telemetry" "github.com/DataDog/datadog-agent/pkg/network" "github.com/DataDog/datadog-agent/pkg/network/config" - netebpf "github.com/DataDog/datadog-agent/pkg/network/ebpf" - "github.com/DataDog/datadog-agent/pkg/network/ebpf/probes" - "github.com/DataDog/datadog-agent/pkg/network/protocols" "github.com/DataDog/datadog-agent/pkg/network/tracer/connection/failure" - "github.com/DataDog/datadog-agent/pkg/network/tracer/connection/fentry" - "github.com/DataDog/datadog-agent/pkg/network/tracer/connection/kprobe" - "github.com/DataDog/datadog-agent/pkg/network/tracer/connection/util" - "github.com/DataDog/datadog-agent/pkg/telemetry" - "github.com/DataDog/datadog-agent/pkg/util/log" ) // TracerType is the type of the underlying tracer @@ -55,6 +31,8 @@ const ( TracerTypeKProbeCORE //nolint:revive // TODO(NET) Fix revive linter TracerTypeFentry + //nolint:revive // TODO(NET) Fix revive linter + TracerTypeEbpfless ) const ( @@ -96,716 +74,11 @@ type Tracer interface { Collect(metrics chan<- prometheus.Metric) } -const ( - defaultClosedChannelSize = 500 - defaultFailedChannelSize = 500 - connTracerModuleName = "network_tracer__ebpf" -) - -//nolint:revive // TODO(NET) Fix revive linter -var ConnTracerTelemetry = struct { - connections telemetry.Gauge - tcpFailedConnects *prometheus.Desc - //nolint:revive // TODO(NET) Fix revive linter - TcpSentMiscounts *prometheus.Desc - //nolint:revive // TODO(NET) Fix revive linter - unbatchedTcpClose *prometheus.Desc - //nolint:revive // TODO(NET) Fix revive linter - unbatchedUdpClose *prometheus.Desc - //nolint:revive // TODO(NET) Fix revive linter - UdpSendsProcessed *prometheus.Desc - //nolint:revive // TODO(NET) Fix revive linter - UdpSendsMissed *prometheus.Desc - //nolint:revive // TODO(NET) Fix revive linter - UdpDroppedConns *prometheus.Desc - // doubleFlushAttemptsClose is a counter measuring the number of attempts to flush a closed connection twice from tcp_close - doubleFlushAttemptsClose *prometheus.Desc - // doubleFlushAttemptsDone is a counter measuring the number of attempts to flush a closed connection twice from tcp_done - doubleFlushAttemptsDone *prometheus.Desc - // unsupportedTcpFailures is a counter measuring the number of attempts to flush a TCP failure that is not supported - unsupportedTcpFailures *prometheus.Desc - // tcpDonePidMismatch is a counter measuring the number of TCP connections with a PID mismatch between tcp_connect and tcp_done - tcpDonePidMismatch *prometheus.Desc - PidCollisions *telemetry.StatCounterWrapper - iterationDups telemetry.Counter - iterationAborts telemetry.Counter - - //nolint:revive // TODO(NET) Fix revive linter - lastTcpFailedConnects *atomic.Int64 - //nolint:revive // TODO(NET) Fix revive linter - LastTcpSentMiscounts *atomic.Int64 - //nolint:revive // TODO(NET) Fix revive linter - lastUnbatchedTcpClose *atomic.Int64 - //nolint:revive // TODO(NET) Fix revive linter - lastUnbatchedUdpClose *atomic.Int64 - //nolint:revive // TODO(NET) Fix revive linter - lastUdpSendsProcessed *atomic.Int64 - //nolint:revive // TODO(NET) Fix revive linter - lastUdpSendsMissed *atomic.Int64 - //nolint:revive // TODO(NET) Fix revive linter - lastUdpDroppedConns *atomic.Int64 - // lastDoubleFlushAttemptsClose is a counter measuring the diff between the last two values of doubleFlushAttemptsClose - lastDoubleFlushAttemptsClose *atomic.Int64 - // lastDoubleFlushAttemptsDone is a counter measuring the diff between the last two values of doubleFlushAttemptsDone - lastDoubleFlushAttemptsDone *atomic.Int64 - // lastUnsupportedTcpFailures is a counter measuring the diff between the last two values of unsupportedTcpFailures - lastUnsupportedTcpFailures *atomic.Int64 - // lastTcpDonePidMismatch is a counter measuring the diff between the last two values of tcpDonePidMismatch - lastTcpDonePidMismatch *atomic.Int64 -}{ - telemetry.NewGauge(connTracerModuleName, "connections", []string{"ip_proto", "family"}, "Gauge measuring the number of active connections in the EBPF map"), - prometheus.NewDesc(connTracerModuleName+"__tcp_failed_connects", "Counter measuring the number of failed TCP connections in the EBPF map", nil, nil), - prometheus.NewDesc(connTracerModuleName+"__tcp_sent_miscounts", "Counter measuring the number of miscounted tcp sends in the EBPF map", nil, nil), - prometheus.NewDesc(connTracerModuleName+"__unbatched_tcp_close", "Counter measuring the number of missed TCP close events in the EBPF map", nil, nil), - prometheus.NewDesc(connTracerModuleName+"__unbatched_udp_close", "Counter measuring the number of missed UDP close events in the EBPF map", nil, nil), - prometheus.NewDesc(connTracerModuleName+"__udp_sends_processed", "Counter measuring the number of processed UDP sends in EBPF", nil, nil), - prometheus.NewDesc(connTracerModuleName+"__udp_sends_missed", "Counter measuring failures to process UDP sends in EBPF", nil, nil), - prometheus.NewDesc(connTracerModuleName+"__udp_dropped_conns", "Counter measuring the number of dropped UDP connections in the EBPF map", nil, nil), - prometheus.NewDesc(connTracerModuleName+"__double_flush_attempts_close", "Counter measuring the number of attempts to flush a closed connection twice from tcp_close", nil, nil), - prometheus.NewDesc(connTracerModuleName+"__double_flush_attempts_done", "Counter measuring the number of attempts to flush a closed connection twice from tcp_done", nil, nil), - prometheus.NewDesc(connTracerModuleName+"__unsupported_tcp_failures", "Counter measuring the number of attempts to flush a TCP failure that is not supported", nil, nil), - prometheus.NewDesc(connTracerModuleName+"__tcp_done_pid_mismatch", "Counter measuring the number of TCP connections with a PID mismatch between tcp_connect and tcp_done", nil, nil), - telemetry.NewStatCounterWrapper(connTracerModuleName, "pid_collisions", []string{}, "Counter measuring number of process collisions"), - telemetry.NewCounter(connTracerModuleName, "iteration_dups", []string{}, "Counter measuring the number of connections iterated more than once"), - telemetry.NewCounter(connTracerModuleName, "iteration_aborts", []string{}, "Counter measuring how many times ebpf iteration of connection map was aborted"), - atomic.NewInt64(0), - atomic.NewInt64(0), - atomic.NewInt64(0), - atomic.NewInt64(0), - atomic.NewInt64(0), - atomic.NewInt64(0), - atomic.NewInt64(0), - atomic.NewInt64(0), - atomic.NewInt64(0), - atomic.NewInt64(0), - atomic.NewInt64(0), -} - -type tracer struct { - m *manager.Manager - - conns *maps.GenericMap[netebpf.ConnTuple, netebpf.ConnStats] - tcpStats *maps.GenericMap[netebpf.ConnTuple, netebpf.TCPStats] - tcpRetransmits *maps.GenericMap[netebpf.ConnTuple, uint32] - config *config.Config - - // tcp_close events - closeConsumer *tcpCloseConsumer - // tcp failure events - failedConnConsumer *failure.TCPFailedConnConsumer - - removeTuple *netebpf.ConnTuple - - closeTracer func() - stopOnce sync.Once - - ebpfTracerType TracerType - - exitTelemetry chan struct{} - - ch *cookieHasher -} - -// NewTracer creates a new tracer -func NewTracer(config *config.Config, _ telemetryComponent.Component) (Tracer, error) { - mgrOptions := manager.Options{ - // Extend RLIMIT_MEMLOCK (8) size - // On some systems, the default for RLIMIT_MEMLOCK may be as low as 64 bytes. - // This will result in an EPERM (Operation not permitted) error, when trying to create an eBPF map - // using bpf(2) with BPF_MAP_CREATE. - // - // We are setting the limit to infinity until we have a better handle on the true requirements. - RLimit: &unix.Rlimit{ - Cur: math.MaxUint64, - Max: math.MaxUint64, - }, - MapSpecEditors: map[string]manager.MapSpecEditor{ - probes.ConnMap: {MaxEntries: config.MaxTrackedConnections, EditorFlag: manager.EditMaxEntries}, - probes.TCPStatsMap: {MaxEntries: config.MaxTrackedConnections, EditorFlag: manager.EditMaxEntries}, - probes.TCPRetransmitsMap: {MaxEntries: config.MaxTrackedConnections, EditorFlag: manager.EditMaxEntries}, - probes.PortBindingsMap: {MaxEntries: config.MaxTrackedConnections, EditorFlag: manager.EditMaxEntries}, - probes.UDPPortBindingsMap: {MaxEntries: config.MaxTrackedConnections, EditorFlag: manager.EditMaxEntries}, - probes.ConnectionProtocolMap: {MaxEntries: config.MaxTrackedConnections, EditorFlag: manager.EditMaxEntries}, - probes.ConnectionTupleToSocketSKBConnMap: {MaxEntries: config.MaxTrackedConnections, EditorFlag: manager.EditMaxEntries}, - }, - ConstantEditors: []manager.ConstantEditor{ - boolConst("tcpv6_enabled", config.CollectTCPv6Conns), - boolConst("udpv6_enabled", config.CollectUDPv6Conns), - }, - DefaultKProbeMaxActive: maxActive, - BypassEnabled: config.BypassEnabled, +// NewTracer returns a new Tracer +func NewTracer(cfg *config.Config, telemetryComp telemetry.Component) (Tracer, error) { + if cfg.EnableEbpfless { + return newEbpfLessTracer(cfg) } - begin, end := network.EphemeralRange() - mgrOptions.ConstantEditors = append(mgrOptions.ConstantEditors, - manager.ConstantEditor{Name: "ephemeral_range_begin", Value: uint64(begin)}, - manager.ConstantEditor{Name: "ephemeral_range_end", Value: uint64(end)}) - - closedChannelSize := defaultClosedChannelSize - if config.ClosedChannelSize > 0 { - closedChannelSize = config.ClosedChannelSize - } - var connCloseEventHandler ddebpf.EventHandler - var failedConnsHandler ddebpf.EventHandler - if config.RingBufferSupportedNPM() { - connCloseEventHandler = ddebpf.NewRingBufferHandler(closedChannelSize) - failedConnsHandler = ddebpf.NewRingBufferHandler(defaultFailedChannelSize) - } else { - connCloseEventHandler = ddebpf.NewPerfHandler(closedChannelSize) - failedConnsHandler = ddebpf.NewPerfHandler(defaultFailedChannelSize) - } - - var m *manager.Manager - //nolint:revive // TODO(NET) Fix revive linter - var tracerType TracerType = TracerTypeFentry - var closeTracerFn func() - m, closeTracerFn, err := fentry.LoadTracer(config, mgrOptions, connCloseEventHandler) - if err != nil && !errors.Is(err, fentry.ErrorNotSupported) { - // failed to load fentry tracer - return nil, err - } - - if err != nil { - // load the kprobe tracer - log.Info("fentry tracer not supported, falling back to kprobe tracer") - var kprobeTracerType kprobe.TracerType - m, closeTracerFn, kprobeTracerType, err = kprobe.LoadTracer(config, mgrOptions, connCloseEventHandler, failedConnsHandler) - if err != nil { - return nil, err - } - tracerType = TracerType(kprobeTracerType) - } - m.DumpHandler = dumpMapsHandler - ddebpf.AddNameMappings(m, "npm_tracer") - - numCPUs, err := ebpf.PossibleCPU() - if err != nil { - return nil, fmt.Errorf("could not determine number of CPUs: %w", err) - } - extractor := newBatchExtractor(numCPUs) - batchMgr, err := newConnBatchManager(m, extractor) - if err != nil { - return nil, fmt.Errorf("could not create connection batch manager: %w", err) - } - - closeConsumer := newTCPCloseConsumer(connCloseEventHandler, batchMgr) - - var failedConnConsumer *failure.TCPFailedConnConsumer - // Failed connections are not supported on prebuilt - if tracerType == TracerTypeKProbePrebuilt { - config.TCPFailedConnectionsEnabled = false - } - if config.FailedConnectionsSupported() { - failedConnConsumer = failure.NewFailedConnConsumer(failedConnsHandler, m, config.MaxFailedConnectionsBuffered) - } - - tr := &tracer{ - m: m, - config: config, - closeConsumer: closeConsumer, - failedConnConsumer: failedConnConsumer, - removeTuple: &netebpf.ConnTuple{}, - closeTracer: closeTracerFn, - ebpfTracerType: tracerType, - exitTelemetry: make(chan struct{}), - ch: newCookieHasher(), - } - - tr.conns, err = maps.GetMap[netebpf.ConnTuple, netebpf.ConnStats](m, probes.ConnMap) - if err != nil { - tr.Stop() - return nil, fmt.Errorf("error retrieving the bpf %s map: %s", probes.ConnMap, err) - } - - tr.tcpStats, err = maps.GetMap[netebpf.ConnTuple, netebpf.TCPStats](m, probes.TCPStatsMap) - if err != nil { - tr.Stop() - return nil, fmt.Errorf("error retrieving the bpf %s map: %s", probes.TCPStatsMap, err) - } - - if tr.tcpRetransmits, err = maps.GetMap[netebpf.ConnTuple, uint32](m, probes.TCPRetransmitsMap); err != nil { - tr.Stop() - return nil, fmt.Errorf("error retrieving the bpf %s map: %s", probes.TCPRetransmitsMap, err) - } - - return tr, nil -} - -func boolConst(name string, value bool) manager.ConstantEditor { - c := manager.ConstantEditor{ - Name: name, - Value: uint64(1), - } - if !value { - c.Value = uint64(0) - } - - return c -} - -func (t *tracer) Start(callback func(*network.ConnectionStats)) (err error) { - defer func() { - if err != nil { - t.Stop() - } - }() - - err = initializePortBindingMaps(t.config, t.m) - if err != nil { - return fmt.Errorf("error initializing port binding maps: %s", err) - } - - if err := t.m.Start(); err != nil { - return fmt.Errorf("could not start ebpf manager: %s", err) - } - - t.closeConsumer.Start(callback) - t.failedConnConsumer.Start() - return nil -} - -func (t *tracer) Pause() error { - // add small delay for socket filters to properly detach - time.Sleep(1 * time.Millisecond) - return t.m.Pause() -} - -func (t *tracer) Resume() error { - err := t.m.Resume() - // add small delay for socket filters to properly attach - time.Sleep(1 * time.Millisecond) - return err -} - -func (t *tracer) FlushPending() { - t.closeConsumer.FlushPending() -} - -func (t *tracer) GetFailedConnections() *failure.FailedConns { - if t.failedConnConsumer == nil { - return nil - } - return t.failedConnConsumer.FailedConns -} - -func (t *tracer) Stop() { - t.stopOnce.Do(func() { - close(t.exitTelemetry) - ddebpf.RemoveNameMappings(t.m) - ebpftelemetry.UnregisterTelemetry(t.m) - _ = t.m.Stop(manager.CleanAll) - t.closeConsumer.Stop() - t.failedConnConsumer.Stop() - if t.closeTracer != nil { - t.closeTracer() - } - }) -} - -func (t *tracer) GetMap(name string) *ebpf.Map { - switch name { - case probes.ConnectionProtocolMap: - default: - return nil - } - m, _, _ := t.m.GetMap(name) - return m -} - -func (t *tracer) GetConnections(buffer *network.ConnectionBuffer, filter func(*network.ConnectionStats) bool) error { - // Iterate through all key-value pairs in map - key, stats := &netebpf.ConnTuple{}, &netebpf.ConnStats{} - seen := make(map[netebpf.ConnTuple]struct{}) - // connsByTuple is used to detect whether we are iterating over - // a connection we have previously seen. This can happen when - // ebpf maps are being iterated over and deleted at the same time. - // The iteration can reset when that happens. - // See https://justin.azoff.dev/blog/bpf_map_get_next_key-pitfalls/ - connsByTuple := make(map[netebpf.ConnTuple]uint32) - - // Cached objects - conn := new(network.ConnectionStats) - tcp := new(netebpf.TCPStats) - - var tcp4, tcp6, udp4, udp6 float64 - entries := t.conns.Iterate() - for entries.Next(key, stats) { - if cookie, exists := connsByTuple[*key]; exists && cookie == stats.Cookie { - // already seen the connection in current batch processing, - // due to race between the iterator and bpf_map_delete - ConnTracerTelemetry.iterationDups.Inc() - continue - } - - populateConnStats(conn, key, stats, t.ch) - connsByTuple[*key] = stats.Cookie - - isTCP := conn.Type == network.TCP - switch conn.Family { - case network.AFINET6: - if isTCP { - tcp6++ - } else { - udp6++ - } - case network.AFINET: - if isTCP { - tcp4++ - } else { - udp4++ - } - } - - if filter != nil && !filter(conn) { - continue - } - - if t.getTCPStats(tcp, key) { - updateTCPStats(conn, tcp, 0) - } - if retrans, ok := t.getTCPRetransmits(key, seen); ok { - updateTCPStats(conn, nil, retrans) - } - - *buffer.Next() = *conn - } - - if err := entries.Err(); err != nil { - if !errors.Is(err, ebpf.ErrIterationAborted) { - return fmt.Errorf("unable to iterate connection map: %w", err) - } - - log.Warn("eBPF conn_stats map iteration aborted. Some connections may not be reported") - ConnTracerTelemetry.iterationAborts.Inc() - } - - updateTelemetry(tcp4, tcp6, udp4, udp6) - - return nil -} - -func updateTelemetry(tcp4 float64, tcp6 float64, udp4 float64, udp6 float64) { - ConnTracerTelemetry.connections.Set(tcp4, "tcp", "v4") - ConnTracerTelemetry.connections.Set(tcp6, "tcp", "v6") - ConnTracerTelemetry.connections.Set(udp4, "udp", "v4") - ConnTracerTelemetry.connections.Set(udp6, "udp", "v6") -} - -func removeConnectionFromTelemetry(conn *network.ConnectionStats) { - isTCP := conn.Type == network.TCP - switch conn.Family { - case network.AFINET6: - if isTCP { - ConnTracerTelemetry.connections.Dec("tcp", "v6") - } else { - ConnTracerTelemetry.connections.Dec("udp", "v6") - } - case network.AFINET: - if isTCP { - ConnTracerTelemetry.connections.Dec("tcp", "v4") - } else { - ConnTracerTelemetry.connections.Dec("udp", "v4") - } - } -} - -func (t *tracer) Remove(conn *network.ConnectionStats) error { - util.ConnStatsToTuple(conn, t.removeTuple) - - err := t.conns.Delete(t.removeTuple) - if err != nil { - // If this entry no longer exists in the eBPF map it means `tcp_close` has executed - // during this function call. In that case state.StoreClosedConnection() was already called for this connection, - // and we can't delete the corresponding client state, or we'll likely over-report the metric values. - // By skipping to the next iteration and not calling state.RemoveConnections() we'll let - // this connection expire "naturally" when either next connection check runs or the client itself expires. - return err - } - - removeConnectionFromTelemetry(conn) - - if conn.Type == network.TCP { - // We can ignore the error for this map since it will not always contain the entry - _ = t.tcpStats.Delete(t.removeTuple) - // We remove the PID from the tuple as it is not used in the retransmits map - pid := t.removeTuple.Pid - t.removeTuple.Pid = 0 - _ = t.tcpRetransmits.Delete(t.removeTuple) - t.removeTuple.Pid = pid - } - return nil -} - -func (t *tracer) getEBPFTelemetry() *netebpf.Telemetry { - var zero uint32 - mp, err := maps.GetMap[uint32, netebpf.Telemetry](t.m, probes.TelemetryMap) - if err != nil { - log.Warnf("error retrieving telemetry map: %s", err) - return nil - } - - tm := &netebpf.Telemetry{} - if err := mp.Lookup(&zero, tm); err != nil { - // This can happen if we haven't initialized the telemetry object yet - // so let's just use a trace log - if log.ShouldLog(seelog.TraceLvl) { - log.Tracef("error retrieving the telemetry struct: %s", err) - } - return nil - } - return tm -} - -// Describe returns all descriptions of the collector -func (t *tracer) Describe(ch chan<- *prometheus.Desc) { - ch <- ConnTracerTelemetry.tcpFailedConnects - ch <- ConnTracerTelemetry.TcpSentMiscounts - ch <- ConnTracerTelemetry.unbatchedTcpClose - ch <- ConnTracerTelemetry.unbatchedUdpClose - ch <- ConnTracerTelemetry.UdpSendsProcessed - ch <- ConnTracerTelemetry.UdpSendsMissed - ch <- ConnTracerTelemetry.UdpDroppedConns - ch <- ConnTracerTelemetry.doubleFlushAttemptsClose - ch <- ConnTracerTelemetry.doubleFlushAttemptsDone - ch <- ConnTracerTelemetry.unsupportedTcpFailures - ch <- ConnTracerTelemetry.tcpDonePidMismatch -} - -// Collect returns the current state of all metrics of the collector -func (t *tracer) Collect(ch chan<- prometheus.Metric) { - ebpfTelemetry := t.getEBPFTelemetry() - if ebpfTelemetry == nil { - return - } - delta := int64(ebpfTelemetry.Tcp_failed_connect) - ConnTracerTelemetry.lastTcpFailedConnects.Load() - ConnTracerTelemetry.lastTcpFailedConnects.Store(int64(ebpfTelemetry.Tcp_failed_connect)) - ch <- prometheus.MustNewConstMetric(ConnTracerTelemetry.tcpFailedConnects, prometheus.CounterValue, float64(delta)) - - delta = int64(ebpfTelemetry.Tcp_sent_miscounts) - ConnTracerTelemetry.LastTcpSentMiscounts.Load() - ConnTracerTelemetry.LastTcpSentMiscounts.Store(int64(ebpfTelemetry.Tcp_sent_miscounts)) - ch <- prometheus.MustNewConstMetric(ConnTracerTelemetry.TcpSentMiscounts, prometheus.CounterValue, float64(delta)) - - delta = int64(ebpfTelemetry.Unbatched_tcp_close) - ConnTracerTelemetry.lastUnbatchedTcpClose.Load() - ConnTracerTelemetry.lastUnbatchedTcpClose.Store(int64(ebpfTelemetry.Unbatched_tcp_close)) - ch <- prometheus.MustNewConstMetric(ConnTracerTelemetry.unbatchedTcpClose, prometheus.CounterValue, float64(delta)) - - delta = int64(ebpfTelemetry.Unbatched_udp_close) - ConnTracerTelemetry.lastUnbatchedUdpClose.Load() - ConnTracerTelemetry.lastUnbatchedUdpClose.Store(int64(ebpfTelemetry.Unbatched_udp_close)) - ch <- prometheus.MustNewConstMetric(ConnTracerTelemetry.unbatchedUdpClose, prometheus.CounterValue, float64(delta)) - - delta = int64(ebpfTelemetry.Udp_sends_processed) - ConnTracerTelemetry.lastUdpSendsProcessed.Load() - ConnTracerTelemetry.lastUdpSendsProcessed.Store(int64(ebpfTelemetry.Udp_sends_processed)) - ch <- prometheus.MustNewConstMetric(ConnTracerTelemetry.UdpSendsProcessed, prometheus.CounterValue, float64(delta)) - - delta = int64(ebpfTelemetry.Udp_sends_missed) - ConnTracerTelemetry.lastUdpSendsMissed.Load() - ConnTracerTelemetry.lastUdpSendsMissed.Store(int64(ebpfTelemetry.Udp_sends_missed)) - ch <- prometheus.MustNewConstMetric(ConnTracerTelemetry.UdpSendsMissed, prometheus.CounterValue, float64(delta)) - - delta = int64(ebpfTelemetry.Udp_dropped_conns) - ConnTracerTelemetry.lastUdpDroppedConns.Load() - ConnTracerTelemetry.lastUdpDroppedConns.Store(int64(ebpfTelemetry.Udp_dropped_conns)) - ch <- prometheus.MustNewConstMetric(ConnTracerTelemetry.UdpDroppedConns, prometheus.CounterValue, float64(delta)) - - delta = int64(ebpfTelemetry.Double_flush_attempts_close) - ConnTracerTelemetry.lastDoubleFlushAttemptsClose.Load() - ConnTracerTelemetry.lastDoubleFlushAttemptsClose.Store(int64(ebpfTelemetry.Double_flush_attempts_close)) - ch <- prometheus.MustNewConstMetric(ConnTracerTelemetry.doubleFlushAttemptsClose, prometheus.CounterValue, float64(delta)) - - delta = int64(ebpfTelemetry.Double_flush_attempts_done) - ConnTracerTelemetry.lastDoubleFlushAttemptsDone.Load() - ConnTracerTelemetry.lastDoubleFlushAttemptsDone.Store(int64(ebpfTelemetry.Double_flush_attempts_done)) - ch <- prometheus.MustNewConstMetric(ConnTracerTelemetry.doubleFlushAttemptsDone, prometheus.CounterValue, float64(delta)) - - delta = int64(ebpfTelemetry.Unsupported_tcp_failures) - ConnTracerTelemetry.lastUnsupportedTcpFailures.Load() - ConnTracerTelemetry.lastUnsupportedTcpFailures.Store(int64(ebpfTelemetry.Unsupported_tcp_failures)) - ch <- prometheus.MustNewConstMetric(ConnTracerTelemetry.unsupportedTcpFailures, prometheus.CounterValue, float64(delta)) - - delta = int64(ebpfTelemetry.Tcp_done_pid_mismatch) - ConnTracerTelemetry.lastTcpDonePidMismatch.Load() - ConnTracerTelemetry.lastTcpDonePidMismatch.Store(int64(ebpfTelemetry.Tcp_done_pid_mismatch)) - ch <- prometheus.MustNewConstMetric(ConnTracerTelemetry.tcpDonePidMismatch, prometheus.CounterValue, float64(delta)) - -} - -// DumpMaps (for debugging purpose) returns all maps content by default or selected maps from maps parameter. -func (t *tracer) DumpMaps(w io.Writer, maps ...string) error { - return t.m.DumpMaps(w, maps...) -} - -// Type returns the type of the underlying ebpf tracer that is currently loaded -func (t *tracer) Type() TracerType { - return t.ebpfTracerType -} - -func initializePortBindingMaps(config *config.Config, m *manager.Manager) error { - tcpPorts, err := network.ReadInitialState(config.ProcRoot, network.TCP, config.CollectTCPv6Conns) - if err != nil { - return fmt.Errorf("failed to read initial TCP pid->port mapping: %s", err) - } - - tcpPortMap, err := maps.GetMap[netebpf.PortBinding, uint32](m, probes.PortBindingsMap) - if err != nil { - return fmt.Errorf("failed to get TCP port binding map: %w", err) - } - for p, count := range tcpPorts { - log.Debugf("adding initial TCP port binding: netns: %d port: %d", p.Ino, p.Port) - pb := netebpf.PortBinding{Netns: p.Ino, Port: p.Port} - err = tcpPortMap.Update(&pb, &count, ebpf.UpdateNoExist) - if err != nil && !errors.Is(err, ebpf.ErrKeyExist) { - return fmt.Errorf("failed to update TCP port binding map: %w", err) - } - } - - udpPorts, err := network.ReadInitialState(config.ProcRoot, network.UDP, config.CollectUDPv6Conns) - if err != nil { - return fmt.Errorf("failed to read initial UDP pid->port mapping: %s", err) - } - - udpPortMap, err := maps.GetMap[netebpf.PortBinding, uint32](m, probes.UDPPortBindingsMap) - if err != nil { - return fmt.Errorf("failed to get UDP port binding map: %w", err) - } - for p, count := range udpPorts { - // ignore ephemeral port binds as they are more likely to be from - // clients calling bind with port 0 - if network.IsPortInEphemeralRange(network.AFINET, network.UDP, p.Port) == network.EphemeralTrue { - log.Debugf("ignoring initial ephemeral UDP port bind to %d", p) - continue - } - - log.Debugf("adding initial UDP port binding: netns: %d port: %d", p.Ino, p.Port) - pb := netebpf.PortBinding{Netns: p.Ino, Port: p.Port} - err = udpPortMap.Update(&pb, &count, ebpf.UpdateNoExist) - if err != nil && !errors.Is(err, ebpf.ErrKeyExist) { - return fmt.Errorf("failed to update UDP port binding map: %w", err) - } - } - return nil -} - -func (t *tracer) getTCPRetransmits(tuple *netebpf.ConnTuple, seen map[netebpf.ConnTuple]struct{}) (uint32, bool) { - if tuple.Type() != netebpf.TCP { - return 0, false - } - - // The PID isn't used as a key in the stats map, we will temporarily set it to 0 here and reset it when we're done - pid := tuple.Pid - tuple.Pid = 0 - - var retransmits uint32 - if err := t.tcpRetransmits.Lookup(tuple, &retransmits); err == nil { - // This is required to avoid (over)reporting retransmits for connections sharing the same socket. - if _, reported := seen[*tuple]; reported { - ConnTracerTelemetry.PidCollisions.Inc() - retransmits = 0 - } else { - seen[*tuple] = struct{}{} - } - } - - tuple.Pid = pid - return retransmits, true -} - -// getTCPStats reads tcp related stats for the given ConnTuple -func (t *tracer) getTCPStats(stats *netebpf.TCPStats, tuple *netebpf.ConnTuple) bool { - if tuple.Type() != netebpf.TCP { - return false - } - - return t.tcpStats.Lookup(tuple, stats) == nil -} - -func populateConnStats(stats *network.ConnectionStats, t *netebpf.ConnTuple, s *netebpf.ConnStats, ch *cookieHasher) { - *stats = network.ConnectionStats{ - Pid: t.Pid, - NetNS: t.Netns, - Source: t.SourceAddress(), - Dest: t.DestAddress(), - SPort: t.Sport, - DPort: t.Dport, - Monotonic: network.StatCounters{ - SentBytes: s.Sent_bytes, - RecvBytes: s.Recv_bytes, - SentPackets: uint64(s.Sent_packets), - RecvPackets: uint64(s.Recv_packets), - }, - LastUpdateEpoch: s.Timestamp, - IsAssured: s.IsAssured(), - Cookie: network.StatCookie(s.Cookie), - } - - if s.Duration <= uint64(math.MaxInt64) { - stats.Duration = time.Duration(s.Duration) * time.Nanosecond - } - - stats.ProtocolStack = protocols.Stack{ - API: protocols.API(s.Protocol_stack.Api), - Application: protocols.Application(s.Protocol_stack.Application), - Encryption: protocols.Encryption(s.Protocol_stack.Encryption), - } - - if t.Type() == netebpf.TCP { - stats.Type = network.TCP - } else { - stats.Type = network.UDP - } - - switch t.Family() { - case netebpf.IPv4: - stats.Family = network.AFINET - case netebpf.IPv6: - stats.Family = network.AFINET6 - } - - stats.SPortIsEphemeral = network.IsPortInEphemeralRange(stats.Family, stats.Type, t.Sport) - - switch s.ConnectionDirection() { - case netebpf.Incoming: - stats.Direction = network.INCOMING - case netebpf.Outgoing: - stats.Direction = network.OUTGOING - default: - stats.Direction = network.OUTGOING - } - - if ch != nil { - ch.Hash(stats) - } -} - -func updateTCPStats(conn *network.ConnectionStats, tcpStats *netebpf.TCPStats, retransmits uint32) { - if conn.Type != network.TCP { - return - } - - conn.Monotonic.Retransmits = retransmits - if tcpStats != nil { - conn.Monotonic.TCPEstablished = uint32(tcpStats.State_transitions >> netebpf.Established & 1) - conn.Monotonic.TCPClosed = uint32(tcpStats.State_transitions >> netebpf.Close & 1) - conn.RTT = tcpStats.Rtt - conn.RTTVar = tcpStats.Rtt_var - } -} - -type cookieHasher struct { - hash hash.Hash64 - buf []byte -} - -func newCookieHasher() *cookieHasher { - return &cookieHasher{ - hash: murmur3.New64(), - buf: make([]byte, network.ConnectionByteKeyMaxLen), - } -} - -func (h *cookieHasher) Hash(stats *network.ConnectionStats) { - h.hash.Reset() - if err := binary.Write(h.hash, binary.BigEndian, stats.Cookie); err != nil { - log.Errorf("error writing cookie to hash: %s", err) - return - } - key := stats.ByteKey(h.buf) - if _, err := h.hash.Write(key); err != nil { - log.Errorf("error writing byte key to hash: %s", err) - return - } - stats.Cookie = h.hash.Sum64() + return newEbpfTracer(cfg, telemetryComp) } diff --git a/pkg/network/tracer/tracer.go b/pkg/network/tracer/tracer.go index db5f0bc95a1a93..55faac7817df16 100644 --- a/pkg/network/tracer/tracer.go +++ b/pkg/network/tracer/tracer.go @@ -15,7 +15,6 @@ import ( "sync" "time" - "github.com/DataDog/ebpf-manager/tracefs" "github.com/cihub/seelog" "github.com/cilium/ebpf" "go.uber.org/atomic" @@ -121,10 +120,6 @@ func NewTracer(config *config.Config, telemetryComponent telemetryComponent.Comp // newTracer is an internal function used by tests primarily // (and NewTracer above) func newTracer(cfg *config.Config, telemetryComponent telemetryComponent.Component) (_ *Tracer, reterr error) { - if _, err := tracefs.Root(); err != nil { - return nil, fmt.Errorf("system-probe unsupported: %s", err) - } - // check if current platform is using old kernel API because it affects what kprobe are we going to enable currKernelVersion, err := kernel.HostVersion() if err != nil { @@ -139,12 +134,16 @@ func newTracer(cfg *config.Config, telemetryComponent telemetryComponent.Compone } if cfg.ServiceMonitoringEnabled { - if !usmconfig.IsUSMSupported() { - errStr := fmt.Sprintf("Universal Service Monitoring (USM) requires a Linux kernel version of %s or higher. We detected %s", usmconfig.MinimumKernelVersion, currKernelVersion) + if err := usmconfig.CheckUSMSupported(cfg); err != nil { + // this is the case where USM is enabled and NPM is not enabled + // in config; we implicitly enable the network tracer module + // in system-probe if USM is enabled if !cfg.NPMEnabled { - return nil, fmt.Errorf(errStr) + return nil, err } - log.Warnf("%s. NPM is explicitly enabled, so system-probe will continue with only NPM features enabled.", errStr) + + log.Warn(err) + log.Warnf("NPM is explicitly enabled, so system-probe will continue with only NPM features enabled") } } @@ -244,30 +243,33 @@ func newConntracker(cfg *config.Config, telemetryComponent telemetryComponent.Co var c netlink.Conntracker var err error - ns, err := cfg.GetRootNetNs() - if err != nil { - log.Warnf("error fetching root net namespace, will not attempt to load nf_conntrack_netlink module: %s", err) - } else { - defer ns.Close() - if err = netlink.LoadNfConntrackKernelModule(ns); err != nil { - log.Warnf("failed to load conntrack kernel module, though it may already be loaded: %s", err) + if !cfg.EnableEbpfless { + ns, err := cfg.GetRootNetNs() + if err != nil { + log.Warnf("error fetching root net namespace, will not attempt to load nf_conntrack_netlink module: %s", err) + } else { + defer ns.Close() + if err = netlink.LoadNfConntrackKernelModule(ns); err != nil { + log.Warnf("failed to load conntrack kernel module, though it may already be loaded: %s", err) + } } - } - if cfg.EnableEbpfConntracker { - if c, err = NewEBPFConntracker(cfg, telemetryComponent); err == nil { - return c, nil + if cfg.EnableEbpfConntracker { + if c, err = NewEBPFConntracker(cfg, telemetryComponent); err == nil { + return c, nil + } + log.Warnf("error initializing ebpf conntracker: %s", err) + } else { + log.Info("ebpf conntracker disabled") } - log.Warnf("error initializing ebpf conntracker: %s", err) - } else { - log.Info("ebpf conntracker disabled") + + log.Info("falling back to netlink conntracker") } - log.Info("falling back to netlink conntracker") if c, err = netlink.NewConntracker(cfg, telemetryComponent); err == nil { return c, nil } - if cfg.IgnoreConntrackInitFailure { + if errors.Is(err, netlink.ErrNotPermitted) || cfg.IgnoreConntrackInitFailure { log.Warnf("could not initialize conntrack, tracer will continue without NAT tracking: %s", err) return netlink.NewNoOpConntracker(), nil } diff --git a/pkg/network/tracer/tracer_linux_test.go b/pkg/network/tracer/tracer_linux_test.go index f86ccbb749ff88..090a105bd3fdf6 100644 --- a/pkg/network/tracer/tracer_linux_test.go +++ b/pkg/network/tracer/tracer_linux_test.go @@ -241,7 +241,7 @@ func (s *TracerSuite) TestTCPRetransmitSharedSocket() { // Test if telemetry measuring PID collisions is correct // >= because there can be other connections going on during CI that increase pidCollisions - assert.GreaterOrEqual(t, connection.ConnTracerTelemetry.PidCollisions.Load(), int64(numProcesses-1)) + assert.GreaterOrEqual(t, connection.EbpfTracerTelemetry.PidCollisions.Load(), int64(numProcesses-1)) } func (s *TracerSuite) TestTCPRTT() { @@ -333,7 +333,7 @@ func (s *TracerSuite) TestTCPMiscount() { assert.False(t, uint64(len(x)) == conn.Monotonic.SentBytes) } - assert.NotZero(t, connection.ConnTracerTelemetry.LastTcpSentMiscounts.Load()) + assert.NotZero(t, connection.EbpfTracerTelemetry.LastTcpSentMiscounts.Load()) } func (s *TracerSuite) TestConnectionExpirationRegression() { @@ -387,6 +387,9 @@ func (s *TracerSuite) TestConnectionExpirationRegression() { func (s *TracerSuite) TestConntrackExpiration() { t := s.T() ebpftest.LogLevel(t, "trace") + + cfg := testConfig() + skipOnEbpflessNotSupported(t, cfg) netlinktestutil.SetupDNAT(t) tr := setupTracer(t, testConfig()) @@ -861,6 +864,7 @@ func (s *TracerSuite) TestGatewayLookupCrossNamespace() { }) t.Run("client in other namespace", func(t *testing.T) { + skipOnEbpflessNotSupported(t, cfg) // try connecting to server in test1 namespace test2Ns, err := vnetns.GetFromName(ns2) require.NoError(t, err) @@ -925,6 +929,8 @@ func (s *TracerSuite) TestGatewayLookupCrossNamespace() { func (s *TracerSuite) TestConnectionAssured() { t := s.T() cfg := testConfig() + skipOnEbpflessNotSupported(t, cfg) + tr := setupTracer(t, cfg) server := &UDPServer{ network: "udp4", @@ -1012,9 +1018,11 @@ func (s *TracerSuite) TestUDPConnExpiryTimeout() { func (s *TracerSuite) TestDNATIntraHostIntegration() { t := s.T() + cfg := testConfig() + skipEbpflessTodo(t, cfg) netlinktestutil.SetupDNAT(t) - tr := setupTracer(t, testConfig()) + tr := setupTracer(t, cfg) var serverAddr struct { local, remote net.Addr @@ -1385,12 +1393,13 @@ func testUDPReusePort(t *testing.T, udpnet string, ip string) { func (s *TracerSuite) TestDNSStatsWithNAT() { t := s.T() + cfg := testConfig() + skipEbpflessTodo(t, cfg) testutil.IptablesSave(t) // Setup a NAT rule to translate 2.2.2.2 to 8.8.8.8 and issue a DNS request to 2.2.2.2 cmds := []string{"iptables -t nat -A OUTPUT -d 2.2.2.2 -j DNAT --to-destination 8.8.8.8"} testutil.RunCommands(t, cmds, false) - cfg := testConfig() cfg.CollectDNSStats = true cfg.DNSTimeout = 1 * time.Second tr := setupTracer(t, cfg) @@ -1713,6 +1722,7 @@ func (s *TracerSuite) TestShortWrite() { func (s *TracerSuite) TestKprobeAttachWithKprobeEvents() { t := s.T() cfg := config.New() + skipOnEbpflessNotSupported(t, cfg) cfg.AttachKprobesWithKprobeEventsABI = true tr := setupTracer(t, cfg) @@ -1851,7 +1861,9 @@ func (s *TracerSuite) TestPreexistingConnectionDirection() { m := outgoing.Monotonic assert.Equal(t, clientMessageSize, int(m.SentBytes)) assert.Equal(t, serverMessageSize, int(m.RecvBytes)) - assert.Equal(t, os.Getpid(), int(outgoing.Pid)) + if !tr.config.EnableEbpfless { + assert.Equal(t, os.Getpid(), int(outgoing.Pid)) + } assert.Equal(t, addrPort(server.Address()), int(outgoing.DPort)) assert.Equal(t, c.LocalAddr().(*net.TCPAddr).Port, int(outgoing.SPort)) assert.Equal(t, network.OUTGOING, outgoing.Direction) @@ -1859,7 +1871,9 @@ func (s *TracerSuite) TestPreexistingConnectionDirection() { m = incoming.Monotonic assert.Equal(t, clientMessageSize, int(m.RecvBytes)) assert.Equal(t, serverMessageSize, int(m.SentBytes)) - assert.Equal(t, os.Getpid(), int(incoming.Pid)) + if !tr.config.EnableEbpfless { + assert.Equal(t, os.Getpid(), int(incoming.Pid)) + } assert.Equal(t, addrPort(server.Address()), int(incoming.SPort)) assert.Equal(t, c.LocalAddr().(*net.TCPAddr).Port, int(incoming.DPort)) assert.Equal(t, network.INCOMING, incoming.Direction) @@ -1872,6 +1886,7 @@ func (s *TracerSuite) TestPreexistingEmptyIncomingConnectionDirection() { t.Skip("skipping test as ringbuffers are not supported on this kernel") } c := testConfig() + skipOnEbpflessNotSupported(t, c) c.NPMRingbuffersEnabled = true testPreexistingEmptyIncomingConnectionDirection(t, c) }) @@ -1890,6 +1905,7 @@ func testPreexistingEmptyIncomingConnectionDirection(t *testing.T, config *confi server := tracertestutil.NewTCPServer(func(c net.Conn) { <-ch c.Close() + close(ch) }) require.NoError(t, server.Run()) t.Cleanup(server.Shutdown) @@ -1901,7 +1917,8 @@ func testPreexistingEmptyIncomingConnectionDirection(t *testing.T, config *confi tr := setupTracer(t, config) // close the server connection so the tracer picks it up - close(ch) + ch <- struct{}{} + <-ch var conn *network.ConnectionStats require.Eventually(t, func() bool { @@ -2337,3 +2354,15 @@ func setupDropTrafficRule(tb testing.TB) (ns string) { testutil.RunCommands(tb, cmds, false) return } + +func skipOnEbpflessNotSupported(t *testing.T, cfg *config.Config) { + if cfg.EnableEbpfless { + t.Skip("not supported on ebpf-less") + } +} + +func skipEbpflessTodo(t *testing.T, cfg *config.Config) { + if cfg.EnableEbpfless { + t.Skip("TODO: ebpf-less") + } +} diff --git a/pkg/network/tracer/tracer_test.go b/pkg/network/tracer/tracer_test.go index 7011923c9329d2..4bc9d6ce8f0e69 100644 --- a/pkg/network/tracer/tracer_test.go +++ b/pkg/network/tracer/tracer_test.go @@ -156,7 +156,8 @@ func (s *TracerSuite) TestGetStats() { func (s *TracerSuite) TestTCPSendAndReceive() { t := s.T() - tr := setupTracer(t, testConfig()) + cfg := testConfig() + tr := setupTracer(t, cfg) // Create TCP Server which, for every line, sends back a message with size=serverMessageSize server := testutil.NewTCPServer(func(c net.Conn) { @@ -208,10 +209,11 @@ func (s *TracerSuite) TestTCPSendAndReceive() { m := conn.Monotonic assert.Equal(t, 10*clientMessageSize, int(m.SentBytes)) assert.Equal(t, 10*serverMessageSize, int(m.RecvBytes)) - assert.Equal(t, os.Getpid(), int(conn.Pid)) + if !cfg.EnableEbpfless { + assert.Equal(t, os.Getpid(), int(conn.Pid)) + } assert.Equal(t, addrPort(server.Address()), int(conn.DPort)) assert.Equal(t, network.OUTGOING, conn.Direction) - assert.True(t, conn.IntraHost) } func (s *TracerSuite) TestTCPShortLived() { @@ -557,14 +559,16 @@ func (s *TracerSuite) TestLocalDNSCollectionEnabled() { _, err = cn.Write([]byte("test")) assert.NoError(t, err) - found := false - // Iterate through active connections making sure theres at least one connection - for _, c := range getConnections(t, tr).Conns { - found = found || isLocalDNS(c) - } + require.Eventually(t, func() bool { + for _, c := range getConnections(t, tr).Conns { + if isLocalDNS(c) { + return true + } + } - assert.True(t, found) + return false + }, 3*time.Second, 100*time.Millisecond, "could not find connection") } func isLocalDNS(c network.ConnectionStats) bool { @@ -993,8 +997,10 @@ func testDNSStats(t *testing.T, tr *Tracer, domain string, success, failure, tim if !assert.Equal(c, queryMsg.Len(), int(conn.Monotonic.SentBytes)) { return } - if !assert.Equal(c, os.Getpid(), int(conn.Pid)) { - return + if !tr.config.EnableEbpfless { + if !assert.Equal(c, os.Getpid(), int(conn.Pid)) { + return + } } if !assert.Equal(c, dnsServerAddr.Port, int(conn.DPort)) { return @@ -1161,10 +1167,15 @@ func (s *TracerSuite) TestConnectedUDPSendIPv6() { bytesSent, err := conn.Write(message) require.NoError(t, err) - connections := getConnections(t, tr) - outgoing := network.FilterConnections(connections, func(cs network.ConnectionStats) bool { - return cs.DPort == uint16(remotePort) - }) + var outgoing []network.ConnectionStats + require.Eventually(t, func() bool { + connections := getConnections(t, tr) + outgoing = network.FilterConnections(connections, func(cs network.ConnectionStats) bool { + return cs.DPort == uint16(remotePort) + }) + + return len(outgoing) == 1 + }, 3*time.Second, 100*time.Millisecond, "failed to find connection") require.Len(t, outgoing, 1) assert.Equal(t, remoteAddr.IP.String(), outgoing[0].Dest.String()) diff --git a/pkg/network/tracer/utils_linux.go b/pkg/network/tracer/utils_linux.go index 5315b10785a005..d072906eec6879 100644 --- a/pkg/network/tracer/utils_linux.go +++ b/pkg/network/tracer/utils_linux.go @@ -3,8 +3,6 @@ // This product includes software developed at Datadog (https://www.datadoghq.com/). // Copyright 2016-present Datadog, Inc. -//go:build linux_bpf - package tracer import ( @@ -16,10 +14,16 @@ import ( "github.com/cilium/ebpf/asm" "github.com/cilium/ebpf/features" + coreconfig "github.com/DataDog/datadog-agent/pkg/config" "github.com/DataDog/datadog-agent/pkg/util/kernel" "github.com/DataDog/datadog-agent/pkg/util/log" ) +// NeedsEBPF returns `true` if the network-tracer requires eBPF +func NeedsEBPF() bool { + return !coreconfig.SystemProbe().GetBool("network_config.enable_ebpfless") +} + // IsTracerSupportedByOS returns whether the current kernel version supports tracer functionality // along with some context on why it's not supported func IsTracerSupportedByOS(exclusionList []string) (bool, error) { @@ -59,6 +63,10 @@ func verifyOSVersion(kernelCode kernel.Version, platform string, exclusionList [ return false, fmt.Errorf("Known bug for kernel %s on platform %s, see: \n- https://bugs.launchpad.net/ubuntu/+source/linux/+bug/1763454", kernelCode, platform) } + if !NeedsEBPF() { + return true, nil + } + var requiredFuncs = []asm.BuiltinFunc{ asm.FnMapLookupElem, asm.FnMapUpdateElem, diff --git a/pkg/network/tracer/utils_unsupported.go b/pkg/network/tracer/utils_unsupported.go index cf8f70835e1a8b..5678e22e579e12 100644 --- a/pkg/network/tracer/utils_unsupported.go +++ b/pkg/network/tracer/utils_unsupported.go @@ -3,7 +3,7 @@ // This product includes software developed at Datadog (https://www.datadoghq.com/). // Copyright 2016-present Datadog, Inc. -//go:build !linux_bpf && !windows +//go:build !windows && !linux package tracer diff --git a/pkg/network/usm/config/config.go b/pkg/network/usm/config/config.go index 915e68b15098be..00a4cc9764c5bf 100644 --- a/pkg/network/usm/config/config.go +++ b/pkg/network/usm/config/config.go @@ -9,6 +9,8 @@ package config import ( + "errors" + "fmt" "runtime" "strings" @@ -20,6 +22,9 @@ import ( // MinimumKernelVersion indicates the minimum kernel version required for HTTP monitoring var MinimumKernelVersion kernel.Version +// ErrNotSupported is the error returned if USM is not supported on this platform +var ErrNotSupported = errors.New("Universal Service Monitoring (USM) is not supported") + func init() { MinimumKernelVersion = kernel.VersionCode(4, 14, 0) } @@ -44,21 +49,31 @@ func TLSSupported(c *config.Config) bool { return kversion >= MinimumKernelVersion } -// IsUSMSupported We only support http with kernel >= 4.14.0. -func IsUSMSupported() bool { +// CheckUSMSupported returns an error if USM is not supported +// on this platform. Callers can check `errors.Is(err, ErrNotSupported)` +// to verify if USM is supported +func CheckUSMSupported(cfg *config.Config) error { + // TODO: remove this once USM is supported on ebpf-less + if cfg.EnableEbpfless { + return fmt.Errorf("%w: eBPF-less is not supported", ErrNotSupported) + } + kversion, err := kernel.HostVersion() if err != nil { - log.Warn("could not determine the current kernel version. USM disabled.") - return false + return fmt.Errorf("%w: could not determine the current kernel version: %w", ErrNotSupported, err) } - return kversion >= MinimumKernelVersion + if kversion < MinimumKernelVersion { + return fmt.Errorf("%w: a Linux kernel version of %s or higher is required; we detected %s", ErrNotSupported, MinimumKernelVersion, kversion) + } + + return nil } // IsUSMSupportedAndEnabled returns true if USM is supported and enabled func IsUSMSupportedAndEnabled(config *config.Config) bool { // http.Supported is misleading, it should be named usm.Supported. - return config.ServiceMonitoringEnabled && IsUSMSupported() + return config.ServiceMonitoringEnabled && CheckUSMSupported(config) == nil } // NeedProcessMonitor returns true if the process monitor is needed for the given configuration