Skip to content

Commit d7a246b

Browse files
committed
cascading smigrated will trigger multiple reloads
1 parent b3a3bdd commit d7a246b

File tree

2 files changed

+229
-7
lines changed

2 files changed

+229
-7
lines changed

osscluster.go

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -946,8 +946,9 @@ func (c *clusterState) slotNodes(slot int) []*clusterNode {
946946
type clusterStateHolder struct {
947947
load func(ctx context.Context) (*clusterState, error)
948948

949-
state atomic.Value
950-
reloading uint32 // atomic
949+
state atomic.Value
950+
reloading uint32 // atomic
951+
reloadPending uint32 // atomic - set to 1 when reload is requested during active reload
951952
}
952953

953954
func newClusterStateHolder(fn func(ctx context.Context) (*clusterState, error)) *clusterStateHolder {
@@ -966,17 +967,36 @@ func (c *clusterStateHolder) Reload(ctx context.Context) (*clusterState, error)
966967
}
967968

968969
func (c *clusterStateHolder) LazyReload() {
970+
// If already reloading, mark that another reload is pending
969971
if !atomic.CompareAndSwapUint32(&c.reloading, 0, 1) {
972+
atomic.StoreUint32(&c.reloadPending, 1)
970973
return
971974
}
975+
972976
go func() {
973-
defer atomic.StoreUint32(&c.reloading, 0)
977+
for {
978+
_, err := c.Reload(context.Background())
979+
if err != nil {
980+
atomic.StoreUint32(&c.reloading, 0)
981+
return
982+
}
974983

975-
_, err := c.Reload(context.Background())
976-
if err != nil {
977-
return
984+
// Clear pending flag after reload completes, before cooldown
985+
// This captures notifications that arrived during the reload
986+
atomic.StoreUint32(&c.reloadPending, 0)
987+
988+
// Wait cooldown period
989+
time.Sleep(200 * time.Millisecond)
990+
991+
// Check if another reload was requested during cooldown
992+
if atomic.LoadUint32(&c.reloadPending) == 0 {
993+
// No pending reload, we're done
994+
atomic.StoreUint32(&c.reloading, 0)
995+
return
996+
}
997+
998+
// Pending reload requested, loop to reload again
978999
}
979-
time.Sleep(200 * time.Millisecond)
9801000
}()
9811001
}
9821002

osscluster_lazy_reload_test.go

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
package redis
2+
3+
import (
4+
"context"
5+
"sync/atomic"
6+
"testing"
7+
"time"
8+
)
9+
10+
// TestLazyReloadQueueBehavior tests that LazyReload properly queues reload requests
11+
func TestLazyReloadQueueBehavior(t *testing.T) {
12+
t.Run("SingleReload", func(t *testing.T) {
13+
var reloadCount atomic.Int32
14+
15+
holder := newClusterStateHolder(func(ctx context.Context) (*clusterState, error) {
16+
reloadCount.Add(1)
17+
time.Sleep(50 * time.Millisecond) // Simulate reload work
18+
return &clusterState{}, nil
19+
})
20+
21+
// Trigger one reload
22+
holder.LazyReload()
23+
24+
// Wait for reload to complete
25+
time.Sleep(300 * time.Millisecond)
26+
27+
if count := reloadCount.Load(); count != 1 {
28+
t.Errorf("Expected 1 reload, got %d", count)
29+
}
30+
})
31+
32+
t.Run("ConcurrentReloadsDeduplication", func(t *testing.T) {
33+
var reloadCount atomic.Int32
34+
35+
holder := newClusterStateHolder(func(ctx context.Context) (*clusterState, error) {
36+
reloadCount.Add(1)
37+
time.Sleep(50 * time.Millisecond) // Simulate reload work
38+
return &clusterState{}, nil
39+
})
40+
41+
// Trigger multiple reloads concurrently
42+
for i := 0; i < 10; i++ {
43+
go holder.LazyReload()
44+
}
45+
46+
// Wait for all to complete
47+
time.Sleep(100 * time.Millisecond)
48+
49+
// Should only reload once (all concurrent calls deduplicated)
50+
if count := reloadCount.Load(); count != 1 {
51+
t.Errorf("Expected 1 reload (deduplication), got %d", count)
52+
}
53+
})
54+
55+
t.Run("PendingReloadDuringCooldown", func(t *testing.T) {
56+
var reloadCount atomic.Int32
57+
58+
holder := newClusterStateHolder(func(ctx context.Context) (*clusterState, error) {
59+
reloadCount.Add(1)
60+
time.Sleep(10 * time.Millisecond) // Simulate reload work
61+
return &clusterState{}, nil
62+
})
63+
64+
// Trigger first reload
65+
holder.LazyReload()
66+
67+
// Wait for reload to complete but still in cooldown
68+
time.Sleep(50 * time.Millisecond)
69+
70+
// Trigger second reload during cooldown period
71+
holder.LazyReload()
72+
73+
// Wait for second reload to complete
74+
time.Sleep(300 * time.Millisecond)
75+
76+
// Should have reloaded twice (second request queued and executed)
77+
if count := reloadCount.Load(); count != 2 {
78+
t.Errorf("Expected 2 reloads (queued during cooldown), got %d", count)
79+
}
80+
})
81+
82+
t.Run("MultiplePendingReloadsCollapsed", func(t *testing.T) {
83+
var reloadCount atomic.Int32
84+
85+
holder := newClusterStateHolder(func(ctx context.Context) (*clusterState, error) {
86+
reloadCount.Add(1)
87+
time.Sleep(10 * time.Millisecond) // Simulate reload work
88+
return &clusterState{}, nil
89+
})
90+
91+
// Trigger first reload
92+
holder.LazyReload()
93+
94+
// Wait for reload to start
95+
time.Sleep(5 * time.Millisecond)
96+
97+
// Trigger multiple reloads during active reload + cooldown
98+
for i := 0; i < 10; i++ {
99+
holder.LazyReload()
100+
time.Sleep(5 * time.Millisecond)
101+
}
102+
103+
// Wait for all to complete
104+
time.Sleep(400 * time.Millisecond)
105+
106+
// Should have reloaded exactly twice:
107+
// 1. Initial reload
108+
// 2. One more reload for all the pending requests (collapsed into one)
109+
if count := reloadCount.Load(); count != 2 {
110+
t.Errorf("Expected 2 reloads (initial + collapsed pending), got %d", count)
111+
}
112+
})
113+
114+
t.Run("ReloadAfterCooldownPeriod", func(t *testing.T) {
115+
var reloadCount atomic.Int32
116+
117+
holder := newClusterStateHolder(func(ctx context.Context) (*clusterState, error) {
118+
reloadCount.Add(1)
119+
time.Sleep(10 * time.Millisecond) // Simulate reload work
120+
return &clusterState{}, nil
121+
})
122+
123+
// Trigger first reload
124+
holder.LazyReload()
125+
126+
// Wait for reload + cooldown to complete
127+
time.Sleep(300 * time.Millisecond)
128+
129+
// Trigger second reload after cooldown
130+
holder.LazyReload()
131+
132+
// Wait for second reload to complete
133+
time.Sleep(300 * time.Millisecond)
134+
135+
// Should have reloaded twice (separate reload cycles)
136+
if count := reloadCount.Load(); count != 2 {
137+
t.Errorf("Expected 2 reloads (separate cycles), got %d", count)
138+
}
139+
})
140+
141+
t.Run("ErrorDuringReload", func(t *testing.T) {
142+
var reloadCount atomic.Int32
143+
var shouldFail atomic.Bool
144+
shouldFail.Store(true)
145+
146+
holder := newClusterStateHolder(func(ctx context.Context) (*clusterState, error) {
147+
reloadCount.Add(1)
148+
if shouldFail.Load() {
149+
return nil, context.DeadlineExceeded
150+
}
151+
return &clusterState{}, nil
152+
})
153+
154+
// Trigger reload that will fail
155+
holder.LazyReload()
156+
157+
// Wait for failed reload
158+
time.Sleep(50 * time.Millisecond)
159+
160+
// Trigger another reload (should succeed now)
161+
shouldFail.Store(false)
162+
holder.LazyReload()
163+
164+
// Wait for successful reload
165+
time.Sleep(300 * time.Millisecond)
166+
167+
// Should have attempted reload twice (first failed, second succeeded)
168+
if count := reloadCount.Load(); count != 2 {
169+
t.Errorf("Expected 2 reload attempts, got %d", count)
170+
}
171+
})
172+
173+
t.Run("CascadingSMigratedScenario", func(t *testing.T) {
174+
// Simulate the real-world scenario: multiple SMIGRATED notifications
175+
// arriving in quick succession from different node clients
176+
var reloadCount atomic.Int32
177+
178+
holder := newClusterStateHolder(func(ctx context.Context) (*clusterState, error) {
179+
reloadCount.Add(1)
180+
time.Sleep(20 * time.Millisecond) // Simulate realistic reload time
181+
return &clusterState{}, nil
182+
})
183+
184+
// Simulate 5 SMIGRATED notifications arriving within 100ms
185+
for i := 0; i < 5; i++ {
186+
go holder.LazyReload()
187+
time.Sleep(20 * time.Millisecond)
188+
}
189+
190+
// Wait for all reloads to complete
191+
time.Sleep(500 * time.Millisecond)
192+
193+
// Should reload at most 2 times:
194+
// 1. First notification triggers reload
195+
// 2. Notifications 2-5 collapse into one pending reload
196+
count := reloadCount.Load()
197+
if count < 1 || count > 2 {
198+
t.Errorf("Expected 1-2 reloads for cascading scenario, got %d", count)
199+
}
200+
})
201+
}
202+

0 commit comments

Comments
 (0)