Skip to content

Commit c01e0d4

Browse files
ti-chi-botlidezhu
andauthored
logcoordinator: ignore uninitialized subscriptions for reuse candidates (#3313) (#3337)
* add more test * fix * fix * remove test --------- Co-authored-by: lidezhu <[email protected]>
1 parent 6287942 commit c01e0d4

File tree

4 files changed

+72
-8
lines changed

4 files changed

+72
-8
lines changed

logservice/coordinator/coordinator.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -344,9 +344,14 @@ func (c *logCoordinator) getCandidateNodes(requestNodeID node.ID, span *heartbea
344344
var maxResolvedTs, subID uint64
345345
found := false
346346
for _, subsState := range subStates.GetSubscriptions() {
347+
// Only consider subscriptions which meet the following conditions:
348+
// 1. subscription's span covers the requested span
349+
// 2. subscription's checkpointTs <= request startTs
350+
// 3. subscription's checkpointTs < resolvedTs (meaning the subscription has finished incremental scan)
347351
if bytes.Compare(subsState.Span.StartKey, span.StartKey) <= 0 &&
348352
bytes.Compare(span.EndKey, subsState.Span.EndKey) <= 0 &&
349-
subsState.CheckpointTs <= startTs {
353+
subsState.CheckpointTs <= startTs &&
354+
subsState.CheckpointTs < subsState.ResolvedTs {
350355
if !found || subsState.ResolvedTs > maxResolvedTs {
351356
maxResolvedTs = subsState.ResolvedTs
352357
subID = subsState.SubID

logservice/coordinator/coordinator_test.go

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ func TestGetCandidateNodes(t *testing.T) {
5151
tableID2 := int64(101)
5252
span1 := common.TableIDToComparableSpan(common.DefaultKeyspaceID, tableID1)
5353
span2 := common.TableIDToComparableSpan(common.DefaultKeyspaceID, tableID2)
54+
startTs := uint64(100)
5455

5556
// initialize event store states
5657
coordinator.updateEventStoreState(nodeID1, &logservicepb.EventStoreState{
@@ -129,22 +130,23 @@ func TestGetCandidateNodes(t *testing.T) {
129130
},
130131
},
131132
})
133+
require.Len(t, coordinator.eventStoreStates.m, 3)
132134

133135
// check get candidates
134136
{
135-
nodes := coordinator.getCandidateNodes(nodeID1, &span1, uint64(100))
137+
nodes := coordinator.getCandidateNodes(nodeID1, &span1, startTs)
136138
assert.Equal(t, []string{nodeID2.String()}, nodes)
137139
}
138140
{
139-
nodes := coordinator.getCandidateNodes(nodeID3, &span1, uint64(100))
141+
nodes := coordinator.getCandidateNodes(nodeID3, &span1, startTs)
140142
assert.Equal(t, []string{nodeID2.String(), nodeID1.String()}, nodes)
141143
}
142144
{
143-
nodes := coordinator.getCandidateNodes(nodeID1, &span2, uint64(100))
145+
nodes := coordinator.getCandidateNodes(nodeID1, &span2, startTs)
144146
assert.Equal(t, []string{nodeID3.String(), nodeID2.String()}, nodes)
145147
}
146148
{
147-
nodes := coordinator.getCandidateNodes(nodeID3, &span2, uint64(100))
149+
nodes := coordinator.getCandidateNodes(nodeID3, &span2, startTs)
148150
assert.Equal(t, []string{nodeID2.String()}, nodes)
149151
}
150152

@@ -164,7 +166,7 @@ func TestGetCandidateNodes(t *testing.T) {
164166
},
165167
})
166168
{
167-
nodes := coordinator.getCandidateNodes(nodeID3, &span1, uint64(100))
169+
nodes := coordinator.getCandidateNodes(nodeID3, &span1, startTs)
168170
assert.Equal(t, []string{nodeID1.String(), nodeID2.String()}, nodes)
169171
}
170172

@@ -190,18 +192,49 @@ func TestGetCandidateNodes(t *testing.T) {
190192
},
191193
})
192194
{
193-
nodes := coordinator.getCandidateNodes(nodeID3, &span1, uint64(100))
195+
nodes := coordinator.getCandidateNodes(nodeID3, &span1, startTs)
194196
assert.Equal(t, []string{nodeID2.String(), nodeID1.String()}, nodes)
195197
}
196198

197199
// remove node1 and check again
198200
delete(coordinator.nodes.m, nodeID1)
199201
{
200-
nodes := coordinator.getCandidateNodes(nodeID3, &span1, uint64(100))
202+
nodes := coordinator.getCandidateNodes(nodeID3, &span1, startTs)
201203
assert.Equal(t, []string{nodeID2.String()}, nodes)
202204
}
203205
}
204206

207+
func TestGetCandidateNodesIgnoreResolvedEqCheckpoint(t *testing.T) {
208+
coordinator := newLogCoordinatorForTest()
209+
210+
nodeID1 := node.ID("node-1")
211+
nodeID2 := node.ID("node-2")
212+
coordinator.nodes.m[nodeID1] = &node.Info{ID: nodeID1}
213+
coordinator.nodes.m[nodeID2] = &node.Info{ID: nodeID2}
214+
215+
tableID := int64(200)
216+
span := common.TableIDToComparableSpan(common.DefaultKeyspaceID, tableID)
217+
startTs := uint64(600)
218+
219+
coordinator.updateEventStoreState(nodeID2, &logservicepb.EventStoreState{
220+
TableStates: map[int64]*logservicepb.TableState{
221+
tableID: {
222+
Subscriptions: []*logservicepb.SubscriptionState{
223+
{
224+
SubID: 1,
225+
Span: &span,
226+
CheckpointTs: startTs - 1,
227+
ResolvedTs: startTs - 1,
228+
},
229+
},
230+
},
231+
},
232+
})
233+
234+
nodes := coordinator.getCandidateNodes(nodeID1, &span, startTs)
235+
require.Empty(t, nodes, "resolvedTs equal to checkpointTs should not be reused")
236+
}
237+
205238
func TestUpdateChangefeedStates(t *testing.T) {
206239
c := newLogCoordinatorForTest()
207240

logservice/eventstore/event_store.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -479,6 +479,11 @@ func (e *eventStore) RegisterDispatcher(
479479
if bytes.Compare(subStat.tableSpan.StartKey, dispatcherSpan.StartKey) <= 0 &&
480480
bytes.Compare(subStat.tableSpan.EndKey, dispatcherSpan.EndKey) >= 0 {
481481

482+
// For onlyReuse register request, we only consider initialized subStats
483+
if onlyReuse && !subStat.initialized.Load() {
484+
continue
485+
}
486+
482487
// Check whether the subStat ts range contains startTs
483488
if subStat.checkpointTs.Load() > startTs || startTs > subStat.resolvedTs.Load() {
484489
continue

logservice/eventstore/event_store_test.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,14 @@ func TestEventStoreInteractionWithSubClient(t *testing.T) {
152152
}
153153
}
154154

155+
func markSubStatsInitializedForTest(store EventStore, tableID int64) {
156+
es := store.(*eventStore)
157+
subStats := es.dispatcherMeta.tableStats[tableID]
158+
for _, subStat := range subStats {
159+
subStat.initialized.Store(true)
160+
}
161+
}
162+
155163
func TestEventStoreOnlyReuseDispatcher(t *testing.T) {
156164
_, store := newEventStoreForTest(fmt.Sprintf("/tmp/%s", t.Name()))
157165

@@ -180,6 +188,18 @@ func TestEventStoreOnlyReuseDispatcher(t *testing.T) {
180188
ok := store.RegisterDispatcher(cfID, dispatcherID2, span, 100, func(watermark uint64, latestCommitTs uint64) {}, true, false)
181189
require.False(t, ok)
182190
}
191+
// when the existing subscription is not initialized, add a dispatcher(onlyReuse=true) should fail
192+
{
193+
span := &heartbeatpb.TableSpan{
194+
TableID: tableID,
195+
StartKey: []byte("b"),
196+
EndKey: []byte("h"),
197+
}
198+
ok := store.RegisterDispatcher(cfID, dispatcherID3, span, 100, func(watermark uint64, latestCommitTs uint64) {}, true, false)
199+
require.False(t, ok)
200+
}
201+
// mark existing subscription as initialized
202+
markSubStatsInitializedForTest(store, tableID)
183203
// add a dispatcher(onlyReuse=true) with a containing span which should success
184204
{
185205
span := &heartbeatpb.TableSpan{
@@ -229,6 +249,7 @@ func TestEventStoreOnlyReuseDispatcherSuccess(t *testing.T) {
229249
ok := es.RegisterDispatcher(cfID, dispatcherID1, span, 100, func(watermark uint64, latestCommitTs uint64) {}, false, false)
230250
require.True(t, ok)
231251
}
252+
markSubStatsInitializedForTest(store, tableID)
232253

233254
// 2. Register a second dispatcher with onlyReuse=true, whose span is contained
234255
// by the first subscription. This registration should succeed.

0 commit comments

Comments
 (0)