diff --git a/flags/flags.go b/flags/flags.go index 251da7a797..1269b1a2a2 100644 --- a/flags/flags.go +++ b/flags/flags.go @@ -126,6 +126,9 @@ type Flags struct { OfflineMode FlagsOfflineMode `embed:"" prefix:"offline-mode-"` OffCPUThreshold uint `default:"0" help:"The per-mille probablity of off-CPU event being recorded."` + + EnableOOMProf bool `default:"false" help:"Enable OOMProf profiling integration."` + EnableOOMProfAllocs bool `default:"false" help:"Enable OOMProf alloc counts."` } type ExitCode int diff --git a/go.mod b/go.mod index afb6168a9a..9664ea3e55 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( buf.build/gen/go/parca-dev/parca/grpc/go v1.5.1-20250212095114-4db6f2d46517.2 buf.build/gen/go/parca-dev/parca/protocolbuffers/go v1.36.6-20250212095114-4db6f2d46517.1 buf.build/gen/go/prometheus/prometheus/protocolbuffers/go v1.36.6-20250320161912-af2aab87b1b3.1 - github.com/KimMachineGun/automemlimit v0.7.1 + github.com/KimMachineGun/automemlimit v0.7.3 github.com/alecthomas/kong v1.10.0 github.com/apache/arrow/go/v16 v16.1.0 github.com/armon/circbuf v0.0.0-20190214190532-5111143e8da2 @@ -20,6 +20,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1 github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.1 github.com/klauspost/compress v1.18.0 + github.com/parca-dev/oomprof v0.1.5-0.20250731185212-152810a61833 github.com/prometheus/client_golang v1.22.0 github.com/prometheus/common v0.63.0 github.com/prometheus/prometheus v0.303.0 @@ -32,7 +33,7 @@ require ( go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.60.0 go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.60.0 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0 - go.opentelemetry.io/ebpf-profiler v0.0.0-20250416113750-7ddc23ea135a + go.opentelemetry.io/ebpf-profiler v0.0.0-20250519164423-009a07f3803c go.opentelemetry.io/otel v1.37.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.35.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.35.0 @@ -54,7 +55,7 @@ require ( require ( buf.build/gen/go/gogo/protobuf/protocolbuffers/go v1.36.6-20240617172848-e1dbca2775a7.1 // indirect github.com/AdaLogics/go-fuzz-headers v0.0.0-20240806141605-e8a1dd7889d6 // indirect - github.com/AdamKorcz/go-118-fuzz-build v0.0.0-20231105174938-2b5cbb29f3e2 // indirect + github.com/AdamKorcz/go-118-fuzz-build v0.0.0-20230306123547-8075edf89bb0 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect github.com/Microsoft/hcsshim v0.12.9 // indirect github.com/beorn7/perks v1.0.1 // indirect @@ -92,6 +93,7 @@ require ( github.com/google/gnostic-models v0.6.9 // indirect github.com/google/go-cmp v0.7.0 // indirect github.com/google/gofuzz v1.2.0 // indirect + github.com/google/pprof v0.0.0-20250607225305-033d6d78b36a // indirect github.com/google/uuid v1.6.0 // indirect github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 // indirect @@ -99,7 +101,7 @@ require ( github.com/josharian/native v1.1.0 // indirect github.com/jpillora/backoff v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect - github.com/klauspost/cpuid/v2 v2.2.10 // indirect + github.com/klauspost/cpuid/v2 v2.2.11 // indirect github.com/mailru/easyjson v0.9.0 // indirect github.com/mdlayher/kobject v0.0.0-20200520190114-19ca17470d7d // indirect github.com/mdlayher/netlink v1.7.2 // indirect @@ -145,9 +147,10 @@ require ( golang.org/x/time v0.11.0 // indirect golang.org/x/tools v0.34.0 // indirect golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect + gonum.org/v1/gonum v0.16.0 // indirect google.golang.org/genproto v0.0.0-20250414145226-207652e42e2e // indirect google.golang.org/genproto/googleapis/api v0.0.0-20250414145226-207652e42e2e // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20250414145226-207652e42e2e // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250425173222-7b384671a197 // indirect gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect @@ -160,4 +163,4 @@ require ( sigs.k8s.io/yaml v1.4.0 // indirect ) -replace go.opentelemetry.io/ebpf-profiler => github.com/parca-dev/opentelemetry-ebpf-profiler v0.0.0-20250723163006-6ef58876b286 +replace go.opentelemetry.io/ebpf-profiler => github.com/parca-dev/opentelemetry-ebpf-profiler v0.0.0-20250731174540-1195e517db5c diff --git a/go.sum b/go.sum index acebb21e3e..09e4788248 100644 --- a/go.sum +++ b/go.sum @@ -11,13 +11,13 @@ dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk= dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk= github.com/AdaLogics/go-fuzz-headers v0.0.0-20240806141605-e8a1dd7889d6 h1:He8afgbRMd7mFxO99hRNu+6tazq8nFF9lIwo9JFroBk= github.com/AdaLogics/go-fuzz-headers v0.0.0-20240806141605-e8a1dd7889d6/go.mod h1:8o94RPi1/7XTJvwPpRSzSUedZrtlirdB3r9Z20bi2f8= -github.com/AdamKorcz/go-118-fuzz-build v0.0.0-20231105174938-2b5cbb29f3e2 h1:dIScnXFlF784X79oi7MzVT6GWqr/W1uUt0pB5CsDs9M= -github.com/AdamKorcz/go-118-fuzz-build v0.0.0-20231105174938-2b5cbb29f3e2/go.mod h1:gCLVsLfv1egrcZu+GoJATN5ts75F2s62ih/457eWzOw= +github.com/AdamKorcz/go-118-fuzz-build v0.0.0-20230306123547-8075edf89bb0 h1:59MxjQVfjXsBpLy+dbd2/ELV5ofnUkUZBvWSC85sheA= +github.com/AdamKorcz/go-118-fuzz-build v0.0.0-20230306123547-8075edf89bb0/go.mod h1:OahwfttHWG6eJ0clwcfBAHoDI6X/LV/15hx/wlMZSrU= github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25UVaW/CKtUDjefjrs0SPonmDGUVOYP0= github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/KimMachineGun/automemlimit v0.7.1 h1:QcG/0iCOLChjfUweIMC3YL5Xy9C3VBeNmCZHrZfJMBw= -github.com/KimMachineGun/automemlimit v0.7.1/go.mod h1:QZxpHaGOQoYvFhv/r4u3U0JTC2ZcOwbSr11UZF46UBM= +github.com/KimMachineGun/automemlimit v0.7.3 h1:oPgMp0bsWez+4fvgSa11Rd9nUDrd8RLtDjBoT3ro+/A= +github.com/KimMachineGun/automemlimit v0.7.3/go.mod h1:QZxpHaGOQoYvFhv/r4u3U0JTC2ZcOwbSr11UZF46UBM= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= github.com/Microsoft/hcsshim v0.12.9 h1:2zJy5KA+l0loz1HzEGqyNnjd3fyZA31ZBCGKacp6lLg= @@ -67,6 +67,8 @@ github.com/containerd/ttrpc v1.2.7 h1:qIrroQvuOL9HQ1X6KHe2ohc7p+HP/0VE6XPU7elJRq github.com/containerd/ttrpc v1.2.7/go.mod h1:YCXHsb32f+Sq5/72xHubdiJRQY9inL4a4ZQrAbN1q9o= github.com/containerd/typeurl/v2 v2.2.3 h1:yNA/94zxWdvYACdYO8zofhrTVuQY73fFU1y++dYSw40= github.com/containerd/typeurl/v2 v2.2.3/go.mod h1:95ljDnPfD3bAbDJRugOiShd/DlAAsxGtUBhJxIn7SCk= +github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8iXXhfZs= +github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/cpuguy83/dockercfg v0.3.2 h1:DlJTyZGBDlXqUZ2Dk2Q3xHs/FtnooJJVaad2S9GKorA= github.com/cpuguy83/dockercfg v0.3.2/go.mod h1:sugsbF4//dDlL/i+S+rtpIWp+5h0BHJHfjj5/jFyUJc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -118,6 +120,8 @@ github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1v github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4= github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= +github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk= +github.com/godbus/dbus/v5 v5.1.0/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= @@ -155,8 +159,8 @@ github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= -github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad h1:a6HEuzUHeKH6hwfN/ZoQgRgVIWFJljSWa/zetS2WTvg= -github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144= +github.com/google/pprof v0.0.0-20250607225305-033d6d78b36a h1://KbezygeMJZCSHH+HgUZiTeSoiuFspbMg1ge+eFj18= +github.com/google/pprof v0.0.0-20250607225305-033d6d78b36a/go.mod h1:5hDyRhoBCxViHszMt12TnOpEI4VVi+U8Gm9iphldiMA= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -187,8 +191,8 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= -github.com/klauspost/cpuid/v2 v2.2.10 h1:tBs3QSyvjDyFTq3uoc/9xFpCuOsJQFNPiAhYdw2skhE= -github.com/klauspost/cpuid/v2 v2.2.10/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= +github.com/klauspost/cpuid/v2 v2.2.11 h1:0OwqZRYI2rFrjS4kvkDnqJkKHdHaRnCm68/DY4OxRzU= +github.com/klauspost/cpuid/v2 v2.2.11/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -253,8 +257,10 @@ github.com/opencontainers/runtime-spec v1.2.1 h1:S4k4ryNgEpxW1dzyqffOmhI1BHYcjzU github.com/opencontainers/runtime-spec v1.2.1/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0= github.com/opencontainers/selinux v1.12.0 h1:6n5JV4Cf+4y0KNXW48TLj5DwfXpvWlxXplUkdTrmPb8= github.com/opencontainers/selinux v1.12.0/go.mod h1:BTPX+bjVbWGXw7ZZWUbdENt8w0htPSrlgOOysQaU62U= -github.com/parca-dev/opentelemetry-ebpf-profiler v0.0.0-20250723163006-6ef58876b286 h1:P/fRnXyyq/8AY+g3RN+KSMX7xQeRPpEkiDJ23WtEOIE= -github.com/parca-dev/opentelemetry-ebpf-profiler v0.0.0-20250723163006-6ef58876b286/go.mod h1:ZKCiLxqP47dzRIJgvvcj/jj+gpzco2AA8imbDZ4+Tac= +github.com/parca-dev/oomprof v0.1.5-0.20250731185212-152810a61833 h1:YHzX1AuI9qThY6sR9rgnF5GD7OJAwEqhzSmIlR24pZs= +github.com/parca-dev/oomprof v0.1.5-0.20250731185212-152810a61833/go.mod h1:+vw0+rZYq/rTVouF6G+/+Uol758EOkUINi8/8RXPvX0= +github.com/parca-dev/opentelemetry-ebpf-profiler v0.0.0-20250731174540-1195e517db5c h1:3K9ZevMtHUl+MBeGOhQ/XP1IKMcYl6WaEwg1+4YAv7A= +github.com/parca-dev/opentelemetry-ebpf-profiler v0.0.0-20250731174540-1195e517db5c/go.mod h1:h8C/CuCo73JCYzKS7mcHtoT925k0TJRQq99N1c3UPpo= github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0= github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y= github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= @@ -438,8 +444,8 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da h1:noIWHXmPHxILtqtCOPIhSt0ABwskkZKjD3bXGnZGpNY= golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= -gonum.org/v1/gonum v0.15.0 h1:2lYxjRbTYyxkJxlhC+LvJIx3SsANPdRybu1tGj9/OrQ= -gonum.org/v1/gonum v0.15.0/go.mod h1:xzZVBJBtS+Mz4q0Yl2LJTk+OxOg4jiXZ7qBoM0uISGo= +gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= +gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= @@ -449,8 +455,8 @@ google.golang.org/genproto v0.0.0-20250414145226-207652e42e2e h1:mYHFv3iX85YMwhG google.golang.org/genproto v0.0.0-20250414145226-207652e42e2e/go.mod h1:TQT1YpH/rlDCS5+EuFaqPIMqDfuNMFR1OI8EcZJGgAk= google.golang.org/genproto/googleapis/api v0.0.0-20250414145226-207652e42e2e h1:UdXH7Kzbj+Vzastr5nVfccbmFsmYNygVLSPk1pEfDoY= google.golang.org/genproto/googleapis/api v0.0.0-20250414145226-207652e42e2e/go.mod h1:085qFyf2+XaZlRdCgKNCIZ3afY2p4HHZdoIRpId8F4A= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250414145226-207652e42e2e h1:ztQaXfzEXTmCBvbtWYRhJxW+0iJcz2qXfd38/e9l7bA= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250414145226-207652e42e2e/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250425173222-7b384671a197 h1:29cjnHVylHwTzH66WfFZqgSQgnxzvWE+jvBwpZCLRxY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250425173222-7b384671a197/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= diff --git a/main.go b/main.go index e9fe05503e..09ff3b316b 100644 --- a/main.go +++ b/main.go @@ -48,9 +48,11 @@ import ( "golang.org/x/sys/unix" "google.golang.org/grpc" + "github.com/parca-dev/oomprof/oomprof" "github.com/parca-dev/parca-agent/analytics" "github.com/parca-dev/parca-agent/config" "github.com/parca-dev/parca-agent/flags" + "github.com/parca-dev/parca-agent/oom" "github.com/parca-dev/parca-agent/reporter" "github.com/parca-dev/parca-agent/uploader" ) @@ -208,10 +210,53 @@ func mainWithExitCode() flags.ExitCode { cmd.Stdout = os.Stdout cmd.Stderr = io.MultiWriter(os.Stderr, buf) + if f.EnableOOMProf { + // Try to ensure oom killer leaves the parent alive by lowering our own oom adjustment, + // if the parent gets oom killed too its possible no oom reports will be sent. + // TODO: weigh this strategy's effectiveness against a canary/ballast approach. + if err := os.WriteFile("/proc/self/oom_score_adj", []byte("-100"), 0644); err != nil { + log.Errorf("Failed to set oom_score_adj: %v", err) + } + } + // Run garbage collector to minimize the amount of memory that the parent // telemetry process uses. runtime.GC() - err := cmd.Run() + + // Setup OOMProf integration if enabled. + var oomProfState *oomprof.State + if f.EnableOOMProf { + state, err := oom.StartOOMProf(ctx, grpcConn, f.BPF.VerboseLogging, f.Node, f.Metadata.ExternalLabels, f.EnableOOMProfAllocs) + if err != nil { + return flags.Failure("Failed to start OOMProf: %v", err) + } + oomProfState = state + } + + // Start the subprocess instead of Run to get the PID + err := cmd.Start() + if err != nil { + log.Errorf("Failed to start subprocess: %v", err) + return flags.ExitFailure + } + + // Get the subprocess PID for oomprof monitoring + subprocessPID := uint32(cmd.Process.Pid) + log.Debugf("Started parca-agent subprocess with PID %d", subprocessPID) + + // If oomprof is enabled, tell it to watch our subprocess + if f.EnableOOMProf && oomProfState != nil { + err = oomProfState.WatchPid(subprocessPID) + if err != nil { + log.Errorf("Failed to watch subprocess PID %d: %v", subprocessPID, err) + } else { + log.Infof("OOMProf is now monitoring main parca-agent PID %d", subprocessPID) + } + } + + // Wait for the subprocess to complete + err = cmd.Wait() + log.Infof("Subprocess completed %v exit: %d", err, cmd.ProcessState.ExitCode()) if err != nil { log.Error("======================= unexpected error =======================") log.Error(buf.String()) @@ -263,7 +308,7 @@ func mainWithExitCode() flags.ExitCode { } if err = tracer.ProbeBPFSyscall(); err != nil { - return flags.Failure(fmt.Sprintf("Failed to probe eBPF syscall: %v", err)) + return flags.Failure("Failed to probe eBPF syscall: %v", err) } externalLabels := reporter.Labels{} @@ -356,6 +401,8 @@ func mainWithExitCode() flags.ExitCode { buildInfo.VcsRevision, reg, offlineModeConfig, + f.EnableOOMProf, + f.EnableOOMProfAllocs, ) if err != nil { return flags.Failure("Failed to start reporting: %v", err) diff --git a/oom/oom_test_handler.go b/oom/oom_test_handler.go new file mode 100644 index 0000000000..3500133a95 --- /dev/null +++ b/oom/oom_test_handler.go @@ -0,0 +1,75 @@ +//go:build oomtest +// +build oomtest + +package oom + +import ( + "os" + "os/signal" + "sync" + "syscall" + + log "github.com/sirupsen/logrus" +) + +// memoryBombs holds allocated memory to prevent GC from freeing it +var memoryBombs [][]byte + +// oomOnce ensures the OOM trigger only runs once +var oomOnce sync.Once + +// setupOOMTestHandler sets up a signal handler that triggers massive memory allocation +// when SIGUSR2 is received. +func setupOOMTestHandler() { + log.Warn("OOM testing mode enabled - SIGUSR2 will trigger memory allocation bomb") + + // Create a channel to receive SIGUSR2 signals + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGUSR2) + + go func() { + for range sigChan { + // do once to avoid multiple triggers + oomOnce.Do(func() { + log.Warn("SIGUSR2 received - triggering OOM by allocating memory aggressively") + // Start the memory allocation bomb + triggerOOM() + }) + } + }() +} + +// triggerOOM allocates memory aggressively to force an OOM condition +func triggerOOM() { + // Start allocating memory in chunks + chunkSize := 1 * 1024 * 1024 + allocations := 0 + + log.Info("Starting memory allocation bomb...") + + // Allocate memory until we get killed + for { // 10MB chunks + // Allocate a chunk + chunk := make([]byte, chunkSize) + + // Fill it with data to ensure it's actually allocated + for i := range chunk { + chunk[i] = byte(i % 256) + } + + // Keep reference to prevent GC + memoryBombs = append(memoryBombs, chunk) + allocations++ + + // Log progress every 10 allocations + if allocations%10 == 0 { + totalMB := (allocations * chunkSize) / (1024 * 1024) + log.Infof("Allocated %d chunks, total: %d MB", allocations, totalMB) + } + } +} + +// init is called automatically when the package is imported +func init() { + setupOOMTestHandler() +} diff --git a/oom/oom_test_handler_stub.go b/oom/oom_test_handler_stub.go new file mode 100644 index 0000000000..c3d7ae4cf9 --- /dev/null +++ b/oom/oom_test_handler_stub.go @@ -0,0 +1,9 @@ +//go:build !oomtest +// +build !oomtest + +package oom + +// setupOOMTestHandler is a no-op when the oomtest build tag is not present +func setupOOMTestHandler() { + // No-op - OOM testing is not compiled in +} diff --git a/oom/oomprof.go b/oom/oomprof.go new file mode 100644 index 0000000000..ef8bf3c380 --- /dev/null +++ b/oom/oomprof.go @@ -0,0 +1,125 @@ +package oom + +import ( + "bytes" + "context" + "fmt" + + profilestoregrpc "buf.build/gen/go/parca-dev/parca/grpc/go/parca/profilestore/v1alpha1/profilestorev1alpha1grpc" + profilestorepb "buf.build/gen/go/parca-dev/parca/protocolbuffers/go/parca/profilestore/v1alpha1" + + "github.com/parca-dev/oomprof/oomprof" + log "github.com/sirupsen/logrus" + "google.golang.org/grpc" +) + +func StartOOMProf(ctx context.Context, grpcConn *grpc.ClientConn, verbose bool, nodeName string, + externalLabels map[string]string, reportAllocs bool) (state *oomprof.State, err error) { + profChan := make(chan oomprof.ProfileData, 10) + cfg := oomprof.Config{ + ScanInterval: 0, + MemLimit: 32 * 1024 * 1024, // 32MB + LogTracePipe: false, // only have oom prof or parca-agent read trace pipe, not both + Verbose: verbose, + Symbolize: false, + ReportAlloc: reportAllocs, // whether to report allocs in memory profiles + } + state, err = oomprof.Setup(ctx, &cfg, profChan) + if err != nil { + return nil, err + } + client := profilestoregrpc.NewProfileStoreServiceClient(grpcConn) + go handleOOMProfData(ctx, profChan, client, nodeName, externalLabels) + return state, nil +} + +// handleOOMProfData handles ProfileData from oomprof and sends it to the ProfileStoreService +func handleOOMProfData(ctx context.Context, profileCh <-chan oomprof.ProfileData, client profilestoregrpc.ProfileStoreServiceClient, nodeName string, externalLabels map[string]string) { + for { + select { + case <-ctx.Done(): + log.Info("OOMProf profile handler shutting down") + return + case profileData := <-profileCh: + log.Infof("Received OOMProf profile for PID %d, command %s", + profileData.PID, profileData.Command) + + // Convert the profile to protobuf format and send directly to Parca + err := sendOOMProfile(ctx, client, profileData, nodeName, externalLabels) + if err != nil { + log.Errorf("Failed to send OOM profile: %v", err) + } + } + } +} + +// sendOOMProfileToParca sends an OOM profile directly to Parca using the gRPC client +func sendOOMProfile(ctx context.Context, client profilestoregrpc.ProfileStoreServiceClient, profileData oomprof.ProfileData, nodeName string, externalLabels map[string]string) error { + // Convert profile to raw bytes + var buf bytes.Buffer + err := profileData.Profile.Write(&buf) + if err != nil { + return fmt.Errorf("failed to marshal profile: %w", err) + } + + // Build labels for the profile + labelSet := make([]*profilestorepb.Label, 0, len(externalLabels)+3) + + // Add external labels + for k, v := range externalLabels { + labelSet = append(labelSet, &profilestorepb.Label{ + Name: k, + Value: v, + }) + } + + // Add OOM-specific labels + // FIXME: subject these to relabeling? + labelSet = append(labelSet, &profilestorepb.Label{ + Name: "job", + Value: "oomprof", + }) + labelSet = append(labelSet, &profilestorepb.Label{ + Name: "node", + Value: nodeName, + }) + labelSet = append(labelSet, &profilestorepb.Label{ + Name: "__name__", + Value: "memory", + }) + labelSet = append(labelSet, &profilestorepb.Label{ + Name: "pid", + Value: fmt.Sprintf("%d", profileData.PID), + }) + labelSet = append(labelSet, &profilestorepb.Label{ + Name: "comm", + Value: profileData.Command, + }) + + // Create the WriteRaw request + req := &profilestorepb.WriteRawRequest{ + Series: []*profilestorepb.RawProfileSeries{ + { + Labels: &profilestorepb.LabelSet{ + Labels: labelSet, + }, + Samples: []*profilestorepb.RawSample{ + { + RawProfile: buf.Bytes(), + }, + }, + }, + }, + } + + // Send the profile + _, err = client.WriteRaw(ctx, req) + if err != nil { + return fmt.Errorf("failed to write raw profile: %w", err) + } + + log.Infof("Successfully sent OOM profile for PID %d, command %s, size: %d bytes to Parca", + profileData.PID, profileData.Command, len(buf.Bytes())) + + return nil +} diff --git a/oom/test-oom.sh b/oom/test-oom.sh new file mode 100755 index 0000000000..f8b6fe29a6 --- /dev/null +++ b/oom/test-oom.sh @@ -0,0 +1,368 @@ +#!/bin/bash +# +# This is a test to ensure the parca-agent can detect and handle "self" OOMs, +# ie the parent process watches the child for OOM events and reports them. +# Currently not a full circle test and has to be verified manually by +# inspecting parca for the oom profile. + +set -e + +# Configuration +MEMORY_LIMIT="1500M" +CGROUP_NAME="parca-oom-test" +SIGNAL_TO_SEND="USR2" +SLEEP_TIME=15 +PARCA_PORT="${PARCA_PORT:-7070}" + +# Colors for output +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +NC='\033[0m' # No Color + +log() { + echo -e "${GREEN}[$(date '+%H:%M:%S')]${NC} $1" +} + +warn() { + echo -e "${YELLOW}[$(date '+%H:%M:%S')] WARNING:${NC} $1" +} + +error() { + echo -e "${RED}[$(date '+%H:%M:%S')] ERROR:${NC} $1" +} + +# Function to cleanup cgroup +cleanup() { + log "Cleaning up..." + if [ -n "$PARCA_PID" ]; then + log "Killing parca-agent (PID: $PARCA_PID)" + kill -TERM "$PARCA_PID" 2>/dev/null || true + wait "$PARCA_PID" 2>/dev/null || true + fi + + # Remove from cgroup + if [ -f "/sys/fs/cgroup/${CGROUP_NAME}/cgroup.procs" ]; then + echo $$ > /sys/fs/cgroup/cgroup.procs 2>/dev/null || true + fi + + # Stop bpftrace if running + if [ -n "$BPFTRACE_PID" ]; then + log "Stopping bpftrace (PID: $BPFTRACE_PID)" + kill "$BPFTRACE_PID" 2>/dev/null || true + wait "$BPFTRACE_PID" 2>/dev/null || true + fi + + # Remove cgroup + if [ -d "/sys/fs/cgroup/${CGROUP_NAME}" ]; then + log "Removing cgroup: $CGROUP_NAME" + rmdir "/sys/fs/cgroup/${CGROUP_NAME}" 2>/dev/null || true + fi +} + +# Set up cleanup on exit +trap cleanup EXIT + +# Variable to store bpftrace PID +BPFTRACE_PID="" + +# Check if running as root or with sudo +if [ "$EUID" -ne 0 ]; then + error "This script needs to be run with sudo to manage cgroups" + exit 1 +fi + +# Check if cgroup v2 is available +if [ ! -f "/sys/fs/cgroup/cgroup.controllers" ]; then + error "cgroup v2 is required but not found" + exit 1 +fi + +# Function to get detailed memory breakdown for a process +get_process_memory_detail() { + local pid=$1 + local proc_name=$(ps -p "$pid" -o comm= 2>/dev/null || echo "unknown") + + if [[ ! -d "/proc/$pid" ]]; then + echo "Process $pid not found" + return + fi + + log "=== Memory Analysis for PID $pid ($proc_name) ===" + + # Basic memory stats from /proc/pid/status + if [[ -f "/proc/$pid/status" ]]; then + log "Process Status Memory:" + grep -E "^(VmSize|VmRSS|VmHWM|VmData|VmStk|VmExe|VmLib|VmPTE|VmSwap)" "/proc/$pid/status" | while read -r line; do + log " $line" + done + fi + + # Memory maps breakdown + if [[ -f "/proc/$pid/smaps" ]]; then + log "Memory Maps Summary:" + local heap_size=$(grep -A 20 "\[heap\]" "/proc/$pid/smaps" | grep "^Size:" | awk '{print $2}' | head -1) + local stack_size=$(grep -A 20 "\[stack\]" "/proc/$pid/smaps" | grep "^Size:" | awk '{print $2}' | head -1) + local anon_total=$(grep "^Size:" "/proc/$pid/smaps" | awk '{total+=$2} END {print total}') + local rss_total=$(grep "^Rss:" "/proc/$pid/smaps" | awk '{total+=$2} END {print total}') + + log " Heap: ${heap_size:-0} kB" + log " Stack: ${stack_size:-0} kB" + log " Total Size: ${anon_total:-0} kB" + log " Total RSS: ${rss_total:-0} kB" + + # Show top memory consumers + log " Top Memory Regions:" + awk '/^[0-9a-f]/ {addr=$1} /^Size:/ {size=$2} /^Rss:/ {rss=$2} /^[0-9a-f].*\[/ {name=$NF} END {if(rss>1024) print " " addr " " size "kB/" rss "kB " name}' "/proc/$pid/smaps" | sort -k3 -nr | head -10 + fi + + # Show file descriptors count + local fd_count=$(ls -1 "/proc/$pid/fd/" 2>/dev/null | wc -l) + log " File Descriptors: $fd_count" + + # Show threads + local thread_count=$(ls -1 "/proc/$pid/task/" 2>/dev/null | wc -l) + log " Threads: $thread_count" + + log "" +} + +# Function to compare parent and child memory usage +compare_parent_child_memory() { + local parent_pid=$1 + + # Get all child processes + local children=$(pgrep -P "$parent_pid" 2>/dev/null) + + get_process_memory_detail "$parent_pid" + + if [[ -n "$children" ]]; then + for child_pid in $children; do + get_process_memory_detail "$child_pid" + done + else + log "No child processes found for PID $parent_pid" + fi +} + +# Build parca-agent with oomtest tag +log "Building parca-agent with oomtest tag..." +go build -tags oomtest -o parca-agent-oomtest . +if [ $? -ne 0 ]; then + error "Failed to build parca-agent with oomtest tag" + exit 1 +fi + +PARCA_BINARY="./parca-agent-oomtest" + +log "Setting up memory-limited cgroup for OOM testing" + +# Create cgroup +CGROUP_PATH="/sys/fs/cgroup/${CGROUP_NAME}" +log "Creating cgroup: $CGROUP_NAME with memory limit: $MEMORY_LIMIT" +mkdir -p "$CGROUP_PATH" + +# Enable memory controller +echo "+memory" > /sys/fs/cgroup/cgroup.subtree_control 2>/dev/null || true + +# Set memory limit +echo "$MEMORY_LIMIT" > "${CGROUP_PATH}/memory.max" +echo "$MEMORY_LIMIT" > "${CGROUP_PATH}/memory.high" + +# Start bpftrace for OOM monitoring +log "Starting bpftrace OOM monitoring..." +if command -v bpftrace >/dev/null 2>&1; then + bpftrace ./oom_trace.bt & + BPFTRACE_PID=$! + log "Started bpftrace with PID: $BPFTRACE_PID" + # Give bpftrace a moment to start up + sleep 2 +else + warn "bpftrace not found - OOM tracing will be disabled" +fi + +if ! lsof -i :"$PARCA_PORT" >/dev/null 2>&1; then + docker run -d --name parca -p "$PARCA_PORT:$PARCA_PORT" ghcr.io/parca-dev/parca:v0.24.0 +fi + +log "Starting parca-agent with OOM testing enabled..." + +# Start parca-agent in the background +"$PARCA_BINARY" \ + "--remote-store-address=localhost:$PARCA_PORT" \ + --remote-store-insecure \ + --enable-oom-prof=true & + +PARCA_PID=$! + +# Add the process to the cgroup +echo "$PARCA_PID" > "${CGROUP_PATH}/cgroup.procs" + +log "Started parca-agent with PID: $PARCA_PID" +log "Memory limit: $MEMORY_LIMIT" +log "Cgroup: $CGROUP_NAME" + +# Wait for parca-agent to start up +log "Waiting ${SLEEP_TIME}s for parca-agent to start up..." +sleep "$SLEEP_TIME" + +# Check if process is still running +if ! kill -0 "$PARCA_PID" 2>/dev/null; then + error "parca-agent process died during startup" + exit 1 +fi + +# Show current memory usage +log "Current memory usage:" +if [ -f "${CGROUP_PATH}/memory.current" ]; then + mem=$(cat "${CGROUP_PATH}/memory.current") + mem_mb=$((mem / 1024 / 1024)) + log " Current: ${mem_mb}MB" +fi + +log "=== PRE-OOM Memory Analysis ===" +compare_parent_child_memory "$PARCA_PID" + +# Get the child process PID +CHILD_PID=$(pgrep -P "$PARCA_PID" 2>/dev/null) +if [[ -n "$CHILD_PID" ]]; then + log "Sending $SIGNAL_TO_SEND signal to child process (PID: $CHILD_PID) to trigger OOM..." + kill -"$SIGNAL_TO_SEND" "$CHILD_PID" +else + log "No child process found, sending signal to parent process..." + kill -"$SIGNAL_TO_SEND" "$PARCA_PID" +fi + +log "Monitoring for OOM events..." + +# Function to get BPF memory usage for a process and its children +get_bpf_memory() { + local main_pid=$1 + local total_memlock=0 + local map_count=0 + local prog_count=0 + local detailed_info="" + + # Get all child PIDs + local all_pids=$(pgrep -P "$main_pid" 2>/dev/null | tr '\n' ' ') + all_pids="$main_pid $all_pids" + + log "Checking BPF usage for PIDs: $all_pids" + + # Get programs loaded by root (uid 0) - parca-agent runs as root + while IFS= read -r prog_json; do + if [[ -n "$prog_json" ]]; then + local prog_id=$(echo "$prog_json" | jq -r '.id') + local prog_name=$(echo "$prog_json" | jq -r '.name // "unnamed"') + local prog_memlock=$(echo "$prog_json" | jq -r '.bytes_memlock') + local prog_uid=$(echo "$prog_json" | jq -r '.uid') + + # Only count programs loaded by root (parca-agent) + if [[ "$prog_uid" == "0" ]]; then + prog_count=$((prog_count + 1)) + total_memlock=$((total_memlock + prog_memlock)) + local prog_mb=$((prog_memlock / 1024 / 1024)) + if [[ $prog_mb -gt 0 ]]; then + detailed_info="${detailed_info} PROG $prog_id: $prog_name (${prog_mb}MB)\n" + fi + fi + fi + done < <(bpftool prog show -j 2>/dev/null | jq -c '.[]?' 2>/dev/null || echo "") + + # Get maps and their memory usage + while IFS= read -r map_json; do + if [[ -n "$map_json" ]]; then + local map_id=$(echo "$map_json" | jq -r '.id') + local map_name=$(echo "$map_json" | jq -r '.name // "unnamed"') + local map_memlock=$(echo "$map_json" | jq -r '.bytes_memlock') + local map_type=$(echo "$map_json" | jq -r '.type') + + # Only count maps > 1MB to focus on large consumers + local map_mb=$((map_memlock / 1024 / 1024)) + if [[ $map_mb -gt 1 ]]; then + map_count=$((map_count + 1)) + total_memlock=$((total_memlock + map_memlock)) + detailed_info="${detailed_info} MAP $map_id: $map_name [$map_type] (${map_mb}MB)\n" + fi + fi + done < <(bpftool map show -j 2>/dev/null | jq -c '.[]?' 2>/dev/null || echo "") + + local total_mb=$((total_memlock / 1024 / 1024)) + + # Show detailed info if there's significant memory usage + if [[ $total_mb -gt 10 ]]; then + log "BPF Memory Details:" + echo -e "$detailed_info" | head -20 + fi + + echo "${total_mb}MB (${map_count} large maps, ${prog_count} progs)" +} + +# Monitor the process and memory usage +for i in {1..60}; do + if ! kill -0 "$PARCA_PID" 2>/dev/null; then + log "Process has terminated (likely OOM killed)" + break + fi + + # Show memory usage every second + mem=$(cat "${CGROUP_PATH}/memory.current" 2>/dev/null || echo "0") + mem_mb=$((mem / 1024 / 1024)) + mem_limit_mb=$(echo "$MEMORY_LIMIT" | sed 's/M$//') + + # Get BPF memory usage + bpf_memory=$(get_bpf_memory "$PARCA_PID") + + log "Memory usage: ${mem_mb}MB / ${mem_limit_mb}MB | BPF: ${bpf_memory}" + + # Show detailed memory analysis every 10 seconds + if [[ $((i % 10)) -eq 0 ]]; then + log "=== Detailed Memory Analysis (iteration $i) ===" + compare_parent_child_memory "$PARCA_PID" + fi + + # Check for OOM events and show all memory.events contents + if [ -f "${CGROUP_PATH}/memory.events" ]; then + log "Current memory.events:" + cat "${CGROUP_PATH}/memory.events" | while read line; do + log " $line" + done + + oom_kill=$(grep "oom_kill" "${CGROUP_PATH}/memory.events" | awk '{print $2}') + if [ "$oom_kill" -gt "0" ]; then + log "OOM kill detected! (count: $oom_kill)" + fi + fi + + sleep 1 +done + +# Final check +if kill -0 "$PARCA_PID" 2>/dev/null; then + warn "Process is still running after 60 seconds" +else + log "Process has terminated" + + # Check exit status + wait "$PARCA_PID" + EXIT_CODE=$? + if [ $EXIT_CODE -eq 137 ]; then + log "Process was killed with SIGKILL (likely OOM)" + else + log "Process exited with code: $EXIT_CODE" + fi + + # Check for OOM events in cgroup + if [ -f "${CGROUP_PATH}/memory.events" ]; then + log "Memory events:" + cat "${CGROUP_PATH}/memory.events" | while read line; do + log " $line" + done + fi + + # Check dmesg for OOM messages + log "Checking dmesg for OOM messages..." + dmesg | tail -20 | grep -i "killed process\|out of memory\|oom-kill" || log "No OOM messages found in recent dmesg" +fi + +log "OOM test completed" \ No newline at end of file diff --git a/reporter/parca_reporter.go b/reporter/parca_reporter.go index 4181941f74..12ff019f1e 100644 --- a/reporter/parca_reporter.go +++ b/reporter/parca_reporter.go @@ -31,6 +31,7 @@ import ( "github.com/apache/arrow/go/v16/arrow/memory" lru "github.com/elastic/go-freelru" "github.com/klauspost/compress/zstd" + "github.com/parca-dev/oomprof/oomprof" "github.com/parca-dev/parca-agent/metrics" "github.com/parca-dev/parca-agent/reporter/metadata" "github.com/prometheus/client_golang/prometheus" @@ -158,6 +159,9 @@ type ParcaReporter struct { // Set of stacks that are already in the current log, // meaning we don't need to log them again. offlineModeLoggedStacks *lru.SyncedLRU[libpf.TraceHash, struct{}] + + oomState *oomprof.State + reportAllocs bool // whether to report allocs in memory profiles } // hashString is a helper function for LRUs that use string as a key. @@ -226,42 +230,70 @@ func (r *ParcaReporter) ReportTraceEvent(trace *libpf.Trace, r.sampleWriterMu.Lock() defer r.sampleWriterMu.Unlock() - for _, lbl := range labelRetrievalResult.labels { - r.sampleWriter.Label(lbl.Name).AppendString(lbl.Value) - } + buf := [16]byte{} + trace.Hash.PutBytes16(&buf) - for k, v := range trace.CustomLabels { - if !utf8.ValidString(k) { - log.Warnf("ignoring non-UTF8 label: %s", hex.EncodeToString([]byte(k))) - continue - } - v, ok := maybeFixTruncation(v, support.CustomLabelMaxValLen-1) - if !ok { - log.Warnf("ignoring non-UTF8 value for label %s: %s", k, hex.EncodeToString([]byte(v))) - continue + writeSample := func(value, duration, per int64, producer, sampleType, sampleUnit, periodType, periodUnit string) { + // Write labels + for _, lbl := range labelRetrievalResult.labels { + r.sampleWriter.Label(lbl.Name).AppendString(lbl.Value) } - r.sampleWriter.Label(k).AppendString(v) - } - buf := [16]byte{} - trace.Hash.PutBytes16(&buf) - r.sampleWriter.StacktraceID.Append(buf[:]) + // Write custom labels + for k, v := range trace.CustomLabels { + if !utf8.ValidString(k) { + log.Warnf("ignoring non-UTF8 label: %s", hex.EncodeToString([]byte(k))) + continue + } + v, ok := maybeFixTruncation(v, support.CustomLabelMaxValLen-1) + if !ok { + log.Warnf("ignoring non-UTF8 value for label %s: %s", k, hex.EncodeToString([]byte(v))) + continue + } + r.sampleWriter.Label(k).AppendString(v) + } - r.sampleWriter.Timestamp.Append(int64(meta.Timestamp)) + // Write sample data + r.sampleWriter.StacktraceID.Append(buf[:]) + r.sampleWriter.Timestamp.Append(int64(meta.Timestamp)) + r.sampleWriter.Value.Append(value) + r.sampleWriter.SampleType.AppendString(sampleType) + r.sampleWriter.SampleUnit.AppendString(sampleUnit) + r.sampleWriter.PeriodType.AppendString(periodType) + r.sampleWriter.PeriodUnit.AppendString(periodUnit) + r.sampleWriter.Producer.AppendString(producer) + r.sampleWriter.Duration.Append(duration) + r.sampleWriter.Period.Append(per) + } switch meta.Origin { case support.TraceOriginSampling: - r.sampleWriter.Value.Append(1) - r.sampleWriter.SampleType.AppendString("samples") - r.sampleWriter.SampleUnit.AppendString("count") - r.sampleWriter.PeriodType.AppendString("cpu") - r.sampleWriter.PeriodUnit.AppendString("nanoseconds") + writeSample(1, time.Second.Nanoseconds(), 1e9/int64(r.samplesPerSecond), "parca_agent", "samples", "count", "cpu", "nanoseconds") + r.sampleWriter.Temporality.AppendString("delta") case support.TraceOriginOffCPU: - r.sampleWriter.Value.Append(meta.OffTime) - r.sampleWriter.SampleType.AppendString("wallclock") - r.sampleWriter.SampleUnit.AppendString("nanoseconds") - r.sampleWriter.PeriodType.AppendString("samples") - r.sampleWriter.PeriodUnit.AppendString("count") + writeSample(meta.OffTime, time.Second.Nanoseconds(), 1e9/int64(r.samplesPerSecond), "parca_agent", "wallclock", "nanoseconds", "samples", "count") + r.sampleWriter.Temporality.AppendString("delta") + case support.TraceOriginMemory: + memPeriod := int64(512 * 1024) // 512 KiB + // Write 4 memory samples + // 1. inuse_objects (Allocs - Frees) + if meta.Allocs != meta.Frees { + r.sampleWriter.Temporality.AppendNull() + writeSample(int64(meta.Allocs-meta.Frees), 0, memPeriod, "memory", "inuse_objects", "count", "space", "bytes") + } + // 2. inuse_space (AllocBytes - FreeBytes) + if meta.AllocBytes != meta.FreeBytes { + r.sampleWriter.Temporality.AppendNull() + writeSample(int64(meta.AllocBytes-meta.FreeBytes), 0, memPeriod, "memory", "inuse_space", "bytes", "space", "bytes") + } + if r.reportAllocs { + // 3. alloc_objects + r.sampleWriter.Temporality.AppendNull() + writeSample(int64(meta.Allocs), 0, memPeriod, "memory", "alloc_objects", "count", "space", "bytes") + // 4. alloc_space + r.sampleWriter.Temporality.AppendNull() + writeSample(int64(meta.AllocBytes), 0, memPeriod, "memory", "alloc_space", "bytes", "space", "bytes") + } } return nil @@ -293,6 +325,10 @@ func (r *ParcaReporter) labelsForTID(tid, pid libpf.PID, comm string, cpu int, e lb.Set("__meta_env_var_"+k, v) } + if r.oomState.PidOomd(uint32(pid)) { + lb.Set("job", "oomprof") + } + cacheable := r.addMetadataForPID(pid, lb) keep := relabel.ProcessBuilder(lb, r.relabelConfigs...) @@ -365,6 +401,10 @@ func (r *ParcaReporter) ExecutableMetadata(args *reporter.ExecutableMetadataArgs return } + if r.oomState != nil { + r.oomState.RegisterBuildIDToFileID(args.GnuBuildID, args.FileID) + } + r.executables.Add(args.FileID, metadata.ExecInfo{ FileName: args.FileName, BuildID: args.GnuBuildID, @@ -485,6 +525,10 @@ func (r *ParcaReporter) ReportMetrics(_ uint32, ids []uint32, values []int64) { // Stop triggers a graceful shutdown of ParcaReporter. func (r *ParcaReporter) Stop() { close(r.stopSignal) + if r.oomState != nil { + r.oomState.Close() + r.oomState = nil + } } type Label struct { @@ -532,6 +576,8 @@ func New( agentRevision string, reg prometheus.Registerer, offlineModeConfig *OfflineModeConfig, + enableOOMProf bool, + enableAllocs bool, ) (*ParcaReporter, error) { if offlineModeConfig != nil && !disableSymbolUpload { return nil, errors.New("Illogical configuration: offline mode with symbol upload enabled") @@ -636,6 +682,25 @@ func New( r.uploader = u } + if enableOOMProf { + // Set up oomprof with process scanning disabled - we'll feed PIDs from ReportTrace + config := &oomprof.Config{ + ScanInterval: 0, + LogTracePipe: false, // Don't log trace pipe in child process + Verbose: false, + Symbolize: false, + ReportAlloc: enableAllocs, + } + + state, err := oomprof.SetupWithReporter(context.TODO(), config, r) + if err != nil { + close(r.stopSignal) + return nil, fmt.Errorf("failed to setup oomprof: %w", err) + } + r.oomState = state + log.Infof("OOM Profiler enabled, will report OOM traces to Parca") + } + return r, nil } @@ -1128,16 +1193,6 @@ func (r *ParcaReporter) buildSampleRecord(ctx context.Context) (arrow.Record, in // Completing the record with all values that are the same for all rows. rows := uint64(w.Value.Len()) r.writeCommonLabels(w, rows) - w.Producer.ree.Append(rows) - w.Producer.bd.AppendString("parca_agent") - w.Temporality.ree.Append(rows) - w.Temporality.bd.AppendString("delta") - w.Period.ree.Append(rows) - // Since the period is of type cpu nanoseconds it is the time between - // samples. - w.Period.ib.Append(1e9 / int64(r.samplesPerSecond)) - w.Duration.ree.Append(rows) - w.Duration.ib.Append(time.Second.Nanoseconds()) return w.NewRecord(), len(w.labelBuilders) }