diff --git a/internal/datacoord/compaction_task_clustering.go b/internal/datacoord/compaction_task_clustering.go index e063032ca3c24..f370ab4764789 100644 --- a/internal/datacoord/compaction_task_clustering.go +++ b/internal/datacoord/compaction_task_clustering.go @@ -137,6 +137,14 @@ func (t *clusteringCompactionTask) CreateTaskOnWorker(nodeID int64, cluster sess } func (t *clusteringCompactionTask) QueryTaskOnWorker(cluster session.Cluster) { + // If task is in analyzing state, skip querying the DataNode — the compaction has not been + // submitted yet. The state transition (analyzing → pipelining) is driven by Process() / + // processAnalyzing(). Once the state becomes pipelining, the scheduler will move the task + // back to pendingTasks and CreateTaskOnWorker will call doCompact. + if t.GetTaskProto().GetState() == datapb.CompactionTaskState_analyzing { + return + } + log := log.Ctx(context.TODO()).With(zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.String("type", t.GetTaskProto().GetType().String())) diff --git a/internal/datacoord/compaction_task_clustering_test.go b/internal/datacoord/compaction_task_clustering_test.go index 990977f8ec78c..e3b2e973149a7 100644 --- a/internal/datacoord/compaction_task_clustering_test.go +++ b/internal/datacoord/compaction_task_clustering_test.go @@ -552,6 +552,18 @@ func (s *ClusteringCompactionTaskSuite) TestQueryTaskOnWorker() { }) } +func (s *ClusteringCompactionTaskSuite) TestQueryTaskOnWorkerSkipAnalyzing() { + s.Run("QueryTaskOnWorker skips when state is analyzing", func() { + task := s.generateBasicTask(true) // vector clustering key + task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_analyzing)) + cluster := session.NewMockCluster(s.T()) + // No QueryCompaction mock — if QueryTaskOnWorker calls it, the mock will panic. + task.QueryTaskOnWorker(cluster) + // State should remain analyzing, not be reset to pipelining. + s.Equal(datapb.CompactionTaskState_analyzing, task.GetTaskProto().GetState()) + }) +} + func (s *ClusteringCompactionTaskSuite) TestProcess() { s.Run("test process states", func() { testCases := []struct { diff --git a/internal/datacoord/task_analyze.go b/internal/datacoord/task_analyze.go index 7fcdb1c5bfe70..90d7c8e8e9f60 100644 --- a/internal/datacoord/task_analyze.go +++ b/internal/datacoord/task_analyze.go @@ -18,17 +18,23 @@ package datacoord import ( "context" + "fmt" + "math" "time" + "github.com/samber/lo" "go.uber.org/zap" + "golang.org/x/exp/slices" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/datacoord/session" globalTask "github.com/milvus-io/milvus/internal/datacoord/task" + "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/proto/indexpb" "github.com/milvus-io/milvus/pkg/v2/proto/workerpb" "github.com/milvus-io/milvus/pkg/v2/taskcommon" + "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) type analyzeTask struct { @@ -122,7 +128,8 @@ func (at *analyzeTask) dropAndResetTaskOnWorker(cluster session.Cluster, reason } func (at *analyzeTask) CreateTaskOnWorker(nodeID int64, cluster session.Cluster) { - log := log.Ctx(context.TODO()).With(zap.Int64("taskID", at.GetTaskID())) + ctx := context.TODO() + log := log.Ctx(ctx).With(zap.Int64("taskID", at.GetTaskID())) // Check if task still exists in meta task := at.meta.analyzeMeta.GetTask(at.GetTaskID()) @@ -150,6 +157,70 @@ func (at *analyzeTask) CreateTaskOnWorker(nodeID int64, cluster session.Cluster) Version: task.Version + 1, StorageConfig: createStorageConfig(), } + + // Populate SegmentStats with binlog IDs and row counts from segment metadata. + segments := at.meta.SelectSegments(ctx, SegmentFilterFunc(func(info *SegmentInfo) bool { + return isSegmentHealthy(info) && slices.Contains(task.SegmentIDs, info.ID) + })) + segmentsMap := lo.SliceToMap(segments, func(t *SegmentInfo) (int64, *SegmentInfo) { + return t.ID, t + }) + + totalSegmentsRows := int64(0) + for _, segID := range task.SegmentIDs { + info := segmentsMap[segID] + if info == nil { + log.Warn("analyze task is processing, but segment is nil, fail the task", + zap.Int64("segmentID", segID)) + at.SetState(indexpb.JobState_JobStateFailed, fmt.Sprintf("segmentInfo with ID: %d is nil", segID)) + return + } + totalSegmentsRows += info.GetNumOfRows() + binlogIDs := getBinLogIDs(info, task.FieldID) + req.SegmentStats[segID] = &indexpb.SegmentStats{ + ID: segID, + NumRows: info.GetNumOfRows(), + LogIDs: binlogIDs, + } + } + + // Extract dim from schema field TypeParams for vector clustering key. + if at.schema != nil { + for _, f := range at.schema.Fields { + if f.FieldID == task.FieldID { + dim, err := storage.GetDimFromParams(f.TypeParams) + if err != nil { + at.SetState(indexpb.JobState_JobStateInit, err.Error()) + return + } + req.Dim = int64(dim) + + // Calculate the number of clusters based on total data size. + totalSegmentsRawDataSize := float64(totalSegmentsRows) * float64(dim) * typeutil.VectorTypeSize(task.FieldType) + numClusters := int64(math.Ceil(totalSegmentsRawDataSize / (Params.DataCoordCfg.SegmentMaxSize.GetAsFloat() * 1024 * 1024 * Params.DataCoordCfg.ClusteringCompactionMaxSegmentSizeRatio.GetAsFloat()))) + if numClusters < Params.DataCoordCfg.ClusteringCompactionMinCentroidsNum.GetAsInt64() { + log.Info("data size is too small, skip analyze task", + zap.Float64("raw data size", totalSegmentsRawDataSize), + zap.Int64("num clusters", numClusters), + zap.Int64("minimum num clusters required", Params.DataCoordCfg.ClusteringCompactionMinCentroidsNum.GetAsInt64())) + at.SetState(indexpb.JobState_JobStateFinished, "") + return + } + if numClusters > Params.DataCoordCfg.ClusteringCompactionMaxCentroidsNum.GetAsInt64() { + numClusters = Params.DataCoordCfg.ClusteringCompactionMaxCentroidsNum.GetAsInt64() + } + req.NumClusters = numClusters + break + } + } + } + + req.MaxTrainSizeRatio = Params.DataCoordCfg.ClusteringCompactionMaxTrainSizeRatio.GetAsFloat() + req.MinClusterSizeRatio = Params.DataCoordCfg.ClusteringCompactionMinClusterSizeRatio.GetAsFloat() + req.MaxClusterSizeRatio = Params.DataCoordCfg.ClusteringCompactionMaxClusterSizeRatio.GetAsFloat() + req.MaxClusterSize = Params.DataCoordCfg.ClusteringCompactionMaxClusterSize.GetAsSize() + req.TaskSlot = Params.DataCoordCfg.AnalyzeTaskSlotUsage.GetAsInt64() + WrapPluginContext(task.CollectionID, at.schema.GetProperties(), req) var err error diff --git a/internal/datacoord/task_analyze_test.go b/internal/datacoord/task_analyze_test.go index d7e105acb9c38..d2f77b9d3b73b 100644 --- a/internal/datacoord/task_analyze_test.go +++ b/internal/datacoord/task_analyze_test.go @@ -25,9 +25,12 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/datacoord/session" catalogmocks "github.com/milvus-io/milvus/internal/metastore/mocks" + "github.com/milvus-io/milvus/pkg/v2/common" + "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/proto/indexpb" "github.com/milvus-io/milvus/pkg/v2/proto/workerpb" "github.com/milvus-io/milvus/pkg/v2/taskcommon" @@ -80,9 +83,52 @@ func (s *analyzeTaskSuite) SetupSuite() { } analyzeMt.tasks[s.taskID] = analyzeTask + schema := &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: s.fieldID, + Name: "vector_field", + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "128"}, + }, + }, + }, + } + + collections := typeutil.NewConcurrentMap[int64, *collectionInfo]() + collections.Insert(s.collID, &collectionInfo{Schema: schema}) + + segments := NewSegmentsInfo() + segments.SetSegment(101, &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 101, + CollectionID: s.collID, + PartitionID: s.partID, + State: commonpb.SegmentState_Flushed, + NumOfRows: 1000, + Binlogs: []*datapb.FieldBinlog{ + {FieldID: s.fieldID, Binlogs: []*datapb.Binlog{{LogID: 1001}, {LogID: 1002}}}, + }, + }, + }) + segments.SetSegment(102, &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 102, + CollectionID: s.collID, + PartitionID: s.partID, + State: commonpb.SegmentState_Flushed, + NumOfRows: 2000, + Binlogs: []*datapb.FieldBinlog{ + {FieldID: s.fieldID, Binlogs: []*datapb.Binlog{{LogID: 2001}, {LogID: 2002}}}, + }, + }, + }) + s.mt = &meta{ analyzeMeta: analyzeMt, - collections: typeutil.NewConcurrentMap[int64, *collectionInfo](), + collections: collections, + segments: segments, } } @@ -149,6 +195,177 @@ func (s *analyzeTaskSuite) TestCreateTaskOnWorker() { }) } +func (s *analyzeTaskSuite) newTask() *analyzeTask { + return newAnalyzeTask(&indexpb.AnalyzeTask{ + CollectionID: s.collID, + TaskID: s.taskID, + State: indexpb.JobState_JobStateInit, + }, s.mt) +} + +func (s *analyzeTaskSuite) TestCreateTaskOnWorker_SegmentNil() { + // Replace segment 102 with a dropped segment so it's filtered out by isSegmentHealthy + s.mt.segments.SetSegment(102, &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 102, + State: commonpb.SegmentState_Dropped, + }, + }) + defer func() { + s.mt.segments.SetSegment(102, &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 102, + CollectionID: s.collID, + PartitionID: s.partID, + State: commonpb.SegmentState_Flushed, + NumOfRows: 2000, + Binlogs: []*datapb.FieldBinlog{ + {FieldID: s.fieldID, Binlogs: []*datapb.Binlog{{LogID: 2001}, {LogID: 2002}}}, + }, + }, + }) + }() + + at := s.newTask() + catalog := catalogmocks.NewDataCoordCatalog(s.T()) + catalog.On("SaveAnalyzeTask", mock.Anything, mock.Anything).Return(nil) + s.mt.analyzeMeta.catalog = catalog + + at.CreateTaskOnWorker(1, session.NewMockCluster(s.T())) + s.Equal(indexpb.JobState_JobStateFailed, at.GetState()) + s.Contains(at.GetFailReason(), "102") +} + +func (s *analyzeTaskSuite) TestCreateTaskOnWorker_DimExtractionError() { + // Use a schema with missing dim TypeParams + badSchema := &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: s.fieldID, + Name: "vector_field", + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{}, // no dim + }, + }, + } + origCollections := s.mt.collections + collections := typeutil.NewConcurrentMap[int64, *collectionInfo]() + collections.Insert(s.collID, &collectionInfo{Schema: badSchema}) + s.mt.collections = collections + defer func() { s.mt.collections = origCollections }() + + // Must create task AFTER swapping collections so schema is the bad one + at := s.newTask() + catalog := catalogmocks.NewDataCoordCatalog(s.T()) + catalog.On("SaveAnalyzeTask", mock.Anything, mock.Anything).Return(nil) + s.mt.analyzeMeta.catalog = catalog + + at.CreateTaskOnWorker(1, session.NewMockCluster(s.T())) + // Should reset to Init state on dim error + s.Equal(indexpb.JobState_JobStateInit, at.GetState()) +} + +func (s *analyzeTaskSuite) TestCreateTaskOnWorker_DataTooSmall() { + // Set MinCentroidsNum very high so data is considered too small + origMin := Params.DataCoordCfg.ClusteringCompactionMinCentroidsNum.SwapTempValue("999999999") + defer Params.DataCoordCfg.ClusteringCompactionMinCentroidsNum.SwapTempValue(origMin) + + at := s.newTask() + catalog := catalogmocks.NewDataCoordCatalog(s.T()) + catalog.On("SaveAnalyzeTask", mock.Anything, mock.Anything).Return(nil) + s.mt.analyzeMeta.catalog = catalog + + at.CreateTaskOnWorker(1, session.NewMockCluster(s.T())) + // data too small → skip → mark as finished + s.Equal(indexpb.JobState_JobStateFinished, at.GetState()) +} + +func (s *analyzeTaskSuite) TestCreateTaskOnWorker_NumClustersCapped() { + // Set MaxCentroidsNum=1, MinCentroidsNum=1, SegmentMaxSize very small to force numClusters > max + origMax := Params.DataCoordCfg.ClusteringCompactionMaxCentroidsNum.SwapTempValue("1") + defer Params.DataCoordCfg.ClusteringCompactionMaxCentroidsNum.SwapTempValue(origMax) + origMin := Params.DataCoordCfg.ClusteringCompactionMinCentroidsNum.SwapTempValue("1") + defer Params.DataCoordCfg.ClusteringCompactionMinCentroidsNum.SwapTempValue(origMin) + origSegSize := Params.DataCoordCfg.SegmentMaxSize.SwapTempValue("0.0001") + defer Params.DataCoordCfg.SegmentMaxSize.SwapTempValue(origSegSize) + + at := s.newTask() + catalog := catalogmocks.NewDataCoordCatalog(s.T()) + catalog.On("SaveAnalyzeTask", mock.Anything, mock.Anything).Return(nil) + s.mt.analyzeMeta.catalog = catalog + + cluster := session.NewMockCluster(s.T()) + cluster.EXPECT().CreateAnalyze(mock.Anything, mock.MatchedBy(func(req *workerpb.AnalyzeRequest) bool { + return req.NumClusters == 1 // capped at MaxCentroidsNum=1 + })).Return(nil) + + at.CreateTaskOnWorker(1, cluster) + s.Equal(indexpb.JobState_JobStateInProgress, at.GetState()) +} + +func (s *analyzeTaskSuite) TestCreateTaskOnWorker_CreateAnalyzeError() { + // Ensure numClusters passes the min check + origMin := Params.DataCoordCfg.ClusteringCompactionMinCentroidsNum.SwapTempValue("1") + defer Params.DataCoordCfg.ClusteringCompactionMinCentroidsNum.SwapTempValue(origMin) + + at := s.newTask() + catalog := catalogmocks.NewDataCoordCatalog(s.T()) + catalog.On("SaveAnalyzeTask", mock.Anything, mock.Anything).Return(nil) + s.mt.analyzeMeta.catalog = catalog + + cluster := session.NewMockCluster(s.T()) + cluster.EXPECT().CreateAnalyze(mock.Anything, mock.Anything).Return(fmt.Errorf("node down")) + cluster.EXPECT().DropAnalyze(mock.Anything, mock.Anything).Return(nil) + + at.CreateTaskOnWorker(1, cluster) + // Should NOT be InProgress since CreateAnalyze failed + s.NotEqual(indexpb.JobState_JobStateInProgress, at.GetState()) +} + +func (s *analyzeTaskSuite) TestCreateTaskOnWorker_SegmentStatsPopulated() { + // Ensure numClusters passes the min check + origMin := Params.DataCoordCfg.ClusteringCompactionMinCentroidsNum.SwapTempValue("1") + defer Params.DataCoordCfg.ClusteringCompactionMinCentroidsNum.SwapTempValue(origMin) + + at := s.newTask() + catalog := catalogmocks.NewDataCoordCatalog(s.T()) + catalog.On("SaveAnalyzeTask", mock.Anything, mock.Anything).Return(nil) + s.mt.analyzeMeta.catalog = catalog + + cluster := session.NewMockCluster(s.T()) + cluster.EXPECT().CreateAnalyze(mock.Anything, mock.MatchedBy(func(req *workerpb.AnalyzeRequest) bool { + // Verify SegmentStats are populated correctly + if len(req.SegmentStats) != 2 { + return false + } + stat101 := req.SegmentStats[101] + stat102 := req.SegmentStats[102] + if stat101 == nil || stat102 == nil { + return false + } + // segment 101: 1000 rows, binlogs [1001, 1002] + if stat101.NumRows != 1000 || len(stat101.LogIDs) != 2 { + return false + } + // segment 102: 2000 rows, binlogs [2001, 2002] + if stat102.NumRows != 2000 || len(stat102.LogIDs) != 2 { + return false + } + // Dim should be 128 + if req.Dim != 128 { + return false + } + // Clustering params should be populated + if req.MaxTrainSizeRatio == 0 || req.MaxClusterSize == 0 || req.TaskSlot == 0 { + return false + } + return true + })).Return(nil) + + at.CreateTaskOnWorker(1, cluster) + s.Equal(indexpb.JobState_JobStateInProgress, at.GetState()) +} + func (s *analyzeTaskSuite) TestQueryTaskOnWorker() { at := newAnalyzeTask(&indexpb.AnalyzeTask{ TaskID: s.taskID, diff --git a/pkg/taskcommon/state.go b/pkg/taskcommon/state.go index a9b997b815690..3bdd0582f9344 100644 --- a/pkg/taskcommon/state.go +++ b/pkg/taskcommon/state.go @@ -68,7 +68,7 @@ func FromCompactionState(s datapb.CompactionTaskState) State { switch s { case datapb.CompactionTaskState_pipelining: return Init - case datapb.CompactionTaskState_executing: + case datapb.CompactionTaskState_executing, datapb.CompactionTaskState_analyzing: return InProgress case datapb.CompactionTaskState_completed, datapb.CompactionTaskState_meta_saved, datapb.CompactionTaskState_statistic, datapb.CompactionTaskState_indexing, datapb.CompactionTaskState_cleaned: diff --git a/pkg/taskcommon/state_test.go b/pkg/taskcommon/state_test.go new file mode 100644 index 0000000000000..de216a65c79e8 --- /dev/null +++ b/pkg/taskcommon/state_test.go @@ -0,0 +1,51 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package taskcommon + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/pkg/v2/proto/datapb" +) + +func TestFromCompactionState(t *testing.T) { + tests := []struct { + name string + input datapb.CompactionTaskState + expected State + }{ + {"pipelining maps to Init", datapb.CompactionTaskState_pipelining, Init}, + {"executing maps to InProgress", datapb.CompactionTaskState_executing, InProgress}, + {"analyzing maps to InProgress", datapb.CompactionTaskState_analyzing, InProgress}, + {"completed maps to Finished", datapb.CompactionTaskState_completed, Finished}, + {"meta_saved maps to Finished", datapb.CompactionTaskState_meta_saved, Finished}, + {"statistic maps to Finished", datapb.CompactionTaskState_statistic, Finished}, + {"indexing maps to Finished", datapb.CompactionTaskState_indexing, Finished}, + {"cleaned maps to Finished", datapb.CompactionTaskState_cleaned, Finished}, + {"failed maps to Failed", datapb.CompactionTaskState_failed, Failed}, + {"timeout maps to Retry", datapb.CompactionTaskState_timeout, Retry}, + {"unknown maps to None", datapb.CompactionTaskState_unknown, None}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.expected, FromCompactionState(tt.input)) + }) + } +} diff --git a/tests/integration/compaction/clustering_compaction_vector_test.go b/tests/integration/compaction/clustering_compaction_vector_test.go new file mode 100644 index 0000000000000..d3c286b22dcd2 --- /dev/null +++ b/tests/integration/compaction/clustering_compaction_vector_test.go @@ -0,0 +1,305 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package compaction + +import ( + "context" + "fmt" + "strconv" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/suite" + "go.uber.org/zap" + "google.golang.org/protobuf/proto" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/pkg/v2/common" + "github.com/milvus-io/milvus/pkg/v2/log" + "github.com/milvus-io/milvus/pkg/v2/proto/datapb" + "github.com/milvus-io/milvus/pkg/v2/util/funcutil" + "github.com/milvus-io/milvus/pkg/v2/util/merr" + "github.com/milvus-io/milvus/pkg/v2/util/metric" + "github.com/milvus-io/milvus/pkg/v2/util/paramtable" + "github.com/milvus-io/milvus/tests/integration" +) + +type VectorClusteringCompactionSuite struct { + integration.MiniClusterSuite +} + +func (s *VectorClusteringCompactionSuite) SetupSuite() { + s.WithMilvusConfig(paramtable.Get().PulsarCfg.MaxMessageSize.Key, strconv.Itoa(500*1024)) + s.WithMilvusConfig(paramtable.Get().DataNodeCfg.ClusteringCompactionWorkerPoolSize.Key, strconv.Itoa(8)) + s.WithMilvusConfig(paramtable.Get().DataCoordCfg.EnableAutoCompaction.Key, "false") + s.WithMilvusConfig(paramtable.Get().DataCoordCfg.ClusteringCompactionMaxSegmentSizeRatio.Key, "1.0") + s.WithMilvusConfig(paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.Key, "1.0") + s.WithMilvusConfig(paramtable.Get().DataCoordCfg.TaskCheckInterval.Key, "1") + s.WithMilvusConfig(paramtable.Get().DataCoordCfg.TaskScheduleInterval.Key, "100") + s.WithMilvusConfig(paramtable.Get().CommonCfg.EnableVectorClusteringKey.Key, "true") + s.MiniClusterSuite.SetupSuite() +} + +// TestVectorClusteringCompaction verifies the end-to-end flow of clustering compaction +// when FloatVector is used as the clustering key. This exercises the full pipeline: +// pipelining → analyzing → pipelining (with AnalyzeVersion) → executing → meta_saved → completed +func (s *VectorClusteringCompactionSuite) TestVectorClusteringCompaction() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + c := s.Cluster + + const ( + dim = 128 + dbName = "" + rowNum = 30000 + ) + + collectionName := "TestVectorClusteringCompaction" + funcutil.GenRandomStr() + + schema := ConstructVectorClusteringSchema(collectionName, dim, true) + marshaledSchema, err := proto.Marshal(schema) + s.NoError(err) + + createCollectionStatus, err := c.MilvusClient.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ + DbName: dbName, + CollectionName: collectionName, + Schema: marshaledSchema, + ShardsNum: common.DefaultShardsNum, + }) + s.NoError(err) + s.Equal(commonpb.ErrorCode_Success, createCollectionStatus.GetErrorCode()) + + showCollectionsResp, err := c.MilvusClient.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{ + CollectionNames: []string{collectionName}, + }) + s.NoError(err) + s.Equal(commonpb.ErrorCode_Success, showCollectionsResp.GetStatus().GetErrorCode()) + + // insert — autoID=true so only vector field is needed + fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, rowNum, dim) + hashKeys := integration.GenerateHashKeys(rowNum) + insertResult, err := c.MilvusClient.Insert(ctx, &milvuspb.InsertRequest{ + DbName: dbName, + CollectionName: collectionName, + FieldsData: []*schemapb.FieldData{fVecColumn}, + HashKeys: hashKeys, + NumRows: uint32(rowNum), + }) + s.NoError(err) + s.Equal(commonpb.ErrorCode_Success, insertResult.GetStatus().GetErrorCode()) + + // flush + flushResp, err := c.MilvusClient.Flush(ctx, &milvuspb.FlushRequest{ + DbName: dbName, + CollectionNames: []string{collectionName}, + }) + s.NoError(err) + segmentIDs, has := flushResp.GetCollSegIDs()[collectionName] + ids := segmentIDs.GetData() + s.Require().NotEmpty(segmentIDs) + s.Require().True(has) + flushTs, has := flushResp.GetCollFlushTs()[collectionName] + s.True(has) + + s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName) + + // record segment count before compaction + segmentsBefore, err := c.ShowSegments(collectionName) + s.NoError(err) + s.NotEmpty(segmentsBefore) + segCountBefore := 0 + for _, seg := range segmentsBefore { + if seg.GetState() == commonpb.SegmentState_Flushed { + segCountBefore++ + } + log.Info("segment before compaction", + zap.Int64("id", seg.ID), + zap.String("state", seg.GetState().String()), + zap.String("level", seg.GetLevel().String()), + zap.Int64("numOfRows", seg.GetNumOfRows())) + } + log.Info("segments before compaction", zap.Int("count", segCountBefore)) + + // reduce SegmentMaxSize to force clustering compaction + revertGuard := s.Cluster.MustModifyMilvusConfig(map[string]string{ + paramtable.Get().DataCoordCfg.SegmentMaxSize.Key: "1", + }) + defer revertGuard() + + indexType := integration.IndexFaissIvfFlat + metricType := metric.L2 + vecType := schemapb.DataType_FloatVector + + // create index + createIndexStatus, err := c.MilvusClient.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ + CollectionName: collectionName, + FieldName: fVecColumn.FieldName, + IndexName: "_default", + ExtraParams: integration.ConstructIndexParam(dim, indexType, metricType), + }) + s.NoError(err) + s.Equal(commonpb.ErrorCode_Success, createIndexStatus.GetErrorCode()) + + s.WaitForIndexBuilt(ctx, collectionName, fVecColumn.FieldName) + + // load + loadStatus, err := c.MilvusClient.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ + DbName: dbName, + CollectionName: collectionName, + }) + s.NoError(err) + s.Equal(commonpb.ErrorCode_Success, loadStatus.GetErrorCode()) + s.WaitForLoad(ctx, collectionName) + + // trigger clustering compaction + compactReq := &milvuspb.ManualCompactionRequest{ + CollectionID: showCollectionsResp.CollectionIds[0], + MajorCompaction: true, + } + compactResp, err := c.MilvusClient.ManualCompaction(ctx, compactReq) + s.NoError(err) + s.Greater(compactResp.GetCompactionID(), int64(0), "compaction should be triggered, got zero compactionID") + log.Info("compaction triggered", zap.Int64("compactionID", compactResp.GetCompactionID())) + + // wait for compaction to complete with timeout + compacted := func() bool { + resp, err := c.MilvusClient.GetCompactionState(ctx, &milvuspb.GetCompactionStateRequest{ + CompactionID: compactResp.GetCompactionID(), + }) + if err != nil { + return false + } + if resp.GetState() == commonpb.CompactionState_Completed { + return true + } + log.Info("waiting for compaction", zap.String("state", resp.GetState().String())) + return false + } + for !compacted() { + select { + case <-ctx.Done(): + s.FailNow("compaction did not complete within timeout") + default: + time.Sleep(3 * time.Second) + } + } + log.Info("compaction completed") + + // verify: get segment info after compaction + desCollResp, err := c.MilvusClient.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{ + CollectionName: collectionName, + }) + s.NoError(err) + s.Equal(commonpb.ErrorCode_Success, desCollResp.GetStatus().GetErrorCode()) + + flushedSegmentsResp, err := c.MixCoordClient.GetFlushedSegments(ctx, &datapb.GetFlushedSegmentsRequest{ + CollectionID: desCollResp.GetCollectionID(), + PartitionID: -1, + }) + s.NoError(err) + s.Equal(commonpb.ErrorCode_Success, flushedSegmentsResp.GetStatus().GetErrorCode()) + + segsInfoResp, err := c.MixCoordClient.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{ + SegmentIDs: flushedSegmentsResp.GetSegments(), + }) + s.NoError(err) + s.Equal(commonpb.ErrorCode_Success, segsInfoResp.GetStatus().GetErrorCode()) + + // verify 1: total row count unchanged + totalRows := int64(0) + segCountAfter := 0 + hasPartitionStats := false + for _, segInfo := range segsInfoResp.GetInfos() { + totalRows += segInfo.GetNumOfRows() + if segInfo.GetState() == commonpb.SegmentState_Flushed { + segCountAfter++ + } + if segInfo.GetPartitionStatsVersion() > 0 { + hasPartitionStats = true + } + log.Info("segment after compaction", + zap.Int64("id", segInfo.GetID()), + zap.String("state", segInfo.GetState().String()), + zap.String("level", segInfo.GetLevel().String()), + zap.Int64("numOfRows", segInfo.GetNumOfRows()), + zap.Int64("partitionStatsVersion", segInfo.GetPartitionStatsVersion())) + } + s.Equal(int64(rowNum), totalRows, "total row count should be unchanged after compaction") + log.Info("segments after compaction", zap.Int("count", segCountAfter)) + + // verify 2: segment count changed (compaction merged segments) + s.NotEqual(segCountBefore, segCountAfter, "segment count should change after clustering compaction") + + // verify 3: result segments have PartitionStatsVersion > 0 (proves clustering compaction ran) + s.True(hasPartitionStats, "at least one result segment should have PartitionStatsVersion > 0") + + // verify 4: search still works correctly + expr := fmt.Sprintf("%s > 0", integration.Int64Field) + nq := 10 + topk := 10 + roundDecimal := -1 + + params := integration.GetSearchParams(indexType, metricType) + searchReq := integration.ConstructSearchRequest("", collectionName, expr, + fVecColumn.FieldName, vecType, nil, metricType, params, nq, dim, topk, roundDecimal) + + searchResult, err := c.MilvusClient.Search(ctx, searchReq) + err = merr.CheckRPCCall(searchResult, err) + s.NoError(err) + + // verify 5: query segment info matches total row count + checkWaitGroup := sync.WaitGroup{} + checkWaitGroup.Add(1) + go func() { + defer checkWaitGroup.Done() + timeoutCtx, cancelFunc := context.WithTimeout(ctx, time.Minute*2) + defer cancelFunc() + + for { + select { + case <-timeoutCtx.Done(): + s.Fail("check query segment info timeout") + return + default: + querySegmentInfo, err := c.MilvusClient.GetQuerySegmentInfo(timeoutCtx, &milvuspb.GetQuerySegmentInfoRequest{ + DbName: dbName, + CollectionName: collectionName, + }) + s.NoError(err) + + var queryRows int64 + for _, seg := range querySegmentInfo.Infos { + queryRows += seg.NumRows + } + if queryRows == rowNum { + return + } + } + time.Sleep(time.Second * 3) + } + }() + checkWaitGroup.Wait() + + log.Info("TestVectorClusteringCompaction succeed") +} + +func TestVectorClusteringCompaction(t *testing.T) { + suite.Run(t, new(VectorClusteringCompactionSuite)) +}