Skip to content

Commit 0b55946

Browse files
bigsheeperclaude
andauthored
fix: [cp2.6] apply denylist retry to pack_writer writeLog and binlog import (#48436)
Cherry-pick from master pr: #48402 issue: #48153 ## Summary Cherry-picked from master PR #48402 (merged) - **fix(High)**: `pack_writer.go` `writeLog` now skips retry only for non-retryable errors (permission denied, bucket not found, invalid credentials, etc.), matching the denylist strategy in `retryable_reader.go`. - **fix(Medium)**: Binlog import's `WithDownloader` callbacks now use `multiReadWithRetry`, skipping retry only for non-retryable errors. Previously all transient failures were not retried. - **fix(Low)**: `IsMilvusError` in `merr/utils.go` switched from `errors.Cause` (root only) to `errors.As` (full chain traversal). ## Verification - [x] File count matches original PR (7/8 — .gitignore already in 2.6) - [x] Code changes verified - [x] No conflict markers - [x] Rebased onto latest upstream/2.6 --------- Signed-off-by: Yihao Dai <yihao.dai@zilliz.com> Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 800b382 commit 0b55946

File tree

7 files changed

+172
-14
lines changed

7 files changed

+172
-14
lines changed

internal/flushcommon/syncmgr/pack_writer.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"github.com/milvus-io/milvus/pkg/v2/common"
3535
"github.com/milvus-io/milvus/pkg/v2/log"
3636
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
37+
"github.com/milvus-io/milvus/pkg/v2/util/merr"
3738
"github.com/milvus-io/milvus/pkg/v2/util/metautil"
3839
"github.com/milvus-io/milvus/pkg/v2/util/retry"
3940
)
@@ -103,8 +104,16 @@ func (bw *BulkPackWriter) writeLog(ctx context.Context, blob *storage.Blob,
103104
root, p string, pack *SyncPack,
104105
) (*datapb.Binlog, error) {
105106
key := path.Join(bw.chunkManager.RootPath(), root, p)
106-
err := retry.Do(ctx, func() error {
107-
return bw.chunkManager.Write(ctx, key, blob.Value)
107+
err := retry.Handle(ctx, func() (bool, error) {
108+
err := bw.chunkManager.Write(ctx, key, blob.Value)
109+
if err == nil {
110+
return false, nil
111+
}
112+
err = storage.ToMilvusIoError(key, err)
113+
if merr.IsNonRetryableErr(err) {
114+
return false, err
115+
}
116+
return true, err
108117
}, bw.writeRetryOpts...)
109118
if err != nil {
110119
return nil, err

internal/flushcommon/syncmgr/pack_writer_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,13 @@ import (
2121
"fmt"
2222
"reflect"
2323
"testing"
24+
"time"
2425

26+
"github.com/cockroachdb/errors"
27+
"github.com/minio/minio-go/v7"
28+
"github.com/stretchr/testify/assert"
2529
"github.com/stretchr/testify/mock"
30+
"github.com/stretchr/testify/require"
2631

2732
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
2833
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
@@ -32,7 +37,9 @@ import (
3237
"github.com/milvus-io/milvus/internal/storage"
3338
"github.com/milvus-io/milvus/pkg/v2/common"
3439
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
40+
"github.com/milvus-io/milvus/pkg/v2/util/merr"
3541
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
42+
"github.com/milvus-io/milvus/pkg/v2/util/retry"
3643
)
3744

3845
func TestBulkPackWriter_Write(t *testing.T) {
@@ -161,3 +168,53 @@ func TestBulkPackWriter_Write(t *testing.T) {
161168
})
162169
}
163170
}
171+
172+
func TestBulkPackWriter_WriteLog_NonRetryableError(t *testing.T) {
173+
paramtable.Get().Init(paramtable.NewBaseTable())
174+
175+
mc := metacache.NewMockMetaCache(t)
176+
mc.EXPECT().Collection().Return(int64(1)).Maybe()
177+
178+
cm := mocks.NewChunkManager(t)
179+
cm.EXPECT().RootPath().Return("files").Maybe()
180+
// Return a permission-denied error — should NOT be retried
181+
callCount := 0
182+
cm.EXPECT().Write(mock.Anything, mock.Anything, mock.Anything).
183+
RunAndReturn(func(ctx context.Context, key string, data []byte) error {
184+
callCount++
185+
// Simulate MinIO AccessDenied — maps to ErrIoPermissionDenied via ToMilvusIoError
186+
return minio.ErrorResponse{Code: "AccessDenied"}
187+
})
188+
189+
schema := &schemapb.CollectionSchema{
190+
Fields: []*schemapb.FieldSchema{
191+
{FieldID: common.RowIDField, DataType: schemapb.DataType_Int64},
192+
{FieldID: common.TimeStampField, DataType: schemapb.DataType_Int64},
193+
{FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true},
194+
{
195+
FieldID: 101, Name: "vec", DataType: schemapb.DataType_FloatVector,
196+
TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "4"}},
197+
},
198+
},
199+
}
200+
201+
bw := &BulkPackWriter{
202+
metaCache: mc,
203+
schema: schema,
204+
chunkManager: cm,
205+
allocator: allocator.NewLocalAllocator(10000, 100000),
206+
writeRetryOpts: []retry.Option{retry.AttemptAlways(), retry.MaxSleepTime(10 * time.Second)},
207+
}
208+
209+
// Use a timeout context so the test doesn't hang if retry loop is infinite
210+
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
211+
defer cancel()
212+
213+
blob := &storage.Blob{Value: []byte("data"), RowNum: 1}
214+
_, err := bw.writeLog(ctx, blob, "insert_log", "1/2/3/100/1", nil)
215+
require.Error(t, err)
216+
// Must stop after exactly 1 attempt — not retried
217+
assert.Equal(t, 1, callCount, "non-retryable error should not be retried")
218+
assert.True(t, merr.IsNonRetryableErr(err))
219+
assert.True(t, errors.Is(err, merr.ErrIoPermissionDenied))
220+
}

internal/flushcommon/syncmgr/task_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import (
4141
"github.com/milvus-io/milvus/internal/util/initcore"
4242
"github.com/milvus-io/milvus/pkg/v2/common"
4343
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
44+
"github.com/milvus-io/milvus/pkg/v2/util/merr"
4445
"github.com/milvus-io/milvus/pkg/v2/util/metricsinfo"
4546
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
4647
"github.com/milvus-io/milvus/pkg/v2/util/retry"
@@ -368,7 +369,7 @@ func (s *SyncTaskSuite) TestRunError() {
368369
handler := func(_ error) { flag = true }
369370
s.chunkManager.ExpectedCalls = nil
370371
s.chunkManager.EXPECT().RootPath().Return("files")
371-
s.chunkManager.EXPECT().Write(mock.Anything, mock.Anything, mock.Anything).Return(retry.Unrecoverable(errors.New("mocked")))
372+
s.chunkManager.EXPECT().Write(mock.Anything, mock.Anything, mock.Anything).Return(merr.WrapErrIoPermissionDenied("mocked-key", errors.New("mocked")))
372373
task := s.getSuiteSyncTask(new(SyncPack).WithInsertData([]*storage.InsertData{s.getInsertBuffer()})).
373374
WithFailureCallback(handler).
374375
WithWriteRetryOptions(retry.AttemptAlways(), retry.MaxSleepTime(10*time.Second))

internal/util/importutilv2/binlog/reader.go

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ import (
3232
"github.com/milvus-io/milvus/pkg/v2/log"
3333
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
3434
"github.com/milvus-io/milvus/pkg/v2/util/merr"
35+
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
36+
"github.com/milvus-io/milvus/pkg/v2/util/retry"
3537
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
3638
)
3739

@@ -41,10 +43,13 @@ type reader struct {
4143
schema *schemapb.CollectionSchema
4244
storageVersion int64
4345

44-
fileSize *atomic.Int64
45-
bufferSize int
46-
deleteData map[any]typeutil.Timestamp // pk2ts
47-
insertLogs map[int64][]string // fieldID (or fieldGroupID if storage v2) -> binlogs
46+
fileSize *atomic.Int64
47+
bufferSize int
48+
retryAttempts uint
49+
storageConfig *indexpb.StorageConfig
50+
importEz string
51+
deleteData map[any]typeutil.Timestamp // pk2ts
52+
insertLogs map[int64][]string // fieldID (or fieldGroupID if storage v2) -> binlogs
4853

4954
filters []Filter
5055
dr storage.DeserializeReader[*storage.Value]
@@ -78,6 +83,9 @@ func NewReader(ctx context.Context,
7883
storageVersion: storageVersion,
7984
fileSize: atomic.NewInt64(0),
8085
bufferSize: bufferSize,
86+
storageConfig: storageConfig,
87+
importEz: importEz,
88+
retryAttempts: paramtable.Get().CommonCfg.StorageReadRetryAttempts.GetAsUint(),
8189
}
8290
err := r.init(paths, tsStart, tsEnd, storageConfig, importEz)
8391
if err != nil {
@@ -119,7 +127,7 @@ func (r *reader) init(paths []string, tsStart, tsEnd uint64, storageConfig *inde
119127
storage.WithVersion(r.storageVersion),
120128
storage.WithBufferSize(32 * 1024 * 1024),
121129
storage.WithDownloader(func(ctx context.Context, paths []string) ([][]byte, error) {
122-
return r.cm.MultiRead(ctx, paths)
130+
return r.multiReadWithRetry(ctx, paths)
123131
}),
124132
storage.WithStorageConfig(storageConfig),
125133
}
@@ -206,6 +214,30 @@ func (r *reader) readDelete(deltaLogs []string, tsStart, tsEnd uint64) (map[any]
206214
return deleteData, nil
207215
}
208216

217+
// multiReadWithRetry wraps MultiRead with denylist retry: retries all errors
218+
// except permanent/validation ones (permission denied, bucket not found, etc.),
219+
// matching the strategy used by parquet/json/csv imports via RetryableReader.
220+
func (r *reader) multiReadWithRetry(ctx context.Context, paths []string) ([][]byte, error) {
221+
var result [][]byte
222+
representative := ""
223+
if len(paths) > 0 {
224+
representative = paths[0]
225+
}
226+
err := retry.Handle(ctx, func() (bool, error) {
227+
var e error
228+
result, e = r.cm.MultiRead(ctx, paths)
229+
if e == nil {
230+
return false, nil
231+
}
232+
e = storage.ToMilvusIoError(representative, e)
233+
if merr.IsNonRetryableErr(e) {
234+
return false, e
235+
}
236+
return true, e
237+
}, retry.Attempts(r.retryAttempts))
238+
return result, err
239+
}
240+
209241
func (r *reader) Read() (*storage.InsertData, error) {
210242
insertData, err := storage.NewInsertDataWithFunctionOutputField(r.schema)
211243
if err != nil {

internal/util/importutilv2/binlog/reader_test.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"github.com/milvus-io/milvus/internal/util/testutil"
3737
"github.com/milvus-io/milvus/pkg/v2/common"
3838
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
39+
"github.com/milvus-io/milvus/pkg/v2/util/merr"
3940
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
4041
"github.com/milvus-io/milvus/pkg/v2/util/testutils"
4142
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
@@ -819,3 +820,44 @@ func (suite *ReaderSuite) TestZeroDeltaRead() {
819820
func TestBinlogReader(t *testing.T) {
820821
suite.Run(t, new(ReaderSuite))
821822
}
823+
824+
func TestMultiReadWithRetry_NonRetryableError(t *testing.T) {
825+
paramtable.Init()
826+
ctx := context.Background()
827+
828+
cm := mocks.NewChunkManager(t)
829+
callCount := 0
830+
cm.EXPECT().MultiRead(mock.Anything, mock.Anything).
831+
RunAndReturn(func(ctx context.Context, paths []string) ([][]byte, error) {
832+
callCount++
833+
return nil, merr.WrapErrIoPermissionDenied("test/path", fmt.Errorf("access denied"))
834+
})
835+
836+
r := &reader{ctx: ctx, cm: cm, retryAttempts: 3}
837+
_, err := r.multiReadWithRetry(ctx, []string{"test/path"})
838+
assert.Error(t, err)
839+
assert.True(t, merr.IsNonRetryableErr(err))
840+
assert.Equal(t, 1, callCount, "non-retryable error should not be retried")
841+
}
842+
843+
func TestMultiReadWithRetry_RetryableError(t *testing.T) {
844+
paramtable.Init()
845+
ctx := context.Background()
846+
847+
cm := mocks.NewChunkManager(t)
848+
callCount := 0
849+
cm.EXPECT().MultiRead(mock.Anything, mock.Anything).
850+
RunAndReturn(func(ctx context.Context, paths []string) ([][]byte, error) {
851+
callCount++
852+
if callCount < 3 {
853+
return nil, merr.WrapErrIoFailed("test/path", fmt.Errorf("transient error"))
854+
}
855+
return [][]byte{[]byte("data")}, nil
856+
})
857+
858+
r := &reader{ctx: ctx, cm: cm, retryAttempts: 3}
859+
result, err := r.multiReadWithRetry(ctx, []string{"test/path"})
860+
assert.NoError(t, err)
861+
assert.Equal(t, [][]byte{[]byte("data")}, result)
862+
assert.Equal(t, 3, callCount, "retryable error should be retried until success")
863+
}

pkg/util/merr/utils.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,12 +92,8 @@ func IsNonRetryableErr(err error) bool {
9292
}
9393

9494
func IsMilvusError(err error) bool {
95-
if err == nil {
96-
return false
97-
}
98-
cause := errors.Cause(err)
99-
_, ok := cause.(milvusError)
100-
return ok
95+
var me milvusError
96+
return errors.As(err, &me)
10197
}
10298

10399
func IsCanceledOrTimeout(err error) bool {

pkg/util/merr/utils_test.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,3 +74,24 @@ func TestIsNonRetryableErr_WrappedErrors(t *testing.T) {
7474
wrappedRetryable := fmt.Errorf("network issue: %w", ErrIoUnexpectEOF)
7575
assert.False(t, IsNonRetryableErr(wrappedRetryable))
7676
}
77+
78+
func TestIsMilvusError_WrappedChain(t *testing.T) {
79+
// Direct milvus error
80+
assert.True(t, IsMilvusError(ErrCollectionNotFound))
81+
82+
// Wrapped via errors.Wrap — milvusError is root, both impls handle this
83+
assert.True(t, IsMilvusError(errors.Wrap(ErrCollectionNotFound, "context")))
84+
85+
// Combined: plain error first, milvusError second.
86+
// errors.Cause: multiErrors has no Cause() method, returns multiErrors itself — type
87+
// assertion to milvusError fails. errors.As: multiErrors.Unwrap() returns errs[1] directly
88+
// (ErrCollectionNotFound), so errors.As finds it.
89+
// Known limitation: Combine(milvusErr, plain) is NOT covered — Unwrap exposes errs[1:] only,
90+
// so milvusError at index 0 would not be found by errors.As either.
91+
combined := Combine(errors.New("plain"), ErrCollectionNotFound)
92+
assert.True(t, IsMilvusError(combined), "milvusError at tail of Combine chain should be detected")
93+
94+
// Non-milvus error
95+
assert.False(t, IsMilvusError(errors.New("plain error")))
96+
assert.False(t, IsMilvusError(nil))
97+
}

0 commit comments

Comments
 (0)