|
4 | 4 | "context" |
5 | 5 | "fmt" |
6 | 6 | "strings" |
| 7 | + "sync/atomic" |
7 | 8 |
|
8 | 9 | "google.golang.org/grpc" |
9 | 10 |
|
@@ -41,9 +42,9 @@ type Balancer struct { |
41 | 42 | discoveryRepeater repeater.Repeater |
42 | 43 | localDCDetector func(ctx context.Context, endpoints []endpoint.Endpoint) (string, error) |
43 | 44 |
|
44 | | - mu xsync.RWMutex |
45 | | - connectionsState *connectionsState |
| 45 | + connectionsState atomic.Pointer[connectionsState] |
46 | 46 |
|
| 47 | + mu xsync.RWMutex |
47 | 48 | onApplyDiscoveredEndpoints []func(ctx context.Context, endpoints []endpoint.Info) |
48 | 49 | } |
49 | 50 |
|
@@ -158,8 +159,9 @@ func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, newest []endpoi |
158 | 159 | endpointsInfo[i] = e |
159 | 160 | } |
160 | 161 |
|
| 162 | + b.connectionsState.Store(state) |
| 163 | + |
161 | 164 | b.mu.WithLock(func() { |
162 | | - b.connectionsState = state |
163 | 165 | for _, onApplyDiscoveredEndpoints := range b.onApplyDiscoveredEndpoints { |
164 | 166 | onApplyDiscoveredEndpoints(ctx, endpointsInfo) |
165 | 167 | } |
@@ -216,8 +218,7 @@ func New( |
216 | 218 | discoveryClient: internalDiscovery.New(ctx, pool.Get( |
217 | 219 | endpoint.New(driverConfig.Endpoint()), |
218 | 220 | ), discoveryConfig), |
219 | | - connectionsState: &connectionsState{}, |
220 | | - localDCDetector: detectLocalDC, |
| 221 | + localDCDetector: detectLocalDC, |
221 | 222 | } |
222 | 223 |
|
223 | 224 | if config := driverConfig.Balancer(); config == nil { |
@@ -319,10 +320,7 @@ func (b *Balancer) wrapCall(ctx context.Context, f func(ctx context.Context, cc |
319 | 320 | } |
320 | 321 |
|
321 | 322 | func (b *Balancer) connections() *connectionsState { |
322 | | - b.mu.RLock() |
323 | | - defer b.mu.RUnlock() |
324 | | - |
325 | | - return b.connectionsState |
| 323 | + return b.connectionsState.Load() |
326 | 324 | } |
327 | 325 |
|
328 | 326 | func (b *Balancer) getConn(ctx context.Context) (c conn.Conn, err error) { |
|
0 commit comments