Skip to content

Commit 19c96c4

Browse files
authored
*: use atomic value to store coordinator info (pingcap#1414)
ref pingcap#1328
1 parent 1f59580 commit 19c96c4

File tree

2 files changed

+13
-16
lines changed

2 files changed

+13
-16
lines changed

downstreamadapter/eventcollector/event_collector.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package eventcollector
1616
import (
1717
"context"
1818
"sync"
19+
"sync/atomic"
1920
"time"
2021

2122
"github.com/pingcap/log"
@@ -112,8 +113,7 @@ type EventCollector struct {
112113
ds dynstream.DynamicStream[common.GID, common.DispatcherID, dispatcher.DispatcherEvent, *dispatcherStat, *EventsHandler]
113114

114115
coordinatorInfo struct {
115-
sync.Mutex
116-
id node.ID
116+
value atomic.Value
117117
}
118118

119119
wg sync.WaitGroup
@@ -401,15 +401,14 @@ func (c *EventCollector) processDispatcherRequests(ctx context.Context) {
401401
}
402402

403403
func (c *EventCollector) setCoordinatorInfo(id node.ID) {
404-
c.coordinatorInfo.Lock()
405-
defer c.coordinatorInfo.Unlock()
406-
c.coordinatorInfo.id = id
404+
c.coordinatorInfo.value.Store(id)
407405
}
408406

409407
func (c *EventCollector) getCoordinatorInfo() node.ID {
410-
c.coordinatorInfo.Lock()
411-
defer c.coordinatorInfo.Unlock()
412-
return c.coordinatorInfo.id
408+
if v := c.coordinatorInfo.value.Load(); v != nil {
409+
return v.(node.ID)
410+
}
411+
return ""
413412
}
414413

415414
func (c *EventCollector) processLogCoordinatorRequest(ctx context.Context) {

logservice/eventstore/event_store.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -163,8 +163,7 @@ type eventStore struct {
163163
messageCenter messaging.MessageCenter
164164

165165
coordinatorInfo struct {
166-
sync.Mutex
167-
id node.ID
166+
value atomic.Value
168167
}
169168
// The channel is used to gather the subscription info
170169
// which need to be uploaded to log coordinator periodically.
@@ -270,15 +269,14 @@ func (p *writeTaskPool) run(ctx context.Context) {
270269
}
271270

272271
func (e *eventStore) setCoordinatorInfo(id node.ID) {
273-
e.coordinatorInfo.Lock()
274-
defer e.coordinatorInfo.Unlock()
275-
e.coordinatorInfo.id = id
272+
e.coordinatorInfo.value.Store(id)
276273
}
277274

278275
func (e *eventStore) getCoordinatorInfo() node.ID {
279-
e.coordinatorInfo.Lock()
280-
defer e.coordinatorInfo.Unlock()
281-
return e.coordinatorInfo.id
276+
if v := e.coordinatorInfo.value.Load(); v != nil {
277+
return v.(node.ID)
278+
}
279+
return ""
282280
}
283281

284282
func (e *eventStore) Name() string {

0 commit comments

Comments
 (0)