Skip to content

Commit 3c8a7b8

Browse files
authored
fix(shard-distributor): separate watch event processing from the cache refresh (#7670)
<!-- 1-2 line summary of WHAT changed technically: - Always link the relevant projects GitHub issue, unless it is a minor bugfix - Good: "Modified FailoverDomain mapper to allow null ActiveClusterName #320" - Bad: "added nil check" --> **What changed?** * Watch event processing in `watch` function is separated from a call of `refreshCache` function <!-- Your goal is to provide all the required context for a future maintainer to understand the reasons for making this change (see https://cbea.ms/git-commit/#why-not-how). How did this work previously (and what was wrong with it)? What has changed, and why did you solve it this way? - Good: "Active-active domains have independent cluster attributes per region. Previously, modifying cluster attributes required spedifying the default ActiveClusterName which updates the global domain default. This prevents operators from updating regional configurations without affecting the primary cluster designation. This change allows attribute updates to be independent of active cluster selection." - Bad: "Improves domain handling" --> **Why?** * We observed that intensive watch event updates may cause a growing backlog on the server side and etcd that may lead to OOMKills <!-- Include specific test commands and setup. Please include the exact commands such that another maintainer or contributor can reproduce the test steps taken. - e.g Unit test commands with exact invocation `go test -v ./common/types/mapper/proto -run TestFailoverDomainRequest` - For integration tests include setup steps and test commands Example: "Started local server with `./cadence start`, then ran `make test_e2e`" - For local simulation testing include setup steps for the server and how you ran the tests - Good: Full commands that reviewers can copy-paste to verify - Bad: "Tested locally" or "Added tests" --> **How did you test it?** * Unit tests * Run on dev cluster <!-- If there are risks that the release engineer should know about document them here. For example: - Has an API/IDL been modified? Is it backwards/forwards compatible? If not, what are the repecussions? - Has a schema change been introduced? Is it possible to roll back? - Has a feature flag been re-used for a new purpose? - Is there a potential performance concern? Is the change modifying core task processing logic? - If truly N/A, you can mark it as such --> **Potential risks** N/A <!-- If this PR completes a user facing feature or changes functionality add release notes here. Your release notes should allow a user and the release engineer to understand the changes with little context. Always ensure that the description contains a link to the relevant GitHub issue. --> **Release notes** N/A <!-- Consider whether this change requires documentation updates in the Cadence-Docs repo - If yes: mention what needs updating (or link to docs PR in cadence-docs repo) - If in doubt, add a note about potential doc needs - Only mark N/A if you're certain no docs are affected --> **Documentation Changes** N/A --- ## Reviewer Validation **PR Description Quality** (check these before reviewing code): - [ ] **"What changed"** provides a clear 1-2 line summary - [ ] Project Issue is linked - [ ] **"Why"** explains the full motivation with sufficient context - [ ] **Testing is documented:** - [ ] Unit test commands are included (with exact `go test` invocation) - [ ] Integration test setup/commands included (if integration tests were run) - [ ] Canary testing details included (if canary was mentioned) - [ ] **Potential risks** section is thoughtfully filled out (or legitimately N/A) - [ ] **Release notes** included if this completes a user-facing feature - [ ] **Documentation** needs are addressed (or noted if uncertain)
1 parent 5863c5c commit 3c8a7b8

File tree

3 files changed

+195
-19
lines changed

3 files changed

+195
-19
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ require (
7676
github.com/ncruces/go-sqlite3 v0.22.0
7777
github.com/opensearch-project/opensearch-go/v4 v4.1.0
7878
github.com/robfig/cron/v3 v3.0.1
79+
go.etcd.io/etcd/api/v3 v3.5.5
7980
go.uber.org/mock v0.5.0
8081
)
8182

@@ -89,7 +90,6 @@ require (
8990
github.com/tetratelabs/wazero v1.8.2 // indirect
9091
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect
9192
github.com/yosida95/uritemplate/v3 v3.0.2 // indirect
92-
go.etcd.io/etcd/api/v3 v3.5.5 // indirect
9393
go.etcd.io/etcd/client/pkg/v3 v3.5.5 // indirect
9494
google.golang.org/genproto v0.0.0-20231016165738-49dd2c1f3d0b // indirect
9595
google.golang.org/genproto/googleapis/api v0.0.0-20231012201019-e917dd12ba7a // indirect

service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache.go

Lines changed: 50 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -122,22 +122,52 @@ func (n *namespaceShardToExecutor) Subscribe(ctx context.Context) (<-chan map[*s
122122
}
123123

124124
func (n *namespaceShardToExecutor) namespaceRefreshLoop() {
125+
triggerCh := n.runWatchLoop()
126+
125127
for {
126-
if err := n.watch(); err != nil {
127-
n.logger.Error("error watching in namespaceRefreshLoop, retrying...", tag.Error(err))
128-
n.timeSource.Sleep(backoff.JitDuration(
129-
namespaceRefreshLoopWatchRetryInterval,
130-
namespaceRefreshLoopWatchJitterCoeff,
131-
))
132-
continue
133-
}
128+
select {
129+
case <-n.stopCh:
130+
n.logger.Info("stop channel closed, exiting namespaceRefreshLoop")
131+
return
134132

135-
n.logger.Info("namespaceRefreshLoop is exiting")
136-
return
133+
case _, ok := <-triggerCh:
134+
if !ok {
135+
n.logger.Info("trigger channel closed, exiting namespaceRefreshLoop")
136+
return
137+
}
138+
139+
if err := n.refresh(context.Background()); err != nil {
140+
n.logger.Error("failed to refresh namespace shard to executor", tag.Error(err))
141+
}
142+
}
137143
}
138144
}
139145

140-
func (n *namespaceShardToExecutor) watch() error {
146+
func (n *namespaceShardToExecutor) runWatchLoop() <-chan struct{} {
147+
triggerCh := make(chan struct{}, 1)
148+
149+
go func() {
150+
defer close(triggerCh)
151+
152+
for {
153+
if err := n.watch(triggerCh); err != nil {
154+
n.logger.Error("error watching in namespaceRefreshLoop, retrying...", tag.Error(err))
155+
n.timeSource.Sleep(backoff.JitDuration(
156+
namespaceRefreshLoopWatchRetryInterval,
157+
namespaceRefreshLoopWatchJitterCoeff,
158+
))
159+
continue
160+
}
161+
162+
n.logger.Info("namespaceRefreshLoop is exiting")
163+
return
164+
}
165+
}()
166+
167+
return triggerCh
168+
}
169+
170+
func (n *namespaceShardToExecutor) watch(triggerCh chan<- struct{}) error {
141171
ctx, cancel := context.WithCancel(context.Background())
142172
defer cancel()
143173

@@ -151,6 +181,7 @@ func (n *namespaceShardToExecutor) watch() error {
151181
for {
152182
select {
153183
case <-n.stopCh:
184+
n.logger.Info("stop channel closed, exiting watch loop")
154185
return nil
155186

156187
case watchResp, ok := <-watchChan:
@@ -170,10 +201,14 @@ func (n *namespaceShardToExecutor) watch() error {
170201
}
171202
}
172203

173-
if shouldRefresh {
174-
if err := n.refresh(context.Background()); err != nil {
175-
n.logger.Error("failed to refresh namespace shard to executor", tag.Error(err))
176-
}
204+
if !shouldRefresh {
205+
continue
206+
}
207+
208+
select {
209+
case triggerCh <- struct{}{}:
210+
default:
211+
n.logger.Info("Cache is being refreshed, skipping trigger")
177212
}
178213
}
179214
}

service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache_test.go

Lines changed: 144 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/stretchr/testify/assert"
1212
"github.com/stretchr/testify/require"
13+
"go.etcd.io/etcd/api/v3/mvccpb"
1314
clientv3 "go.etcd.io/etcd/client/v3"
1415
"go.uber.org/goleak"
1516
"go.uber.org/mock/gomock"
@@ -159,6 +160,8 @@ func TestNamespaceShardToExecutor_watch_watchChanErrors(t *testing.T) {
159160
e, err := newNamespaceShardToExecutor(testPrefix, testNamespace, mockClient, stopCh, logger, clock.NewRealTimeSource())
160161
require.NoError(t, err)
161162

163+
triggerChan := make(chan struct{}, 1)
164+
162165
// Test Case #1
163166
// Test received compact revision error from watch channel
164167
{
@@ -168,7 +171,7 @@ func TestNamespaceShardToExecutor_watch_watchChanErrors(t *testing.T) {
168171
}
169172
}()
170173

171-
err = e.watch()
174+
err = e.watch(triggerChan)
172175
require.Error(t, err)
173176
assert.ErrorContains(t, err, "etcdserver: mvcc: required revision has been compacted")
174177
}
@@ -177,12 +180,147 @@ func TestNamespaceShardToExecutor_watch_watchChanErrors(t *testing.T) {
177180
// Test closed watch channel
178181
{
179182
close(watchChan)
180-
err = e.watch()
183+
err = e.watch(triggerChan)
181184
require.Error(t, err)
182185
assert.ErrorContains(t, err, "watch channel closed")
183186
}
184187
}
185188

189+
func TestNamespaceShardToExecutor_watch_triggerChBlocking(t *testing.T) {
190+
ctrl := gomock.NewController(t)
191+
defer ctrl.Finish()
192+
193+
logger := testlogger.New(t)
194+
mockClient := etcdclient.NewMockClient(ctrl)
195+
stopCh := make(chan struct{})
196+
testPrefix := "/test-prefix"
197+
testNamespace := "test-namespace"
198+
199+
watchChan := make(chan clientv3.WatchResponse)
200+
mockClient.EXPECT().
201+
Watch(gomock.Any(), gomock.Any(), gomock.Any()).
202+
Return(watchChan)
203+
204+
e, err := newNamespaceShardToExecutor(testPrefix, testNamespace, mockClient, stopCh, logger, clock.NewRealTimeSource())
205+
require.NoError(t, err)
206+
207+
// Create a triggerCh with buffer size 1, but never read from it
208+
triggerChan := make(chan struct{}, 1)
209+
210+
executorKey := etcdkeys.BuildExecutorKey(testPrefix, testNamespace, "executor-1", etcdkeys.ExecutorAssignedStateKey)
211+
212+
// Start watch in a goroutine
213+
watchDone := make(chan error, 1)
214+
go func() {
215+
watchDone <- e.watch(triggerChan)
216+
}()
217+
218+
// Send many events - the loop should not block even though triggerCh is full
219+
for i := 0; i < 100; i++ {
220+
select {
221+
case watchChan <- clientv3.WatchResponse{
222+
Events: []*clientv3.Event{
223+
{
224+
Type: clientv3.EventTypePut,
225+
Kv: &mvccpb.KeyValue{
226+
Key: []byte(executorKey),
227+
},
228+
},
229+
},
230+
}:
231+
case <-time.After(100 * time.Millisecond):
232+
t.Fatal("watch loop is stuck - could not send event to watchChan")
233+
}
234+
}
235+
236+
// Close stopCh to exit the watch loop
237+
close(stopCh)
238+
239+
select {
240+
case err := <-watchDone:
241+
assert.NoError(t, err)
242+
case <-time.After(1 * time.Second):
243+
t.Fatal("watch loop did not exit after stopCh was closed")
244+
}
245+
}
246+
247+
func TestNamespaceShardToExecutor_namespaceRefreshLoop_triggersRefresh(t *testing.T) {
248+
defer goleak.VerifyNone(t)
249+
250+
ctrl := gomock.NewController(t)
251+
defer ctrl.Finish()
252+
253+
logger := testlogger.New(t)
254+
mockClient := etcdclient.NewMockClient(ctrl)
255+
timeSource := clock.NewMockedTimeSource()
256+
stopCh := make(chan struct{})
257+
testPrefix := "/test-prefix"
258+
testNamespace := "test-namespace"
259+
executorID := "executor-1"
260+
261+
watchChan := make(chan clientv3.WatchResponse)
262+
mockClient.EXPECT().
263+
Watch(gomock.Any(), gomock.Any(), gomock.Any()).
264+
Return(watchChan)
265+
266+
executorPrefix := etcdkeys.BuildExecutorsPrefix(testPrefix, testNamespace)
267+
executorKey := etcdkeys.BuildMetadataKey(
268+
testPrefix,
269+
testNamespace,
270+
executorID,
271+
"metadata-key",
272+
)
273+
274+
// Mock Get call for refresh
275+
mockClient.EXPECT().
276+
Get(gomock.Any(), executorPrefix, gomock.Any()).
277+
Return(
278+
&clientv3.GetResponse{Kvs: []*mvccpb.KeyValue{
279+
{
280+
Key: []byte(executorKey),
281+
Value: []byte("metadata-value"),
282+
},
283+
}},
284+
nil,
285+
)
286+
287+
e, err := newNamespaceShardToExecutor(testPrefix, testNamespace, mockClient, stopCh, logger, timeSource)
288+
require.NoError(t, err)
289+
290+
wg := sync.WaitGroup{}
291+
wg.Add(1)
292+
293+
go func() {
294+
defer wg.Done()
295+
e.namespaceRefreshLoop()
296+
}()
297+
298+
// Send a watch event with ExecutorAssignedStateKey to trigger refresh
299+
go func() {
300+
watchChan <- clientv3.WatchResponse{
301+
Events: []*clientv3.Event{
302+
{
303+
Type: clientv3.EventTypePut,
304+
Kv: &mvccpb.KeyValue{
305+
Key: []byte(executorKey),
306+
},
307+
},
308+
},
309+
}
310+
}()
311+
312+
require.Eventually(t, func() bool {
313+
e.RLock()
314+
defer e.RUnlock()
315+
_, ok := e.shardOwners[executorID]
316+
return ok
317+
}, time.Second, 1*time.Millisecond, "expected executor to be added to shardOwners")
318+
319+
// Close stopCh to exit the loop
320+
close(stopCh)
321+
wg.Wait()
322+
}
323+
186324
func TestNamespaceShardToExecutor_namespaceRefreshLoop_watchError(t *testing.T) {
187325
defer goleak.VerifyNone(t)
188326

@@ -209,9 +347,12 @@ func TestNamespaceShardToExecutor_namespaceRefreshLoop_watchError(t *testing.T)
209347
Return(watchChanClosed)
210348

211349
// mock for third watch call that will be used when stopCh is closed
350+
// maybe called or not if stopCh is closed before retry interval
212351
mockClient.EXPECT().
213352
Watch(gomock.Any(), gomock.Any(), gomock.Any()).
214-
Return(make(chan clientv3.WatchResponse))
353+
Return(make(chan clientv3.WatchResponse)).
354+
MinTimes(0).
355+
MaxTimes(1)
215356

216357
e, err := newNamespaceShardToExecutor(testPrefix, testNamespace, mockClient, stopCh, logger, timeSource)
217358
require.NoError(t, err)

0 commit comments

Comments
 (0)