Skip to content

Commit 15b7ac9

Browse files
authored
fix: [2.6] remove interactive logic between import and L0 compaction (#48114)
The interactive coordination between import and L0 compaction (pause/resume channels, skip collections, L0 import task waiting) causes BulkInsert to fail with "too many input paths for binlog import" when L0 compaction triggers during import. Remove this coordination entirely so import and L0 compaction operate independently. issue: #47762 pr: #47768 Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
1 parent 2b4988a commit 15b7ac9

10 files changed

+36
-520
lines changed

internal/datacoord/compaction_policy_l0.go

Lines changed: 3 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
"github.com/milvus-io/milvus/internal/datacoord/allocator"
1313
"github.com/milvus-io/milvus/pkg/v2/log"
1414
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
15-
"github.com/milvus-io/milvus/pkg/v2/util/merr"
1615
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
1716
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
1817
)
@@ -23,53 +22,20 @@ type l0CompactionPolicy struct {
2322

2423
activeCollections *activeCollections
2524
allocator allocator.Allocator
26-
27-
// key: collectionID, value: reference count
28-
skipCompactionCollections map[int64]int
29-
skipLocker sync.RWMutex
3025
}
3126

3227
func newL0CompactionPolicy(meta *meta, allocator allocator.Allocator) *l0CompactionPolicy {
3328
return &l0CompactionPolicy{
34-
meta: meta,
35-
activeCollections: newActiveCollections(),
36-
allocator: allocator,
37-
skipCompactionCollections: make(map[int64]int),
29+
meta: meta,
30+
activeCollections: newActiveCollections(),
31+
allocator: allocator,
3832
}
3933
}
4034

4135
func (policy *l0CompactionPolicy) Enable() bool {
4236
return Params.DataCoordCfg.EnableAutoCompaction.GetAsBool()
4337
}
4438

45-
func (policy *l0CompactionPolicy) AddSkipCollection(collectionID UniqueID) {
46-
policy.skipLocker.Lock()
47-
defer policy.skipLocker.Unlock()
48-
49-
if _, ok := policy.skipCompactionCollections[collectionID]; !ok {
50-
policy.skipCompactionCollections[collectionID] = 1
51-
} else {
52-
policy.skipCompactionCollections[collectionID]++
53-
}
54-
}
55-
56-
func (policy *l0CompactionPolicy) RemoveSkipCollection(collectionID UniqueID) {
57-
policy.skipLocker.Lock()
58-
defer policy.skipLocker.Unlock()
59-
refCount := policy.skipCompactionCollections[collectionID]
60-
if refCount > 1 {
61-
policy.skipCompactionCollections[collectionID]--
62-
} else {
63-
delete(policy.skipCompactionCollections, collectionID)
64-
}
65-
}
66-
67-
func (policy *l0CompactionPolicy) isSkipCollection(collectionID UniqueID) bool {
68-
policy.skipLocker.RLock()
69-
defer policy.skipLocker.RUnlock()
70-
return policy.skipCompactionCollections[collectionID] > 0
71-
}
72-
7339
// Notify policy to record the active updated(when adding a new L0 segment) collections.
7440
func (policy *l0CompactionPolicy) OnCollectionUpdate(collectionID int64) {
7541
policy.activeCollections.Record(collectionID)
@@ -94,10 +60,6 @@ func (policy *l0CompactionPolicy) Trigger(ctx context.Context) (events map[Compa
9460
}
9561
events = make(map[CompactionTriggerType][]CompactionView)
9662
for collID, segments := range latestCollSegs {
97-
if policy.isSkipCollection(collID) {
98-
continue
99-
}
100-
10163
policy.activeCollections.Read(collID)
10264
levelZeroSegments := lo.Filter(segments, func(info *SegmentInfo, _ int) bool {
10365
return info.GetLevel() == datapb.SegmentLevel_L0
@@ -126,9 +88,6 @@ func (policy *l0CompactionPolicy) Trigger(ctx context.Context) (events map[Compa
12688
func (policy *l0CompactionPolicy) triggerOneCollection(ctx context.Context, collectionID int64) ([]CompactionView, int64, error) {
12789
log := log.Ctx(ctx).With(zap.Int64("collectionID", collectionID))
12890
log.Info("start trigger collection l0 compaction")
129-
if policy.isSkipCollection(collectionID) {
130-
return nil, 0, merr.WrapErrCollectionNotLoaded(collectionID, "the collection being paused by importing cannot do force l0 compaction")
131-
}
13291
allL0Segments := policy.meta.SelectSegments(ctx, WithCollection(collectionID), SegmentFilterFunc(func(segment *SegmentInfo) bool {
13392
return isSegmentHealthy(segment) &&
13493
isFlushed(segment) &&

internal/datacoord/compaction_policy_l0_test.go

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -117,29 +117,6 @@ func (s *L0CompactionPolicySuite) TestTriggerIdle() {
117117
s.Equal(datapb.SegmentLevel_L0, view.Level)
118118
}
119119

120-
// test for skip collection
121-
s.l0_policy.AddSkipCollection(1)
122-
s.l0_policy.AddSkipCollection(1)
123-
// Test for skip collection
124-
events, err = s.l0_policy.Trigger(context.Background())
125-
s.NoError(err)
126-
s.Empty(events)
127-
128-
// Test for skip collection with ref count
129-
s.l0_policy.RemoveSkipCollection(1)
130-
events, err = s.l0_policy.Trigger(context.Background())
131-
s.NoError(err)
132-
s.Empty(events)
133-
134-
s.l0_policy.RemoveSkipCollection(1)
135-
events, err = s.l0_policy.Trigger(context.Background())
136-
s.NoError(err)
137-
s.Equal(1, len(events))
138-
gotViews, ok = events[TriggerTypeLevelZeroViewIDLE]
139-
s.True(ok)
140-
s.NotNil(gotViews)
141-
s.Equal(1, len(gotViews))
142-
143120
log.Info("cView", zap.String("string", cView.String()))
144121
}
145122

internal/datacoord/compaction_trigger_v2.go

Lines changed: 6 additions & 168 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ import (
3131
"github.com/milvus-io/milvus/internal/util/sessionutil"
3232
"github.com/milvus-io/milvus/pkg/v2/log"
3333
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
34-
"github.com/milvus-io/milvus/pkg/v2/proto/internalpb"
3534
"github.com/milvus-io/milvus/pkg/v2/util/logutil"
3635
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
3736
)
@@ -97,8 +96,6 @@ type TriggerManager interface {
9796
Stop()
9897
OnCollectionUpdate(collectionID int64)
9998
ManualTrigger(ctx context.Context, collectionID int64, clusteringCompaction bool, l0Compaction bool, targetSize int64) (UniqueID, error)
100-
GetPauseCompactionChan(jobID, collectionID int64) <-chan struct{}
101-
GetResumeCompactionChan(jobID, collectionID int64) <-chan struct{}
10299
InitForceMergeMemoryQuerier(nodeManager session.NodeManager, mixCoord types.MixCoord, session sessionutil.SessionInterface)
103100
}
104101

@@ -113,7 +110,6 @@ type CompactionTriggerManager struct {
113110
allocator allocator.Allocator
114111

115112
meta *meta
116-
importMeta ImportMeta
117113
l0Policy *l0CompactionPolicy
118114
clusteringPolicy *clusteringCompactionPolicy
119115
singlePolicy *singleCompactionPolicy
@@ -122,28 +118,15 @@ type CompactionTriggerManager struct {
122118

123119
cancel context.CancelFunc
124120
closeWg sync.WaitGroup
125-
126-
l0Triggering bool
127-
l0SigLock *sync.Mutex
128-
l0TickSig *sync.Cond
129-
130-
pauseCompactionChanMap map[int64]chan struct{}
131-
resumeCompactionChanMap map[int64]chan struct{}
132-
compactionChanLock sync.Mutex
133121
}
134122

135-
func NewCompactionTriggerManager(alloc allocator.Allocator, handler Handler, inspector CompactionInspector, meta *meta, importMeta ImportMeta) *CompactionTriggerManager {
123+
func NewCompactionTriggerManager(alloc allocator.Allocator, handler Handler, inspector CompactionInspector, meta *meta) *CompactionTriggerManager {
136124
m := &CompactionTriggerManager{
137-
allocator: alloc,
138-
handler: handler,
139-
inspector: inspector,
140-
meta: meta,
141-
importMeta: importMeta,
142-
pauseCompactionChanMap: make(map[int64]chan struct{}),
143-
resumeCompactionChanMap: make(map[int64]chan struct{}),
144-
}
145-
m.l0SigLock = &sync.Mutex{}
146-
m.l0TickSig = sync.NewCond(m.l0SigLock)
125+
allocator: alloc,
126+
handler: handler,
127+
inspector: inspector,
128+
meta: meta,
129+
}
147130
m.l0Policy = newL0CompactionPolicy(meta, alloc)
148131
m.clusteringPolicy = newClusteringCompactionPolicy(meta, m.allocator, m.handler)
149132
m.singlePolicy = newSingleCompactionPolicy(meta, m.allocator, m.handler)
@@ -183,64 +166,6 @@ func (m *CompactionTriggerManager) Stop() {
183166
m.closeWg.Wait()
184167
}
185168

186-
func (m *CompactionTriggerManager) pauseL0SegmentCompacting(jobID, collectionID int64) {
187-
m.l0Policy.AddSkipCollection(collectionID)
188-
m.l0SigLock.Lock()
189-
for m.l0Triggering {
190-
m.l0TickSig.Wait()
191-
}
192-
m.l0SigLock.Unlock()
193-
m.compactionChanLock.Lock()
194-
if ch, ok := m.pauseCompactionChanMap[jobID]; ok {
195-
close(ch)
196-
}
197-
m.compactionChanLock.Unlock()
198-
}
199-
200-
func (m *CompactionTriggerManager) resumeL0SegmentCompacting(jobID, collectionID int64) {
201-
m.compactionChanLock.Lock()
202-
m.l0Policy.RemoveSkipCollection(collectionID)
203-
if ch, ok := m.resumeCompactionChanMap[jobID]; ok {
204-
close(ch)
205-
delete(m.pauseCompactionChanMap, jobID)
206-
delete(m.resumeCompactionChanMap, jobID)
207-
}
208-
m.compactionChanLock.Unlock()
209-
}
210-
211-
func (m *CompactionTriggerManager) GetPauseCompactionChan(jobID, collectionID int64) <-chan struct{} {
212-
m.compactionChanLock.Lock()
213-
defer m.compactionChanLock.Unlock()
214-
if ch, ok := m.pauseCompactionChanMap[jobID]; ok {
215-
return ch
216-
}
217-
ch := make(chan struct{})
218-
m.pauseCompactionChanMap[jobID] = ch
219-
go m.pauseL0SegmentCompacting(jobID, collectionID)
220-
return ch
221-
}
222-
223-
func (m *CompactionTriggerManager) GetResumeCompactionChan(jobID, collectionID int64) <-chan struct{} {
224-
m.compactionChanLock.Lock()
225-
defer m.compactionChanLock.Unlock()
226-
if ch, ok := m.resumeCompactionChanMap[jobID]; ok {
227-
return ch
228-
}
229-
ch := make(chan struct{})
230-
m.resumeCompactionChanMap[jobID] = ch
231-
go m.resumeL0SegmentCompacting(jobID, collectionID)
232-
return ch
233-
}
234-
235-
func (m *CompactionTriggerManager) setL0Triggering(b bool) {
236-
m.l0SigLock.Lock()
237-
defer m.l0SigLock.Unlock()
238-
m.l0Triggering = b
239-
if !b {
240-
m.l0TickSig.Broadcast()
241-
}
242-
}
243-
244169
func (m *CompactionTriggerManager) loop(ctx context.Context) {
245170
defer logutil.LogPanic()
246171

@@ -267,19 +192,16 @@ func (m *CompactionTriggerManager) loop(ctx context.Context) {
267192
log.RatedInfo(10, "Skip trigger l0 compaction since inspector is full")
268193
continue
269194
}
270-
m.setL0Triggering(true)
271195
events, err := m.l0Policy.Trigger(ctx)
272196
if err != nil {
273197
log.Warn("Fail to trigger L0 policy", zap.Error(err))
274-
m.setL0Triggering(false)
275198
continue
276199
}
277200
if len(events) > 0 {
278201
for triggerType, views := range events {
279202
m.notify(ctx, triggerType, views)
280203
}
281204
}
282-
m.setL0Triggering(false)
283205
case <-clusteringTicker.C:
284206
if !m.clusteringPolicy.Enable() {
285207
continue
@@ -362,8 +284,6 @@ func (m *CompactionTriggerManager) ManualTrigger(ctx context.Context, collection
362284
views, triggerID, err = m.forceMergePolicy.triggerOneCollection(ctx, collectionID, targetSize)
363285
events[TriggerTypeForceMerge] = views
364286
} else if isL0 {
365-
m.setL0Triggering(true)
366-
defer m.setL0Triggering(false)
367287
views, triggerID, err = m.l0Policy.triggerOneCollection(ctx, collectionID)
368288
events[TriggerTypeLevelZeroViewManual] = views
369289
} else if isClustering {
@@ -444,12 +364,6 @@ func (m *CompactionTriggerManager) SubmitL0ViewToScheduler(ctx context.Context,
444364
return
445365
}
446366

447-
err = m.addL0ImportTaskForImport(ctx, collection, view)
448-
if err != nil {
449-
log.Warn("Failed to submit compaction view to scheduler because add l0 import task fail", zap.Error(err))
450-
return
451-
}
452-
453367
totalRows := lo.SumBy(view.GetSegmentsView(), func(segView *SegmentView) int64 {
454368
return segView.NumOfRows
455369
})
@@ -486,82 +400,6 @@ func (m *CompactionTriggerManager) SubmitL0ViewToScheduler(ctx context.Context,
486400
)
487401
}
488402

489-
func (m *CompactionTriggerManager) addL0ImportTaskForImport(ctx context.Context, collection *collectionInfo, view CompactionView) error {
490-
// add l0 import task for the collection if the collection is importing
491-
importJobs := m.importMeta.GetJobBy(ctx,
492-
WithCollectionID(collection.ID),
493-
WithoutJobStates(internalpb.ImportJobState_Completed, internalpb.ImportJobState_Failed),
494-
WithoutL0Job(),
495-
)
496-
if len(importJobs) > 0 {
497-
partitionID := view.GetGroupLabel().PartitionID
498-
var (
499-
fileSize int64 = 0
500-
totalRows int64 = 0
501-
totalMemorySize int64 = 0
502-
importPaths []string
503-
)
504-
idStart := time.Now().UnixMilli()
505-
for _, segmentView := range view.GetSegmentsView() {
506-
segInfo := m.meta.GetSegment(ctx, segmentView.ID)
507-
if segInfo == nil {
508-
continue
509-
}
510-
totalRows += int64(segmentView.DeltaRowCount)
511-
totalMemorySize += int64(segmentView.DeltaSize)
512-
for _, deltaLogs := range segInfo.GetDeltalogs() {
513-
for _, binlog := range deltaLogs.GetBinlogs() {
514-
fileSize += binlog.GetLogSize()
515-
importPaths = append(importPaths, binlog.GetLogPath())
516-
}
517-
}
518-
}
519-
if len(importPaths) == 0 {
520-
return nil
521-
}
522-
523-
for i, job := range importJobs {
524-
newTasks, err := NewImportTasks([][]*datapb.ImportFileStats{
525-
{
526-
{
527-
ImportFile: &internalpb.ImportFile{
528-
Id: idStart + int64(i),
529-
Paths: importPaths,
530-
},
531-
FileSize: fileSize,
532-
TotalRows: totalRows,
533-
TotalMemorySize: totalMemorySize,
534-
HashedStats: map[string]*datapb.PartitionImportStats{
535-
// which is vchannel
536-
view.GetGroupLabel().Channel: {
537-
PartitionRows: map[int64]int64{
538-
partitionID: totalRows,
539-
},
540-
PartitionDataSize: map[int64]int64{
541-
partitionID: totalMemorySize,
542-
},
543-
},
544-
},
545-
},
546-
},
547-
}, job, m.allocator, m.meta, m.importMeta, paramtable.Get().DataNodeCfg.FlushDeleteBufferBytes.GetAsInt())
548-
if err != nil {
549-
log.Warn("new import tasks failed", zap.Error(err))
550-
return err
551-
}
552-
for _, t := range newTasks {
553-
err = m.importMeta.AddTask(ctx, t)
554-
if err != nil {
555-
log.Warn("add new l0 import task from l0 compaction failed", WrapTaskLog(t, zap.Error(err))...)
556-
return err
557-
}
558-
log.Info("add new l0 import task from l0 compaction", WrapTaskLog(t)...)
559-
}
560-
}
561-
}
562-
return nil
563-
}
564-
565403
func (m *CompactionTriggerManager) SubmitClusteringViewToScheduler(ctx context.Context, view CompactionView) {
566404
log := log.Ctx(ctx).With(zap.String("view", view.String()))
567405
taskID, _, err := m.allocator.AllocN(2)

0 commit comments

Comments
 (0)