Skip to content
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions agent/core/compile.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,21 @@ package core

import (
//go:nolint
_ "bytes"
_ "encoding/base64"
_ "fmt"
_ "io"
_ "log"
_ "math"
_ "math/rand"
_ "net"
_ "os"
_ "path/filepath"
_ "reflect"
_ "runtime"
_ "runtime/debug"
_ "runtime/metrics"
_ "runtime/pprof"
_ "sort"
_ "strconv"
_ "strings"
Expand All @@ -55,4 +59,5 @@ import (
_ "github.com/apache/skywalking-go/protocols/collect/language/agent/v3"
_ "github.com/apache/skywalking-go/protocols/collect/language/profile/v3"
_ "github.com/apache/skywalking-go/protocols/collect/logging/v3"
_ "github.com/apache/skywalking-go/protocols/collect/pprof/v10"
)
1 change: 1 addition & 0 deletions agent/reporter/imports.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,5 +71,6 @@ import (
_ "github.com/apache/skywalking-go/protocols/collect/language/profile/v3"
_ "github.com/apache/skywalking-go/protocols/collect/logging/v3"
_ "github.com/apache/skywalking-go/protocols/collect/management/v3"
_ "github.com/apache/skywalking-go/protocols/collect/pprof/v10"
_ "github.com/apache/skywalking-go/protocols/collect/servicemesh/v3"
)
228 changes: 228 additions & 0 deletions plugins/core/pprof.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package core

import (
"bytes"
"fmt"
"io"
"os"
"path/filepath"
"runtime"
"runtime/pprof"
"sync/atomic"
"time"

"github.com/apache/skywalking-go/plugins/core/operator"
"github.com/apache/skywalking-go/plugins/core/reporter"
)

const (
// Pprof event types
PprofEventsTypeCPU = "cpu"
PprofEventsTypeHeap = "heap"
PprofEventsTypeAllocs = "allocs"
PprofEventsTypeBlock = "block"
PprofEventsTypeMutex = "mutex"
PprofEventsTypeThread = "threadcreate"
PprofEventsTypeGoroutine = "goroutine"
)

// CPU profiling state to ensure only one CPU profiling task runs at a time
var profilingIsRunning atomic.Bool

func init() {
reporter.NewPprofTaskCommand = NewPprofTaskCommand
}

type PprofTaskCommandImpl struct {
// Pprof Task ID
taskID string
// Type of profiling (CPU/Heap/Block/Mutex)
events string
// unit is minute
duration time.Duration
// Unix timestamp in milliseconds when the task was created
createTime int64
dumpPeriod int

// for pprof task service
pprofFilePath string
logger operator.LogOperator
manager reporter.PprofReporter
}

func NewPprofTaskCommand(taskID, events string, duration time.Duration,
createTime int64, dumpPeriod int, pprofFilePath string,
logger operator.LogOperator, manager reporter.PprofReporter) reporter.PprofTaskCommand {
return &PprofTaskCommandImpl{
taskID: taskID,
events: events,
duration: duration,
createTime: createTime,
dumpPeriod: dumpPeriod,
pprofFilePath: pprofFilePath,
logger: logger,
manager: manager,
}
}

func (c *PprofTaskCommandImpl) GetTaskID() string {
return c.taskID
}

func (c *PprofTaskCommandImpl) GetCreateTime() int64 {
return c.createTime
}

func (c *PprofTaskCommandImpl) GetDuration() time.Duration {
return c.duration
}

func (c *PprofTaskCommandImpl) IsDirectSamplingType() bool {
return c.events == PprofEventsTypeHeap ||
c.events == PprofEventsTypeAllocs ||
c.events == PprofEventsTypeGoroutine ||
c.events == PprofEventsTypeThread
}

func (c *PprofTaskCommandImpl) closeFileWriter(writer io.Writer) {
if file, ok := writer.(*os.File); ok {
if err := file.Close(); err != nil {
c.logger.Errorf("failed to close pprof file: %v", err)
}
}
}

func (c *PprofTaskCommandImpl) getWriter() (io.Writer, error) {
// sample data to buffer
if c.pprofFilePath == "" {
return &bytes.Buffer{}, nil
}

// sample data to file
pprofFileName := filepath.Join(c.taskID, ".pprof")
pprofFilePath := filepath.Join(c.pprofFilePath, pprofFileName)
if err := os.MkdirAll(filepath.Dir(pprofFilePath), os.ModePerm); err != nil {
return nil, err
}

writer, err := os.Create(pprofFilePath)
if err != nil {
return nil, err
}

return writer, nil
}

func (c *PprofTaskCommandImpl) StartTask() (io.Writer, error) {
// For CPU profiling, check global state first
if c.events == PprofEventsTypeCPU && !profilingIsRunning.CompareAndSwap(false, true) {
return nil, fmt.Errorf("CPU profiling is already running")
}

writer, err := c.getWriter()
if err != nil {
if c.events == PprofEventsTypeCPU {
profilingIsRunning.Store(false)
}
return nil, err
}

switch c.events {
case PprofEventsTypeCPU:
if err = pprof.StartCPUProfile(writer); err != nil {
profilingIsRunning.Store(false)
if c.pprofFilePath != "" {
c.closeFileWriter(writer)
}
return nil, err
}
case PprofEventsTypeBlock:
runtime.SetBlockProfileRate(c.dumpPeriod)
case PprofEventsTypeMutex:
runtime.SetMutexProfileFraction(c.dumpPeriod)
case PprofEventsTypeHeap:
runtime.MemProfileRate = c.dumpPeriod
case PprofEventsTypeAllocs:
runtime.MemProfileRate = c.dumpPeriod
}

return writer, nil
}

func (c *PprofTaskCommandImpl) StopTask(writer io.Writer) {
switch c.events {
case PprofEventsTypeCPU:
pprof.StopCPUProfile()
profilingIsRunning.Store(false)
case PprofEventsTypeBlock:
if err := pprof.Lookup("block").WriteTo(writer, 0); err != nil {
c.logger.Errorf("write Block profile error %v", err)
}
runtime.SetBlockProfileRate(0)
case PprofEventsTypeMutex:
if err := pprof.Lookup("mutex").WriteTo(writer, 0); err != nil {
c.logger.Errorf("write Mutex profile error %v", err)
}
runtime.SetMutexProfileFraction(0)
case PprofEventsTypeHeap:
if err := pprof.Lookup("heap").WriteTo(writer, 0); err != nil {
c.logger.Errorf("write Heap profile error %v", err)
}
case PprofEventsTypeAllocs:
if err := pprof.Lookup("allocs").WriteTo(writer, 0); err != nil {
c.logger.Errorf("write Alloc profile error %v", err)
}
case PprofEventsTypeGoroutine:
if err := pprof.Lookup("goroutine").WriteTo(writer, 0); err != nil {
c.logger.Errorf("write Goroutine profile error %v", err)
}
case PprofEventsTypeThread:
if err := pprof.Lookup("threadcreate").WriteTo(writer, 0); err != nil {
c.logger.Errorf("write Thread profile error %v", err)
}
}

if c.pprofFilePath != "" {
c.closeFileWriter(writer)
}
c.readPprofData(c.taskID, writer)
}

func (c *PprofTaskCommandImpl) readPprofData(taskID string, writer io.Writer) {
var data []byte
if c.pprofFilePath == "" {
if buf, ok := writer.(*bytes.Buffer); ok {
data = buf.Bytes()
}
} else {
if file, ok := writer.(*os.File); ok {
filePath := file.Name()
fileData, err := os.ReadFile(filePath)
if err != nil {
c.logger.Errorf("failed to read pprof file %s: %v", filePath, err)
}
data = fileData
if err := os.Remove(filePath); err != nil {
c.logger.Errorf("failed to remove pprof file %s: %v", filePath, err)
}
}
}
c.manager.ReportPprof(taskID, data)
}
28 changes: 16 additions & 12 deletions plugins/core/reporter/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,19 @@ func NewGRPCReporter(logger operator.LogOperator,
checkInterval time.Duration,
connManager *reporter.ConnectionManager,
cdsManager *reporter.CDSManager,
pprofTaskManager *reporter.PprofTaskManager,
opts ...ReporterOption,
) (reporter.Reporter, error) {
r := &gRPCReporter{
logger: logger,
serverAddr: serverAddr,
tracingSendCh: make(chan *agentv3.SegmentObject, maxSendQueueSize),
metricsSendCh: make(chan []*agentv3.MeterData, maxSendQueueSize),
logSendCh: make(chan *logv3.LogData, maxSendQueueSize),
checkInterval: checkInterval,
connManager: connManager,
cdsManager: cdsManager,
logger: logger,
serverAddr: serverAddr,
tracingSendCh: make(chan *agentv3.SegmentObject, maxSendQueueSize),
metricsSendCh: make(chan []*agentv3.MeterData, maxSendQueueSize),
logSendCh: make(chan *logv3.LogData, maxSendQueueSize),
checkInterval: checkInterval,
connManager: connManager,
cdsManager: cdsManager,
pprofTaskManager: pprofTaskManager,
}
for _, o := range opts {
o(r)
Expand Down Expand Up @@ -83,10 +85,11 @@ type gRPCReporter struct {
checkInterval time.Duration

// bootFlag is set if Boot be executed
bootFlag bool
transform *reporter.Transform
connManager *reporter.ConnectionManager
cdsManager *reporter.CDSManager
bootFlag bool
transform *reporter.Transform
connManager *reporter.ConnectionManager
cdsManager *reporter.CDSManager
pprofTaskManager *reporter.PprofTaskManager
}

func (r *gRPCReporter) Boot(entity *reporter.Entity, cdsWatchers []reporter.AgentConfigChangeWatcher) {
Expand All @@ -95,6 +98,7 @@ func (r *gRPCReporter) Boot(entity *reporter.Entity, cdsWatchers []reporter.Agen
r.initSendPipeline()
r.check()
r.cdsManager.InitCDS(entity, cdsWatchers)
r.pprofTaskManager.InitPprofTask(entity)
r.bootFlag = true
}

Expand Down
Loading
Loading