Skip to content

Commit 1a8aa3c

Browse files
authored
[feat][backend]Add rmq compression. (#322)
* Add rmq compression.
1 parent 3678804 commit 1a8aa3c

File tree

4 files changed

+649
-0
lines changed

4 files changed

+649
-0
lines changed

backend/modules/observability/infra/mq/producer/span_with_annotation_producer.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ func newSpanWithAnnotationProducerImpl(traceConfig config.ITraceConfig, mqFactor
7171
ProduceTimeout: time.Duration(mqCfg.Timeout) * time.Millisecond,
7272
RetryTimes: mqCfg.RetryTimes,
7373
ProducerGroup: ptr.Of(mqCfg.ProducerGroup),
74+
Compression: mq.CompressionZSTD,
7475
})
7576
if err != nil {
7677
return nil, err
Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
// Copyright (c) 2025 coze-dev Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package producer
5+
6+
import (
7+
"context"
8+
"sync"
9+
"testing"
10+
11+
"github.com/stretchr/testify/assert"
12+
"go.uber.org/mock/gomock"
13+
14+
"github.com/coze-dev/coze-loop/backend/infra/mq"
15+
"github.com/coze-dev/coze-loop/backend/infra/mq/mocks"
16+
"github.com/coze-dev/coze-loop/backend/modules/observability/domain/component/config"
17+
confmocks "github.com/coze-dev/coze-loop/backend/modules/observability/domain/component/config/mocks"
18+
"github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity"
19+
"github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/loop_span"
20+
obErrorx "github.com/coze-dev/coze-loop/backend/modules/observability/pkg/errno"
21+
"github.com/coze-dev/coze-loop/backend/pkg/errorx"
22+
)
23+
24+
func TestSpanWithAnnotationProducerImpl_SendSpanWithAnnotation(t *testing.T) {
25+
type fields struct {
26+
topic string
27+
}
28+
type args struct {
29+
ctx context.Context
30+
message *entity.SpanEvent
31+
tag string
32+
}
33+
tests := []struct {
34+
name string
35+
fields fields
36+
args args
37+
mockSetup func(mqProducer *mocks.MockIProducer)
38+
expectedError error
39+
}{
40+
{
41+
name: "正常场景: 发送span annotation消息成功",
42+
fields: fields{
43+
topic: "test_topic",
44+
},
45+
args: args{
46+
ctx: context.Background(),
47+
message: &entity.SpanEvent{
48+
Span: &loop_span.Span{
49+
TraceID: "test_trace_id",
50+
SpanID: "test_span_id",
51+
},
52+
},
53+
tag: "test_tag",
54+
},
55+
mockSetup: func(mqProducer *mocks.MockIProducer) {
56+
mqProducer.EXPECT().Send(gomock.Any(), gomock.Any()).Return(mq.SendResponse{}, nil)
57+
},
58+
expectedError: nil,
59+
},
60+
{
61+
name: "异常场景: MQ发送失败",
62+
fields: fields{
63+
topic: "test_topic",
64+
},
65+
args: args{
66+
ctx: context.Background(),
67+
message: &entity.SpanEvent{
68+
Span: &loop_span.Span{
69+
TraceID: "test_trace_id",
70+
SpanID: "test_span_id",
71+
},
72+
},
73+
tag: "test_tag",
74+
},
75+
mockSetup: func(mqProducer *mocks.MockIProducer) {
76+
mqProducer.EXPECT().Send(gomock.Any(), gomock.Any()).Return(mq.SendResponse{}, assert.AnError)
77+
},
78+
expectedError: errorx.NewByCode(obErrorx.CommercialCommonRPCErrorCodeCode),
79+
},
80+
}
81+
82+
for _, tt := range tests {
83+
t.Run(tt.name, func(t *testing.T) {
84+
ctrl := gomock.NewController(t)
85+
defer ctrl.Finish()
86+
87+
mqProducerMock := mocks.NewMockIProducer(ctrl)
88+
tt.mockSetup(mqProducerMock)
89+
90+
producer := &SpanWithAnnotationProducerImpl{
91+
topic: tt.fields.topic,
92+
mqProducer: mqProducerMock,
93+
}
94+
95+
err := producer.SendSpanWithAnnotation(tt.args.ctx, tt.args.message, tt.args.tag)
96+
if tt.expectedError != nil {
97+
assert.Error(t, err)
98+
} else {
99+
assert.NoError(t, err)
100+
}
101+
})
102+
}
103+
}
104+
105+
func TestNewSpanWithAnnotationProducerImpl(t *testing.T) {
106+
type args struct {
107+
traceConfig config.ITraceConfig
108+
mqFactory mq.IFactory
109+
}
110+
tests := []struct {
111+
name string
112+
args args
113+
mockSetup func(traceConfig *confmocks.MockITraceConfig, mqFactory *mocks.MockIFactory)
114+
expectedError bool
115+
}{
116+
{
117+
name: "正常场景: 创建producer成功",
118+
args: args{
119+
traceConfig: nil, // 将在mockSetup中设置
120+
mqFactory: nil, // 将在mockSetup中设置
121+
},
122+
mockSetup: func(traceConfig *confmocks.MockITraceConfig, mqFactory *mocks.MockIFactory) {
123+
mqCfg := &config.MqProducerCfg{
124+
Topic: "test_topic",
125+
Addr: []string{"localhost:9876"},
126+
Timeout: 5000,
127+
RetryTimes: 3,
128+
ProducerGroup: "test_group",
129+
}
130+
traceConfig.EXPECT().GetSpanWithAnnotationMqProducerCfg(gomock.Any()).Return(mqCfg, nil)
131+
132+
producerMock := mocks.NewMockIProducer(gomock.NewController(t))
133+
producerMock.EXPECT().Start().Return(nil)
134+
135+
mqFactory.EXPECT().NewProducer(gomock.Any()).Return(producerMock, nil)
136+
},
137+
expectedError: false,
138+
},
139+
{
140+
name: "异常场景: 获取配置失败",
141+
args: args{
142+
traceConfig: nil,
143+
mqFactory: nil,
144+
},
145+
mockSetup: func(traceConfig *confmocks.MockITraceConfig, mqFactory *mocks.MockIFactory) {
146+
traceConfig.EXPECT().GetSpanWithAnnotationMqProducerCfg(gomock.Any()).Return(nil, assert.AnError)
147+
},
148+
expectedError: true,
149+
},
150+
{
151+
name: "异常场景: topic为空",
152+
args: args{
153+
traceConfig: nil,
154+
mqFactory: nil,
155+
},
156+
mockSetup: func(traceConfig *confmocks.MockITraceConfig, mqFactory *mocks.MockIFactory) {
157+
mqCfg := &config.MqProducerCfg{
158+
Topic: "", // 空topic
159+
Addr: []string{"localhost:9876"},
160+
Timeout: 5000,
161+
RetryTimes: 3,
162+
ProducerGroup: "test_group",
163+
}
164+
traceConfig.EXPECT().GetSpanWithAnnotationMqProducerCfg(gomock.Any()).Return(mqCfg, nil)
165+
},
166+
expectedError: true,
167+
},
168+
{
169+
name: "异常场景: 创建MQ producer失败",
170+
args: args{
171+
traceConfig: nil,
172+
mqFactory: nil,
173+
},
174+
mockSetup: func(traceConfig *confmocks.MockITraceConfig, mqFactory *mocks.MockIFactory) {
175+
mqCfg := &config.MqProducerCfg{
176+
Topic: "test_topic",
177+
Addr: []string{"localhost:9876"},
178+
Timeout: 5000,
179+
RetryTimes: 3,
180+
ProducerGroup: "test_group",
181+
}
182+
traceConfig.EXPECT().GetSpanWithAnnotationMqProducerCfg(gomock.Any()).Return(mqCfg, nil)
183+
mqFactory.EXPECT().NewProducer(gomock.Any()).Return(nil, assert.AnError)
184+
},
185+
expectedError: true,
186+
},
187+
{
188+
name: "异常场景: 启动producer失败",
189+
args: args{
190+
traceConfig: nil,
191+
mqFactory: nil,
192+
},
193+
mockSetup: func(traceConfig *confmocks.MockITraceConfig, mqFactory *mocks.MockIFactory) {
194+
mqCfg := &config.MqProducerCfg{
195+
Topic: "test_topic",
196+
Addr: []string{"localhost:9876"},
197+
Timeout: 5000,
198+
RetryTimes: 3,
199+
ProducerGroup: "test_group",
200+
}
201+
traceConfig.EXPECT().GetSpanWithAnnotationMqProducerCfg(gomock.Any()).Return(mqCfg, nil)
202+
203+
producerMock := mocks.NewMockIProducer(gomock.NewController(t))
204+
producerMock.EXPECT().Start().Return(assert.AnError)
205+
206+
mqFactory.EXPECT().NewProducer(gomock.Any()).Return(producerMock, nil)
207+
},
208+
expectedError: true,
209+
},
210+
}
211+
212+
for _, tt := range tests {
213+
t.Run(tt.name, func(t *testing.T) {
214+
ctrl := gomock.NewController(t)
215+
defer ctrl.Finish()
216+
217+
traceConfigMock := confmocks.NewMockITraceConfig(ctrl)
218+
mqFactoryMock := mocks.NewMockIFactory(ctrl)
219+
tt.mockSetup(traceConfigMock, mqFactoryMock)
220+
221+
// 重置单例
222+
singletonSpanWithAnnotationProducer = nil
223+
spanWithAnnotationProducerOnce = sync.Once{}
224+
225+
producer, err := NewSpanWithAnnotationProducerImpl(traceConfigMock, mqFactoryMock)
226+
227+
if tt.expectedError {
228+
assert.Error(t, err)
229+
assert.Nil(t, producer)
230+
} else {
231+
assert.NoError(t, err)
232+
assert.NotNil(t, producer)
233+
}
234+
})
235+
}
236+
}

backend/modules/observability/infra/mq/producer/trace_producer.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ func newTraceProducerImpl(traceConfig config.ITraceConfig, mqFactory mq.IFactory
107107
ProduceTimeout: time.Duration(mqCfg.Timeout) * time.Millisecond,
108108
RetryTimes: mqCfg.RetryTimes,
109109
ProducerGroup: ptr.Of(mqCfg.ProducerGroup),
110+
Compression: mq.CompressionZSTD,
110111
})
111112
if e != nil {
112113
return nil, e

0 commit comments

Comments
 (0)