Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions states/etcd/common/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,14 @@ func UpdateSegments(ctx context.Context, cli kv.MetaKV, basePath string, collect
return nil
}

func UpdateSegment(ctx context.Context, cli kv.MetaKV, segment *models.Segment) error {
bs, err := proto.Marshal(segment.SegmentInfo)
if err != nil {
return err
}
return cli.Save(ctx, segment.GetKey(), string(bs))
}

// WalkAllSegments walk all segment info from etcd with func
func WalkAllSegments(ctx context.Context, cli kv.MetaKV, basePath string, filter func(*datapb.SegmentInfo) bool, op func(*datapb.SegmentInfo) error, limit int64) error {
cnt := int64(0)
Expand Down
56 changes: 56 additions & 0 deletions states/etcd/repair/zero_part_l0.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package repair

import (
"context"
"fmt"

"github.com/milvus-io/birdwatcher/framework"
"github.com/milvus-io/birdwatcher/models"
"github.com/milvus-io/birdwatcher/states/etcd/common"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
)

type ZeroParitiionL0Param struct {
framework.ParamBase `use:"repair zero-partition-l0" desc:"Set partitionID to non-zero value for L0 segments with partitionID 0"`

Collection int64 `name:"collection" default:"0" desc:"collection id to filter with"`
Segment int64 `name:"segment" default:"0" desc:"segment id to filter with"`
TargetPartitionID int64 `name:"targetPartitionID" default:"-1" desc:"the partition id to set for the segment"`
Run bool `name:"run" default:"false" desc:"actual do repair"`
}

func (c *ComponentRepair) ZeroPartitionL0SegmentCommand(ctx context.Context, p *ZeroParitiionL0Param) error {
segments, err := common.ListSegments(ctx, c.client, c.basePath, func(info *models.Segment) bool {
return info.GetLevel() == datapb.SegmentLevel_L0 &&
(p.Collection == 0 || info.GetCollectionID() == p.Collection) &&
(p.Segment == 0 || info.GetID() == p.Segment) &&
info.GetPartitionID() == 0
})
if err != nil {
fmt.Println("failed to list segments", err.Error())
return nil
}

fmt.Printf("%d segment found\n", len(segments))
for _, info := range segments {
fmt.Printf("suspect segment %d found:\n", info.GetID())
fmt.Printf("SegmentID: %d CollectionID: %d PartitionID: %d, State: %s, Row Count:%d\n",
info.ID, info.CollectionID, info.PartitionID, info.State.String(), info.NumOfRows)
}

if !p.Run {
return nil
}

for _, info := range segments {
info.PartitionID = p.TargetPartitionID
err := common.UpdateSegment(ctx, c.client, info)
if err != nil {
fmt.Printf("update segment %d partitionID failed: %s\n", info.GetID(), err.Error())
continue
}
fmt.Printf("update segment %d partitionID to %d succeed\n", info.GetID(), p.TargetPartitionID)
}
Comment on lines +45 to +53
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The current repair logic is critically flawed and will likely lead to metadata corruption. The etcd key for a segment and its associated metadata (binlogs, deltalogs, statslogs) is dependent on the partitionID. This implementation changes the partitionID within the SegmentInfo protobuf message but then writes it back to the old etcd key (which is based on partitionID=0). This creates a severe inconsistency between the key and the data.

A correct implementation must perform a "move" operation in etcd:

  1. Read all metadata for the segment from keys with partitionID=0.
  2. Delete all of this old metadata.
  3. Update the partitionID in the SegmentInfo and any other affected data structures.
  4. Write the updated metadata to new etcd keys corresponding to the new partitionID.

Without this, the system's metadata will be in a corrupt state.


return nil
}