Skip to content

Commit aa94837

Browse files
authored
Support pprof profiling (#232)
1 parent 0d8a5be commit aa94837

File tree

11 files changed

+678
-32
lines changed

11 files changed

+678
-32
lines changed

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ Release Notes.
1111
* Add recover to goroutine to prevent unexpected panics.
1212
* Add mutex to fix some data race.
1313
* Replace external `goapi` dependency with in-repo generated protocols.
14+
* Support pprof profiling.
1415
#### Plugins
1516

1617
#### Documentation

agent/core/compile.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,21 @@ package core
1919

2020
import (
2121
//go:nolint
22+
_ "bytes"
2223
_ "encoding/base64"
2324
_ "fmt"
25+
_ "io"
2426
_ "log"
2527
_ "math"
2628
_ "math/rand"
2729
_ "net"
2830
_ "os"
31+
_ "path/filepath"
2932
_ "reflect"
3033
_ "runtime"
3134
_ "runtime/debug"
3235
_ "runtime/metrics"
36+
_ "runtime/pprof"
3337
_ "sort"
3438
_ "strconv"
3539
_ "strings"
@@ -55,4 +59,5 @@ import (
5559
_ "github.com/apache/skywalking-go/protocols/collect/language/agent/v3"
5660
_ "github.com/apache/skywalking-go/protocols/collect/language/profile/v3"
5761
_ "github.com/apache/skywalking-go/protocols/collect/logging/v3"
62+
_ "github.com/apache/skywalking-go/protocols/collect/pprof/v10"
5863
)

agent/reporter/imports.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,5 +71,6 @@ import (
7171
_ "github.com/apache/skywalking-go/protocols/collect/language/profile/v3"
7272
_ "github.com/apache/skywalking-go/protocols/collect/logging/v3"
7373
_ "github.com/apache/skywalking-go/protocols/collect/management/v3"
74+
_ "github.com/apache/skywalking-go/protocols/collect/pprof/v10"
7475
_ "github.com/apache/skywalking-go/protocols/collect/servicemesh/v3"
7576
)

plugins/core/pprof.go

Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
// Licensed to Apache Software Foundation (ASF) under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Apache Software Foundation (ASF) licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package core
19+
20+
import (
21+
"bytes"
22+
"fmt"
23+
"io"
24+
"os"
25+
"path/filepath"
26+
"runtime"
27+
"runtime/pprof"
28+
"sync/atomic"
29+
"time"
30+
31+
"github.com/apache/skywalking-go/plugins/core/operator"
32+
"github.com/apache/skywalking-go/plugins/core/reporter"
33+
)
34+
35+
const (
36+
// Pprof event types
37+
PprofEventsTypeCPU = "cpu"
38+
PprofEventsTypeHeap = "heap"
39+
PprofEventsTypeAllocs = "allocs"
40+
PprofEventsTypeBlock = "block"
41+
PprofEventsTypeMutex = "mutex"
42+
PprofEventsTypeThread = "threadcreate"
43+
PprofEventsTypeGoroutine = "goroutine"
44+
)
45+
46+
// CPU profiling state to ensure only one CPU profiling task runs at a time
47+
var profilingIsRunning atomic.Bool
48+
49+
func init() {
50+
reporter.NewPprofTaskCommand = NewPprofTaskCommand
51+
}
52+
53+
type PprofTaskCommandImpl struct {
54+
// Pprof Task ID
55+
taskID string
56+
// Type of profiling (CPU/Heap/Block/Mutex/Goroutine/Threadcreate/Allocs)
57+
events string
58+
// Unit is minute, required for CPU, Block and Mutex events
59+
duration time.Duration
60+
// Unix timestamp in milliseconds when the task was created
61+
createTime int64
62+
// Define the period of the pprof dump, required for Block and Mutex events
63+
dumpPeriod int
64+
65+
// for pprof task service
66+
pprofFilePath string
67+
logger operator.LogOperator
68+
manager reporter.PprofReporter
69+
}
70+
71+
func NewPprofTaskCommand(taskID, events string, duration time.Duration,
72+
createTime int64, dumpPeriod int, pprofFilePath string,
73+
logger operator.LogOperator, manager reporter.PprofReporter) reporter.PprofTaskCommand {
74+
return &PprofTaskCommandImpl{
75+
taskID: taskID,
76+
events: events,
77+
duration: duration,
78+
createTime: createTime,
79+
dumpPeriod: dumpPeriod,
80+
pprofFilePath: pprofFilePath,
81+
logger: logger,
82+
manager: manager,
83+
}
84+
}
85+
86+
func (c *PprofTaskCommandImpl) GetTaskID() string {
87+
return c.taskID
88+
}
89+
90+
func (c *PprofTaskCommandImpl) GetCreateTime() int64 {
91+
return c.createTime
92+
}
93+
94+
func (c *PprofTaskCommandImpl) GetDuration() time.Duration {
95+
return c.duration
96+
}
97+
98+
func (c *PprofTaskCommandImpl) GetDumpPeriod() int {
99+
return c.dumpPeriod
100+
}
101+
102+
func (c *PprofTaskCommandImpl) IsInvalidEvent() bool {
103+
return !(c.events == PprofEventsTypeHeap ||
104+
c.events == PprofEventsTypeAllocs ||
105+
c.events == PprofEventsTypeGoroutine ||
106+
c.events == PprofEventsTypeThread ||
107+
c.events == PprofEventsTypeCPU ||
108+
c.events == PprofEventsTypeBlock ||
109+
c.events == PprofEventsTypeMutex)
110+
}
111+
112+
func (c *PprofTaskCommandImpl) IsDirectSamplingType() bool {
113+
return c.events == PprofEventsTypeHeap ||
114+
c.events == PprofEventsTypeAllocs ||
115+
c.events == PprofEventsTypeGoroutine ||
116+
c.events == PprofEventsTypeThread
117+
}
118+
119+
func (c *PprofTaskCommandImpl) HasDumpPeriod() bool {
120+
return c.events == PprofEventsTypeBlock ||
121+
c.events == PprofEventsTypeMutex
122+
}
123+
124+
func (c *PprofTaskCommandImpl) closeFileWriter(writer io.Writer) {
125+
if file, ok := writer.(*os.File); ok {
126+
if err := file.Close(); err != nil {
127+
c.logger.Errorf("failed to close pprof file: %v", err)
128+
}
129+
}
130+
}
131+
132+
func (c *PprofTaskCommandImpl) getWriter() (io.Writer, error) {
133+
// sample data to buffer
134+
if c.pprofFilePath == "" {
135+
return &bytes.Buffer{}, nil
136+
}
137+
138+
// sample data to file
139+
pprofFileName := filepath.Join(c.taskID, ".pprof")
140+
pprofFilePath := filepath.Join(c.pprofFilePath, pprofFileName)
141+
if err := os.MkdirAll(filepath.Dir(pprofFilePath), os.ModePerm); err != nil {
142+
return nil, err
143+
}
144+
145+
writer, err := os.Create(pprofFilePath)
146+
if err != nil {
147+
return nil, err
148+
}
149+
150+
return writer, nil
151+
}
152+
153+
func (c *PprofTaskCommandImpl) StartTask() (io.Writer, error) {
154+
c.logger.Infof("start pprof task %s", c.taskID)
155+
// For CPU profiling, check global state first
156+
if c.events == PprofEventsTypeCPU && !profilingIsRunning.CompareAndSwap(false, true) {
157+
return nil, fmt.Errorf("CPU profiling is already running")
158+
}
159+
160+
writer, err := c.getWriter()
161+
if err != nil {
162+
if c.events == PprofEventsTypeCPU {
163+
profilingIsRunning.Store(false)
164+
}
165+
return nil, err
166+
}
167+
168+
switch c.events {
169+
case PprofEventsTypeCPU:
170+
if err = pprof.StartCPUProfile(writer); err != nil {
171+
profilingIsRunning.Store(false)
172+
if c.pprofFilePath != "" {
173+
c.closeFileWriter(writer)
174+
}
175+
return nil, err
176+
}
177+
case PprofEventsTypeBlock:
178+
runtime.SetBlockProfileRate(c.dumpPeriod)
179+
case PprofEventsTypeMutex:
180+
runtime.SetMutexProfileFraction(c.dumpPeriod)
181+
}
182+
183+
return writer, nil
184+
}
185+
186+
func (c *PprofTaskCommandImpl) StopTask(writer io.Writer) {
187+
c.logger.Infof("stop pprof task %s", c.taskID)
188+
switch c.events {
189+
case PprofEventsTypeCPU:
190+
pprof.StopCPUProfile()
191+
profilingIsRunning.Store(false)
192+
case PprofEventsTypeBlock:
193+
if err := pprof.Lookup("block").WriteTo(writer, 0); err != nil {
194+
c.logger.Errorf("write Block profile error %v", err)
195+
}
196+
runtime.SetBlockProfileRate(0)
197+
case PprofEventsTypeMutex:
198+
if err := pprof.Lookup("mutex").WriteTo(writer, 0); err != nil {
199+
c.logger.Errorf("write Mutex profile error %v", err)
200+
}
201+
runtime.SetMutexProfileFraction(0)
202+
case PprofEventsTypeHeap:
203+
if err := pprof.Lookup("heap").WriteTo(writer, 0); err != nil {
204+
c.logger.Errorf("write Heap profile error %v", err)
205+
}
206+
case PprofEventsTypeAllocs:
207+
if err := pprof.Lookup("allocs").WriteTo(writer, 0); err != nil {
208+
c.logger.Errorf("write Alloc profile error %v", err)
209+
}
210+
case PprofEventsTypeGoroutine:
211+
if err := pprof.Lookup("goroutine").WriteTo(writer, 0); err != nil {
212+
c.logger.Errorf("write Goroutine profile error %v", err)
213+
}
214+
case PprofEventsTypeThread:
215+
if err := pprof.Lookup("threadcreate").WriteTo(writer, 0); err != nil {
216+
c.logger.Errorf("write Thread profile error %v", err)
217+
}
218+
}
219+
220+
if c.pprofFilePath != "" {
221+
c.closeFileWriter(writer)
222+
}
223+
c.readPprofData(c.taskID, writer)
224+
}
225+
226+
func (c *PprofTaskCommandImpl) readPprofData(taskID string, writer io.Writer) {
227+
var data []byte
228+
if c.pprofFilePath == "" {
229+
if buf, ok := writer.(*bytes.Buffer); ok {
230+
data = buf.Bytes()
231+
}
232+
} else {
233+
if file, ok := writer.(*os.File); ok {
234+
filePath := file.Name()
235+
fileData, err := os.ReadFile(filePath)
236+
if err != nil {
237+
c.logger.Errorf("failed to read pprof file %s: %v", filePath, err)
238+
}
239+
data = fileData
240+
if err := os.Remove(filePath); err != nil {
241+
c.logger.Errorf("failed to remove pprof file %s: %v", filePath, err)
242+
}
243+
}
244+
}
245+
c.manager.ReportPprof(taskID, data)
246+
}

plugins/core/reporter/grpc/grpc.go

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -42,17 +42,19 @@ func NewGRPCReporter(logger operator.LogOperator,
4242
checkInterval time.Duration,
4343
connManager *reporter.ConnectionManager,
4444
cdsManager *reporter.CDSManager,
45+
pprofTaskManager *reporter.PprofTaskManager,
4546
opts ...ReporterOption,
4647
) (reporter.Reporter, error) {
4748
r := &gRPCReporter{
48-
logger: logger,
49-
serverAddr: serverAddr,
50-
tracingSendCh: make(chan *agentv3.SegmentObject, maxSendQueueSize),
51-
metricsSendCh: make(chan []*agentv3.MeterData, maxSendQueueSize),
52-
logSendCh: make(chan *logv3.LogData, maxSendQueueSize),
53-
checkInterval: checkInterval,
54-
connManager: connManager,
55-
cdsManager: cdsManager,
49+
logger: logger,
50+
serverAddr: serverAddr,
51+
tracingSendCh: make(chan *agentv3.SegmentObject, maxSendQueueSize),
52+
metricsSendCh: make(chan []*agentv3.MeterData, maxSendQueueSize),
53+
logSendCh: make(chan *logv3.LogData, maxSendQueueSize),
54+
checkInterval: checkInterval,
55+
connManager: connManager,
56+
cdsManager: cdsManager,
57+
pprofTaskManager: pprofTaskManager,
5658
}
5759
for _, o := range opts {
5860
o(r)
@@ -83,10 +85,11 @@ type gRPCReporter struct {
8385
checkInterval time.Duration
8486

8587
// bootFlag is set if Boot be executed
86-
bootFlag bool
87-
transform *reporter.Transform
88-
connManager *reporter.ConnectionManager
89-
cdsManager *reporter.CDSManager
88+
bootFlag bool
89+
transform *reporter.Transform
90+
connManager *reporter.ConnectionManager
91+
cdsManager *reporter.CDSManager
92+
pprofTaskManager *reporter.PprofTaskManager
9093
}
9194

9295
func (r *gRPCReporter) Boot(entity *reporter.Entity, cdsWatchers []reporter.AgentConfigChangeWatcher) {
@@ -95,6 +98,7 @@ func (r *gRPCReporter) Boot(entity *reporter.Entity, cdsWatchers []reporter.Agen
9598
r.initSendPipeline()
9699
r.check()
97100
r.cdsManager.InitCDS(entity, cdsWatchers)
101+
r.pprofTaskManager.InitPprofTask(entity)
98102
r.bootFlag = true
99103
}
100104

0 commit comments

Comments
 (0)