Skip to content

Commit 0cc917d

Browse files
authored
maintainer: Fix Data Race (#3494)
close #3481
1 parent 555e9cc commit 0cc917d

File tree

3 files changed

+8
-7
lines changed

3 files changed

+8
-7
lines changed

coordinator/operator/operator_controller.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ func (oc *Controller) pushStopChangefeedOperator(keyspaceID uint32, cfID common.
163163
zap.String("operator", old.OP.String()))
164164
old.OP.OnTaskRemoved()
165165
old.OP.PostFinish()
166-
old.IsRemoved = true
166+
old.IsRemoved.Store(true)
167167
delete(oc.operators, old.OP.ID())
168168
}
169169
oc.pushOperator(op)
@@ -242,15 +242,15 @@ func (oc *Controller) pollQueueingOperator() (operator.Operator[common.ChangeFee
242242
return nil, false
243243
}
244244
item := heap.Pop(&oc.runningQueue).(*operator.OperatorWithTime[common.ChangeFeedID, *heartbeatpb.MaintainerStatus])
245-
if item.IsRemoved {
245+
if item.IsRemoved.Load() {
246246
return nil, true
247247
}
248248
op := item.OP
249249
opID := item.OP.ID()
250250
// always call the PostFinish method to ensure the operator is cleaned up by itself.
251251
if op.IsFinished() {
252252
op.PostFinish()
253-
item.IsRemoved = true
253+
item.IsRemoved.Store(true)
254254
delete(oc.operators, opID)
255255
metrics.CoordinatorFinishedOperatorCount.WithLabelValues(op.Type()).Inc()
256256
metrics.CoordinatorOperatorDuration.WithLabelValues(op.Type()).Observe(time.Since(item.CreatedAt).Seconds())

maintainer/operator/operator_controller.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -245,13 +245,13 @@ func (oc *Controller) pollQueueingOperator() (
245245
op := item.OP
246246
opID := op.ID()
247247
oc.mu.Unlock()
248-
if item.IsRemoved {
248+
if item.IsRemoved.Load() {
249249
return nil, true
250250
}
251251
// always call the PostFinish method to ensure the operator is cleaned up by itself.
252252
if op.IsFinished() {
253253
op.PostFinish()
254-
item.IsRemoved = true
254+
item.IsRemoved.Store(true)
255255

256256
oc.mu.Lock()
257257
delete(oc.operators, opID)
@@ -311,7 +311,7 @@ func (oc *Controller) removeReplicaSet(op *removeDispatcherOperator) {
311311
zap.String("operator", old.OP.String()))
312312
old.OP.OnTaskRemoved()
313313
old.OP.PostFinish()
314-
old.IsRemoved = true
314+
old.IsRemoved.Store(true)
315315

316316
oc.mu.Lock()
317317
delete(oc.operators, op.ID())

pkg/scheduler/operator/operator_queue.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package operator
1515

1616
import (
17+
"sync/atomic"
1718
"time"
1819

1920
"github.com/pingcap/ticdc/pkg/scheduler/replica"
@@ -28,7 +29,7 @@ type OperatorWithTime[T replica.ReplicationID, S replica.ReplicationStatus] stru
2829
// CreatedAt records when this operator was created
2930
CreatedAt time.Time
3031
// IsRemoved indicates whether this operator has been marked for removal
31-
IsRemoved bool
32+
IsRemoved atomic.Bool
3233
}
3334

3435
func NewOperatorWithTime[T replica.ReplicationID, S replica.ReplicationStatus](op Operator[T, S], time time.Time) *OperatorWithTime[T, S] {

0 commit comments

Comments
 (0)