Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 133 additions & 1 deletion dbm-services/common/dbha-v2/internal/analysis/apm/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,145 @@ import (
"dbm-services/common/go-pubpkg/apm/metric"
)

var Metrics []*metric.Metric
const (
MetricLabelSwitchID = "switch_id"
MetricLabelActionScope = "action_scope"
MetricLabelDbType = "db_type"
)

var (
Metrics []*metric.Metric

ScanBusinessTotal *haapm.HaCounter
ScanBusinessTimeConsumingMs *haapm.HaHistogram
SwitchingTimeConsumingMs *haapm.HaHistogram
MysqlClusterSwitchingTimeConsumingMs *haapm.HaHistogram
MysqlHostSwitchingTimeConsumingMs *haapm.HaHistogram
MysqlInstanceSwitchingTimeConsumingMs *haapm.HaHistogram
SwitchingSuccessTotal *haapm.HaCounter
SwitchingErrorTotal *haapm.HaCounter
MysqlSwitchingSuccessTotal *haapm.HaCounter
MysqlSwitchingErrorTotal *haapm.HaCounter
RedisSwitchingSuccessTotal *haapm.HaCounter
RedisSwitchingErrorTotal *haapm.HaCounter
)

// Default histogram buckets for latency (milliseconds)
var defaultLatencyBuckets = []float64{1, 5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000, 10000}

func init() {
// Scan business total counter
ScanBusinessTotal = haapm.NewHaCounter(
"scan_business_total",
"Total number of scan business",
haapm.MetricLabelServiceID, haapm.MetricLabelServiceName,
)

// Scan business time consuming histogram
ScanBusinessTimeConsumingMs = haapm.NewHaHistogramWithBuckets(
"scan_business_time_consuming_ms",
"Time consuming of scan business in milliseconds",
defaultLatencyBuckets,
haapm.MetricLabelServiceID, haapm.MetricLabelServiceName,
)

// Switching time consuming histogram
SwitchingTimeConsumingMs = haapm.NewHaHistogramWithBuckets(
"switching_time_consuming_ms",
"Time consuming of switching in milliseconds",
defaultLatencyBuckets,
MetricLabelDbType,
)

// Mysql cluster switching time consuming histogram
MysqlClusterSwitchingTimeConsumingMs = haapm.NewHaHistogramWithBuckets(
"mysql_cluster_switching_time_consuming_ms",
"Time consuming of MySQL cluster switching in milliseconds",
defaultLatencyBuckets,
MetricLabelSwitchID, MetricLabelActionScope, MetricLabelDbType,
)

// Mysql host switching time consuming histogram
MysqlHostSwitchingTimeConsumingMs = haapm.NewHaHistogramWithBuckets(
"mysql_host_switching_time_consuming_ms",
"Time consuming of MySQL host switching in milliseconds",
defaultLatencyBuckets,
MetricLabelSwitchID, MetricLabelActionScope, MetricLabelDbType,
)

// Mysql instance switching time consuming histogram
MysqlInstanceSwitchingTimeConsumingMs = haapm.NewHaHistogramWithBuckets(
"mysql_instance_switching_time_consuming_ms",
"Time consuming of MySQL instance switching in milliseconds",
defaultLatencyBuckets,
MetricLabelSwitchID, MetricLabelActionScope, MetricLabelDbType,
)

// Switching success total counter
SwitchingSuccessTotal = haapm.NewHaCounter("switching_success_total", "Total number of switching success")

// Switching error total counter
SwitchingErrorTotal = haapm.NewHaCounter("switching_error_total", "Total number of switching error")

// Mysql switching success total counter
MysqlSwitchingSuccessTotal = haapm.NewHaCounter(
"mysql_switching_success_total",
"Total number of MySQL switching success",
MetricLabelActionScope, MetricLabelDbType,
)

// Mysql switching error total counter
MysqlSwitchingErrorTotal = haapm.NewHaCounter(
"mysql_switching_error_total",
"Total number of MySQL switching error",
MetricLabelActionScope, MetricLabelDbType,
)

// Redis switching success total counter
RedisSwitchingSuccessTotal = haapm.NewHaCounter(
"redis_switching_success_total",
"Total number of Redis switching success",
MetricLabelActionScope, MetricLabelDbType,
)

// Redis switching error total counter
RedisSwitchingErrorTotal = haapm.NewHaCounter(
"redis_switching_error_total",
"Total number of Redis switching error",
MetricLabelActionScope, MetricLabelDbType,
)
}

// InitAPM init apm
func InitAPM(serviceID, serviceName string) {
haapm.AppStartupMetric.UpdateLabel(map[string]string{
haapm.MetricLabelServiceID: serviceID,
haapm.MetricLabelServiceName: serviceName,
})

Metrics = append(Metrics, haapm.AppStartupMetric.ToMetric())

// Scan business total counter
Metrics = append(Metrics, ScanBusinessTotal.ToMetric())
Metrics = append(Metrics, ScanBusinessTimeConsumingMs.ToMetric())

// Switching total counter
Metrics = append(Metrics, SwitchingSuccessTotal.ToMetric())
Metrics = append(Metrics, SwitchingErrorTotal.ToMetric())

// Switching time consuming histogram
Metrics = append(Metrics, SwitchingTimeConsumingMs.ToMetric())

// Mysql switching time consuming histogram
Metrics = append(Metrics, MysqlClusterSwitchingTimeConsumingMs.ToMetric())
Metrics = append(Metrics, MysqlHostSwitchingTimeConsumingMs.ToMetric())
Metrics = append(Metrics, MysqlInstanceSwitchingTimeConsumingMs.ToMetric())

// Mysql switching total counter
Metrics = append(Metrics, MysqlSwitchingSuccessTotal.ToMetric())
Metrics = append(Metrics, MysqlSwitchingErrorTotal.ToMetric())

// Redis switching total counter
Metrics = append(Metrics, RedisSwitchingSuccessTotal.ToMetric())
Metrics = append(Metrics, RedisSwitchingErrorTotal.ToMetric())
}
40 changes: 40 additions & 0 deletions dbm-services/common/dbha-v2/internal/analysis/switcher/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,15 @@ import (
"context"
"strings"
"sync"
"time"

"dbm-services/common/dbha-v2/internal/analysis/apm"
"dbm-services/common/dbha-v2/internal/analysis/dbm"
"dbm-services/common/dbha-v2/internal/analysis/switcher/mysql"
"dbm-services/common/dbha-v2/internal/analysis/switcher/switchcore"
"dbm-services/common/dbha-v2/internal/analysis/switcher/switchlogger"
"dbm-services/common/dbha-v2/pkg/gerrors"
"dbm-services/common/dbha-v2/pkg/haapm"
"dbm-services/common/dbha-v2/pkg/logger"
"dbm-services/common/dbha-v2/pkg/storage/hamodel"
"dbm-services/common/dbha-v2/pkg/storage/haprobe"
Expand Down Expand Up @@ -113,6 +116,8 @@ func (m *Mysql) NewSwitchLogger() ([]switchlogger.DbSwitchLogger, error) {

// InstanceLevelSwitch handles MySQL instance switching operations
func (m *Mysql) InstanceLevelSwitch(ctx context.Context, switchLoggers []switchlogger.DbSwitchLogger, req *Request) *Response {
start := time.Now()

rsp := &Response{
MySqlFailureInsts: map[switchcore.MetadataKey]*dbm.DbInstMetadata{},
}
Expand Down Expand Up @@ -169,6 +174,8 @@ func (m *Mysql) InstanceLevelSwitch(ctx context.Context, switchLoggers []switchl

wg.Wait()

m.reportMysqlSwitchingMetrics(apm.MysqlInstanceSwitchingTimeConsumingMs, start, req, rsp)

if len(rsp.MySqlFailureInsts) == 0 {
return rsp
}
Expand Down Expand Up @@ -253,6 +260,8 @@ func (m *Mysql) checkHostInstanceCompleteness(ctx context.Context, host switchco

// HostLevelSwitch handles MySQL host switching operations
func (m *Mysql) HostLevelSwitch(ctx context.Context, switchLoggers []switchlogger.DbSwitchLogger, req *Request) *Response {
start := time.Now()

rsp := &Response{
MySqlFailureInsts: map[switchcore.MetadataKey]*dbm.DbInstMetadata{},
}
Expand Down Expand Up @@ -318,13 +327,44 @@ func (m *Mysql) HostLevelSwitch(ctx context.Context, switchLoggers []switchlogge

wg.Wait()

m.reportMysqlSwitchingMetrics(apm.MysqlHostSwitchingTimeConsumingMs, start, req, rsp)

if len(rsp.MySqlFailureInsts) > 0 {
rsp.Err = ErrSwitchPartialSuccess
}

return rsp
}

// reportMysqlSwitchingMetrics reports the switching time consuming, success total and error total metrics
func (m *Mysql) reportMysqlSwitchingMetrics(timeConsumingMetric *haapm.HaHistogram, start time.Time, req *Request, rsp *Response) {

// report the mysql switching time consuming
if err := timeConsumingMetric.UpdateLabel(map[string]string{
apm.MetricLabelSwitchID: req.SwitchID,
apm.MetricLabelActionScope: string(req.ActionScope),
apm.MetricLabelDbType: string(m.DbTypeName()),
}).Observe(float64(time.Since(start).Milliseconds())); err != nil {
logger.Error("failed to update mysql switching time consuming metric, errmsg: %s", err.Error())
}

// report the mysql switching success total
if err := apm.MysqlSwitchingSuccessTotal.UpdateLabel(map[string]string{
apm.MetricLabelActionScope: string(req.ActionScope),
apm.MetricLabelDbType: string(m.DbTypeName()),
}).Add(float64(len(req.MySqlInstData) - len(rsp.MySqlFailureInsts))); err != nil {
logger.Error("failed to update mysql switching success total metric, errmsg: %s", err.Error())
}

// report the mysql switching error total
if err := apm.MysqlSwitchingErrorTotal.UpdateLabel(map[string]string{
apm.MetricLabelActionScope: string(req.ActionScope),
apm.MetricLabelDbType: string(m.DbTypeName()),
}).Add(float64(len(rsp.MySqlFailureInsts))); err != nil {
logger.Error("failed to update mysql switching error total metric, errmsg: %s", err.Error())
}
}

// Switch handles MySQL switching operations on different levels
func (m *Mysql) Switch(ctx context.Context, req *Request) *Response {
rsp := &Response{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"sort"
"time"

"dbm-services/common/dbha-v2/internal/analysis/apm"
"dbm-services/common/dbha-v2/internal/analysis/config"
"dbm-services/common/dbha-v2/internal/analysis/dbm"
"dbm-services/common/dbha-v2/internal/analysis/storage"
Expand Down Expand Up @@ -168,11 +169,30 @@ func (e *SwitchExecutor) TriggerSwitching(dbType haprobe.DbType, req *switcher.R
return
}

start := time.Now()

rsp := sw.Switch(context.Background(), req)
if rsp.Err == nil {
logger.Info("switching success for the database type: %s", dbType)
}

// report the switching time consuming
if err := apm.SwitchingTimeConsumingMs.UpdateLabel(map[string]string{
apm.MetricLabelDbType: dbType.String(),
}).Observe(float64(time.Since(start).Milliseconds())); err != nil {
logger.Warn("failed to update switching time consuming metric, errmsg: %s", err)
}

// report the switching success total
if err := apm.SwitchingSuccessTotal.Add(float64(len(req.MySqlInstData) - len(rsp.MySqlFailureInsts))); err != nil {
logger.Error("failed to update switching success total metric: %s", err.Error())
}

// report the switching error total
if err := apm.SwitchingErrorTotal.Add(float64(len(rsp.MySqlFailureInsts))); err != nil {
logger.Error("failed to update switching error total metric: %s", err.Error())
}

// post the success alarm
for _, inst := range req.GetDbInstMetadata() {
instKey := switchcore.GenerateMetadataKey(inst.BkCloudID, inst.IP, inst.Port)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@ import (
"sync"
"time"

"dbm-services/common/dbha-v2/internal/analysis/apm"
"dbm-services/common/dbha-v2/internal/analysis/config"
"dbm-services/common/dbha-v2/internal/analysis/storage"
"dbm-services/common/dbha-v2/internal/analysis/switcher"
"dbm-services/common/dbha-v2/pkg/discovery"
"dbm-services/common/dbha-v2/pkg/gerrors"
"dbm-services/common/dbha-v2/pkg/haapm"
"dbm-services/common/dbha-v2/pkg/logger"
"dbm-services/common/dbha-v2/pkg/storage/hamysql"
"dbm-services/common/dbha-v2/pkg/storage/haprobe"
Expand Down Expand Up @@ -210,6 +212,8 @@ func (w *Workflow) CheckBusinessWithBizID(ctx context.Context, bizId int) error
// ScanBusinesses fetches business IDs, filters by instance sharding,
// and runs CheckBusinessWithBizID for each (with concurrency limit).
func (w *Workflow) ScanBusinesses(ctx context.Context) {
start := time.Now()

bizIDs, err := w.hadata.GetBizIDs()
if err != nil {
logger.Warn("failed to get business IDs, errmsg: %s", err)
Expand Down Expand Up @@ -240,6 +244,22 @@ func (w *Workflow) ScanBusinesses(ctx context.Context) {
}

wg.Wait()

// report the scan business time consuming
if err := apm.ScanBusinessTimeConsumingMs.UpdateLabel(map[string]string{
haapm.MetricLabelServiceID: w.myServiceID,
haapm.MetricLabelServiceName: "analysis",
}).Observe(float64(time.Since(start).Milliseconds())); err != nil {
logger.Warn("failed to report the scan business time consuming, errmsg: %s", err)
}

// report the scan business total
if err := apm.ScanBusinessTotal.UpdateLabel(map[string]string{
haapm.MetricLabelServiceID: w.myServiceID,
haapm.MetricLabelServiceName: "analysis",
}).Add(float64(len(assigned))); err != nil {
logger.Warn("failed to report the scan business total, errmsg: %s", err)
}
}

// instanceKey builds a unique instance identifier from cloud id, IP and port.
Expand Down