Skip to content

Commit b143833

Browse files
committed
test: add unit tests for salvage checkpoint in data salvage feature
Add unit test coverage for salvage checkpoint functionality introduced in PR #47599: - kv_catalog_test.go: TestCatalogSalvageCheckpoint (save/get success, errors, multiple clusters) and extend TestBuildPrefixAndKey with salvage key builders - replicate_service_test.go: TestReplicateServiceGetSalvageCheckpoint (success, error, closed_lifetime) - manager_test.go: TestSalvageCheckpointLoadedFromEtcd and TestSalvageCheckpointMultipleForcePromotes - salvage_checkpoint_test.go (new): TestUpdateCheckpointForcePromote and TestConsumeDirtySnapshotWithSalvageCheckpoint covering the force-promote path in recovery_storage_impl Signed-off-by: Yihao Dai <yihao.dai@zilliz.com>
1 parent 3851afb commit b143833

File tree

4 files changed

+446
-0
lines changed

4 files changed

+446
-0
lines changed

internal/distributed/streaming/replicate_service_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -876,6 +876,74 @@ func TestReplicateServiceGetCheckpoint(t *testing.T) {
876876
})
877877
}
878878

879+
func TestReplicateServiceGetSalvageCheckpoint(t *testing.T) {
880+
t.Run("success", func(t *testing.T) {
881+
c := mock_client.NewMockClient(t)
882+
h := mock_handler.NewMockHandlerClient(t)
883+
884+
expected := []*wal.ReplicateCheckpoint{
885+
{ClusterID: "primary", PChannel: "primary-rootcoord-dml_0", TimeTick: 500},
886+
}
887+
h.EXPECT().GetSalvageCheckpoint(mock.Anything, "test-channel").Return(expected, nil)
888+
889+
rs := &replicateService{
890+
walAccesserImpl: &walAccesserImpl{
891+
lifetime: typeutil.NewLifetime(),
892+
clusterID: "by-dev",
893+
streamingCoordClient: c,
894+
handlerClient: h,
895+
producers: make(map[string]*producer.ResumableProducer),
896+
},
897+
}
898+
899+
checkpoints, err := rs.GetSalvageCheckpoint(context.Background(), "test-channel")
900+
assert.NoError(t, err)
901+
assert.Equal(t, expected, checkpoints)
902+
})
903+
904+
t.Run("error", func(t *testing.T) {
905+
c := mock_client.NewMockClient(t)
906+
h := mock_handler.NewMockHandlerClient(t)
907+
908+
h.EXPECT().GetSalvageCheckpoint(mock.Anything, "bad-channel").Return(nil, errors.New("not found"))
909+
910+
rs := &replicateService{
911+
walAccesserImpl: &walAccesserImpl{
912+
lifetime: typeutil.NewLifetime(),
913+
clusterID: "by-dev",
914+
streamingCoordClient: c,
915+
handlerClient: h,
916+
producers: make(map[string]*producer.ResumableProducer),
917+
},
918+
}
919+
920+
checkpoints, err := rs.GetSalvageCheckpoint(context.Background(), "bad-channel")
921+
assert.Error(t, err)
922+
assert.Nil(t, checkpoints)
923+
})
924+
925+
t.Run("closed_lifetime", func(t *testing.T) {
926+
c := mock_client.NewMockClient(t)
927+
h := mock_handler.NewMockHandlerClient(t)
928+
929+
rs := &replicateService{
930+
walAccesserImpl: &walAccesserImpl{
931+
lifetime: typeutil.NewLifetime(),
932+
clusterID: "by-dev",
933+
streamingCoordClient: c,
934+
handlerClient: h,
935+
producers: make(map[string]*producer.ResumableProducer),
936+
},
937+
}
938+
rs.lifetime.SetState(typeutil.LifetimeStateStopped)
939+
rs.lifetime.Wait()
940+
941+
checkpoints, err := rs.GetSalvageCheckpoint(context.Background(), "test-channel")
942+
assert.ErrorIs(t, err, ErrWALAccesserClosed)
943+
assert.Nil(t, checkpoints)
944+
})
945+
}
946+
879947
func TestReplicateServiceAppendClosed(t *testing.T) {
880948
c := mock_client.NewMockClient(t)
881949
h := mock_handler.NewMockHandlerClient(t)

internal/metastore/kv/streamingnode/kv_catalog_test.go

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/stretchr/testify/mock"
1111
"google.golang.org/protobuf/proto"
1212

13+
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
1314
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
1415
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
1516
"github.com/milvus-io/milvus/internal/kv/mocks"
@@ -197,6 +198,99 @@ func TestCatalogVChannel(t *testing.T) {
197198
}
198199
}
199200

201+
func TestCatalogSalvageCheckpoint(t *testing.T) {
202+
ctx := context.Background()
203+
204+
t.Run("save_and_get_success", func(t *testing.T) {
205+
kv := mocks.NewMetaKv(t)
206+
catalog := NewCataLog(kv)
207+
208+
cp := &commonpb.ReplicateCheckpoint{
209+
ClusterId: "source-cluster",
210+
Pchannel: "source-cluster-rootcoord-dml_0",
211+
}
212+
cpBytes, err := proto.Marshal(cp)
213+
assert.NoError(t, err)
214+
215+
kv.EXPECT().Save(mock.Anything, mock.Anything, string(cpBytes)).Return(nil)
216+
err = catalog.SaveSalvageCheckpoint(ctx, "p1", cp)
217+
assert.NoError(t, err)
218+
219+
kv.EXPECT().LoadWithPrefix(mock.Anything, mock.Anything).Return(
220+
[]string{"streamingnode-meta/wal/p1/salvage-checkpoint/source-cluster"},
221+
[]string{string(cpBytes)},
222+
nil,
223+
)
224+
checkpoints, err := catalog.GetSalvageCheckpoint(ctx, "p1")
225+
assert.NoError(t, err)
226+
assert.Len(t, checkpoints, 1)
227+
assert.Equal(t, "source-cluster", checkpoints[0].ClusterId)
228+
assert.Equal(t, "source-cluster-rootcoord-dml_0", checkpoints[0].Pchannel)
229+
})
230+
231+
t.Run("save_error", func(t *testing.T) {
232+
kv := mocks.NewMetaKv(t)
233+
catalog := NewCataLog(kv)
234+
235+
kv.EXPECT().Save(mock.Anything, mock.Anything, mock.Anything).Return(errors.New("etcd error"))
236+
err := catalog.SaveSalvageCheckpoint(ctx, "p1", &commonpb.ReplicateCheckpoint{ClusterId: "c1"})
237+
assert.Error(t, err)
238+
})
239+
240+
t.Run("get_load_error", func(t *testing.T) {
241+
kv := mocks.NewMetaKv(t)
242+
catalog := NewCataLog(kv)
243+
244+
kv.EXPECT().LoadWithPrefix(mock.Anything, mock.Anything).Return(nil, nil, errors.New("etcd error"))
245+
checkpoints, err := catalog.GetSalvageCheckpoint(ctx, "p1")
246+
assert.Error(t, err)
247+
assert.Nil(t, checkpoints)
248+
})
249+
250+
t.Run("get_unmarshal_error", func(t *testing.T) {
251+
kv := mocks.NewMetaKv(t)
252+
catalog := NewCataLog(kv)
253+
254+
kv.EXPECT().LoadWithPrefix(mock.Anything, mock.Anything).Return(
255+
[]string{"key"},
256+
[]string{"invalid-proto-bytes"},
257+
nil,
258+
)
259+
checkpoints, err := catalog.GetSalvageCheckpoint(ctx, "p1")
260+
assert.Error(t, err)
261+
assert.Nil(t, checkpoints)
262+
})
263+
264+
t.Run("get_empty", func(t *testing.T) {
265+
kv := mocks.NewMetaKv(t)
266+
catalog := NewCataLog(kv)
267+
268+
kv.EXPECT().LoadWithPrefix(mock.Anything, mock.Anything).Return(nil, nil, nil)
269+
checkpoints, err := catalog.GetSalvageCheckpoint(ctx, "p1")
270+
assert.NoError(t, err)
271+
assert.Empty(t, checkpoints)
272+
})
273+
274+
t.Run("get_multiple_clusters", func(t *testing.T) {
275+
kv := mocks.NewMetaKv(t)
276+
catalog := NewCataLog(kv)
277+
278+
cp1 := &commonpb.ReplicateCheckpoint{ClusterId: "cluster-a"}
279+
cp2 := &commonpb.ReplicateCheckpoint{ClusterId: "cluster-b"}
280+
bytes1, _ := proto.Marshal(cp1)
281+
bytes2, _ := proto.Marshal(cp2)
282+
283+
kv.EXPECT().LoadWithPrefix(mock.Anything, mock.Anything).Return(
284+
[]string{"key1", "key2"},
285+
[]string{string(bytes1), string(bytes2)},
286+
nil,
287+
)
288+
checkpoints, err := catalog.GetSalvageCheckpoint(ctx, "p1")
289+
assert.NoError(t, err)
290+
assert.Len(t, checkpoints, 2)
291+
})
292+
}
293+
200294
func TestBuildPrefixAndKey(t *testing.T) {
201295
// Prefix functions
202296
assert.Equal(t, "streamingnode-meta/wal/p1/", buildWALPrefix("p1"))
@@ -208,6 +302,9 @@ func TestBuildPrefixAndKey(t *testing.T) {
208302
assert.Equal(t, "streamingnode-meta/wal/p1/segment-assign/", buildSegmentAssignmentPrefix("p1"))
209303
assert.Equal(t, "streamingnode-meta/wal/p2/segment-assign/", buildSegmentAssignmentPrefix("p2"))
210304

305+
assert.Equal(t, "streamingnode-meta/wal/p1/salvage-checkpoint/", buildSalvageCheckpointPrefix("p1"))
306+
assert.Equal(t, "streamingnode-meta/wal/p2/salvage-checkpoint/", buildSalvageCheckpointPrefix("p2"))
307+
211308
// Key functions
212309
assert.Equal(t, "streamingnode-meta/wal/p1/vchannel/v1", buildVChannelKey("p1", "v1"))
213310
assert.Equal(t, "streamingnode-meta/wal/p2/vchannel/v2", buildVChannelKey("p2", "v2"))
@@ -220,4 +317,7 @@ func TestBuildPrefixAndKey(t *testing.T) {
220317

221318
assert.Equal(t, "streamingnode-meta/wal/p1/consume-checkpoint", buildConsumeCheckpointKey("p1"))
222319
assert.Equal(t, "streamingnode-meta/wal/p2/consume-checkpoint", buildConsumeCheckpointKey("p2"))
320+
321+
assert.Equal(t, "streamingnode-meta/wal/p1/salvage-checkpoint/cluster-a", buildSalvageCheckpointPath("p1", "cluster-a"))
322+
assert.Equal(t, "streamingnode-meta/wal/p2/salvage-checkpoint/cluster-b", buildSalvageCheckpointPath("p2", "cluster-b"))
223323
}

internal/streamingnode/server/wal/interceptors/replicate/replicates/manager_test.go

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,101 @@ func TestSalvageCheckpointNotCapturedWhenAlreadyPrimary(t *testing.T) {
195195
assert.Empty(t, rm.GetSalvageCheckpoint())
196196
}
197197

198+
func TestSalvageCheckpointLoadedFromEtcd(t *testing.T) {
199+
// Simulate WAL recovery where salvage checkpoints were previously persisted to etcd
200+
// and loaded back into the ReplicateManagerRecoverParam.
201+
preLoaded := []*utility.ReplicateCheckpoint{
202+
{ClusterID: "cluster-a", PChannel: "cluster-a-rootcoord-dml_0", TimeTick: 100},
203+
{ClusterID: "cluster-b", PChannel: "cluster-b-rootcoord-dml_0", TimeTick: 200},
204+
}
205+
206+
rm, err := RecoverReplicateManager(&ReplicateManagerRecoverParam{
207+
ChannelInfo: types.PChannelInfo{Name: "test1-rootcoord-dml_0", Term: 1},
208+
CurrentClusterID: "test1",
209+
InitialRecoverSnapshot: &recovery.RecoverySnapshot{
210+
Checkpoint: &utility.WALCheckpoint{
211+
MessageID: walimplstest.NewTestMessageID(1),
212+
TimeTick: 1,
213+
ReplicateConfig: newReplicateConfiguration("test1", "test2"),
214+
},
215+
},
216+
SalvageCheckpoints: preLoaded,
217+
})
218+
assert.NoError(t, err)
219+
assert.Equal(t, replicateutil.RolePrimary, rm.Role())
220+
221+
salvageCPs := rm.GetSalvageCheckpoint()
222+
assert.Len(t, salvageCPs, 2)
223+
224+
byCluster := make(map[string]*utility.ReplicateCheckpoint, 2)
225+
for _, cp := range salvageCPs {
226+
byCluster[cp.ClusterID] = cp
227+
}
228+
assert.Equal(t, uint64(100), byCluster["cluster-a"].TimeTick)
229+
assert.Equal(t, "cluster-a-rootcoord-dml_0", byCluster["cluster-a"].PChannel)
230+
assert.Equal(t, uint64(200), byCluster["cluster-b"].TimeTick)
231+
assert.Equal(t, "cluster-b-rootcoord-dml_0", byCluster["cluster-b"].PChannel)
232+
}
233+
234+
func TestSalvageCheckpointMultipleForcePromotes(t *testing.T) {
235+
// Start as secondary of cluster-a, force promote to primary,
236+
// then become secondary of cluster-b, force promote again.
237+
// Both salvage checkpoints should accumulate (keyed by source cluster).
238+
txnBuffer := utility.NewTxnBuffer(log.With(), metricsutil.NewScanMetrics(types.PChannelInfo{}).NewScannerMetrics())
239+
rm, err := RecoverReplicateManager(&ReplicateManagerRecoverParam{
240+
ChannelInfo: types.PChannelInfo{Name: "test1-rootcoord-dml_0", Term: 1},
241+
CurrentClusterID: "test1",
242+
InitialRecoverSnapshot: &recovery.RecoverySnapshot{
243+
Checkpoint: &utility.WALCheckpoint{
244+
MessageID: walimplstest.NewTestMessageID(1),
245+
TimeTick: 1,
246+
ReplicateCheckpoint: &utility.ReplicateCheckpoint{
247+
ClusterID: "cluster-a",
248+
PChannel: "cluster-a-rootcoord-dml_0",
249+
MessageID: walimplstest.NewTestMessageID(10),
250+
TimeTick: 100,
251+
},
252+
ReplicateConfig: newReplicateConfiguration("cluster-a", "test1"),
253+
},
254+
TxnBuffer: txnBuffer,
255+
},
256+
})
257+
assert.NoError(t, err)
258+
assert.Equal(t, replicateutil.RoleSecondary, rm.Role())
259+
assert.Empty(t, rm.GetSalvageCheckpoint())
260+
261+
// First force promote: secondary(cluster-a) -> primary
262+
err = rm.SwitchReplicateMode(context.Background(), newAlterReplicateConfigMessageWithForcePromote("test1", true, "cluster-a"))
263+
assert.NoError(t, err)
264+
assert.Equal(t, replicateutil.RolePrimary, rm.Role())
265+
cps := rm.GetSalvageCheckpoint()
266+
assert.Len(t, cps, 1)
267+
assert.Equal(t, "cluster-a", cps[0].ClusterID)
268+
269+
// Now become secondary of cluster-b
270+
err = rm.SwitchReplicateMode(context.Background(), newAlterReplicateConfigMessage("cluster-b", "test1"))
271+
assert.NoError(t, err)
272+
assert.Equal(t, replicateutil.RoleSecondary, rm.Role())
273+
274+
// Still one salvage checkpoint from first promote
275+
assert.Len(t, rm.GetSalvageCheckpoint(), 1)
276+
277+
// Second force promote: secondary(cluster-b) -> primary
278+
err = rm.SwitchReplicateMode(context.Background(), newAlterReplicateConfigMessageWithForcePromote("test1", true, "cluster-b"))
279+
assert.NoError(t, err)
280+
assert.Equal(t, replicateutil.RolePrimary, rm.Role())
281+
282+
// Both salvage checkpoints should now be present
283+
cps = rm.GetSalvageCheckpoint()
284+
assert.Len(t, cps, 2)
285+
byCluster := make(map[string]*utility.ReplicateCheckpoint, 2)
286+
for _, cp := range cps {
287+
byCluster[cp.ClusterID] = cp
288+
}
289+
assert.Contains(t, byCluster, "cluster-a")
290+
assert.Contains(t, byCluster, "cluster-b")
291+
}
292+
198293
func TestSecondaryReplicateManagerWithTxn(t *testing.T) {
199294
txnBuffer := utility.NewTxnBuffer(log.With(), metricsutil.NewScanMetrics(types.PChannelInfo{}).NewScannerMetrics())
200295
txnMsgs := newReplicateTxnMessage("test1", "test2", 2)

0 commit comments

Comments
 (0)