Skip to content

Commit d5dc444

Browse files
committed
feat(observability): 添加HTTP延迟注入功能并优化指标收集
重构指标注入器以支持服务版本维度 移除冗余的exported_job标签和实例ID生成 新增HTTP延迟注入器并与中间件集成
1 parent 23f91e4 commit d5dc444

File tree

6 files changed

+275
-40
lines changed

6 files changed

+275
-40
lines changed

mock/s3/shared/middleware/error_injection/error_injection.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ type CacheConfig struct {
3434
type MetricInjector struct {
3535
mockErrorClient *client.BaseHTTPClient
3636
serviceName string
37+
serviceVersion string // 添加服务版本字段
3738
logger *observability.Logger
3839

3940
// 缓存
@@ -56,7 +57,7 @@ type CachedAnomaly struct {
5657
}
5758

5859
// NewMetricInjector 从YAML配置创建指标异常注入器
59-
func NewMetricInjector(configPath string, serviceName string, logger *observability.Logger) (*MetricInjector, error) {
60+
func NewMetricInjector(configPath string, serviceName string, serviceVersion string, logger *observability.Logger) (*MetricInjector, error) {
6061
// 加载配置文件
6162
var config MetricInjectorConfig
6263
if err := utils.LoadConfig(configPath, &config); err != nil {
@@ -84,6 +85,7 @@ func NewMetricInjector(configPath string, serviceName string, logger *observabil
8485
injector := &MetricInjector{
8586
mockErrorClient: client,
8687
serviceName: serviceName,
88+
serviceVersion: serviceVersion,
8789
logger: logger,
8890
cache: make(map[string]*CachedAnomaly),
8991
cacheTTL: config.Cache.TTL,
@@ -102,12 +104,13 @@ func NewMetricInjector(configPath string, serviceName string, logger *observabil
102104
}
103105

104106
// NewMetricInjectorWithDefaults 使用默认配置创建指标异常注入器
105-
func NewMetricInjectorWithDefaults(mockErrorServiceURL string, serviceName string, logger *observability.Logger) *MetricInjector {
107+
func NewMetricInjectorWithDefaults(mockErrorServiceURL string, serviceName string, serviceVersion string, logger *observability.Logger) *MetricInjector {
106108
client := client.NewBaseHTTPClient(mockErrorServiceURL, 5*time.Second, "metric-injector", logger)
107109

108110
injector := &MetricInjector{
109111
mockErrorClient: client,
110112
serviceName: serviceName,
113+
serviceVersion: serviceVersion,
111114
logger: logger,
112115
cache: make(map[string]*CachedAnomaly),
113116
cacheTTL: 30 * time.Second,
@@ -125,11 +128,9 @@ func NewMetricInjectorWithDefaults(mockErrorServiceURL string, serviceName strin
125128

126129
// InjectMetricAnomaly 检查并注入指标异常
127130
func (mi *MetricInjector) InjectMetricAnomaly(ctx context.Context, metricName string, originalValue float64) float64 {
128-
// 计算实例标识,用于实例级注入与缓存
129-
instanceID := utils.GetInstanceID(mi.serviceName)
130-
131-
// 检查缓存(加入实例维度)
132-
cacheKey := mi.serviceName + ":" + instanceID + ":" + metricName
131+
// 使用服务版本作为注入维度,同一版本的所有实例共享相同的异常注入
132+
// 检查缓存(基于服务版本)
133+
cacheKey := mi.serviceName + ":" + mi.serviceVersion + ":" + metricName
133134
mi.cacheMu.RLock()
134135
if cached, exists := mi.cache[cacheKey]; exists && time.Now().Before(cached.ExpiresAt) {
135136
mi.cacheMu.RUnlock()
@@ -140,18 +141,18 @@ func (mi *MetricInjector) InjectMetricAnomaly(ctx context.Context, metricName st
140141
}
141142
mi.cacheMu.RUnlock()
142143

143-
// 查询Mock Error Service获取异常规则
144+
// 查询Mock Error Service获取异常规则(基于版本)
144145
request := map[string]string{
145146
"service": mi.serviceName,
147+
"version": mi.serviceVersion,
146148
"metric_name": metricName,
147-
"instance": instanceID,
148149
}
149150

150151
var response struct {
151152
ShouldInject bool `json:"should_inject"`
152153
Service string `json:"service"`
154+
Version string `json:"version"`
153155
MetricName string `json:"metric_name"`
154-
Instance string `json:"instance"`
155156
Anomaly map[string]any `json:"anomaly,omitempty"`
156157
}
157158

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
package error_injection
2+
3+
import (
4+
"context"
5+
"mocks3/shared/client"
6+
"mocks3/shared/observability"
7+
"sync"
8+
"time"
9+
)
10+
11+
// HTTPLatencyInjector HTTP请求延迟注入器
12+
type HTTPLatencyInjector struct {
13+
mockErrorClient *client.BaseHTTPClient
14+
serviceName string
15+
serviceVersion string
16+
logger *observability.Logger
17+
18+
// 缓存
19+
cache map[string]*CachedLatencyConfig
20+
cacheMu sync.RWMutex
21+
cacheTTL time.Duration
22+
}
23+
24+
// CachedLatencyConfig 缓存的延迟配置
25+
type CachedLatencyConfig struct {
26+
Config *LatencyConfig
27+
ExpiresAt time.Time
28+
}
29+
30+
// LatencyConfig 延迟配置
31+
type LatencyConfig struct {
32+
ShouldInject bool `json:"should_inject"`
33+
Latency time.Duration `json:"latency"` // 注入的延迟时间
34+
Probability float64 `json:"probability"` // 注入概率 (0-1)
35+
Pattern string `json:"pattern"` // 路径匹配模式(可选)
36+
}
37+
38+
// NewHTTPLatencyInjector 创建HTTP延迟注入器
39+
func NewHTTPLatencyInjector(mockErrorServiceURL string, serviceName, serviceVersion string, logger *observability.Logger) *HTTPLatencyInjector {
40+
client := client.NewBaseHTTPClient(mockErrorServiceURL, 5*time.Second, "latency-injector", logger)
41+
42+
injector := &HTTPLatencyInjector{
43+
mockErrorClient: client,
44+
serviceName: serviceName,
45+
serviceVersion: serviceVersion,
46+
logger: logger,
47+
cache: make(map[string]*CachedLatencyConfig),
48+
cacheTTL: 30 * time.Second,
49+
}
50+
51+
// 启动缓存清理
52+
go injector.cleanupCache()
53+
54+
return injector
55+
}
56+
57+
// GetLatencyConfig 获取延迟配置
58+
func (h *HTTPLatencyInjector) GetLatencyConfig(ctx context.Context, path string) (*LatencyConfig, error) {
59+
// 构建缓存键(基于版本)
60+
cacheKey := h.serviceName + ":" + h.serviceVersion + ":" + path
61+
62+
// 检查缓存
63+
h.cacheMu.RLock()
64+
if cached, exists := h.cache[cacheKey]; exists && time.Now().Before(cached.ExpiresAt) {
65+
h.cacheMu.RUnlock()
66+
return cached.Config, nil
67+
}
68+
h.cacheMu.RUnlock()
69+
70+
// 查询Mock Error Service获取延迟配置
71+
request := map[string]string{
72+
"service": h.serviceName,
73+
"version": h.serviceVersion,
74+
"path": path,
75+
"type": "http_latency",
76+
}
77+
78+
var response struct {
79+
ShouldInject bool `json:"should_inject"`
80+
Latency int64 `json:"latency_ms"` // 毫秒
81+
Probability float64 `json:"probability"`
82+
Pattern string `json:"pattern"`
83+
}
84+
85+
opts := client.RequestOptions{
86+
Method: "POST",
87+
Path: "/api/v1/latency-inject/check",
88+
Body: request,
89+
}
90+
91+
err := h.mockErrorClient.DoRequestWithJSON(ctx, opts, &response)
92+
if err != nil {
93+
h.logger.Debug(ctx, "Failed to check latency injection",
94+
observability.Error(err),
95+
observability.String("path", path))
96+
// 失败时缓存空结果
97+
h.updateCache(cacheKey, nil)
98+
return nil, nil
99+
}
100+
101+
// 构建配置
102+
var config *LatencyConfig
103+
if response.ShouldInject {
104+
config = &LatencyConfig{
105+
ShouldInject: true,
106+
Latency: time.Duration(response.Latency) * time.Millisecond,
107+
Probability: response.Probability,
108+
Pattern: response.Pattern,
109+
}
110+
}
111+
112+
// 更新缓存
113+
h.updateCache(cacheKey, config)
114+
115+
return config, nil
116+
}
117+
118+
// InjectLatency 注入延迟(如果需要)
119+
func (h *HTTPLatencyInjector) InjectLatency(ctx context.Context, path string) time.Duration {
120+
config, err := h.GetLatencyConfig(ctx, path)
121+
if err != nil || config == nil || !config.ShouldInject {
122+
return 0
123+
}
124+
125+
// 基于概率决定是否注入
126+
if config.Probability < 1.0 {
127+
// 简单的概率实现(生产环境应使用更好的随机数)
128+
if time.Now().UnixNano()%100 >= int64(config.Probability*100) {
129+
return 0
130+
}
131+
}
132+
133+
// 执行真实的延迟
134+
if config.Latency > 0 {
135+
h.logger.Info(ctx, "Injecting HTTP latency",
136+
observability.String("service", h.serviceName),
137+
observability.String("version", h.serviceVersion),
138+
observability.String("path", path),
139+
observability.Duration("latency", config.Latency))
140+
141+
// 真实的延迟注入
142+
time.Sleep(config.Latency)
143+
144+
return config.Latency
145+
}
146+
147+
return 0
148+
}
149+
150+
// updateCache 更新缓存
151+
func (h *HTTPLatencyInjector) updateCache(key string, config *LatencyConfig) {
152+
h.cacheMu.Lock()
153+
defer h.cacheMu.Unlock()
154+
155+
h.cache[key] = &CachedLatencyConfig{
156+
Config: config,
157+
ExpiresAt: time.Now().Add(h.cacheTTL),
158+
}
159+
}
160+
161+
// cleanupCache 定期清理过期缓存
162+
func (h *HTTPLatencyInjector) cleanupCache() {
163+
ticker := time.NewTicker(1 * time.Minute)
164+
defer ticker.Stop()
165+
166+
for range ticker.C {
167+
h.cacheMu.Lock()
168+
now := time.Now()
169+
for key, cached := range h.cache {
170+
if now.After(cached.ExpiresAt) {
171+
delete(h.cache, key)
172+
}
173+
}
174+
h.cacheMu.Unlock()
175+
}
176+
}
177+
178+
// Cleanup 清理资源
179+
func (h *HTTPLatencyInjector) Cleanup() {
180+
h.cacheMu.Lock()
181+
defer h.cacheMu.Unlock()
182+
h.cache = make(map[string]*CachedLatencyConfig)
183+
}

mock/s3/shared/observability/metrics.go

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -439,11 +439,8 @@ func (c *MetricCollector) collectNetworkMetrics(ctx context.Context) {
439439
finalValue = c.metricInjector.InjectMetricAnomaly(ctx, "system_network_qps", qps)
440440
}
441441

442-
// 添加服务属性作为标签
442+
// 添加服务版本标签(exported_job 冗余,已通过 service_name 资源属性暴露)
443443
attrs := []attribute.KeyValue{}
444-
if c.serviceName != "" {
445-
attrs = append(attrs, attribute.String("exported_job", c.serviceName))
446-
}
447444
if c.serviceVersion != "" {
448445
attrs = append(attrs, attribute.String("service_version", c.serviceVersion))
449446
}
@@ -524,17 +521,14 @@ func (c *MetricCollector) updateMachineStatus(ctx context.Context) {
524521

525522
// RecordHTTPRequestDuration 记录 HTTP 请求时延
526523
func (c *MetricCollector) RecordHTTPRequestDuration(ctx context.Context, duration float64, method, path string, statusCode int) {
527-
// 构建属性标签
524+
// 构建属性标签(移除 exported_job,保留 service_version)
528525
attrs := []attribute.KeyValue{
529526
attribute.String("http.method", method),
530527
attribute.String("http.route", path),
531528
attribute.Int("http.status_code", statusCode),
532529
}
533530

534-
// 添加服务属性
535-
if c.serviceName != "" {
536-
attrs = append(attrs, attribute.String("exported_job", c.serviceName))
537-
}
531+
// 添加服务版本(必要标签,用于版本区分)
538532
if c.serviceVersion != "" {
539533
attrs = append(attrs, attribute.String("service_version", c.serviceVersion))
540534
}

0 commit comments

Comments
 (0)