Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions internal/flushcommon/syncmgr/pack_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/metautil"
"github.com/milvus-io/milvus/pkg/v2/util/retry"
)
Expand Down Expand Up @@ -103,8 +104,16 @@ func (bw *BulkPackWriter) writeLog(ctx context.Context, blob *storage.Blob,
root, p string, pack *SyncPack,
) (*datapb.Binlog, error) {
key := path.Join(bw.chunkManager.RootPath(), root, p)
err := retry.Do(ctx, func() error {
return bw.chunkManager.Write(ctx, key, blob.Value)
err := retry.Handle(ctx, func() (bool, error) {
err := bw.chunkManager.Write(ctx, key, blob.Value)
if err == nil {
return false, nil
}
err = storage.ToMilvusIoError(key, err)
if merr.IsNonRetryableErr(err) {
return false, err
}
return true, err
}, bw.writeRetryOpts...)
if err != nil {
return nil, err
Expand Down
57 changes: 57 additions & 0 deletions internal/flushcommon/syncmgr/pack_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,13 @@ import (
"fmt"
"reflect"
"testing"
"time"

"github.com/cockroachdb/errors"
"github.com/minio/minio-go/v7"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
Expand All @@ -32,7 +37,9 @@ import (
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/retry"
)

func TestBulkPackWriter_Write(t *testing.T) {
Expand Down Expand Up @@ -161,3 +168,53 @@ func TestBulkPackWriter_Write(t *testing.T) {
})
}
}

func TestBulkPackWriter_WriteLog_NonRetryableError(t *testing.T) {
paramtable.Get().Init(paramtable.NewBaseTable())

mc := metacache.NewMockMetaCache(t)
mc.EXPECT().Collection().Return(int64(1)).Maybe()

cm := mocks.NewChunkManager(t)
cm.EXPECT().RootPath().Return("files").Maybe()
// Return a permission-denied error — should NOT be retried
callCount := 0
cm.EXPECT().Write(mock.Anything, mock.Anything, mock.Anything).
RunAndReturn(func(ctx context.Context, key string, data []byte) error {
callCount++
// Simulate MinIO AccessDenied — maps to ErrIoPermissionDenied via ToMilvusIoError
return minio.ErrorResponse{Code: "AccessDenied"}
})

schema := &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{FieldID: common.RowIDField, DataType: schemapb.DataType_Int64},
{FieldID: common.TimeStampField, DataType: schemapb.DataType_Int64},
{FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true},
{
FieldID: 101, Name: "vec", DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "4"}},
},
},
}

bw := &BulkPackWriter{
metaCache: mc,
schema: schema,
chunkManager: cm,
allocator: allocator.NewLocalAllocator(10000, 100000),
writeRetryOpts: []retry.Option{retry.AttemptAlways(), retry.MaxSleepTime(10 * time.Second)},
}

// Use a timeout context so the test doesn't hang if retry loop is infinite
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

blob := &storage.Blob{Value: []byte("data"), RowNum: 1}
_, err := bw.writeLog(ctx, blob, "insert_log", "1/2/3/100/1", nil)
require.Error(t, err)
// Must stop after exactly 1 attempt — not retried
assert.Equal(t, 1, callCount, "non-retryable error should not be retried")
assert.True(t, merr.IsNonRetryableErr(err))
assert.True(t, errors.Is(err, merr.ErrIoPermissionDenied))
}
3 changes: 2 additions & 1 deletion internal/flushcommon/syncmgr/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/milvus-io/milvus/internal/util/initcore"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/retry"
Expand Down Expand Up @@ -368,7 +369,7 @@ func (s *SyncTaskSuite) TestRunError() {
handler := func(_ error) { flag = true }
s.chunkManager.ExpectedCalls = nil
s.chunkManager.EXPECT().RootPath().Return("files")
s.chunkManager.EXPECT().Write(mock.Anything, mock.Anything, mock.Anything).Return(retry.Unrecoverable(errors.New("mocked")))
s.chunkManager.EXPECT().Write(mock.Anything, mock.Anything, mock.Anything).Return(merr.WrapErrIoPermissionDenied("mocked-key", errors.New("mocked")))
task := s.getSuiteSyncTask(new(SyncPack).WithInsertData([]*storage.InsertData{s.getInsertBuffer()})).
WithFailureCallback(handler).
WithWriteRetryOptions(retry.AttemptAlways(), retry.MaxSleepTime(10*time.Second))
Expand Down
42 changes: 37 additions & 5 deletions internal/util/importutilv2/binlog/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/retry"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)

Expand All @@ -41,10 +43,13 @@ type reader struct {
schema *schemapb.CollectionSchema
storageVersion int64

fileSize *atomic.Int64
bufferSize int
deleteData map[any]typeutil.Timestamp // pk2ts
insertLogs map[int64][]string // fieldID (or fieldGroupID if storage v2) -> binlogs
fileSize *atomic.Int64
bufferSize int
retryAttempts uint
storageConfig *indexpb.StorageConfig
importEz string
deleteData map[any]typeutil.Timestamp // pk2ts
insertLogs map[int64][]string // fieldID (or fieldGroupID if storage v2) -> binlogs

filters []Filter
dr storage.DeserializeReader[*storage.Value]
Expand Down Expand Up @@ -78,6 +83,9 @@ func NewReader(ctx context.Context,
storageVersion: storageVersion,
fileSize: atomic.NewInt64(0),
bufferSize: bufferSize,
storageConfig: storageConfig,
importEz: importEz,
retryAttempts: paramtable.Get().CommonCfg.StorageReadRetryAttempts.GetAsUint(),
}
err := r.init(paths, tsStart, tsEnd, storageConfig, importEz)
if err != nil {
Expand Down Expand Up @@ -119,7 +127,7 @@ func (r *reader) init(paths []string, tsStart, tsEnd uint64, storageConfig *inde
storage.WithVersion(r.storageVersion),
storage.WithBufferSize(32 * 1024 * 1024),
storage.WithDownloader(func(ctx context.Context, paths []string) ([][]byte, error) {
return r.cm.MultiRead(ctx, paths)
return r.multiReadWithRetry(ctx, paths)
}),
storage.WithStorageConfig(storageConfig),
}
Expand Down Expand Up @@ -206,6 +214,30 @@ func (r *reader) readDelete(deltaLogs []string, tsStart, tsEnd uint64) (map[any]
return deleteData, nil
}

// multiReadWithRetry wraps MultiRead with denylist retry: retries all errors
// except permanent/validation ones (permission denied, bucket not found, etc.),
// matching the strategy used by parquet/json/csv imports via RetryableReader.
func (r *reader) multiReadWithRetry(ctx context.Context, paths []string) ([][]byte, error) {
var result [][]byte
representative := ""
if len(paths) > 0 {
representative = paths[0]
}
err := retry.Handle(ctx, func() (bool, error) {
var e error
result, e = r.cm.MultiRead(ctx, paths)
if e == nil {
return false, nil
}
e = storage.ToMilvusIoError(representative, e)
if merr.IsNonRetryableErr(e) {
return false, e
}
return true, e
}, retry.Attempts(r.retryAttempts))
return result, err
}

func (r *reader) Read() (*storage.InsertData, error) {
insertData, err := storage.NewInsertDataWithFunctionOutputField(r.schema)
if err != nil {
Expand Down
42 changes: 42 additions & 0 deletions internal/util/importutilv2/binlog/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/milvus-io/milvus/internal/util/testutil"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/testutils"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
Expand Down Expand Up @@ -819,3 +820,44 @@ func (suite *ReaderSuite) TestZeroDeltaRead() {
func TestBinlogReader(t *testing.T) {
suite.Run(t, new(ReaderSuite))
}

func TestMultiReadWithRetry_NonRetryableError(t *testing.T) {
paramtable.Init()
ctx := context.Background()

cm := mocks.NewChunkManager(t)
callCount := 0
cm.EXPECT().MultiRead(mock.Anything, mock.Anything).
RunAndReturn(func(ctx context.Context, paths []string) ([][]byte, error) {
callCount++
return nil, merr.WrapErrIoPermissionDenied("test/path", fmt.Errorf("access denied"))
})

r := &reader{ctx: ctx, cm: cm, retryAttempts: 3}
_, err := r.multiReadWithRetry(ctx, []string{"test/path"})
assert.Error(t, err)
assert.True(t, merr.IsNonRetryableErr(err))
assert.Equal(t, 1, callCount, "non-retryable error should not be retried")
}

func TestMultiReadWithRetry_RetryableError(t *testing.T) {
paramtable.Init()
ctx := context.Background()

cm := mocks.NewChunkManager(t)
callCount := 0
cm.EXPECT().MultiRead(mock.Anything, mock.Anything).
RunAndReturn(func(ctx context.Context, paths []string) ([][]byte, error) {
callCount++
if callCount < 3 {
return nil, merr.WrapErrIoFailed("test/path", fmt.Errorf("transient error"))
}
return [][]byte{[]byte("data")}, nil
})

r := &reader{ctx: ctx, cm: cm, retryAttempts: 3}
result, err := r.multiReadWithRetry(ctx, []string{"test/path"})
assert.NoError(t, err)
assert.Equal(t, [][]byte{[]byte("data")}, result)
assert.Equal(t, 3, callCount, "retryable error should be retried until success")
}
8 changes: 2 additions & 6 deletions pkg/util/merr/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,8 @@ func IsNonRetryableErr(err error) bool {
}

func IsMilvusError(err error) bool {
if err == nil {
return false
}
cause := errors.Cause(err)
_, ok := cause.(milvusError)
return ok
var me milvusError
return errors.As(err, &me)
}

func IsCanceledOrTimeout(err error) bool {
Expand Down
21 changes: 21 additions & 0 deletions pkg/util/merr/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,24 @@ func TestIsNonRetryableErr_WrappedErrors(t *testing.T) {
wrappedRetryable := fmt.Errorf("network issue: %w", ErrIoUnexpectEOF)
assert.False(t, IsNonRetryableErr(wrappedRetryable))
}

func TestIsMilvusError_WrappedChain(t *testing.T) {
// Direct milvus error
assert.True(t, IsMilvusError(ErrCollectionNotFound))

// Wrapped via errors.Wrap — milvusError is root, both impls handle this
assert.True(t, IsMilvusError(errors.Wrap(ErrCollectionNotFound, "context")))

// Combined: plain error first, milvusError second.
// errors.Cause: multiErrors has no Cause() method, returns multiErrors itself — type
// assertion to milvusError fails. errors.As: multiErrors.Unwrap() returns errs[1] directly
// (ErrCollectionNotFound), so errors.As finds it.
// Known limitation: Combine(milvusErr, plain) is NOT covered — Unwrap exposes errs[1:] only,
// so milvusError at index 0 would not be found by errors.As either.
combined := Combine(errors.New("plain"), ErrCollectionNotFound)
assert.True(t, IsMilvusError(combined), "milvusError at tail of Combine chain should be detected")

// Non-milvus error
assert.False(t, IsMilvusError(errors.New("plain error")))
assert.False(t, IsMilvusError(nil))
}
Loading