@@ -20,13 +20,10 @@ import (
2020 "context"
2121 "encoding/json"
2222 "errors"
23- "expvar"
2423 "flag"
2524 "fmt"
2625 "io"
2726 "net"
28- "net/http"
29- "net/http/pprof"
3027 "os"
3128 "path/filepath"
3229 "runtime"
@@ -43,7 +40,6 @@ import (
4340 "github.com/containerd/containerd/v2/plugins"
4441 "github.com/containerd/containerd/v2/version"
4542 "github.com/containerd/log"
46- "github.com/containerd/otelttrpc"
4743 "github.com/containerd/plugin"
4844 "github.com/containerd/plugin/registry"
4945 "github.com/containerd/ttrpc"
@@ -112,10 +108,12 @@ type TTRPCService interface {
112108 RegisterTTRPC (* ttrpc.Server ) error
113109}
114110
115- type TTRPCServerOptioner interface {
116- TTRPCService
111+ type TTRPCServerUnaryOptioner interface {
112+ UnaryServerInterceptor () ttrpc.UnaryServerInterceptor
113+ }
117114
118- UnaryInterceptor () ttrpc.UnaryServerInterceptor
115+ type TTRPCClientUnaryOptioner interface {
116+ UnaryClientInterceptor () ttrpc.UnaryClientInterceptor
119117}
120118
121119var (
@@ -249,13 +247,6 @@ func run(ctx context.Context, manager Manager, config Config) error {
249247 }
250248
251249 ttrpcAddress := os .Getenv (ttrpcAddressEnv )
252- publisher , err := NewPublisher (ttrpcAddress , WithPublishTTRPCOpts (
253- ttrpc .WithUnaryClientInterceptor (otelttrpc .UnaryClientInterceptor ()),
254- ))
255- if err != nil {
256- return err
257- }
258- defer publisher .Close ()
259250
260251 ctx = namespaces .WithNamespace (ctx , namespaceFlag )
261252 ctx = context .WithValue (ctx , OptsKey {}, Opts {BundlePath : bundlePath , Debug : debugFlag })
@@ -333,7 +324,15 @@ func run(ctx context.Context, manager Manager, config Config) error {
333324 Type : plugins .EventPlugin ,
334325 ID : "publisher" ,
335326 InitFn : func (ic * plugin.InitContext ) (interface {}, error ) {
336- return publisher , nil
327+ return NewPublisher (ttrpcAddress , func (cfg * publisherConfig ) {
328+ p , _ := ic .GetByID (plugins .TTRPCPlugin , "otelttrpc" )
329+ if p == nil {
330+ return
331+ }
332+
333+ opts := ttrpc .WithUnaryClientInterceptor (p .(TTRPCClientUnaryOptioner ).UnaryClientInterceptor ())
334+ WithPublishTTRPCOpts (opts )(cfg )
335+ })
337336 },
338337 })
339338
@@ -342,6 +341,8 @@ func run(ctx context.Context, manager Manager, config Config) error {
342341 ttrpcServices = []TTRPCService {}
343342
344343 ttrpcUnaryInterceptors = []ttrpc.UnaryServerInterceptor {}
344+
345+ pprofHandler server
345346 )
346347
347348 for _ , p := range registry .Graph (func (* plugin.Registration ) bool { return false }) {
@@ -389,20 +390,23 @@ func run(ctx context.Context, manager Manager, config Config) error {
389390 if src , ok := instance .(TTRPCService ); ok {
390391 log .G (ctx ).WithField ("id" , pID ).Debug ("registering ttrpc service" )
391392 ttrpcServices = append (ttrpcServices , src )
393+ }
392394
395+ if src , ok := instance .(TTRPCServerUnaryOptioner ); ok {
396+ ttrpcUnaryInterceptors = append (ttrpcUnaryInterceptors , src .UnaryServerInterceptor ())
393397 }
394398
395- if src , ok := instance .(TTRPCServerOptioner ); ok {
396- ttrpcUnaryInterceptors = append (ttrpcUnaryInterceptors , src .UnaryInterceptor ())
399+ if result .Registration .ID == "pprof" {
400+ if src , ok := instance .(server ); ok {
401+ pprofHandler = src
402+ }
397403 }
398404 }
399405
400406 if len (ttrpcServices ) == 0 {
401407 return fmt .Errorf ("required that ttrpc service" )
402408 }
403409
404- ttrpcUnaryInterceptors = append (ttrpcUnaryInterceptors , otelttrpc .UnaryServerInterceptor ())
405-
406410 unaryInterceptor := chainUnaryServerInterceptors (ttrpcUnaryInterceptors ... )
407411 server , err := newServer (ttrpc .WithUnaryServerInterceptor (unaryInterceptor ))
408412 if err != nil {
@@ -415,7 +419,7 @@ func run(ctx context.Context, manager Manager, config Config) error {
415419 }
416420 }
417421
418- if err := serve (ctx , server , signals , sd .Shutdown ); err != nil {
422+ if err := serve (ctx , server , signals , sd .Shutdown , pprofHandler ); err != nil {
419423 if ! errors .Is (err , shutdown .ErrShutdown ) {
420424 cleanupSockets (ctx )
421425 return err
@@ -436,7 +440,7 @@ func run(ctx context.Context, manager Manager, config Config) error {
436440
437441// serve serves the ttrpc API over a unix socket in the current working directory
438442// and blocks until the context is canceled
439- func serve (ctx context.Context , server * ttrpc.Server , signals chan os.Signal , shutdown func ()) error {
443+ func serve (ctx context.Context , server * ttrpc.Server , signals chan os.Signal , shutdown func (), pprof server ) error {
440444 dump := make (chan os.Signal , 32 )
441445 setupDumpStacks (dump )
442446
@@ -456,9 +460,9 @@ func serve(ctx context.Context, server *ttrpc.Server, signals chan os.Signal, sh
456460 }
457461 }()
458462
459- if debugFlag {
460- if err := serveDebug (ctx ); err != nil {
461- return err
463+ if debugFlag && pprof != nil {
464+ if err := setupPprof (ctx , pprof ); err != nil {
465+ log . G ( ctx ). WithError ( err ). Warn ( "Could not setup pprof" )
462466 }
463467 }
464468
@@ -477,31 +481,6 @@ func serve(ctx context.Context, server *ttrpc.Server, signals chan os.Signal, sh
477481 return reap (ctx , logger , signals )
478482}
479483
480- func serveDebug (ctx context.Context ) error {
481- l , err := serveListener (debugSocketFlag , 4 )
482- if err != nil {
483- return err
484- }
485- go func () {
486- defer l .Close ()
487- m := http .NewServeMux ()
488- m .Handle ("/debug/vars" , expvar .Handler ())
489- m .Handle ("/debug/pprof/" , http .HandlerFunc (pprof .Index ))
490- m .Handle ("/debug/pprof/cmdline" , http .HandlerFunc (pprof .Cmdline ))
491- m .Handle ("/debug/pprof/profile" , http .HandlerFunc (pprof .Profile ))
492- m .Handle ("/debug/pprof/symbol" , http .HandlerFunc (pprof .Symbol ))
493- m .Handle ("/debug/pprof/trace" , http .HandlerFunc (pprof .Trace ))
494- srv := & http.Server {
495- Handler : m ,
496- ReadHeaderTimeout : 5 * time .Minute ,
497- }
498- if err := srv .Serve (l ); err != nil && ! errors .Is (err , net .ErrClosed ) {
499- log .G (ctx ).WithError (err ).Fatal ("containerd-shim: pprof endpoint failure" )
500- }
501- }()
502- return nil
503- }
504-
505484func dumpStacks (logger * log.Entry ) {
506485 var (
507486 buf []byte
@@ -516,3 +495,22 @@ func dumpStacks(logger *log.Entry) {
516495 buf = buf [:stackSize ]
517496 logger .Infof ("=== BEGIN goroutine stack dump ===\n %s\n === END goroutine stack dump ===" , buf )
518497}
498+
499+ type server interface {
500+ Serve (net.Listener ) error
501+ }
502+
503+ func setupPprof (ctx context.Context , srv server ) error {
504+ l , err := serveListener (debugSocketFlag , 4 )
505+ if err != nil {
506+ return fmt .Errorf ("could not setup pprof listener: %w" , err )
507+ }
508+
509+ go func () {
510+ if err := srv .Serve (l ); err != nil && ! errors .Is (err , net .ErrClosed ) {
511+ log .G (ctx ).WithError (err ).Fatal ("containerd-shim: pprof endpoint failure" )
512+ }
513+ }()
514+
515+ return nil
516+ }
0 commit comments