Skip to content

Commit 90b3c14

Browse files
committed
Merge remote-tracking branch 'origin/master' into arize-dev/50458f1-from-0f3a43-in-upstream
2 parents 4afa6e9 + 50458f1 commit 90b3c14

20 files changed

+1046
-369
lines changed

allocator/alloc_state.go

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package allocator
22

33
import (
4+
"cmp"
45
"hash/crc64"
56
"math"
7+
"slices"
68
"strconv"
79
"strings"
810

@@ -51,6 +53,13 @@ type State struct {
5153
// These share cardinality with |Members|.
5254
MemberTotalCount []int
5355
MemberPrimaryCount []int
56+
57+
// Number of item slots to shed from each member's ItemLimit, reducing its
58+
// effective capacity to ItemLimit - ShedCapacity. Exiting members are granted
59+
// ShedCapacity from available excess cluster capacity, ordered by age
60+
// (CreateRevision), so that the oldest exiting members drain first.
61+
// Shares cardinality with |Members|.
62+
ShedCapacity []int
5463
}
5564

5665
// NewObservedState returns a *State instance which extracts and updates itself
@@ -89,12 +98,15 @@ func (s *State) observe() {
8998
s.NetworkHash = 0
9099
s.MemberTotalCount = make([]int, len(s.Members))
91100
s.MemberPrimaryCount = make([]int, len(s.Members))
101+
s.ShedCapacity = make([]int, len(s.Members))
92102

93103
// Walk Members to:
94104
// * Group the set of ordered |Zones| across all Members.
95105
// * Initialize |ZoneSlots|.
96106
// * Initialize |MemberSlots|.
97107
// * Initialize |NetworkHash|.
108+
// * Collect indices of exiting members.
109+
var exiting []int
98110
for i := range s.Members {
99111
var m = memberAt(s.Members, i)
100112
var slots = m.ItemLimit()
@@ -114,6 +126,10 @@ func (s *State) observe() {
114126

115127
s.MemberSlots += slots
116128
s.NetworkHash = foldCRC(s.NetworkHash, s.Members[i].Raw.Key, slots)
129+
130+
if m.IsExiting() {
131+
exiting = append(exiting, i)
132+
}
117133
}
118134

119135
// Fetch |localMember| identified by |LocalKey|.
@@ -135,11 +151,14 @@ func (s *State) observe() {
135151
return strings.Compare(itemAt(s.Items, l).ID, assignmentAt(s.Assignments, r).ItemID)
136152
},
137153
}
154+
155+
var maxReplicationFactor int
138156
for cur, ok := it.Next(); ok; cur, ok = it.Next() {
139157
var item = itemAt(s.Items, cur.Left)
140158
var slots = item.DesiredReplication()
141159

142160
s.ItemSlots += slots
161+
maxReplicationFactor = max(maxReplicationFactor, slots)
143162
s.NetworkHash = foldCRC(s.NetworkHash, s.Items[cur.Left].Raw.Key, slots)
144163

145164
for r := cur.RightBegin; r != cur.RightEnd; r++ {
@@ -161,11 +180,39 @@ func (s *State) observe() {
161180
}
162181
}
163182
}
183+
184+
// Compute ShedCapacity for exiting members. ShedCapacity is granted to
185+
// exiting members, oldest first, up to each member's ItemLimit.
186+
// Excess slots available to grant as ShedCapacity. When the cluster is
187+
// overloaded (MemberSlots < ItemSlots), there is no excess to shed.
188+
var excessSlots = max(0, s.MemberSlots-s.ItemSlots)
189+
slices.SortFunc(exiting, func(a, b int) int {
190+
return cmp.Compare(s.Members[a].Raw.CreateRevision, s.Members[b].Raw.CreateRevision)
191+
})
192+
193+
// We must retain at least maxReplicationFactor members at full capacity
194+
// to satisfy replication requirements. This is a numerical bound, not
195+
// zone-aware: we shed the oldest members first, expecting that replacement
196+
// members will restore zone diversity as they join.
197+
var maxShedding = len(s.Members) - maxReplicationFactor
198+
if maxShedding < 0 {
199+
maxShedding = 0
200+
}
201+
for n, i := range exiting {
202+
if n == maxShedding || excessSlots == 0 {
203+
break
204+
}
205+
var shed = min(memberAt(s.Members, i).ItemLimit(), excessSlots)
206+
s.ShedCapacity[i] = shed
207+
s.MemberSlots -= shed
208+
excessSlots -= shed
209+
s.NetworkHash = foldCRC(s.NetworkHash, s.Members[i].Raw.Key, shed)
210+
}
164211
}
165212

166213
// shouldExit returns true iff the local Member is able to safely exit.
167214
func (s *State) shouldExit() bool {
168-
return memberAt(s.Members, s.LocalMemberInd).ItemLimit() == 0 && len(s.LocalItems) == 0
215+
return memberAt(s.Members, s.LocalMemberInd).IsExiting() && len(s.LocalItems) == 0
169216
}
170217

171218
// isLeader returns true iff the local Member key is ordered first on
@@ -201,16 +248,22 @@ func (s *State) debugLog() {
201248
}).Info("extracted State")
202249
}
203250

251+
// memberEffectiveLimit returns the effective item limit for the member at
252+
// index |ind|, accounting for any ShedCapacity granted to exiting members.
253+
func (s *State) memberEffectiveLimit(ind int) int {
254+
return memberAt(s.Members, ind).ItemLimit() - s.ShedCapacity[ind]
255+
}
256+
204257
// memberLoadRatio maps an |assignment| to a Member "load ratio". Given all
205258
// |Members| and their corresponding |counts| (1:1 with |Members|),
206259
// memberLoadRatio maps |assignment| to a Member and, if found, returns the
207-
// ratio of the Member's index in |counts| to the Member's ItemLimit. If the
208-
// Member is not found, infinity is returned.
260+
// ratio of the Member's index in |counts| to the Member's effective item
261+
// limit. If the Member is not found, infinity is returned.
209262
func (s *State) memberLoadRatio(assignment keyspace.KeyValue, counts []int) float32 {
210263
var a = assignment.Decoded.(Assignment)
211264

212265
if ind, found := s.Members.Search(MemberKey(s.KS, a.MemberZone, a.MemberSuffix)); found {
213-
return float32(counts[ind]) / float32(memberAt(s.Members, ind).ItemLimit())
266+
return float32(counts[ind]) / float32(s.memberEffectiveLimit(ind))
214267
}
215268
return math.MaxFloat32
216269
}

allocator/alloc_state_test.go

Lines changed: 200 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ func (s *AllocStateSuite) TestExitCondition(c *gc.C) {
109109
buildAllocKeySpaceFixture(c, ctx, client)
110110
defer etcdtest.Cleanup()
111111

112-
var _, err = client.Put(ctx, "/root/members/us-east#allowed-to-exit", `{"R": 0}`)
112+
var _, err = client.Put(ctx, "/root/members/us-east#allowed-to-exit", `{"R": 0, "E": true}`)
113113
c.Assert(err, gc.IsNil)
114114

115115
var ks = NewAllocatorKeySpace("/root", testAllocDecoder{})
@@ -126,7 +126,7 @@ func (s *AllocStateSuite) TestExitCondition(c *gc.C) {
126126
c.Check(states[1].shouldExit(), gc.Equals, true)
127127

128128
// While we're at it, expect |NetworkHash| changed with the new member.
129-
c.Check(states[0].NetworkHash, gc.Equals, uint64(0xfce0237931d8c200))
129+
c.Check(states[0].NetworkHash, gc.Equals, uint64(0x554c9f4e9605a7a1))
130130
}
131131

132132
func (s *AllocStateSuite) TestLoadRatio(c *gc.C) {
@@ -153,6 +153,204 @@ func (s *AllocStateSuite) TestLoadRatio(c *gc.C) {
153153
}
154154
}
155155

156+
func (s *AllocStateSuite) TestLoadRatioWithShedding(c *gc.C) {
157+
var client, ctx = etcdtest.TestClient(), context.Background()
158+
defer etcdtest.Cleanup()
159+
160+
// m1 is exiting with R:5. m2 is not exiting with R:5.
161+
// 2 items at R:1. MemberSlots=10, ItemSlots=2, excessSlots=8.
162+
// m1 sheds its full capacity (5), so its effective limit is 0.
163+
for _, kv := range [][2]string{
164+
{"/root/items/item-a", `{"R": 1}`},
165+
{"/root/items/item-b", `{"R": 1}`},
166+
167+
{"/root/members/zone-a#m1", `{"R": 5, "E": true}`},
168+
{"/root/members/zone-a#m2", `{"R": 5, "E": false}`},
169+
170+
{"/root/assign/item-a#zone-a#m1#0", `consistent`},
171+
{"/root/assign/item-b#zone-a#m2#0", `consistent`},
172+
} {
173+
var _, err = client.Put(ctx, kv[0], kv[1])
174+
c.Assert(err, gc.IsNil)
175+
}
176+
177+
var ks = NewAllocatorKeySpace("/root", testAllocDecoder{})
178+
var state = NewObservedState(ks, MemberKey(ks, "zone-a", "m1"), isConsistent)
179+
c.Check(ks.Load(ctx, client, 0), gc.IsNil)
180+
181+
c.Check(state.ShedCapacity[0], gc.Equals, 5) // m1 fully shed.
182+
c.Check(state.ShedCapacity[1], gc.Equals, 0)
183+
184+
// m1 has 1 assignment but effective capacity 0: load ratio is +Inf.
185+
c.Check(state.memberLoadRatio(state.Assignments[0], state.MemberTotalCount), gc.Equals, float32(math.Inf(1)))
186+
// m2 has 1 assignment and effective capacity 5: load ratio is 1/5.
187+
c.Check(state.memberLoadRatio(state.Assignments[1], state.MemberTotalCount), gc.Equals, float32(1.0/5.0))
188+
}
189+
190+
func (s *AllocStateSuite) TestShedCapacityAllAtOnce(c *gc.C) {
191+
var client, ctx = etcdtest.TestClient(), context.Background()
192+
defer etcdtest.Cleanup()
193+
194+
// 5 members, capacity 5 each. MemberSlots = 25.
195+
// 2 items at R:1. ItemSlots = 2. excessSlots = 23.
196+
// maxReplicationFactor = 1, maxShedding = 4.
197+
// Both exiting members shed their full capacity.
198+
for _, kv := range [][2]string{
199+
{"/root/items/item-a", `{"R": 1}`},
200+
{"/root/items/item-b", `{"R": 1}`},
201+
202+
{"/root/members/zone-a#m1", `{"R": 5, "E": true}`},
203+
{"/root/members/zone-a#m2", `{"R": 5, "E": true}`},
204+
{"/root/members/zone-a#m3", `{"R": 5, "E": false}`},
205+
{"/root/members/zone-b#m4", `{"R": 5, "E": false}`},
206+
{"/root/members/zone-b#m5", `{"R": 5, "E": false}`},
207+
208+
{"/root/assign/item-a#zone-a#m1#0", `consistent`},
209+
{"/root/assign/item-b#zone-b#m4#0", `consistent`},
210+
} {
211+
var _, err = client.Put(ctx, kv[0], kv[1])
212+
c.Assert(err, gc.IsNil)
213+
}
214+
215+
var ks = NewAllocatorKeySpace("/root", testAllocDecoder{})
216+
var state = NewObservedState(ks, MemberKey(ks, "zone-a", "m1"), isConsistent)
217+
c.Check(ks.Load(ctx, client, 0), gc.IsNil)
218+
219+
c.Check(state.ShedCapacity[0], gc.Equals, 5) // m1 sheds fully.
220+
c.Check(state.ShedCapacity[1], gc.Equals, 5) // m2 sheds fully.
221+
c.Check(state.ShedCapacity[2], gc.Equals, 0)
222+
c.Check(state.ShedCapacity[3], gc.Equals, 0)
223+
c.Check(state.ShedCapacity[4], gc.Equals, 0)
224+
}
225+
226+
func (s *AllocStateSuite) TestShedCapacityReplicationConstrained(c *gc.C) {
227+
var client, ctx = etcdtest.TestClient(), context.Background()
228+
defer etcdtest.Cleanup()
229+
230+
// 4 members, capacity 10 each. MemberSlots = 40.
231+
// 3 items: R:3, R:2, R:1. ItemSlots = 6. excessSlots = 34.
232+
// maxReplicationFactor = 3, maxShedding = 1.
233+
// m1 and m2 are both exiting, but only the oldest (m1) sheds.
234+
for _, kv := range [][2]string{
235+
{"/root/items/item-a", `{"R": 3}`},
236+
{"/root/items/item-b", `{"R": 2}`},
237+
{"/root/items/item-c", `{"R": 1}`},
238+
239+
{"/root/members/zone-a#m1", `{"R": 10, "E": true}`},
240+
{"/root/members/zone-a#m2", `{"R": 10, "E": true}`},
241+
{"/root/members/zone-b#m3", `{"R": 10, "E": false}`},
242+
{"/root/members/zone-b#m4", `{"R": 10, "E": false}`},
243+
244+
{"/root/assign/item-a#zone-a#m1#0", `consistent`},
245+
{"/root/assign/item-a#zone-a#m2#1", `consistent`},
246+
{"/root/assign/item-a#zone-b#m3#2", `consistent`},
247+
{"/root/assign/item-b#zone-a#m1#0", `consistent`},
248+
{"/root/assign/item-b#zone-b#m4#1", `consistent`},
249+
{"/root/assign/item-c#zone-b#m3#0", `consistent`},
250+
} {
251+
var _, err = client.Put(ctx, kv[0], kv[1])
252+
c.Assert(err, gc.IsNil)
253+
}
254+
255+
var ks = NewAllocatorKeySpace("/root", testAllocDecoder{})
256+
var state = NewObservedState(ks, MemberKey(ks, "zone-a", "m1"), isConsistent)
257+
c.Check(ks.Load(ctx, client, 0), gc.IsNil)
258+
259+
c.Check(state.ShedCapacity[0], gc.Equals, 10) // m1 sheds fully (oldest).
260+
c.Check(state.ShedCapacity[1], gc.Equals, 0) // m2 blocked by maxShedding.
261+
c.Check(state.ShedCapacity[2], gc.Equals, 0)
262+
c.Check(state.ShedCapacity[3], gc.Equals, 0)
263+
}
264+
265+
func (s *AllocStateSuite) TestShedCapacityExcessConstrained(c *gc.C) {
266+
var client, ctx = etcdtest.TestClient(), context.Background()
267+
defer etcdtest.Cleanup()
268+
269+
// 4 members, capacity 3 each. MemberSlots = 12.
270+
// 5 items at R:1. ItemSlots = 5. excessSlots = 7.
271+
// maxReplicationFactor = 1, maxShedding = 3.
272+
// 3 exiting members need 9 total shed, but only 7 excess available.
273+
// m1: sheds 3 (full), m2: sheds 3 (full), m3: sheds 1 (partial).
274+
for _, kv := range [][2]string{
275+
{"/root/items/item-a", `{"R": 1}`},
276+
{"/root/items/item-b", `{"R": 1}`},
277+
{"/root/items/item-c", `{"R": 1}`},
278+
{"/root/items/item-d", `{"R": 1}`},
279+
{"/root/items/item-e", `{"R": 1}`},
280+
281+
{"/root/members/zone-a#m1", `{"R": 3, "E": true}`},
282+
{"/root/members/zone-a#m2", `{"R": 3, "E": true}`},
283+
{"/root/members/zone-b#m3", `{"R": 3, "E": true}`},
284+
{"/root/members/zone-b#m4", `{"R": 3, "E": false}`},
285+
286+
{"/root/assign/item-a#zone-a#m1#0", `consistent`},
287+
{"/root/assign/item-b#zone-a#m1#0", `consistent`},
288+
{"/root/assign/item-c#zone-a#m2#0", `consistent`},
289+
{"/root/assign/item-d#zone-b#m3#0", `consistent`},
290+
{"/root/assign/item-e#zone-b#m4#0", `consistent`},
291+
} {
292+
var _, err = client.Put(ctx, kv[0], kv[1])
293+
c.Assert(err, gc.IsNil)
294+
}
295+
296+
var ks = NewAllocatorKeySpace("/root", testAllocDecoder{})
297+
var state = NewObservedState(ks, MemberKey(ks, "zone-a", "m1"), isConsistent)
298+
c.Check(ks.Load(ctx, client, 0), gc.IsNil)
299+
300+
c.Check(state.ShedCapacity[0], gc.Equals, 3) // m1 sheds fully.
301+
c.Check(state.ShedCapacity[1], gc.Equals, 3) // m2 sheds fully.
302+
c.Check(state.ShedCapacity[2], gc.Equals, 1) // m3 gets remaining excess.
303+
c.Check(state.ShedCapacity[3], gc.Equals, 0) // non-exiting.
304+
}
305+
306+
func (s *AllocStateSuite) TestShedCapacityOverloadedCluster(c *gc.C) {
307+
var client, ctx = etcdtest.TestClient(), context.Background()
308+
defer etcdtest.Cleanup()
309+
310+
// 2 members, capacity 2 each. MemberSlots = 4.
311+
// 5 items at R:1. ItemSlots = 5. excessSlots = -1 (clamped to 0).
312+
// No shedding is possible because the cluster is overloaded.
313+
for _, kv := range [][2]string{
314+
{"/root/items/item-a", `{"R": 1}`},
315+
{"/root/items/item-b", `{"R": 1}`},
316+
{"/root/items/item-c", `{"R": 1}`},
317+
{"/root/items/item-d", `{"R": 1}`},
318+
{"/root/items/item-e", `{"R": 1}`},
319+
320+
{"/root/members/zone-a#m1", `{"R": 2, "E": true}`},
321+
{"/root/members/zone-b#m2", `{"R": 2, "E": false}`},
322+
323+
{"/root/assign/item-a#zone-a#m1#0", `consistent`},
324+
{"/root/assign/item-b#zone-a#m1#0", `consistent`},
325+
{"/root/assign/item-c#zone-b#m2#0", `consistent`},
326+
{"/root/assign/item-d#zone-b#m2#0", `consistent`},
327+
} {
328+
var _, err = client.Put(ctx, kv[0], kv[1])
329+
c.Assert(err, gc.IsNil)
330+
}
331+
332+
var ks = NewAllocatorKeySpace("/root", testAllocDecoder{})
333+
var state = NewObservedState(ks, MemberKey(ks, "zone-a", "m1"), isConsistent)
334+
c.Check(ks.Load(ctx, client, 0), gc.IsNil)
335+
336+
c.Check(state.ShedCapacity[0], gc.Equals, 0) // m1: no excess to shed.
337+
c.Check(state.ShedCapacity[1], gc.Equals, 0)
338+
}
339+
340+
func (s *AllocStateSuite) TestShedCapacityNoExiting(c *gc.C) {
341+
var client, ctx = etcdtest.TestClient(), context.Background()
342+
defer etcdtest.Cleanup()
343+
buildAllocKeySpaceFixture(c, ctx, client)
344+
345+
var ks = NewAllocatorKeySpace("/root", testAllocDecoder{})
346+
var state = NewObservedState(ks, MemberKey(ks, "us-west", "baz"), isConsistent)
347+
c.Check(ks.Load(ctx, client, 0), gc.IsNil)
348+
349+
for i := range state.ShedCapacity {
350+
c.Check(state.ShedCapacity[i], gc.Equals, 0)
351+
}
352+
}
353+
156354
var _ = gc.Suite(&AllocStateSuite{})
157355

158356
func TestMain(m *testing.M) { etcdtest.TestMainWithEtcd(m) }

allocator/allocator_key_space.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ const (
3434
type MemberValue interface {
3535
// ItemLimit is the maximum number of Items this Member may be assigned.
3636
ItemLimit() int
37+
// IsExiting returns true if this Member has been signaled to exit.
38+
IsExiting() bool
3739
}
3840

3941
// ItemValue is a user-defined Item representation which also supports required

allocator/allocator_key_space_test.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -354,11 +354,16 @@ func isConsistent(_ Item, assignment keyspace.KeyValue, allAssignments keyspace.
354354
return assignment.Decoded.(Assignment).AssignmentValue.(testAssignment).consistent
355355
}
356356

357-
type testMember struct{ R int }
357+
type testMember struct {
358+
R int // ItemLimit
359+
E bool // Exiting
360+
}
358361

359-
func (m testMember) ItemLimit() int { return m.R }
360-
func (m testMember) Validate() error { return nil }
361-
func (m *testMember) ZeroLimit() { m.R = 0 }
362+
func (m testMember) ItemLimit() int { return m.R }
363+
func (m testMember) Validate() error { return nil }
364+
// TODO(whb): Zero'ing R is for backward compatibility; remove once deployment is complete.
365+
func (m *testMember) SetExiting() { m.E = true; m.R = 0 }
366+
func (m testMember) IsExiting() bool { return m.E }
362367

363368
func (m *testMember) MarshalString() string {
364369
if b, err := json.Marshal(m); err != nil {

0 commit comments

Comments
 (0)