Skip to content

Commit dbcdecb

Browse files
authored
fix(rpc): dns updater should not update current peer on failures (#7424)
<!-- Describe what has changed in this PR --> **What changed?** Cross DC calls to frontend rely on DNSChooser (rather than the single outbound call for standard frontend calls). User reported staled peer list in frontend service after Disaster Recovery test in DNS and pointed the error was from domain replication processor, which uses the cross dc client. ``` {"level":"error","ts":"2025-10-27T15:17:00.285Z","msg":"Failed to get replication tasks","service":"cadence-worker","component":"replicator","component":"replication-task-processor","xdc-source-cluster":"primary","error":"code:unavailable message:\"round-robin\" peer list has 2 peers but none are responsive, timed out waiting for a connection to open (fail-fast is not enabled): context deadline exceeded","logging-call-at":"domain_replication_processor.go:171","stacktrace":"[github.com/uber/cadence/service/worker/replicator.(*domainReplicationProcessor).fetchDomainReplicationTasks](http://github.com/uber/cadence/service/worker/replicator.(*domainReplicationProcessor).fetchDomainReplicationTasks)\n\t/cadence/service/worker/replicator/domain_replication_processor.go:171\[ngithub.com/uber/cadence/service/worker/replicator.(*domainReplicationProcessor).processorLoop](http://ngithub.com/uber/cadence/service/worker/replicator.(*domainReplicationProcessor).processorLoop)\n\t/cadence/service/worker/replicator/domain_replication_processor.go:136 ``` * do not update dnsUpdater's current peer list if the update in transport failed. <!-- Tell your future self why have you made these changes --> **Why?** If transport update peers failed, currentPeers stored locally would be out of sync with reality. Thus, only set currentPeers on successful update. <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> **How did you test it?** Unit Test <!-- Assuming the worst case, what can be broken when deploying this change to production? --> **Potential risks** <!-- Is it notable for release? e.g. schema updates, configuration or data migration required? If so, please mention it, and also update CHANGELOG.md --> **Release notes** <!-- Is there any documentation updates should be made for config, https://cadenceworkflow.io/docs/operation-guide/setup/ ? If so, please open an PR in https://github.com/cadence-workflow/cadence-docs --> **Documentation Changes** --------- Signed-off-by: Shijie Sheng <[email protected]>
1 parent 2b0f8e8 commit dbcdecb

File tree

2 files changed

+176
-3
lines changed

2 files changed

+176
-3
lines changed

common/rpc/dns_updater.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636

3737
type (
3838
dnsUpdater struct {
39+
resolver dnsHostResolver
3940
interval time.Duration
4041
dnsAddress string
4142
port string
@@ -56,13 +57,18 @@ type (
5657
}
5758
)
5859

60+
type dnsHostResolver interface {
61+
LookupHost(ctx context.Context, host string) (addrs []string, err error)
62+
}
63+
5964
func newDNSUpdater(list peer.List, dnsPort string, interval time.Duration, logger log.Logger) (*dnsUpdater, error) {
6065
ss := strings.Split(dnsPort, ":")
6166
if len(ss) != 2 {
6267
return nil, fmt.Errorf("incorrect DNS:Port format")
6368
}
6469
ctx, cancel := context.WithCancel(context.Background())
6570
return &dnsUpdater{
71+
resolver: net.DefaultResolver,
6672
interval: interval,
6773
logger: logger,
6874
list: list,
@@ -95,8 +101,9 @@ func (d *dnsUpdater) Start() {
95101
err := d.list.Update(res.updates)
96102
if err != nil {
97103
d.logger.Error("Failed to update peerList", tag.Error(err), tag.Address(d.dnsAddress))
104+
} else {
105+
d.currentPeers = res.newPeers
98106
}
99-
d.currentPeers = res.newPeers
100107
}
101108
sleepDu := now.Add(d.interval).Sub(now)
102109
t := time.NewTimer(sleepDu)
@@ -120,8 +127,7 @@ func (d *dnsUpdater) Stop() {
120127
}
121128

122129
func (d *dnsUpdater) refresh() (*dnsRefreshResult, error) {
123-
resolver := net.DefaultResolver
124-
ips, err := resolver.LookupHost(d.ctx, d.dnsAddress)
130+
ips, err := d.resolver.LookupHost(d.ctx, d.dnsAddress)
125131
if err != nil {
126132
return nil, err
127133
}

common/rpc/dns_updater_test.go

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
package rpc
2+
3+
import (
4+
"context"
5+
"errors"
6+
"testing"
7+
"time"
8+
9+
"github.com/stretchr/testify/assert"
10+
"github.com/stretchr/testify/require"
11+
"go.uber.org/yarpc/api/peer"
12+
13+
"github.com/uber/cadence/common/log"
14+
)
15+
16+
type mockDNSHostResolver struct {
17+
addrsToReturn []string
18+
errToReturn error
19+
}
20+
21+
func (m *mockDNSHostResolver) LookupHost(ctx context.Context, host string) ([]string, error) {
22+
return m.addrsToReturn, m.errToReturn
23+
}
24+
25+
type dummyPeerList struct {
26+
updates []peer.ListUpdates
27+
errToReturn error
28+
}
29+
30+
func (d *dummyPeerList) Update(updates peer.ListUpdates) error {
31+
d.updates = append(d.updates, updates)
32+
return d.errToReturn
33+
}
34+
35+
func TestDNSUpdater_Refresh(t *testing.T) {
36+
tests := []struct {
37+
name string
38+
initialPeers []string
39+
newPeers []string
40+
currentPeers map[string]struct{}
41+
wantChanged bool
42+
wantNewPeers map[string]struct{}
43+
wantAdditions []string
44+
wantRemovals []string
45+
wantDNSFailure bool
46+
}{
47+
{
48+
name: "Initial state: two peers",
49+
initialPeers: nil,
50+
newPeers: []string{"10.0.0.1", "10.0.0.2"},
51+
currentPeers: nil,
52+
wantChanged: true,
53+
wantNewPeers: map[string]struct{}{
54+
"10.0.0.1:1234": {},
55+
"10.0.0.2:1234": {},
56+
},
57+
wantAdditions: []string{"10.0.0.1:1234", "10.0.0.2:1234"},
58+
wantRemovals: nil,
59+
},
60+
{
61+
name: "Change DNS, one removed, one added, one persistent",
62+
initialPeers: []string{"10.0.0.1", "10.0.0.2"},
63+
newPeers: []string{"10.0.0.2", "10.0.0.3"},
64+
currentPeers: map[string]struct{}{
65+
"10.0.0.1:1234": {},
66+
"10.0.0.2:1234": {},
67+
},
68+
wantChanged: true,
69+
wantNewPeers: map[string]struct{}{
70+
"10.0.0.2:1234": {},
71+
"10.0.0.3:1234": {},
72+
},
73+
wantAdditions: []string{"10.0.0.3:1234"},
74+
wantRemovals: []string{"10.0.0.1:1234"},
75+
},
76+
{
77+
name: "No changes",
78+
initialPeers: []string{"10.0.0.2", "10.0.0.3"},
79+
newPeers: []string{"10.0.0.2", "10.0.0.3"},
80+
currentPeers: map[string]struct{}{
81+
"10.0.0.2:1234": {},
82+
"10.0.0.3:1234": {},
83+
},
84+
wantChanged: false,
85+
wantNewPeers: map[string]struct{}{
86+
"10.0.0.2:1234": {},
87+
"10.0.0.3:1234": {},
88+
},
89+
wantAdditions: nil,
90+
wantRemovals: nil,
91+
},
92+
{
93+
name: "DNS failure",
94+
initialPeers: []string{"10.0.0.1", "10.0.0.2"},
95+
newPeers: nil,
96+
currentPeers: map[string]struct{}{"10.0.0.1:1234": {}, "10.0.0.2:1234": {}},
97+
wantChanged: false,
98+
wantNewPeers: nil,
99+
wantAdditions: nil,
100+
wantRemovals: nil,
101+
wantDNSFailure: true,
102+
},
103+
}
104+
105+
for _, tt := range tests {
106+
t.Run(tt.name, func(t *testing.T) {
107+
mockResolver := &mockDNSHostResolver{}
108+
dnsAddr := "test.service.com:1234"
109+
interval := 1 * time.Second
110+
logger := log.NewNoop()
111+
testList := &dummyPeerList{}
112+
113+
updater, err := newDNSUpdater(testList, dnsAddr, interval, logger)
114+
require.NoError(t, err, "should create dnsUpdater")
115+
updater.resolver = mockResolver
116+
117+
// Optionally set initial currentPeers
118+
if tt.currentPeers != nil {
119+
updater.currentPeers = tt.currentPeers
120+
}
121+
122+
if tt.wantDNSFailure {
123+
mockResolver.errToReturn = errors.New("mock DNS error")
124+
} else {
125+
mockResolver.errToReturn = nil
126+
mockResolver.addrsToReturn = tt.newPeers
127+
}
128+
129+
res, err := updater.refresh()
130+
131+
if tt.wantDNSFailure {
132+
require.Error(t, err)
133+
assert.Nil(t, res)
134+
} else {
135+
require.NoError(t, err)
136+
assert.Equal(t, tt.wantChanged, res.changed)
137+
assert.Equal(t, tt.wantNewPeers, res.newPeers)
138+
actualAdditions := identifiersToStringList(res.updates.Additions)
139+
actualRemovals := identifiersToStringList(res.updates.Removals)
140+
assert.ElementsMatch(t, tt.wantAdditions, actualAdditions)
141+
assert.ElementsMatch(t, tt.wantRemovals, actualRemovals)
142+
}
143+
})
144+
}
145+
}
146+
147+
func TestDNSUpdater_UpdaterFailure(t *testing.T) {
148+
mockResolver := &mockDNSHostResolver{
149+
addrsToReturn: []string{"10.0.0.3", "10.0.0.4"},
150+
}
151+
logger := log.NewNoop()
152+
testList := &dummyPeerList{errToReturn: errors.New("mock updater error")}
153+
154+
updater, err := newDNSUpdater(testList, "host.test:1234", 100*time.Millisecond, logger)
155+
require.NoError(t, err)
156+
updater.resolver = mockResolver
157+
updater.currentPeers = map[string]struct{}{
158+
"10.0.0.1:1234": {},
159+
"10.0.0.2:1234": {},
160+
}
161+
162+
updater.Start()
163+
defer updater.Stop()
164+
165+
time.Sleep(1 * time.Second)
166+
assert.Equal(t, map[string]struct{}{"10.0.0.1:1234": {}, "10.0.0.2:1234": {}}, updater.currentPeers)
167+
}

0 commit comments

Comments
 (0)