Skip to content

Commit 1249e4e

Browse files
authored
Support for creating ephemeral shards (#7209)
What changed? We now support ephemeral shards Shards are created if they do not already exist. An executors shards are no longer deleted when the executor disappears We now load the set of shards from the database for ephemeral shards Future PRs will: Support removing shards when they are no longer needed Support the executor library to heartbeat immediately if an unknown shard is requested (so we do not have to wait for the next heartbeat to process shard requests) Why? We need to support ephemeral shards for use cases where the set of shard is changing (e.g. Cadence Matching) How did you test it? Potential risks Release notes Documentation Changes
1 parent e774d17 commit 1249e4e

File tree

6 files changed

+196
-114
lines changed

6 files changed

+196
-114
lines changed

service/sharddistributor/handler/handler.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"context"
2727
"errors"
2828
"fmt"
29+
"math"
2930
"slices"
3031
"sync"
3132

@@ -92,6 +93,10 @@ func (h *handlerImpl) GetShardOwner(ctx context.Context, request *types.GetShard
9293

9394
executorID, err := h.storage.GetShardOwner(ctx, request.Namespace, request.ShardKey)
9495
if errors.Is(err, store.ErrShardNotFound) {
96+
if h.shardDistributionCfg.Namespaces[namespaceIdx].Type == config.NamespaceTypeEphemeral {
97+
return h.assignEphemeralShard(ctx, request.Namespace, request.ShardKey)
98+
}
99+
95100
return nil, &types.ShardNotFoundError{
96101
Namespace: request.Namespace,
97102
ShardKey: request.ShardKey,
@@ -108,3 +113,33 @@ func (h *handlerImpl) GetShardOwner(ctx context.Context, request *types.GetShard
108113

109114
return resp, nil
110115
}
116+
117+
func (h *handlerImpl) assignEphemeralShard(ctx context.Context, namespace string, shardID string) (*types.GetShardOwnerResponse, error) {
118+
119+
// Get the current state of the namespace and find the executor with the least assigned shards
120+
state, err := h.storage.GetState(ctx, namespace)
121+
if err != nil {
122+
return nil, fmt.Errorf("get state: %w", err)
123+
}
124+
125+
var executor string
126+
minAssignedShards := math.MaxInt
127+
128+
for assignedExecutor, assignment := range state.ShardAssignments {
129+
if len(assignment.AssignedShards) < minAssignedShards {
130+
minAssignedShards = len(assignment.AssignedShards)
131+
executor = assignedExecutor
132+
}
133+
}
134+
135+
// Assign the shard to the executor with the least assigned shards
136+
err = h.storage.AssignShard(ctx, namespace, shardID, executor)
137+
if err != nil {
138+
return nil, fmt.Errorf("assign ephemeral shard: %w", err)
139+
}
140+
141+
return &types.GetShardOwnerResponse{
142+
Owner: executor,
143+
Namespace: namespace,
144+
}, nil
145+
}

service/sharddistributor/handler/handler_test.go

Lines changed: 95 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,22 @@ import (
3737
)
3838

3939
const (
40-
_testNamespace = "test-matching"
40+
_testNamespaceFixed = "test-fixed"
41+
_testNamespaceEphemeral = "test-ephemeral"
4142
)
4243

4344
func TestGetShardOwner(t *testing.T) {
4445
cfg := config.ShardDistribution{
4546
Enabled: true,
4647
Namespaces: []config.Namespace{
4748
{
48-
Name: _testNamespace,
49+
Name: _testNamespaceFixed,
50+
Type: config.NamespaceTypeFixed,
51+
ShardNum: 32,
52+
},
53+
{
54+
Name: _testNamespaceEphemeral,
55+
Type: config.NamespaceTypeEphemeral,
4956
},
5057
},
5158
}
@@ -59,50 +66,119 @@ func TestGetShardOwner(t *testing.T) {
5966
expectedErrMsg string
6067
}{
6168
{
62-
name: "Existing_Success",
69+
name: "InvalidNamespace",
70+
request: &types.GetShardOwnerRequest{
71+
Namespace: "namespace not found invalidNamespace",
72+
ShardKey: "1",
73+
},
74+
expectedError: true,
75+
expectedErrMsg: "namespace not found",
76+
},
77+
{
78+
name: "LookupError",
79+
request: &types.GetShardOwnerRequest{
80+
Namespace: _testNamespaceFixed,
81+
ShardKey: "1",
82+
},
83+
setupMocks: func(mockStore *store.MockStore) {
84+
mockStore.EXPECT().GetShardOwner(gomock.Any(), _testNamespaceFixed, "1").Return("", errors.New("lookup error"))
85+
},
86+
expectedError: true,
87+
expectedErrMsg: "lookup error",
88+
},
89+
{
90+
name: "Existing_Success_Fixed",
6391
request: &types.GetShardOwnerRequest{
64-
Namespace: _testNamespace,
92+
Namespace: _testNamespaceFixed,
6593
ShardKey: "123",
6694
},
6795
setupMocks: func(mockStore *store.MockStore) {
68-
mockStore.EXPECT().GetShardOwner(gomock.Any(), _testNamespace, "123").Return("owner1", nil)
96+
mockStore.EXPECT().GetShardOwner(gomock.Any(), _testNamespaceFixed, "123").Return("owner1", nil)
6997
},
7098
expectedOwner: "owner1",
7199
expectedError: false,
72100
},
73-
74101
{
75-
name: "InvalidNamespace",
102+
name: "ShardNotFound_Fixed",
76103
request: &types.GetShardOwnerRequest{
77-
Namespace: "namespace not found invalidNamespace",
78-
ShardKey: "1",
104+
Namespace: _testNamespaceFixed,
105+
ShardKey: "NON-EXISTING-SHARD",
106+
},
107+
setupMocks: func(mockStore *store.MockStore) {
108+
mockStore.EXPECT().GetShardOwner(gomock.Any(), _testNamespaceFixed, "NON-EXISTING-SHARD").Return("", store.ErrShardNotFound)
79109
},
80110
expectedError: true,
81-
expectedErrMsg: "namespace not found",
111+
expectedErrMsg: "shard not found",
82112
},
83113
{
84-
name: "LookupError",
114+
name: "Existing_Success_Ephemeral",
85115
request: &types.GetShardOwnerRequest{
86-
Namespace: _testNamespace,
87-
ShardKey: "1",
116+
Namespace: _testNamespaceEphemeral,
117+
ShardKey: "123",
118+
},
119+
setupMocks: func(mockStore *store.MockStore) {
120+
mockStore.EXPECT().GetShardOwner(gomock.Any(), _testNamespaceEphemeral, "123").Return("owner1", nil)
121+
},
122+
expectedOwner: "owner1",
123+
expectedError: false,
124+
},
125+
{
126+
name: "ShardNotFound_Ephemeral",
127+
request: &types.GetShardOwnerRequest{
128+
Namespace: _testNamespaceEphemeral,
129+
ShardKey: "NON-EXISTING-SHARD",
88130
},
89131
setupMocks: func(mockStore *store.MockStore) {
90-
mockStore.EXPECT().GetShardOwner(gomock.Any(), _testNamespace, "1").Return("", errors.New("lookup error"))
132+
mockStore.EXPECT().GetShardOwner(gomock.Any(), _testNamespaceEphemeral, "NON-EXISTING-SHARD").Return("", store.ErrShardNotFound)
133+
mockStore.EXPECT().GetState(gomock.Any(), _testNamespaceEphemeral).Return(&store.NamespaceState{
134+
ShardAssignments: map[string]store.AssignedState{
135+
"owner1": {
136+
AssignedShards: map[string]*types.ShardAssignment{
137+
"shard1": {Status: types.AssignmentStatusREADY},
138+
"shard2": {Status: types.AssignmentStatusREADY},
139+
"shard3": {Status: types.AssignmentStatusREADY},
140+
},
141+
},
142+
"owner2": {
143+
AssignedShards: map[string]*types.ShardAssignment{
144+
"shard4": {Status: types.AssignmentStatusREADY},
145+
},
146+
},
147+
},
148+
}, nil)
149+
// owner2 has the fewest shards assigned, so we assign the shard to it
150+
mockStore.EXPECT().AssignShard(gomock.Any(), _testNamespaceEphemeral, "NON-EXISTING-SHARD", "owner2").Return(nil)
151+
},
152+
expectedOwner: "owner2",
153+
expectedError: false,
154+
},
155+
{
156+
name: "ShardNotFound_Ephemeral_GetStateFailure",
157+
request: &types.GetShardOwnerRequest{
158+
Namespace: _testNamespaceEphemeral,
159+
ShardKey: "NON-EXISTING-SHARD",
160+
},
161+
setupMocks: func(mockStore *store.MockStore) {
162+
mockStore.EXPECT().GetShardOwner(gomock.Any(), _testNamespaceEphemeral, "NON-EXISTING-SHARD").Return("", store.ErrShardNotFound)
163+
mockStore.EXPECT().GetState(gomock.Any(), _testNamespaceEphemeral).Return(nil, errors.New("get state failure"))
91164
},
92165
expectedError: true,
93-
expectedErrMsg: "lookup error",
166+
expectedErrMsg: "get state failure",
94167
},
95168
{
96-
name: "ShardNotFound",
169+
name: "ShardNotFound_Ephemeral_AssignShardFailure",
97170
request: &types.GetShardOwnerRequest{
98-
Namespace: _testNamespace,
171+
Namespace: _testNamespaceEphemeral,
99172
ShardKey: "NON-EXISTING-SHARD",
100173
},
101174
setupMocks: func(mockStore *store.MockStore) {
102-
mockStore.EXPECT().GetShardOwner(gomock.Any(), _testNamespace, "NON-EXISTING-SHARD").Return("", store.ErrShardNotFound)
175+
mockStore.EXPECT().GetShardOwner(gomock.Any(), _testNamespaceEphemeral, "NON-EXISTING-SHARD").Return("", store.ErrShardNotFound)
176+
mockStore.EXPECT().GetState(gomock.Any(), _testNamespaceEphemeral).Return(&store.NamespaceState{
177+
ShardAssignments: map[string]store.AssignedState{"owner1": {AssignedShards: map[string]*types.ShardAssignment{}}}}, nil)
178+
mockStore.EXPECT().AssignShard(gomock.Any(), _testNamespaceEphemeral, "NON-EXISTING-SHARD", "owner1").Return(errors.New("assign shard failure"))
103179
},
104180
expectedError: true,
105-
expectedErrMsg: "shard not found",
181+
expectedErrMsg: "assign shard failure",
106182
},
107183
}
108184

service/sharddistributor/leader/process/processor.go

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -316,8 +316,8 @@ func (p *namespaceProcessor) rebalanceShardsImpl(ctx context.Context, metricsLoo
316316

317317
func (p *namespaceProcessor) findShardsToReassign(activeExecutors []string, namespaceState *store.NamespaceState) ([]string, map[string][]string) {
318318
allShards := make(map[string]struct{})
319-
for _, shardID := range getShards(p.namespaceCfg) {
320-
allShards[strconv.FormatInt(shardID, 10)] = struct{}{}
319+
for _, shardID := range getShards(p.namespaceCfg, namespaceState) {
320+
allShards[shardID] = struct{}{}
321321
}
322322

323323
shardsToReassign := make([]string, 0)
@@ -449,17 +449,23 @@ func assignShardsToEmptyExecutors(currentAssignments map[string][]string) bool {
449449
return true
450450
}
451451

452-
func getShards(cfg config.Namespace) []int64 {
452+
func getShards(cfg config.Namespace, namespaceState *store.NamespaceState) []string {
453453
if cfg.Type == config.NamespaceTypeFixed {
454-
return makeRange(0, cfg.ShardNum-1)
454+
return makeShards(cfg.ShardNum)
455+
} else if cfg.Type == config.NamespaceTypeEphemeral {
456+
shards := make([]string, 0)
457+
for shardID := range namespaceState.Shards {
458+
shards = append(shards, shardID)
459+
}
460+
return shards
455461
}
456462
return nil
457463
}
458464

459-
func makeRange(min, max int64) []int64 {
460-
a := make([]int64, max-min+1)
461-
for i := range a {
462-
a[i] = min + int64(i)
465+
func makeShards(num int64) []string {
466+
shards := make([]string, num)
467+
for i := range num {
468+
shards[i] = strconv.FormatInt(i, 10)
463469
}
464-
return a
470+
return shards
465471
}

service/sharddistributor/leader/process/processor_test.go

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package process
33
import (
44
"context"
55
"errors"
6+
"slices"
67
"sync"
78
"testing"
89
"time"
@@ -386,15 +387,34 @@ func TestRebalanceShards_WithUnassignedShards(t *testing.T) {
386387
}
387388

388389
func TestGetShards_Utility(t *testing.T) {
389-
// Fixed type
390-
cfg := config.Namespace{Type: config.NamespaceTypeFixed, ShardNum: 5}
391-
shards := getShards(cfg)
392-
assert.Equal(t, []int64{0, 1, 2, 3, 4}, shards)
393-
394-
// Other type
395-
cfg = config.Namespace{Type: "other"}
396-
shards = getShards(cfg)
397-
assert.Nil(t, shards)
390+
t.Run("Fixed type", func(t *testing.T) {
391+
cfg := config.Namespace{Type: config.NamespaceTypeFixed, ShardNum: 5}
392+
shards := getShards(cfg, nil)
393+
assert.Equal(t, []string{"0", "1", "2", "3", "4"}, shards)
394+
})
395+
396+
t.Run("Ephemeral type", func(t *testing.T) {
397+
cfg := config.Namespace{Type: config.NamespaceTypeEphemeral}
398+
nsState := &store.NamespaceState{
399+
Shards: map[string]store.ShardState{
400+
"s0": {ExecutorID: "exec-1"},
401+
"s1": {ExecutorID: "exec-1"},
402+
"s2": {ExecutorID: "exec-1"},
403+
"s3": {ExecutorID: "exec-1"},
404+
"s4": {ExecutorID: "exec-1"},
405+
},
406+
}
407+
shards := getShards(cfg, nsState)
408+
slices.Sort(shards)
409+
assert.Equal(t, []string{"s0", "s1", "s2", "s3", "s4"}, shards)
410+
})
411+
412+
// Unknown type
413+
t.Run("Other type", func(t *testing.T) {
414+
cfg := config.Namespace{Type: "other"}
415+
shards := getShards(cfg, nil)
416+
assert.Nil(t, shards)
417+
})
398418
}
399419

400420
func TestAssignShardsToEmptyExecutors(t *testing.T) {

service/sharddistributor/store/etcd/etcdstore.go

Lines changed: 2 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -465,44 +465,15 @@ func (s *Store) AssignShard(ctx context.Context, namespace, shardID, executorID
465465
}
466466
}
467467

468+
// DeleteExecutors deletes the given executors from the store. It does not delete the shards owned by the executors, this
469+
// should be handled by the namespace processor loop as we want to reassign, not delete the shards.
468470
func (s *Store) DeleteExecutors(ctx context.Context, namespace string, executorIDs []string, guard store.GuardFunc) error {
469471
if len(executorIDs) == 0 {
470472
return nil
471473
}
472474
var ops []clientv3.Op
473475

474-
// For each executor, find all shards it owns and create conditional deletion operations.
475476
for _, executorID := range executorIDs {
476-
// 1. Find the shards currently assigned to this executor.
477-
assignedStateKey := s.buildExecutorKey(namespace, executorID, executorAssignedStateKey)
478-
resp, err := s.client.Get(ctx, assignedStateKey)
479-
if err != nil {
480-
return fmt.Errorf("get assigned state for executor %s: %w", executorID, err)
481-
}
482-
483-
if len(resp.Kvs) > 0 {
484-
var state store.AssignedState
485-
if err := json.Unmarshal(resp.Kvs[0].Value, &state); err != nil {
486-
return fmt.Errorf("unmarshal assigned state for executor %s: %w", executorID, err)
487-
}
488-
489-
// 2. For each shard, create a nested transaction to conditionally delete it.
490-
for shardID := range state.AssignedShards {
491-
shardOwnerKey := s.buildShardKey(namespace, shardID, shardAssignedKey)
492-
493-
// This is an atomic check-and-delete operation for a single shard.
494-
// IF the shard owner is still the executor being deleted,
495-
// THEN delete the shard ownership key.
496-
conditionalDelete := clientv3.OpTxn(
497-
[]clientv3.Cmp{clientv3.Compare(clientv3.Value(shardOwnerKey), "=", executorID)}, // IF condition
498-
[]clientv3.Op{clientv3.OpDelete(shardOwnerKey)}, // THEN operations
499-
nil, // ELSE do nothing
500-
)
501-
ops = append(ops, conditionalDelete)
502-
}
503-
}
504-
505-
// 3. Add the unconditional deletion for the executor's entire record.
506477
executorPrefix := s.buildExecutorKey(namespace, executorID, "")
507478
ops = append(ops, clientv3.OpDelete(executorPrefix, clientv3.WithPrefix()))
508479
}

0 commit comments

Comments
 (0)