Skip to content

Commit ad7e7ac

Browse files
authored
add pprof options for opm serve command (#968)
* add cpu/mem profiling to serve command serves cached startup cpu profile data via custom endpoint Signed-off-by: Jordan Keister <[email protected]> * revert nit Signed-off-by: Jordan Keister <[email protected]> * package alias rename Signed-off-by: Jordan Keister <[email protected]>
1 parent b080d08 commit ad7e7ac

File tree

1 file changed

+134
-2
lines changed

1 file changed

+134
-2
lines changed

cmd/opm/serve/serve.go

Lines changed: 134 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,17 @@
11
package serve
22

33
import (
4+
"bytes"
45
"context"
6+
"errors"
57
"fmt"
68
"net"
79
"os"
10+
"sync"
11+
12+
"net/http"
13+
endpoint "net/http/pprof"
14+
"runtime/pprof"
815

916
"github.com/sirupsen/logrus"
1017
"github.com/spf13/cobra"
@@ -26,11 +33,17 @@ type serve struct {
2633

2734
port string
2835
terminationLog string
29-
debug bool
36+
37+
debug bool
38+
pprofAddr string
3039

3140
logger *logrus.Entry
3241
}
3342

43+
const (
44+
defaultCpuStartupPath string = "/debug/pprof/startup/cpu"
45+
)
46+
3447
func NewCmd() *cobra.Command {
3548
logger := logrus.New()
3649
s := serve{
@@ -59,12 +72,19 @@ will not be reflected in the served content.
5972
}
6073

6174
cmd.Flags().BoolVar(&s.debug, "debug", false, "enable debug logging")
62-
cmd.Flags().StringVarP(&s.port, "port", "p", "50051", "port number to serve on")
6375
cmd.Flags().StringVarP(&s.terminationLog, "termination-log", "t", "/dev/termination-log", "path to a container termination log file")
76+
cmd.Flags().StringVarP(&s.port, "port", "p", "50051", "port number to serve on")
77+
cmd.Flags().StringVar(&s.pprofAddr, "pprof-addr", "", "address of startup profiling endpoint (addr:port format)")
6478
return cmd
6579
}
6680

6781
func (s *serve) run(ctx context.Context) error {
82+
p := newProfilerInterface(s.pprofAddr, s.logger)
83+
p.startEndpoint()
84+
if err := p.startCpuProfileCache(); err != nil {
85+
return fmt.Errorf("could not start CPU profile: %v", err)
86+
}
87+
6888
// Immediately set up termination log
6989
err := log.AddDefaultWriterHooks(s.terminationLog)
7090
if err != nil {
@@ -103,9 +123,121 @@ func (s *serve) run(ctx context.Context) error {
103123
health.RegisterHealthServer(grpcServer, server.NewHealthServer())
104124
reflection.Register(grpcServer)
105125
s.logger.Info("serving registry")
126+
p.stopCpuProfileCache()
127+
106128
return graceful.Shutdown(s.logger, func() error {
107129
return grpcServer.Serve(lis)
108130
}, func() {
109131
grpcServer.GracefulStop()
132+
p.stopEndpoint(p.logger.Context)
110133
})
134+
135+
}
136+
137+
// manages an HTTP pprof endpoint served by `server`,
138+
// including default pprof handlers and custom cpu pprof cache stored in `cache`.
139+
// the cache is intended to sample CPU activity for a period and serve the data
140+
// via a custom pprof path once collection is complete (e.g. over process initialization)
141+
type profilerInterface struct {
142+
addr string
143+
cache bytes.Buffer
144+
145+
server http.Server
146+
147+
cacheReady bool
148+
cacheLock sync.RWMutex
149+
150+
logger *logrus.Entry
151+
}
152+
153+
func newProfilerInterface(a string, log *logrus.Entry) *profilerInterface {
154+
return &profilerInterface{
155+
addr: a,
156+
logger: log.WithFields(logrus.Fields{"address": a}),
157+
cache: bytes.Buffer{},
158+
}
159+
}
160+
161+
func (p *profilerInterface) isEnabled() bool {
162+
return p.addr != ""
163+
}
164+
165+
func (p *profilerInterface) startEndpoint() {
166+
// short-circuit if not enabled
167+
if !p.isEnabled() {
168+
return
169+
}
170+
171+
mux := http.NewServeMux()
172+
mux.HandleFunc("/debug/pprof/", endpoint.Index)
173+
mux.HandleFunc("/debug/pprof/cmdline", endpoint.Cmdline)
174+
mux.HandleFunc("/debug/pprof/profile", endpoint.Profile)
175+
mux.HandleFunc("/debug/pprof/symbol", endpoint.Symbol)
176+
mux.HandleFunc("/debug/pprof/trace", endpoint.Trace)
177+
mux.HandleFunc(defaultCpuStartupPath, p.httpHandler)
178+
179+
p.server = http.Server{
180+
Addr: p.addr,
181+
Handler: mux,
182+
}
183+
184+
// goroutine exits with main
185+
go func() {
186+
187+
p.logger.Info("starting pprof endpoint")
188+
if err := p.server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
189+
p.logger.Fatal(err)
190+
}
191+
}()
192+
}
193+
194+
func (p *profilerInterface) startCpuProfileCache() error {
195+
// short-circuit if not enabled
196+
if !p.isEnabled() {
197+
return nil
198+
}
199+
200+
p.logger.Infof("start caching cpu profile data at %q", defaultCpuStartupPath)
201+
if err := pprof.StartCPUProfile(&p.cache); err != nil {
202+
return err
203+
}
204+
205+
return nil
206+
}
207+
208+
func (p *profilerInterface) stopCpuProfileCache() {
209+
// short-circuit if not enabled
210+
if !p.isEnabled() {
211+
return
212+
}
213+
pprof.StopCPUProfile()
214+
p.setCacheReady()
215+
p.logger.Info("stopped caching cpu profile data")
216+
}
217+
218+
func (p *profilerInterface) httpHandler(w http.ResponseWriter, r *http.Request) {
219+
if !p.isCacheReady() {
220+
http.Error(w, "cpu profile cache is not yet ready", http.StatusServiceUnavailable)
221+
}
222+
w.Write(p.cache.Bytes())
223+
}
224+
225+
func (p *profilerInterface) stopEndpoint(ctx context.Context) {
226+
if err := p.server.Shutdown(ctx); err != nil {
227+
p.logger.Fatal(err)
228+
}
229+
}
230+
231+
func (p *profilerInterface) isCacheReady() bool {
232+
p.cacheLock.RLock()
233+
isReady := p.cacheReady
234+
p.cacheLock.RUnlock()
235+
236+
return isReady
237+
}
238+
239+
func (p *profilerInterface) setCacheReady() {
240+
p.cacheLock.Lock()
241+
p.cacheReady = true
242+
p.cacheLock.Unlock()
111243
}

0 commit comments

Comments
 (0)