Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions internal/datacoord/compaction_task_clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,14 @@ func (t *clusteringCompactionTask) CreateTaskOnWorker(nodeID int64, cluster sess
}

func (t *clusteringCompactionTask) QueryTaskOnWorker(cluster session.Cluster) {
// If task is in analyzing state, skip querying the DataNode — the compaction has not been
// submitted yet. The state transition (analyzing → pipelining) is driven by Process() /
// processAnalyzing(). Once the state becomes pipelining, the scheduler will move the task
// back to pendingTasks and CreateTaskOnWorker will call doCompact.
if t.GetTaskProto().GetState() == datapb.CompactionTaskState_analyzing {
return
}

log := log.Ctx(context.TODO()).With(zap.Int64("planID", t.GetTaskProto().GetPlanID()),
zap.String("type", t.GetTaskProto().GetType().String()))

Expand Down
94 changes: 94 additions & 0 deletions internal/datacoord/compaction_task_clustering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/objectstorage"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
"github.com/milvus-io/milvus/pkg/v2/taskcommon"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/metautil"
)
Expand Down Expand Up @@ -552,6 +553,18 @@ func (s *ClusteringCompactionTaskSuite) TestQueryTaskOnWorker() {
})
}

func (s *ClusteringCompactionTaskSuite) TestQueryTaskOnWorkerSkipAnalyzing() {
s.Run("QueryTaskOnWorker skips when state is analyzing", func() {
task := s.generateBasicTask(true) // vector clustering key
task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_analyzing))
cluster := session.NewMockCluster(s.T())
// No QueryCompaction mock — if QueryTaskOnWorker calls it, the mock will panic.
task.QueryTaskOnWorker(cluster)
// State should remain analyzing, not be reset to pipelining.
s.Equal(datapb.CompactionTaskState_analyzing, task.GetTaskProto().GetState())
})
}

func (s *ClusteringCompactionTaskSuite) TestProcess() {
s.Run("test process states", func() {
testCases := []struct {
Expand Down Expand Up @@ -805,6 +818,87 @@ func ConstructClusteringSchema(collection string, dim int, autoID bool, vectorCl
}
}

// TestVectorClusteringKeyLifecycle verifies the full lifecycle of a vector clustering key
// compaction task through the state machine. This exercises the fix for #47540 where
// vector clustering key tasks got stuck because:
// - FromCompactionState(analyzing) returned None instead of InProgress
// - QueryTaskOnWorker did not skip the analyzing state, causing it to reset to pipelining
//
// The lifecycle:
//
// pipelining → (CreateTaskOnWorker/doAnalyze) → analyzing
// → (processAnalyzing detects finished) → pipelining (with AnalyzeVersion)
// → (CreateTaskOnWorker/doCompact) → executing
func (s *ClusteringCompactionTaskSuite) TestVectorClusteringKeyLifecycle() {
// Step 1: Create a vector clustering key task in pipelining state.
task := s.generateBasicTask(true)
analyzeTaskID := task.GetTaskProto().GetAnalyzeTaskID()

// Add required segments to meta so that doCompact/BuildCompactionRequest will not fail.
s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 101,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
},
})
s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 102,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L2,
PartitionStatsVersion: 10000,
},
})

// Step 2: CreateTaskOnWorker should call doAnalyze (because vector clustering key
// with AnalyzeVersion == 0), transitioning state to analyzing.
cluster1 := session.NewMockCluster(s.T())
// No CreateCompaction mock — doAnalyze does not call it.
task.CreateTaskOnWorker(1, cluster1)
s.Equal(datapb.CompactionTaskState_analyzing, task.GetTaskProto().GetState(),
"after CreateTaskOnWorker with vector key and AnalyzeVersion==0, state should be analyzing")

// Step 3: Verify FromCompactionState(analyzing) returns InProgress (not None).
// This was the root cause of #47540.
s.Equal(taskcommon.InProgress, taskcommon.FromCompactionState(datapb.CompactionTaskState_analyzing),
"FromCompactionState(analyzing) must return InProgress so the scheduler treats it as a running task")

// Step 4: QueryTaskOnWorker with a mock cluster that has NO QueryCompaction mock.
// If QueryTaskOnWorker incorrectly calls QueryCompaction in the analyzing state,
// the mock will panic. The fix ensures it returns early.
cluster2 := session.NewMockCluster(s.T())
task.QueryTaskOnWorker(cluster2)
s.Equal(datapb.CompactionTaskState_analyzing, task.GetTaskProto().GetState(),
"QueryTaskOnWorker should skip when state is analyzing, leaving state unchanged")

// Step 5: Simulate analyze completion.
// doAnalyze already added the analyze task to analyzeMeta with the task's AnalyzeTaskID.
// Now update that task to finished state with a CentroidsFile and Version set.
analyzeResult := s.meta.GetAnalyzeMeta().GetTask(analyzeTaskID)
s.Require().NotNil(analyzeResult, "analyze task should exist in analyzeMeta after doAnalyze")
analyzeResult.State = indexpb.JobState_JobStateFinished
analyzeResult.CentroidsFile = "analyze/centroids/file"
analyzeResult.Version = 42

// Step 6: Process() drives the state machine; processAnalyzing() detects the finished
// analyze task and transitions state back to pipelining (with AnalyzeVersion set).
done := task.Process()
s.False(done, "Process should return false because pipelining is not a terminal state")
s.Equal(datapb.CompactionTaskState_pipelining, task.GetTaskProto().GetState(),
"after processAnalyzing detects finished analyze, state should be pipelining")
s.Greater(task.GetTaskProto().GetAnalyzeVersion(), int64(0),
"AnalyzeVersion should be set after analyze finishes")

// Step 7: CreateTaskOnWorker again — now AnalyzeVersion > 0, so doCompact is called
// instead of doAnalyze. This requires a cluster that handles CreateCompaction.
cluster3 := session.NewMockCluster(s.T())
cluster3.EXPECT().CreateCompaction(mock.Anything, mock.Anything).Return(nil)
task.CreateTaskOnWorker(1, cluster3)
s.Equal(datapb.CompactionTaskState_executing, task.GetTaskProto().GetState(),
"after CreateTaskOnWorker with AnalyzeVersion>0, state should be executing (doCompact path)")
}

func (s *ClusteringCompactionTaskSuite) TestProcessStatsState() {
s.Run("compaction to not exist", func() {
task := s.generateBasicTask(false)
Expand Down
73 changes: 72 additions & 1 deletion internal/datacoord/task_analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,23 @@ package datacoord

import (
"context"
"fmt"
"math"
"time"

"github.com/samber/lo"
"go.uber.org/zap"
"golang.org/x/exp/slices"

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datacoord/session"
globalTask "github.com/milvus-io/milvus/internal/datacoord/task"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
"github.com/milvus-io/milvus/pkg/v2/proto/workerpb"
"github.com/milvus-io/milvus/pkg/v2/taskcommon"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)

type analyzeTask struct {
Expand Down Expand Up @@ -122,7 +128,8 @@ func (at *analyzeTask) dropAndResetTaskOnWorker(cluster session.Cluster, reason
}

func (at *analyzeTask) CreateTaskOnWorker(nodeID int64, cluster session.Cluster) {
log := log.Ctx(context.TODO()).With(zap.Int64("taskID", at.GetTaskID()))
ctx := context.TODO()
log := log.Ctx(ctx).With(zap.Int64("taskID", at.GetTaskID()))

// Check if task still exists in meta
task := at.meta.analyzeMeta.GetTask(at.GetTaskID())
Expand Down Expand Up @@ -150,6 +157,70 @@ func (at *analyzeTask) CreateTaskOnWorker(nodeID int64, cluster session.Cluster)
Version: task.Version + 1,
StorageConfig: createStorageConfig(),
}

// Populate SegmentStats with binlog IDs and row counts from segment metadata.
segments := at.meta.SelectSegments(ctx, SegmentFilterFunc(func(info *SegmentInfo) bool {
return isSegmentHealthy(info) && slices.Contains(task.SegmentIDs, info.ID)
}))
segmentsMap := lo.SliceToMap(segments, func(t *SegmentInfo) (int64, *SegmentInfo) {
return t.ID, t
})

totalSegmentsRows := int64(0)
for _, segID := range task.SegmentIDs {
info := segmentsMap[segID]
if info == nil {
log.Warn("analyze task is processing, but segment is nil, fail the task",
zap.Int64("segmentID", segID))
at.SetState(indexpb.JobState_JobStateFailed, fmt.Sprintf("segmentInfo with ID: %d is nil", segID))
return
}
totalSegmentsRows += info.GetNumOfRows()
binlogIDs := getBinLogIDs(info, task.FieldID)
req.SegmentStats[segID] = &indexpb.SegmentStats{
ID: segID,
NumRows: info.GetNumOfRows(),
LogIDs: binlogIDs,
}
}

// Extract dim from schema field TypeParams for vector clustering key.
if at.schema != nil {
for _, f := range at.schema.Fields {
if f.FieldID == task.FieldID {
dim, err := storage.GetDimFromParams(f.TypeParams)
if err != nil {
at.SetState(indexpb.JobState_JobStateInit, err.Error())
return
}
req.Dim = int64(dim)

// Calculate the number of clusters based on total data size.
totalSegmentsRawDataSize := float64(totalSegmentsRows) * float64(dim) * typeutil.VectorTypeSize(task.FieldType)
numClusters := int64(math.Ceil(totalSegmentsRawDataSize / (Params.DataCoordCfg.SegmentMaxSize.GetAsFloat() * 1024 * 1024 * Params.DataCoordCfg.ClusteringCompactionMaxSegmentSizeRatio.GetAsFloat())))
if numClusters < Params.DataCoordCfg.ClusteringCompactionMinCentroidsNum.GetAsInt64() {
log.Info("data size is too small, skip analyze task",
zap.Float64("raw data size", totalSegmentsRawDataSize),
zap.Int64("num clusters", numClusters),
zap.Int64("minimum num clusters required", Params.DataCoordCfg.ClusteringCompactionMinCentroidsNum.GetAsInt64()))
at.SetState(indexpb.JobState_JobStateFinished, "")
return
}
if numClusters > Params.DataCoordCfg.ClusteringCompactionMaxCentroidsNum.GetAsInt64() {
numClusters = Params.DataCoordCfg.ClusteringCompactionMaxCentroidsNum.GetAsInt64()
}
req.NumClusters = numClusters
break
}
}
}

req.MaxTrainSizeRatio = Params.DataCoordCfg.ClusteringCompactionMaxTrainSizeRatio.GetAsFloat()
req.MinClusterSizeRatio = Params.DataCoordCfg.ClusteringCompactionMinClusterSizeRatio.GetAsFloat()
req.MaxClusterSizeRatio = Params.DataCoordCfg.ClusteringCompactionMaxClusterSizeRatio.GetAsFloat()
req.MaxClusterSize = Params.DataCoordCfg.ClusteringCompactionMaxClusterSize.GetAsSize()
req.TaskSlot = Params.DataCoordCfg.AnalyzeTaskSlotUsage.GetAsInt64()

WrapPluginContext(task.CollectionID, at.schema.GetProperties(), req)

var err error
Expand Down
48 changes: 47 additions & 1 deletion internal/datacoord/task_analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datacoord/session"
catalogmocks "github.com/milvus-io/milvus/internal/metastore/mocks"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
"github.com/milvus-io/milvus/pkg/v2/proto/workerpb"
"github.com/milvus-io/milvus/pkg/v2/taskcommon"
Expand Down Expand Up @@ -80,9 +83,52 @@ func (s *analyzeTaskSuite) SetupSuite() {
}
analyzeMt.tasks[s.taskID] = analyzeTask

schema := &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: s.fieldID,
Name: "vector_field",
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
},
},
},
}

collections := typeutil.NewConcurrentMap[int64, *collectionInfo]()
collections.Insert(s.collID, &collectionInfo{Schema: schema})

segments := NewSegmentsInfo()
segments.SetSegment(101, &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 101,
CollectionID: s.collID,
PartitionID: s.partID,
State: commonpb.SegmentState_Flushed,
NumOfRows: 1000,
Binlogs: []*datapb.FieldBinlog{
{FieldID: s.fieldID, Binlogs: []*datapb.Binlog{{LogID: 1001}, {LogID: 1002}}},
},
},
})
segments.SetSegment(102, &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 102,
CollectionID: s.collID,
PartitionID: s.partID,
State: commonpb.SegmentState_Flushed,
NumOfRows: 2000,
Binlogs: []*datapb.FieldBinlog{
{FieldID: s.fieldID, Binlogs: []*datapb.Binlog{{LogID: 2001}, {LogID: 2002}}},
},
},
})

s.mt = &meta{
analyzeMeta: analyzeMt,
collections: typeutil.NewConcurrentMap[int64, *collectionInfo](),
collections: collections,
segments: segments,
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/taskcommon/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func FromCompactionState(s datapb.CompactionTaskState) State {
switch s {
case datapb.CompactionTaskState_pipelining:
return Init
case datapb.CompactionTaskState_executing:
case datapb.CompactionTaskState_executing, datapb.CompactionTaskState_analyzing:
return InProgress
case datapb.CompactionTaskState_completed, datapb.CompactionTaskState_meta_saved,
datapb.CompactionTaskState_statistic, datapb.CompactionTaskState_indexing, datapb.CompactionTaskState_cleaned:
Expand Down
51 changes: 51 additions & 0 deletions pkg/taskcommon/state_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package taskcommon

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
)

func TestFromCompactionState(t *testing.T) {
tests := []struct {
name string
input datapb.CompactionTaskState
expected State
}{
{"pipelining maps to Init", datapb.CompactionTaskState_pipelining, Init},
{"executing maps to InProgress", datapb.CompactionTaskState_executing, InProgress},
{"analyzing maps to InProgress", datapb.CompactionTaskState_analyzing, InProgress},
{"completed maps to Finished", datapb.CompactionTaskState_completed, Finished},
{"meta_saved maps to Finished", datapb.CompactionTaskState_meta_saved, Finished},
{"statistic maps to Finished", datapb.CompactionTaskState_statistic, Finished},
{"indexing maps to Finished", datapb.CompactionTaskState_indexing, Finished},
{"cleaned maps to Finished", datapb.CompactionTaskState_cleaned, Finished},
{"failed maps to Failed", datapb.CompactionTaskState_failed, Failed},
{"timeout maps to Retry", datapb.CompactionTaskState_timeout, Retry},
{"unknown maps to None", datapb.CompactionTaskState_unknown, None},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equal(t, tt.expected, FromCompactionState(tt.input))
})
}
}
Loading
Loading