Skip to content

Commit ff51019

Browse files
authored
[fix][backend] fix backfill filter (#318)
* fix backfill filter * fix callback userid
1 parent 69e4503 commit ff51019

File tree

4 files changed

+356
-3
lines changed

4 files changed

+356
-3
lines changed

backend/modules/observability/domain/task/entity/event.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -272,10 +272,10 @@ func (s *OnlineExptTurnEvalResult) GetRunID() (int64, error) {
272272
}
273273

274274
func (s *OnlineExptTurnEvalResult) GetUserID() string {
275-
if s.BaseInfo == nil || s.BaseInfo.UpdatedBy == nil {
275+
if s.BaseInfo == nil || s.BaseInfo.CreatedBy == nil {
276276
return ""
277277
}
278-
return s.BaseInfo.UpdatedBy.UserID
278+
return s.BaseInfo.CreatedBy.UserID
279279
}
280280

281281
type EvaluatorRunError struct {

backend/modules/observability/domain/task/service/taskexe/tracehub/backfill.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,6 @@ type ListSpansReq struct {
206206
func (h *TraceHubServiceImpl) buildSpanFilters(ctx context.Context, taskConfig *entity.ObservabilityTask) *loop_span.FilterFields {
207207
// More complex filters can be built based on the task configuration
208208
// Simplified here: return nil to indicate no additional filters
209-
210209
platformFilter, err := h.buildHelper.BuildPlatformRelatedFilter(ctx, taskConfig.SpanFilter.PlatformType)
211210
if err != nil {
212211
logs.CtxError(ctx, "build platform filter failed, task_id=%d, err=%v", taskConfig.ID, err)
@@ -222,6 +221,10 @@ func (h *TraceHubServiceImpl) buildSpanFilters(ctx context.Context, taskConfig *
222221
// 不需要重试
223222
return nil
224223
}
224+
if err = taskConfig.SpanFilter.Filters.Traverse(processSpecificFilter); err != nil {
225+
logs.CtxError(ctx, "traverse filter fields failed, task_id=%d, err=%v", taskConfig.ID, err)
226+
return nil
227+
}
225228
filters := h.combineFilters(builtinFilter, &taskConfig.SpanFilter.Filters)
226229

227230
return filters

backend/modules/observability/domain/task/service/taskexe/tracehub/utils.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,14 @@ package tracehub
55

66
import (
77
"context"
8+
"fmt"
9+
"strconv"
810

911
"github.com/bytedance/sonic"
1012
"github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/loop_span"
13+
"github.com/coze-dev/coze-loop/backend/pkg/lang/ptr"
1114
"github.com/coze-dev/coze-loop/backend/pkg/logs"
15+
timeutil "github.com/coze-dev/coze-loop/backend/pkg/time"
1216
)
1317

1418
func ToJSONString(ctx context.Context, obj interface{}) string {
@@ -27,3 +31,75 @@ func ToJSONString(ctx context.Context, obj interface{}) string {
2731
func (h *TraceHubServiceImpl) getTenants(ctx context.Context, platform loop_span.PlatformType) ([]string, error) {
2832
return h.tenantProvider.GetTenantsByPlatformType(ctx, platform)
2933
}
34+
35+
// todo tyf TraceService里有相同实现,待合并
36+
func processSpecificFilter(f *loop_span.FilterField) error {
37+
if f == nil {
38+
return nil
39+
}
40+
switch f.FieldName {
41+
case loop_span.SpanFieldStatus:
42+
if err := processStatusFilter(f); err != nil {
43+
return err
44+
}
45+
case loop_span.SpanFieldDuration,
46+
loop_span.SpanFieldLatencyFirstResp,
47+
loop_span.SpanFieldStartTimeFirstResp,
48+
loop_span.SpanFieldStartTimeFirstTokenResp,
49+
loop_span.SpanFieldLatencyFirstTokenResp,
50+
loop_span.SpanFieldReasoningDuration:
51+
if err := processLatencyFilter(f); err != nil {
52+
return err
53+
}
54+
}
55+
return nil
56+
}
57+
58+
func processStatusFilter(f *loop_span.FilterField) error {
59+
if f.QueryType == nil || *f.QueryType != loop_span.QueryTypeEnumIn {
60+
return fmt.Errorf("status filter should use in operator")
61+
}
62+
f.FieldName = loop_span.SpanFieldStatusCode
63+
f.FieldType = loop_span.FieldTypeLong
64+
checkSuccess, checkError := false, false
65+
for _, val := range f.Values {
66+
switch val {
67+
case loop_span.SpanStatusSuccess:
68+
checkSuccess = true
69+
case loop_span.SpanStatusError:
70+
checkError = true
71+
default:
72+
return fmt.Errorf("invalid status code field value")
73+
}
74+
}
75+
if checkSuccess && checkError {
76+
f.QueryType = ptr.Of(loop_span.QueryTypeEnumAlwaysTrue)
77+
f.Values = nil
78+
} else if checkSuccess {
79+
f.Values = []string{"0"}
80+
} else if checkError {
81+
f.QueryType = ptr.Of(loop_span.QueryTypeEnumNotIn)
82+
f.Values = []string{"0"}
83+
} else {
84+
return fmt.Errorf("invalid status code query")
85+
}
86+
return nil
87+
}
88+
89+
// ms -> us
90+
func processLatencyFilter(f *loop_span.FilterField) error {
91+
if f.FieldType != loop_span.FieldTypeLong {
92+
return fmt.Errorf("latency field type should be long ")
93+
}
94+
micros := make([]string, 0)
95+
for _, val := range f.Values {
96+
integer, err := strconv.ParseInt(val, 10, 64)
97+
if err != nil {
98+
return fmt.Errorf("fail to parse long value %s, %v", val, err)
99+
}
100+
integer = timeutil.MillSec2MicroSec(integer)
101+
micros = append(micros, strconv.FormatInt(integer, 10))
102+
}
103+
f.Values = micros
104+
return nil
105+
}
Lines changed: 274 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,274 @@
1+
// Copyright (c) 2025 coze-dev Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package tracehub
5+
6+
import (
7+
"testing"
8+
9+
"github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/loop_span"
10+
"github.com/coze-dev/coze-loop/backend/pkg/lang/ptr"
11+
"github.com/stretchr/testify/assert"
12+
)
13+
14+
func TestProcessSpecificFilter_StatusFilter(t *testing.T) {
15+
tests := []struct {
16+
name string
17+
filter *loop_span.FilterField
18+
expectError bool
19+
validate func(t *testing.T, result *loop_span.FilterField)
20+
}{
21+
{
22+
name: "status filter with success and error - should convert to always true",
23+
filter: &loop_span.FilterField{
24+
FieldName: loop_span.SpanFieldStatus,
25+
FieldType: loop_span.FieldTypeString,
26+
QueryType: ptr.Of(loop_span.QueryTypeEnumIn),
27+
Values: []string{loop_span.SpanStatusSuccess, loop_span.SpanStatusError},
28+
},
29+
expectError: false,
30+
validate: func(t *testing.T, result *loop_span.FilterField) {
31+
assert.Equal(t, loop_span.SpanFieldStatusCode, result.FieldName)
32+
assert.Equal(t, loop_span.FieldTypeLong, result.FieldType)
33+
assert.Equal(t, loop_span.QueryTypeEnumAlwaysTrue, *result.QueryType)
34+
assert.Nil(t, result.Values)
35+
},
36+
},
37+
{
38+
name: "status filter with only success - should convert to value 0",
39+
filter: &loop_span.FilterField{
40+
FieldName: loop_span.SpanFieldStatus,
41+
FieldType: loop_span.FieldTypeString,
42+
QueryType: ptr.Of(loop_span.QueryTypeEnumIn),
43+
Values: []string{loop_span.SpanStatusSuccess},
44+
},
45+
expectError: false,
46+
validate: func(t *testing.T, result *loop_span.FilterField) {
47+
assert.Equal(t, loop_span.SpanFieldStatusCode, result.FieldName)
48+
assert.Equal(t, loop_span.FieldTypeLong, result.FieldType)
49+
assert.Equal(t, loop_span.QueryTypeEnumIn, *result.QueryType)
50+
assert.Equal(t, []string{"0"}, result.Values)
51+
},
52+
},
53+
{
54+
name: "status filter with only error - should convert to not in value 0",
55+
filter: &loop_span.FilterField{
56+
FieldName: loop_span.SpanFieldStatus,
57+
FieldType: loop_span.FieldTypeString,
58+
QueryType: ptr.Of(loop_span.QueryTypeEnumIn),
59+
Values: []string{loop_span.SpanStatusError},
60+
},
61+
expectError: false,
62+
validate: func(t *testing.T, result *loop_span.FilterField) {
63+
assert.Equal(t, loop_span.SpanFieldStatusCode, result.FieldName)
64+
assert.Equal(t, loop_span.FieldTypeLong, result.FieldType)
65+
assert.Equal(t, loop_span.QueryTypeEnumNotIn, *result.QueryType)
66+
assert.Equal(t, []string{"0"}, result.Values)
67+
},
68+
},
69+
{
70+
name: "status filter without in operator - should return error",
71+
filter: &loop_span.FilterField{
72+
FieldName: loop_span.SpanFieldStatus,
73+
FieldType: loop_span.FieldTypeString,
74+
QueryType: ptr.Of(loop_span.QueryTypeEnumEq),
75+
Values: []string{loop_span.SpanStatusSuccess},
76+
},
77+
expectError: true,
78+
},
79+
{
80+
name: "status filter with invalid status value - should return error",
81+
filter: &loop_span.FilterField{
82+
FieldName: loop_span.SpanFieldStatus,
83+
FieldType: loop_span.FieldTypeString,
84+
QueryType: ptr.Of(loop_span.QueryTypeEnumIn),
85+
Values: []string{"invalid_status"},
86+
},
87+
expectError: true,
88+
},
89+
{
90+
name: "status filter with empty values - should return error",
91+
filter: &loop_span.FilterField{
92+
FieldName: loop_span.SpanFieldStatus,
93+
FieldType: loop_span.FieldTypeString,
94+
QueryType: ptr.Of(loop_span.QueryTypeEnumIn),
95+
Values: []string{},
96+
},
97+
expectError: true,
98+
},
99+
}
100+
101+
for _, tt := range tests {
102+
t.Run(tt.name, func(t *testing.T) {
103+
// Make a copy to avoid modifying the original
104+
filterCopy := *tt.filter
105+
err := processSpecificFilter(&filterCopy)
106+
107+
if tt.expectError {
108+
assert.Error(t, err)
109+
} else {
110+
assert.NoError(t, err)
111+
if tt.validate != nil {
112+
tt.validate(t, &filterCopy)
113+
}
114+
}
115+
})
116+
}
117+
}
118+
119+
func TestProcessSpecificFilter_LatencyFilter(t *testing.T) {
120+
tests := []struct {
121+
name string
122+
filter *loop_span.FilterField
123+
expectError bool
124+
validate func(t *testing.T, result *loop_span.FilterField)
125+
}{
126+
{
127+
name: "duration filter - should convert ms to us",
128+
filter: &loop_span.FilterField{
129+
FieldName: loop_span.SpanFieldDuration,
130+
FieldType: loop_span.FieldTypeLong,
131+
QueryType: ptr.Of(loop_span.QueryTypeEnumGte),
132+
Values: []string{"100", "200"},
133+
},
134+
expectError: false,
135+
validate: func(t *testing.T, result *loop_span.FilterField) {
136+
assert.Equal(t, loop_span.SpanFieldDuration, result.FieldName)
137+
assert.Equal(t, loop_span.FieldTypeLong, result.FieldType)
138+
assert.Equal(t, loop_span.QueryTypeEnumGte, *result.QueryType)
139+
assert.Equal(t, []string{"100000", "200000"}, result.Values) // 100ms -> 100000us
140+
},
141+
},
142+
{
143+
name: "latency_first_resp filter - should convert ms to us",
144+
filter: &loop_span.FilterField{
145+
FieldName: loop_span.SpanFieldLatencyFirstResp,
146+
FieldType: loop_span.FieldTypeLong,
147+
QueryType: ptr.Of(loop_span.QueryTypeEnumLte),
148+
Values: []string{"50"},
149+
},
150+
expectError: false,
151+
validate: func(t *testing.T, result *loop_span.FilterField) {
152+
assert.Equal(t, loop_span.SpanFieldLatencyFirstResp, result.FieldName)
153+
assert.Equal(t, loop_span.FieldTypeLong, result.FieldType)
154+
assert.Equal(t, loop_span.QueryTypeEnumLte, *result.QueryType)
155+
assert.Equal(t, []string{"50000"}, result.Values) // 50ms -> 50000us
156+
},
157+
},
158+
{
159+
name: "start_time_first_resp filter - should convert ms to us",
160+
filter: &loop_span.FilterField{
161+
FieldName: loop_span.SpanFieldStartTimeFirstResp,
162+
FieldType: loop_span.FieldTypeLong,
163+
QueryType: ptr.Of(loop_span.QueryTypeEnumEq),
164+
Values: []string{"1000"},
165+
},
166+
expectError: false,
167+
validate: func(t *testing.T, result *loop_span.FilterField) {
168+
assert.Equal(t, loop_span.SpanFieldStartTimeFirstResp, result.FieldName)
169+
assert.Equal(t, []string{"1000000"}, result.Values) // 1000ms -> 1000000us
170+
},
171+
},
172+
{
173+
name: "start_time_first_token_resp filter - should convert ms to us",
174+
filter: &loop_span.FilterField{
175+
FieldName: loop_span.SpanFieldStartTimeFirstTokenResp,
176+
FieldType: loop_span.FieldTypeLong,
177+
QueryType: ptr.Of(loop_span.QueryTypeEnumGt),
178+
Values: []string{"10"},
179+
},
180+
expectError: false,
181+
validate: func(t *testing.T, result *loop_span.FilterField) {
182+
assert.Equal(t, []string{"10000"}, result.Values) // 10ms -> 10000us
183+
},
184+
},
185+
{
186+
name: "latency_first_token_resp filter - should convert ms to us",
187+
filter: &loop_span.FilterField{
188+
FieldName: loop_span.SpanFieldLatencyFirstTokenResp,
189+
FieldType: loop_span.FieldTypeLong,
190+
QueryType: ptr.Of(loop_span.QueryTypeEnumLt),
191+
Values: []string{"5"},
192+
},
193+
expectError: false,
194+
validate: func(t *testing.T, result *loop_span.FilterField) {
195+
assert.Equal(t, []string{"5000"}, result.Values) // 5ms -> 5000us
196+
},
197+
},
198+
{
199+
name: "reasoning_duration filter - should convert ms to us",
200+
filter: &loop_span.FilterField{
201+
FieldName: loop_span.SpanFieldReasoningDuration,
202+
FieldType: loop_span.FieldTypeLong,
203+
QueryType: ptr.Of(loop_span.QueryTypeEnumGte),
204+
Values: []string{"30"},
205+
},
206+
expectError: false,
207+
validate: func(t *testing.T, result *loop_span.FilterField) {
208+
assert.Equal(t, []string{"30000"}, result.Values) // 30ms -> 30000us
209+
},
210+
},
211+
{
212+
name: "latency filter with wrong field type - should return error",
213+
filter: &loop_span.FilterField{
214+
FieldName: loop_span.SpanFieldDuration,
215+
FieldType: loop_span.FieldTypeString,
216+
QueryType: ptr.Of(loop_span.QueryTypeEnumGte),
217+
Values: []string{"100"},
218+
},
219+
expectError: true,
220+
},
221+
{
222+
name: "latency filter with invalid value - should return error",
223+
filter: &loop_span.FilterField{
224+
FieldName: loop_span.SpanFieldDuration,
225+
FieldType: loop_span.FieldTypeLong,
226+
QueryType: ptr.Of(loop_span.QueryTypeEnumGte),
227+
Values: []string{"invalid"},
228+
},
229+
expectError: true,
230+
},
231+
}
232+
233+
for _, tt := range tests {
234+
t.Run(tt.name, func(t *testing.T) {
235+
// Make a copy to avoid modifying the original
236+
filterCopy := *tt.filter
237+
err := processSpecificFilter(&filterCopy)
238+
239+
if tt.expectError {
240+
assert.Error(t, err)
241+
} else {
242+
assert.NoError(t, err)
243+
if tt.validate != nil {
244+
tt.validate(t, &filterCopy)
245+
}
246+
}
247+
})
248+
}
249+
}
250+
251+
func TestProcessSpecificFilter_UnknownField(t *testing.T) {
252+
// Test with unknown field name - should not modify the filter
253+
filter := &loop_span.FilterField{
254+
FieldName: "unknown_field",
255+
FieldType: loop_span.FieldTypeString,
256+
QueryType: ptr.Of(loop_span.QueryTypeEnumEq),
257+
Values: []string{"test"},
258+
}
259+
260+
original := *filter
261+
err := processSpecificFilter(filter)
262+
263+
assert.NoError(t, err)
264+
assert.Equal(t, original.FieldName, filter.FieldName)
265+
assert.Equal(t, original.FieldType, filter.FieldType)
266+
assert.Equal(t, original.QueryType, filter.QueryType)
267+
assert.Equal(t, original.Values, filter.Values)
268+
}
269+
270+
func TestProcessSpecificFilter_NilFilter(t *testing.T) {
271+
// Test with nil filter - should not panic
272+
err := processSpecificFilter(nil)
273+
assert.NoError(t, err)
274+
}

0 commit comments

Comments
 (0)