Skip to content

Commit 44a877b

Browse files
committed
tests done
Signed-off-by: Jakob Haahr Taankvist <jht@uber.com>
1 parent 3adf38f commit 44a877b

File tree

4 files changed

+119
-44
lines changed

4 files changed

+119
-44
lines changed

service/sharddistributor/client/spectatorclient/client.go

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,20 @@ import (
66

77
"github.com/uber-go/tally"
88
"go.uber.org/fx"
9-
csync "github.com/uber/cadence/service/sharddistributor/client/spectatorclient/sync"
109

1110
"github.com/uber/cadence/client/sharddistributor"
1211
"github.com/uber/cadence/common/clock"
1312
"github.com/uber/cadence/common/log"
1413
"github.com/uber/cadence/service/sharddistributor/client/clientcommon"
14+
csync "github.com/uber/cadence/service/sharddistributor/client/spectatorclient/sync"
1515
)
1616

1717
//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination interface_mock.go . Spectator
1818

19+
// EnabledFunc is a function that returns true if the spectator is enabled
20+
// This is used to disable the spectator in the case of a migration
21+
type EnabledFunc func() bool
22+
1923
type Spectators struct {
2024
spectators map[string]Spectator
2125
}
@@ -73,9 +77,7 @@ type Params struct {
7377
Config clientcommon.Config
7478
TimeSource clock.TimeSource
7579

76-
// Enabled is a function that returns true if the spectator is enabled
77-
// This is used to disable the spectator in the case of a migration
78-
Enabled func() bool `optional:"true"`
80+
Enabled EnabledFunc `optional:"true"`
7981
}
8082

8183
// NewSpectatorWithNamespace creates a spectator for a specific namespace
@@ -109,14 +111,14 @@ func newSpectatorWithConfig(params Params, namespaceConfig *clientcommon.Namespa
109111
}
110112

111113
impl := &spectatorImpl{
112-
namespace: namespaceConfig.Namespace,
113-
config: *namespaceConfig,
114-
client: params.Client,
115-
logger: params.Logger,
116-
scope: params.MetricsScope,
117-
timeSource: params.TimeSource,
114+
namespace: namespaceConfig.Namespace,
115+
config: *namespaceConfig,
116+
client: params.Client,
117+
logger: params.Logger,
118+
scope: params.MetricsScope,
119+
timeSource: params.TimeSource,
118120
firstStateSignal: csync.NewResettableSignal(),
119-
enabled: enabled,
121+
enabled: enabled,
120122
}
121123

122124
return impl, nil

service/sharddistributor/client/spectatorclient/clientimpl.go

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/uber/cadence/common/log/tag"
1616
"github.com/uber/cadence/common/types"
1717
"github.com/uber/cadence/service/sharddistributor/client/clientcommon"
18+
csync "github.com/uber/cadence/service/sharddistributor/client/spectatorclient/sync"
1819
)
1920

2021
const (
@@ -30,6 +31,7 @@ type ShardOwner struct {
3031

3132
type spectatorImpl struct {
3233
namespace string
34+
enabled EnabledFunc
3335
config clientcommon.NamespaceConfig
3436
client sharddistributor.Client
3537
scope tally.Scope
@@ -73,27 +75,37 @@ func (s *spectatorImpl) Stop() {
7375
}
7476

7577
func (s *spectatorImpl) watchLoop() {
78+
defer s.logger.Info("Shutting down, stopping watch loop", tag.ShardNamespace(s.namespace))
79+
7680
s.logger.Info("Starting watch loop for namespace", tag.ShardNamespace(s.namespace))
7781

7882
for {
7983
if s.ctx.Err() != nil {
80-
s.logger.Info("Shutting down, stopping watch loop", tag.ShardNamespace(s.namespace))
8184
return
8285
}
8386

87+
if !s.enabled() {
88+
// If spectator is disabled, sleep for a second and continue
89+
s.firstStateSignal.Reset()
90+
91+
err := s.timeSource.SleepWithContext(s.ctx, backoff.JitDuration(streamRetryInterval, streamRetryJitterCoeff))
92+
if err != nil {
93+
return
94+
}
95+
continue
96+
}
97+
8498
// Create new stream
8599
stream, err := s.client.WatchNamespaceState(s.ctx, &types.WatchNamespaceStateRequest{
86100
Namespace: s.namespace,
87101
})
88102
if err != nil {
89103
if s.ctx.Err() != nil {
90-
s.logger.Info("Shutting down during stream creation, exiting watch loop", tag.ShardNamespace(s.namespace))
91104
return
92105
}
93106

94107
s.logger.Error("Failed to create stream, retrying", tag.Error(err), tag.ShardNamespace(s.namespace))
95108
if err := s.timeSource.SleepWithContext(s.ctx, backoff.JitDuration(streamRetryInterval, streamRetryJitterCoeff)); err != nil {
96-
s.logger.Info("Shutting down waiting to retry stream creation, exiting watch loop", tag.ShardNamespace(s.namespace))
97109
return // Context cancelled during sleep
98110
}
99111
continue
@@ -102,7 +114,6 @@ func (s *spectatorImpl) watchLoop() {
102114
s.receiveLoop(stream)
103115

104116
if s.ctx.Err() != nil {
105-
s.logger.Info("Shutting down, exiting watch loop", tag.ShardNamespace(s.namespace))
106117
return
107118
}
108119

@@ -122,6 +133,12 @@ func (s *spectatorImpl) receiveLoop(stream sharddistributor.WatchNamespaceStateC
122133
}()
123134

124135
for {
136+
if !s.enabled() {
137+
// the loop was disabled, exit
138+
s.firstStateSignal.Reset()
139+
return
140+
}
141+
125142
response, err := stream.Recv()
126143
if err != nil {
127144
if s.ctx.Err() != nil {
@@ -176,7 +193,9 @@ func (s *spectatorImpl) handleResponse(response *types.WatchNamespaceStateRespon
176193
// If not found in cache, it falls back to querying the shard distributor directly.
177194
func (s *spectatorImpl) GetShardOwner(ctx context.Context, shardKey string) (*ShardOwner, error) {
178195
// Wait for first state to be received to avoid flooding shard distributor on startup
179-
s.firstStateSignal.Wait(ctx)
196+
if err := s.firstStateSignal.Wait(ctx); err != nil {
197+
return nil, fmt.Errorf("wait for first state: %w", err)
198+
}
180199

181200
// Check cache first
182201
s.stateMu.RLock()

service/sharddistributor/client/spectatorclient/clientimpl_test.go

Lines changed: 78 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package spectatorclient
33
import (
44
"context"
55
"errors"
6+
"sync"
67
"testing"
78
"time"
89

@@ -16,6 +17,7 @@ import (
1617
"github.com/uber/cadence/common/clock"
1718
"github.com/uber/cadence/common/log"
1819
"github.com/uber/cadence/common/types"
20+
csync "github.com/uber/cadence/service/sharddistributor/client/spectatorclient/sync"
1921
)
2022

2123
func TestWatchLoopBasicFlow(t *testing.T) {
@@ -26,12 +28,13 @@ func TestWatchLoopBasicFlow(t *testing.T) {
2628
mockStream := sharddistributor.NewMockWatchNamespaceStateClient(ctrl)
2729

2830
spectator := &spectatorImpl{
29-
namespace: "test-ns",
30-
client: mockClient,
31-
logger: log.NewNoop(),
32-
scope: tally.NoopScope,
33-
timeSource: clock.NewRealTimeSource(),
34-
firstStateCh: make(chan struct{}),
31+
namespace: "test-ns",
32+
client: mockClient,
33+
logger: log.NewNoop(),
34+
scope: tally.NoopScope,
35+
timeSource: clock.NewRealTimeSource(),
36+
firstStateSignal: csync.NewResettableSignal(),
37+
enabled: func() bool { return true },
3538
}
3639

3740
// Expect stream creation
@@ -70,7 +73,7 @@ func TestWatchLoopBasicFlow(t *testing.T) {
7073
defer spectator.Stop()
7174

7275
// Wait for first state
73-
<-spectator.firstStateCh
76+
require.NoError(t, spectator.firstStateSignal.Wait(context.Background()))
7477

7578
// Query shard owner
7679
owner, err := spectator.GetShardOwner(context.Background(), "shard-1")
@@ -91,12 +94,13 @@ func TestGetShardOwner_CacheMiss_FallbackToRPC(t *testing.T) {
9194
mockStream := sharddistributor.NewMockWatchNamespaceStateClient(ctrl)
9295

9396
spectator := &spectatorImpl{
94-
namespace: "test-ns",
95-
client: mockClient,
96-
logger: log.NewNoop(),
97-
scope: tally.NoopScope,
98-
timeSource: clock.NewRealTimeSource(),
99-
firstStateCh: make(chan struct{}),
97+
namespace: "test-ns",
98+
client: mockClient,
99+
logger: log.NewNoop(),
100+
scope: tally.NoopScope,
101+
timeSource: clock.NewRealTimeSource(),
102+
firstStateSignal: csync.NewResettableSignal(),
103+
enabled: func() bool { return true },
100104
}
101105

102106
// Setup stream
@@ -142,7 +146,7 @@ func TestGetShardOwner_CacheMiss_FallbackToRPC(t *testing.T) {
142146
spectator.Start(context.Background())
143147
defer spectator.Stop()
144148

145-
<-spectator.firstStateCh
149+
require.NoError(t, spectator.firstStateSignal.Wait(context.Background()))
146150

147151
// Cache hit
148152
owner, err := spectator.GetShardOwner(context.Background(), "shard-1")
@@ -166,12 +170,13 @@ func TestStreamReconnection(t *testing.T) {
166170
mockTimeSource := clock.NewMockedTimeSource()
167171

168172
spectator := &spectatorImpl{
169-
namespace: "test-ns",
170-
client: mockClient,
171-
logger: log.NewNoop(),
172-
scope: tally.NoopScope,
173-
timeSource: mockTimeSource,
174-
firstStateCh: make(chan struct{}),
173+
namespace: "test-ns",
174+
client: mockClient,
175+
logger: log.NewNoop(),
176+
scope: tally.NoopScope,
177+
timeSource: mockTimeSource,
178+
firstStateSignal: csync.NewResettableSignal(),
179+
enabled: func() bool { return true },
175180
}
176181

177182
// First stream fails immediately
@@ -208,7 +213,7 @@ func TestStreamReconnection(t *testing.T) {
208213
mockTimeSource.BlockUntil(1) // Wait for 1 goroutine to be blocked in Sleep
209214
mockTimeSource.Advance(2 * time.Second)
210215

211-
<-spectator.firstStateCh
216+
require.NoError(t, spectator.firstStateSignal.Wait(context.Background()))
212217
}
213218

214219
func TestGetShardOwner_TimeoutBeforeFirstState(t *testing.T) {
@@ -218,12 +223,13 @@ func TestGetShardOwner_TimeoutBeforeFirstState(t *testing.T) {
218223
mockClient := sharddistributor.NewMockClient(ctrl)
219224

220225
spectator := &spectatorImpl{
221-
namespace: "test-ns",
222-
client: mockClient,
223-
logger: log.NewNoop(),
224-
scope: tally.NoopScope,
225-
timeSource: clock.NewRealTimeSource(),
226-
firstStateCh: make(chan struct{}),
226+
namespace: "test-ns",
227+
client: mockClient,
228+
logger: log.NewNoop(),
229+
scope: tally.NoopScope,
230+
timeSource: clock.NewRealTimeSource(),
231+
firstStateSignal: csync.NewResettableSignal(),
232+
enabled: func() bool { return true },
227233
}
228234

229235
// Create a context with a short timeout
@@ -234,5 +240,49 @@ func TestGetShardOwner_TimeoutBeforeFirstState(t *testing.T) {
234240
// Should timeout and return an error
235241
_, err := spectator.GetShardOwner(ctx, "shard-1")
236242
assert.Error(t, err)
237-
assert.Contains(t, err.Error(), "context cancelled while waiting for first state")
243+
assert.Contains(t, err.Error(), "wait for first state")
244+
}
245+
246+
func TestWatchLoopDisabled(t *testing.T) {
247+
defer goleak.VerifyNone(t)
248+
249+
stateSignal := csync.NewResettableSignal()
250+
timeSource := clock.NewMockedTimeSource()
251+
252+
spectator := &spectatorImpl{
253+
firstStateSignal: stateSignal,
254+
timeSource: timeSource,
255+
logger: log.NewNoop(),
256+
enabled: func() bool { return false },
257+
}
258+
259+
spectator.Start(context.Background())
260+
defer spectator.Stop()
261+
262+
wg := sync.WaitGroup{}
263+
wg.Add(1)
264+
go func() {
265+
defer wg.Done()
266+
err := stateSignal.Wait(context.Background())
267+
assert.Error(t, err)
268+
269+
// Second wait might return ErrReset (if it observes the second reset)
270+
// or nil (if Stop() is called first). Both are acceptable.
271+
_ = stateSignal.Wait(context.Background())
272+
}()
273+
274+
// First sleep should reset the signal
275+
timeSource.BlockUntil(1)
276+
timeSource.Advance(1200 * time.Millisecond)
277+
278+
// Second sleep should reset the signal
279+
timeSource.BlockUntil(1)
280+
281+
// Ensure the loop is exited
282+
timeSource.Advance(1200 * time.Millisecond)
283+
284+
// Stop the spectator to unblock any waiting goroutines
285+
spectator.Stop()
286+
287+
wg.Wait()
238288
}

service/sharddistributor/client/spectatorclient/peer_chooser_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ func TestSpectatorPeerChooser_Choose_Success(t *testing.T) {
105105

106106
mockSpectator := NewMockSpectator(ctrl)
107107
peerTransport := grpc.NewTransport()
108+
require.NoError(t, peerTransport.Start())
109+
defer peerTransport.Stop()
108110

109111
chooser := &SpectatorPeerChooser{
110112
transport: peerTransport,
@@ -150,6 +152,8 @@ func TestSpectatorPeerChooser_Choose_ReusesPeer(t *testing.T) {
150152

151153
mockSpectator := NewMockSpectator(ctrl)
152154
peerTransport := grpc.NewTransport()
155+
require.NoError(t, peerTransport.Start())
156+
defer peerTransport.Stop()
153157

154158
chooser := &SpectatorPeerChooser{
155159
transport: peerTransport,

0 commit comments

Comments
 (0)