Skip to content

Commit 6f80ba2

Browse files
authored
[fix][backend] Fix task npe. (#321)
* Fix NPE. * Fix task finish bug. * Fix no backfill detail bug.
1 parent 90f6f7e commit 6f80ba2

File tree

13 files changed

+386
-36
lines changed

13 files changed

+386
-36
lines changed

.github/workflows/ai-cr-required.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ jobs:
2828
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
2929
run: |
3030
echo "PR number: $PR_NUMBER"
31-
COMMENTS=$(gh api -H "Accept: application/vnd.github+json" /repos/${{ github.repository }}/pulls/${PR_NUMBER}/comments --jq '.[].user.login')
31+
COMMENTS=$(gh api -H "Accept: application/vnd.github+json" /repos/${{ github.repository }}/pulls/${PR_NUMBER}/reviews --jq '.[].user.login')
3232
echo "Comment authors: $COMMENTS"
3333
echo "$COMMENTS" | grep -q "^CozeLoop$"
3434
if [ $? -ne 0 ]; then

backend/modules/observability/application/convertor/task/task.go

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -243,11 +243,11 @@ func BackfillRunDetailDO2DTO(backfillDetail *entity.BackfillDetail) *task.Backfi
243243
return nil
244244
}
245245
return &task.BackfillDetail{
246-
SuccessCount: backfillDetail.SuccessCount,
247-
FailedCount: backfillDetail.FailedCount,
248-
TotalCount: backfillDetail.TotalCount,
249-
BackfillStatus: backfillDetail.BackfillStatus,
250-
LastSpanPageToken: backfillDetail.LastSpanPageToken,
246+
SuccessCount: &backfillDetail.SuccessCount,
247+
FailedCount: &backfillDetail.FailedCount,
248+
TotalCount: &backfillDetail.TotalCount,
249+
BackfillStatus: &backfillDetail.BackfillStatus,
250+
LastSpanPageToken: &backfillDetail.LastSpanPageToken,
251251
}
252252
}
253253

@@ -460,6 +460,7 @@ func TaskConfigDTO2DO(taskConfig *task.TaskConfig) *entity.TaskConfig {
460460
}
461461
}
462462

463+
/*
463464
func TaskRunDTO2DO(taskRun *task.TaskRun) *entity.TaskRun {
464465
if taskRun == nil {
465466
return nil
@@ -479,6 +480,7 @@ func TaskRunDTO2DO(taskRun *task.TaskRun) *entity.TaskRun {
479480
UpdatedAt: time.UnixMilli(taskRun.GetBaseInfo().GetUpdatedAt()),
480481
}
481482
}
483+
*/
482484

483485
func TaskRunConfigDTO2DO(v *task.TaskRunConfig) *entity.TaskRunConfig {
484486
if v == nil {
@@ -515,18 +517,20 @@ func TaskRunConfigDTO2DO(v *task.TaskRunConfig) *entity.TaskRunConfig {
515517
}
516518
}
517519

520+
/*
518521
func BackfillRunDetailDTO2DO(v *task.BackfillDetail) *entity.BackfillDetail {
519522
if v == nil {
520523
return nil
521524
}
522525
return &entity.BackfillDetail{
523-
SuccessCount: v.SuccessCount,
524-
FailedCount: v.FailedCount,
525-
TotalCount: v.TotalCount,
526-
BackfillStatus: v.BackfillStatus,
527-
LastSpanPageToken: v.LastSpanPageToken,
526+
SuccessCount: v.GetSuccessCount(),
527+
FailedCount: v.GetFailedCount(),
528+
TotalCount: v.GetTotalCount(),
529+
BackfillStatus: v.GetBackfillStatus(),
530+
LastSpanPageToken: v.GetLastSpanPageToken(),
528531
}
529532
}
533+
*/
530534

531535
func getLastPartAfterDot(s string) string {
532536
s = strings.TrimRight(s, ".")

backend/modules/observability/application/convertor/task/task_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,13 @@ func TestTaskDOs2DTOs(t *testing.T) {
4646
FailedCount: 1,
4747
TotalCount: 4,
4848
},
49+
BackfillDetail: &entity.BackfillDetail{
50+
SuccessCount: 3,
51+
FailedCount: 1,
52+
TotalCount: 4,
53+
BackfillStatus: kitTask.RunStatusRunning,
54+
LastSpanPageToken: "abc",
55+
},
4956
CreatedAt: now,
5057
UpdatedAt: now,
5158
}

backend/modules/observability/domain/task/entity/task.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -145,11 +145,11 @@ type TaskRun struct {
145145
UpdatedAt time.Time // 更新时间
146146
}
147147
type BackfillDetail struct {
148-
SuccessCount *int64 `json:"success_count"`
149-
FailedCount *int64 `json:"failed_count"`
150-
TotalCount *int64 `json:"total_count"`
151-
BackfillStatus *string `json:"backfill_status"`
152-
LastSpanPageToken *string `json:"last_span_page_token"`
148+
SuccessCount int64 `json:"success_count,omitempty"`
149+
FailedCount int64 `json:"failed_count,omitempty"`
150+
TotalCount int64 `json:"total_count,omitempty"`
151+
BackfillStatus string `json:"backfill_status,omitempty"`
152+
LastSpanPageToken string `json:"last_span_page_token,omitempty"`
153153
}
154154

155155
type TaskRunConfig struct {

backend/modules/observability/domain/task/service/taskexe/tracehub/backfill.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -153,8 +153,11 @@ func (h *TraceHubServiceImpl) listAndSendSpans(ctx context.Context, sub *spanSub
153153
NotQueryAnnotation: true, // No annotation query required during backfill
154154
}
155155

156-
if sub.tr.BackfillDetail != nil && sub.tr.BackfillDetail.LastSpanPageToken != nil {
157-
listParam.PageToken = *sub.tr.BackfillDetail.LastSpanPageToken
156+
if sub.tr.BackfillDetail != nil && sub.tr.BackfillDetail.LastSpanPageToken != "" {
157+
listParam.PageToken = sub.tr.BackfillDetail.LastSpanPageToken
158+
}
159+
if sub.tr.BackfillDetail == nil {
160+
sub.tr.BackfillDetail = &entity.BackfillDetail{}
158161
}
159162

160163
totalCount := int64(0)
@@ -174,6 +177,11 @@ func (h *TraceHubServiceImpl) listAndSendSpans(ctx context.Context, sub *spanSub
174177
totalCount += int64(len(spans))
175178
logs.CtxInfo(ctx, "Processed %d spans completed, total=%d, task_id=%d", len(spans), totalCount, sub.t.ID)
176179

180+
if pageToken != "" {
181+
listParam.PageToken = pageToken
182+
sub.tr.BackfillDetail.LastSpanPageToken = pageToken
183+
}
184+
177185
// todo 不应该这里直接写po字段
178186
err = h.taskRepo.UpdateTaskRunWithOCC(ctx, sub.tr.ID, sub.tr.WorkspaceID, map[string]interface{}{
179187
"backfill_detail": ToJSONString(ctx, sub.tr.BackfillDetail),
@@ -188,14 +196,12 @@ func (h *TraceHubServiceImpl) listAndSendSpans(ctx context.Context, sub *spanSub
188196
if err = sub.processor.OnTaskFinished(ctx, taskexe.OnTaskFinishedReq{
189197
Task: sub.t,
190198
TaskRun: sub.tr,
191-
IsFinish: false,
199+
IsFinish: false, // 任务可能同时有历史回溯和新任务,不能直接关闭
192200
}); err != nil {
193201
return err
194202
}
195203
return nil
196204
}
197-
listParam.PageToken = pageToken
198-
sub.tr.BackfillDetail.LastSpanPageToken = &pageToken
199205
}
200206
}
201207

@@ -424,8 +430,6 @@ func (h *TraceHubServiceImpl) processSpansForBackfill(ctx context.Context, spans
424430
func (h *TraceHubServiceImpl) processBatchSpans(ctx context.Context, spans []*loop_span.Span, sub *spanSubscriber) (err error, shouldFinish bool) {
425431
for _, span := range spans {
426432
// Execute processing logic according to the task type
427-
logs.CtxInfo(ctx, "processing span for backfill, span_id=%s, trace_id=%s, task_id=%d",
428-
span.SpanID, span.TraceID, sub.t.ID)
429433
taskCount, _ := h.taskRepo.GetTaskCount(ctx, sub.taskID)
430434
sampler := sub.t.Sampler
431435
if taskCount+1 > sampler.SampleSize {

backend/modules/observability/domain/task/service/taskexe/tracehub/backfill_test.go

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,66 @@ func TestTraceHubServiceImpl_ListAndSendSpans_GetTenantsError(t *testing.T) {
301301
require.ErrorIs(t, err, tenantErr)
302302
}
303303

304+
func TestTraceHubServiceImpl_ListAndSendSpans_WithoutLastSpanPageToken(t *testing.T) {
305+
ctrl := gomock.NewController(t)
306+
t.Cleanup(ctrl.Finish)
307+
308+
mockTaskRepo := repo_mocks.NewMockITaskRepo(ctrl)
309+
mockTraceRepo := trepo_mocks.NewMockITraceRepo(ctrl)
310+
mockTenant := tenant_mocks.NewMockITenantProvider(ctrl)
311+
mockBuilder := builder_mocks.NewMockTraceFilterProcessorBuilder(ctrl)
312+
filterMock := spanfilter_mocks.NewMockFilter(ctrl)
313+
314+
impl := &TraceHubServiceImpl{
315+
taskRepo: mockTaskRepo,
316+
traceRepo: mockTraceRepo,
317+
tenantProvider: mockTenant,
318+
buildHelper: mockBuilder,
319+
}
320+
321+
now := time.Now()
322+
sub, proc := newBackfillSubscriber(mockTaskRepo, now)
323+
domainRun := newDomainBackfillTaskRun(now)
324+
span := newTestSpan(now)
325+
326+
mockBuilder.EXPECT().BuildPlatformRelatedFilter(gomock.Any(), loop_span.PlatformType(common.PlatformTypeCozeBot)).
327+
Return(filterMock, nil)
328+
filterMock.EXPECT().BuildBasicSpanFilter(gomock.Any(), gomock.Any()).Return([]*loop_span.FilterField{}, true, nil)
329+
filterMock.EXPECT().BuildRootSpanFilter(gomock.Any(), gomock.Any()).Return([]*loop_span.FilterField{}, nil)
330+
mockBuilder.EXPECT().BuildGetTraceProcessors(gomock.Any(), gomock.Any()).Return([]span_processor.Processor(nil), nil).Times(2)
331+
mockTenant.EXPECT().GetTenantsByPlatformType(gomock.Any(), loop_span.PlatformType(common.PlatformTypeCozeBot)).Return([]string{"tenant"}, nil)
332+
333+
mockTraceRepo.EXPECT().ListSpans(gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, param *repo.ListSpansParam) (*repo.ListSpansResult, error) {
334+
switch param.PageToken {
335+
case "":
336+
return &repo.ListSpansResult{
337+
Spans: loop_span.SpanList{span},
338+
PageToken: "next",
339+
HasMore: true,
340+
}, nil
341+
case "next":
342+
return &repo.ListSpansResult{
343+
Spans: loop_span.SpanList{span},
344+
PageToken: "",
345+
HasMore: false,
346+
}, nil
347+
default:
348+
return nil, errors.New("invalid token")
349+
}
350+
}).Times(2)
351+
352+
mockTaskRepo.EXPECT().GetTaskCount(gomock.Any(), int64(1)).Return(int64(0), nil).Times(2)
353+
mockTaskRepo.EXPECT().GetBackfillTaskRun(gomock.Any(), gomock.Nil(), int64(1)).Return(domainRun, nil).Times(2)
354+
mockTaskRepo.EXPECT().UpdateTaskRunWithOCC(gomock.Any(), sub.tr.ID, sub.tr.WorkspaceID, gomock.AssignableToTypeOf(map[string]interface{}{})).Return(nil).Times(2)
355+
356+
err := impl.listAndSendSpans(context.Background(), sub)
357+
require.NoError(t, err)
358+
require.True(t, proc.invokeCalled)
359+
require.NotNil(t, sub.tr.BackfillDetail)
360+
require.NotNil(t, sub.tr.BackfillDetail.LastSpanPageToken)
361+
require.Equal(t, "next", sub.tr.BackfillDetail.LastSpanPageToken)
362+
}
363+
304364
func TestTraceHubServiceImpl_ListAndSendSpans_Success(t *testing.T) {
305365
ctrl := gomock.NewController(t)
306366
t.Cleanup(ctrl.Finish)
@@ -320,7 +380,7 @@ func TestTraceHubServiceImpl_ListAndSendSpans_Success(t *testing.T) {
320380

321381
now := time.Now()
322382
sub, proc := newBackfillSubscriber(mockTaskRepo, now)
323-
sub.tr.BackfillDetail = &entity.BackfillDetail{LastSpanPageToken: ptr.Of("prev")}
383+
sub.tr.BackfillDetail = &entity.BackfillDetail{LastSpanPageToken: "prev"}
324384
domainRun := newDomainBackfillTaskRun(now)
325385
span := newTestSpan(now)
326386

@@ -350,7 +410,7 @@ func TestTraceHubServiceImpl_ListAndSendSpans_Success(t *testing.T) {
350410
require.True(t, proc.invokeCalled)
351411
require.NotNil(t, sub.tr.BackfillDetail)
352412
require.NotNil(t, sub.tr.BackfillDetail.LastSpanPageToken)
353-
require.Equal(t, "prev", ptr.From(sub.tr.BackfillDetail.LastSpanPageToken))
413+
require.Equal(t, "prev", sub.tr.BackfillDetail.LastSpanPageToken)
354414
}
355415

356416
func TestTraceHubServiceImpl_ListAndSendSpans_ListError(t *testing.T) {

backend/modules/observability/domain/task/service/taskexe/tracehub/local_cache.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,6 @@ func (l *LocalCache) LoadTaskCache(ctx context.Context) TaskCacheInfo {
5050
return TaskCacheInfo{}
5151
}
5252

53-
logs.CtxInfo(ctx, "Retrieve task list from cache, taskCount=%d, spaceCount=%d, botCount=%d", len(cacheInfo.Tasks), len(cacheInfo.WorkspaceIDs), len(cacheInfo.BotIDs))
53+
logs.CtxDebug(ctx, "Retrieve task list from cache, taskCount=%d, spaceCount=%d, botCount=%d", len(cacheInfo.Tasks), len(cacheInfo.WorkspaceIDs), len(cacheInfo.BotIDs))
5454
return cacheInfo
5555
}

backend/modules/observability/domain/task/service/taskexe/tracehub/span_trigger.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,13 @@ import (
1919

2020
func (h *TraceHubServiceImpl) SpanTrigger(ctx context.Context, span *loop_span.Span) error {
2121
logSuffix := fmt.Sprintf("log_id=%s, trace_id=%s, span_id=%s", span.LogID, span.TraceID, span.SpanID)
22-
logs.CtxInfo(ctx, "auto_task start, %s", logSuffix)
2322

2423
// 1. perform initial filtering based on space_id
2524
// 1.1 Filter out spans that do not belong to any space or bot
2625
cacheInfo := h.localCache.LoadTaskCache(ctx)
2726
spaceIDs, botIDs := cacheInfo.WorkspaceIDs, cacheInfo.BotIDs
2827
if !gslice.Contains(spaceIDs, span.WorkspaceID) && !gslice.Contains(botIDs, span.TagsString["bot_id"]) {
29-
logs.CtxInfo(ctx, "no space or bot found for span, space_id=%s, bot_id=%s, %s", span.WorkspaceID, span.TagsString["bot_id"], logSuffix)
28+
logs.CtxDebug(ctx, "no space or bot found for span, space_id=%s, bot_id=%s, %s", span.WorkspaceID, span.TagsString["bot_id"], logSuffix)
3029
return nil
3130
}
3231
// 1.2 Filter out spans of type Evaluator
@@ -253,7 +252,6 @@ func (h *TraceHubServiceImpl) dispatch(ctx context.Context, span *loop_span.Span
253252
if sub.t.TaskStatus != entity.TaskStatusRunning {
254253
continue
255254
}
256-
logs.CtxInfo(ctx, " sub.AddSpan: %v", sub)
257255
if err := sub.AddSpan(ctx, span); err != nil {
258256
merr = multierror.Append(merr, errors.WithMessagef(err, "add span to subscriber, log_id=%s, trace_id=%s, span_id=%s, task_id=%d",
259257
span.LogID, span.TraceID, span.SpanID, sub.taskID))

backend/modules/observability/domain/task/service/taskexe/tracehub/subscriber.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ func (s *spanSubscriber) AddSpan(ctx context.Context, span *loop_span.Span) erro
180180
return nil
181181
}
182182
trigger := &taskexe.Trigger{Task: s.t, Span: span, TaskRun: taskRunConfig}
183-
logs.CtxInfo(ctx, "invoke processor, trigger: %v", trigger)
183+
logs.CtxDebug(ctx, "invoke processor, trigger: %v", trigger)
184184
err = s.processor.Invoke(ctx, trigger)
185185
if err != nil {
186186
logs.CtxWarn(ctx, "invoke processor failed, trace_id=%s, span_id=%s, err: %v", span.TraceID, span.SpanID, err)

backend/modules/observability/infra/mq/consumer/task_consumer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,9 @@ func (e *TaskConsumer) HandleMessage(ctx context.Context, ext *mq.MessageExt) er
5656
ctx = logs.SetLogID(ctx, logID)
5757
event := new(entity.RawSpan)
5858
if err := json.Unmarshal(ext.Body, event); err != nil {
59-
logs.CtxError(ctx, "Task msg json unmarshal fail, raw: %v, err: %s", conv.UnsafeBytesToString(ext.Body), err)
59+
logs.CtxWarn(ctx, "Task msg json unmarshal fail, raw: %v, err: %s", conv.UnsafeBytesToString(ext.Body), err)
6060
return nil
6161
}
62-
logs.CtxInfo(ctx, "Span msg,log_id=%s, trace_id=%s, span_id=%s,msgID=%s", event.LogID, event.TraceID, event.SpanID, ext.MsgID)
62+
logs.CtxDebug(ctx, "Span msg,log_id=%s, trace_id=%s, span_id=%s,msgID=%s", event.LogID, event.TraceID, event.SpanID, ext.MsgID)
6363
return e.handler.SpanTrigger(ctx, event, nil)
6464
}

0 commit comments

Comments
 (0)