Skip to content

Commit 3adf38f

Browse files
committed
Now using proper abstraction for the first state
Signed-off-by: Jakob Haahr Taankvist <jht@uber.com>
1 parent 3db8cfa commit 3adf38f

File tree

2 files changed

+8
-18
lines changed

2 files changed

+8
-18
lines changed

service/sharddistributor/client/spectatorclient/client.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"github.com/uber-go/tally"
88
"go.uber.org/fx"
9+
csync "github.com/uber/cadence/service/sharddistributor/client/spectatorclient/sync"
910

1011
"github.com/uber/cadence/client/sharddistributor"
1112
"github.com/uber/cadence/common/clock"
@@ -114,7 +115,7 @@ func newSpectatorWithConfig(params Params, namespaceConfig *clientcommon.Namespa
114115
logger: params.Logger,
115116
scope: params.MetricsScope,
116117
timeSource: params.TimeSource,
117-
firstStateCh: make(chan struct{}),
118+
firstStateSignal: csync.NewResettableSignal(),
118119
enabled: enabled,
119120
}
120121

service/sharddistributor/client/spectatorclient/clientimpl.go

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,8 @@ type spectatorImpl struct {
4545
stateMu sync.RWMutex
4646
shardToOwner map[string]*ShardOwner
4747

48-
// Channel to signal when first state is received
49-
firstStateCh chan struct{}
50-
firstStateOnce sync.Once
48+
// Signal to notify when first state is received
49+
firstStateSignal csync.ResettableSignal
5150
}
5251

5352
func (s *spectatorImpl) Start(ctx context.Context) error {
@@ -68,10 +67,8 @@ func (s *spectatorImpl) Stop() {
6867
if s.cancel != nil {
6968
s.cancel()
7069
}
71-
// Close the firstStateCh to unblock any goroutines waiting for first state
72-
s.firstStateOnce.Do(func() {
73-
close(s.firstStateCh)
74-
})
70+
// Close the firstStateSignal to unblock any goroutines waiting for first state
71+
s.firstStateSignal.Done()
7572
s.stopWG.Wait()
7673
}
7774

@@ -166,9 +163,7 @@ func (s *spectatorImpl) handleResponse(response *types.WatchNamespaceStateRespon
166163

167164
// Signal that first state has been received (only once)
168165
if isFirstState {
169-
s.firstStateOnce.Do(func() {
170-
close(s.firstStateCh)
171-
})
166+
s.firstStateSignal.Done()
172167
}
173168

174169
s.logger.Debug("Received namespace state update",
@@ -181,13 +176,7 @@ func (s *spectatorImpl) handleResponse(response *types.WatchNamespaceStateRespon
181176
// If not found in cache, it falls back to querying the shard distributor directly.
182177
func (s *spectatorImpl) GetShardOwner(ctx context.Context, shardKey string) (*ShardOwner, error) {
183178
// Wait for first state to be received to avoid flooding shard distributor on startup
184-
select {
185-
case <-s.firstStateCh:
186-
// First state received, continue
187-
case <-ctx.Done():
188-
// Context cancelled or timed out before first state received
189-
return nil, fmt.Errorf("context cancelled while waiting for first state: %w", ctx.Err())
190-
}
179+
s.firstStateSignal.Wait(ctx)
191180

192181
// Check cache first
193182
s.stateMu.RLock()

0 commit comments

Comments
 (0)