Skip to content

Commit ec6a632

Browse files
authored
feat(executor-client): split function for local shard assignment (#7446)
<!-- Describe what has changed in this PR --> **What changed?** Instead of having a function that replace the assignment generated with the local logic we have two different a functions, one to assign a new shard and the other to remove the assignment. Also these function are used in the canary to simulate the shard assignment. The changes to the canary are made to make more similar to the matching service. <!-- Tell your future self why have you made these changes --> **Why?** we need to onboard matching and to model the local logic with assignment replacement is adding too much complexity. <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> **How did you test it?** local testing and unit testing <!-- Assuming the worst case, what can be broken when deploying this change to production? --> **Potential risks** the change is only used in canary service which has been tested <!-- Is it notable for release? e.g. schema updates, configuration or data migration required? If so, please mention it, and also update CHANGELOG.md --> **Release notes** <!-- Is there any documentation updates should be made for config, https://cadenceworkflow.io/docs/operation-guide/setup/ ? If so, please open an PR in https://github.com/cadence-workflow/cadence-docs --> **Documentation Changes** Signed-off-by: edigregorio <[email protected]>
1 parent 748fc7d commit ec6a632

File tree

7 files changed

+322
-107
lines changed

7 files changed

+322
-107
lines changed

service/sharddistributor/canary/executors/executors.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -85,12 +85,6 @@ func NewExecutorsModule(params ExecutorsParams) {
8585
func Module(fixedNamespace, ephemeralNamespace, externalAssignmentNamespace string) fx.Option {
8686
return fx.Module(
8787
"Executors",
88-
// Executors that are used for testing namespaces with the different modes of the migration
89-
fx.Provide(
90-
NewExecutorLocalPassthroughNamespace,
91-
NewExecutorLocalPassthroughShadowNamespace,
92-
NewExecutorDistributedPassthroughNamespace,
93-
),
9488
// Executor that is used for testing a namespace with fixed shards
9589
fx.Provide(
9690
func(params executorclient.Params[*processor.ShardProcessor]) (ExecutorResult, error) {

service/sharddistributor/canary/externalshardassignment/shardassigner.go

Lines changed: 45 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package externalshardassignment
22

33
import (
44
"context"
5+
"math/rand"
56
"strconv"
67
"sync"
78
"time"
@@ -19,17 +20,19 @@ import (
1920
)
2021

2122
const (
22-
shardAssignmentInterval = 1 * time.Second
23+
shardAssignmentInterval = 4 * time.Second
24+
minimumAssignedShards = 3
2325
)
2426

2527
// ShardAssigneer assigns shards to the executor for canary testing
2628
type ShardAssigner struct {
27-
logger log.Logger
28-
timeSource clock.TimeSource
29-
executorclient executorclient.Executor[*processorephemeral.ShardProcessor]
30-
stopChan chan struct{}
31-
goRoutineWg sync.WaitGroup
32-
namespace string
29+
logger log.Logger
30+
timeSource clock.TimeSource
31+
shardProcessors map[string]*processorephemeral.ShardProcessor
32+
executorclient executorclient.Executor[*processorephemeral.ShardProcessor]
33+
stopChan chan struct{}
34+
goRoutineWg sync.WaitGroup
35+
namespace string
3336
}
3437

3538
// ShardAssignerParams contains the dependencies needed to create a ShardParam
@@ -42,13 +45,15 @@ type ShardAssignerParams struct {
4245

4346
// NewShardCreator creates a new ShardCreator instance with the given parameters and namespace
4447
func NewShardAssigner(params ShardAssignerParams, namespace string) *ShardAssigner {
48+
sp := make(map[string]*processorephemeral.ShardProcessor)
4549
return &ShardAssigner{
46-
logger: params.Logger,
47-
timeSource: params.TimeSource,
48-
executorclient: params.Executorclient,
49-
stopChan: make(chan struct{}),
50-
goRoutineWg: sync.WaitGroup{},
51-
namespace: namespace,
50+
logger: params.Logger,
51+
timeSource: params.TimeSource,
52+
shardProcessors: sp,
53+
executorclient: params.Executorclient,
54+
stopChan: make(chan struct{}),
55+
goRoutineWg: sync.WaitGroup{},
56+
namespace: namespace,
5257
}
5358
}
5459

@@ -92,21 +97,46 @@ func (s *ShardAssigner) process(ctx context.Context) {
9297
case <-s.stopChan:
9398
return
9499
case <-ticker.Chan():
100+
if len(s.shardProcessors) > minimumAssignedShards {
101+
var shardToRemove string
102+
shardToRemoveIndex := rand.Intn(len(s.shardProcessors))
103+
for shardID := range s.shardProcessors {
104+
if shardToRemoveIndex == 0 {
105+
shardToRemove = shardID
106+
break
107+
}
108+
shardToRemoveIndex--
109+
}
110+
err := s.executorclient.RemoveShardsFromLocalLogic([]string{shardToRemove})
111+
if err != nil {
112+
s.logger.Error("Failed to remove shards", tag.Error(err))
113+
continue
114+
}
115+
delete(s.shardProcessors, shardToRemove)
116+
s.logger.Info("Removed a shard from external source", tag.ShardKey(shardToRemove))
117+
118+
}
119+
120+
// Simulate the assignment of new shards
95121
newAssignedShard := uuid.New().String()
96122
s.logger.Info("Assign a new shard from external source", tag.ShardKey(newAssignedShard))
97123
shardAssignment := map[string]*types.ShardAssignment{
98124
newAssignedShard: {
99125
Status: types.AssignmentStatusREADY,
100126
},
101127
}
102-
s.executorclient.AssignShardsFromLocalLogic(context.Background(), shardAssignment)
128+
err := s.executorclient.AssignShardsFromLocalLogic(context.Background(), shardAssignment)
129+
if err != nil {
130+
s.logger.Error("Failed to assign shard from external source", tag.Error(err))
131+
continue
132+
}
103133
sp, err := s.executorclient.GetShardProcess(ctx, newAssignedShard)
104134
if err != nil {
105135
s.logger.Error("failed to get shard assigned", tag.ShardKey(newAssignedShard), tag.Error(err))
106136
} else {
107137
s.logger.Info("shard assigned", tag.ShardStatus(string(sp.GetShardReport().Status)), tag.ShardLoad(strconv.FormatFloat(sp.GetShardReport().ShardLoad, 'f', -1, 64)))
108138
}
109-
139+
s.shardProcessors[newAssignedShard] = sp
110140
}
111141
}
112142
}

service/sharddistributor/canary/module.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,6 @@ func opts(names NamespacesNames) fx.Option {
4848
// Instantiate executors for multiple namespaces
4949
executors.Module(names.FixedNamespace, names.EphemeralNamespace, names.ExternalAssignmentNamespace),
5050

51-
processorephemeral.ShardCreatorModule([]string{names.EphemeralNamespace, names.ExternalAssignmentNamespace, "test-local-passthrough-shadow"}),
51+
processorephemeral.ShardCreatorModule([]string{names.EphemeralNamespace}),
5252
)
5353
}

service/sharddistributor/client/executorclient/client.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,10 @@ type Executor[SP ShardProcessor] interface {
5050
// Get the current metadata of the executor
5151
GetMetadata() map[string]string
5252

53-
// Used during the migration during local-passthrough and local-passthrough-shadow
54-
AssignShardsFromLocalLogic(ctx context.Context, shardAssignment map[string]*types.ShardAssignment)
53+
// AssignShardsFromLocalLogic is used for the migration during local-passthrough, local-passthrough-shadow, distributed-passthrough
54+
AssignShardsFromLocalLogic(ctx context.Context, shardAssignment map[string]*types.ShardAssignment) error
55+
// RemoveShardsFromLocalLogic is used for the migration during local-passthrough, local-passthrough-shadow, distributed-passthrough
56+
RemoveShardsFromLocalLogic(shardIDs []string) error
5557
}
5658

5759
type Params[SP ShardProcessor] struct {

service/sharddistributor/client/executorclient/clientimpl.go

Lines changed: 88 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -156,12 +156,26 @@ func (e *executorImpl[SP]) GetShardProcess(ctx context.Context, shardID string)
156156
return shardProcess.processor, nil
157157
}
158158

159-
func (e *executorImpl[SP]) AssignShardsFromLocalLogic(ctx context.Context, shardAssignment map[string]*types.ShardAssignment) {
159+
func (e *executorImpl[SP]) AssignShardsFromLocalLogic(ctx context.Context, shardAssignment map[string]*types.ShardAssignment) error {
160160
e.assignmentMutex.Lock()
161161
defer e.assignmentMutex.Unlock()
162-
162+
if e.getMigrationMode() == types.MigrationModeONBOARDED {
163+
return fmt.Errorf("migration mode is onborded, no local assignemnt allowed")
164+
}
163165
e.logger.Info("Executing external shard assignment")
164-
e.updateShardAssignment(ctx, shardAssignment)
166+
e.addNewShards(ctx, shardAssignment)
167+
return nil
168+
}
169+
170+
func (e *executorImpl[SP]) RemoveShardsFromLocalLogic(shardIDs []string) error {
171+
e.assignmentMutex.Lock()
172+
defer e.assignmentMutex.Unlock()
173+
if e.getMigrationMode() == types.MigrationModeONBOARDED {
174+
return fmt.Errorf("migration mode is onborded, no local assignemnt allowed")
175+
}
176+
e.logger.Info("Executing external shard deletion assignment")
177+
e.deleteShards(shardIDs)
178+
return nil
165179
}
166180

167181
func (e *executorImpl[SP]) heartbeatloop(ctx context.Context) {
@@ -291,7 +305,9 @@ func (e *executorImpl[SP]) heartbeat(ctx context.Context) (shardAssignments map[
291305
if previousMode != currentMode {
292306
e.logger.Info("migration mode transition",
293307
tag.Dynamic("previous", previousMode),
294-
tag.Dynamic("current", currentMode))
308+
tag.Dynamic("current", currentMode),
309+
tag.ShardNamespace(e.namespace),
310+
tag.ShardExecutor(e.executorID))
295311
e.setMigrationMode(currentMode)
296312
}
297313

@@ -304,71 +320,102 @@ func (e *executorImpl[SP]) updateShardAssignment(ctx context.Context, shardAssig
304320
// Stop shard processing for shards not assigned to this executor
305321
e.managedProcessors.Range(func(shardID string, managedProcessor *managedProcessor[SP]) bool {
306322
if assignment, ok := shardAssignments[shardID]; !ok || assignment.Status != types.AssignmentStatusREADY {
307-
e.metrics.Counter(metricsconstants.ShardDistributorExecutorShardsStopped).Inc(1)
308-
309323
wg.Add(1)
310-
go func() {
324+
go func(shardID string) {
311325
defer wg.Done()
312-
managedProcessor.setState(processorStateStopping)
313-
managedProcessor.processor.Stop()
314-
e.managedProcessors.Delete(shardID)
315-
}()
326+
e.stopManagerProcessor(shardID)
327+
}(shardID)
316328
}
317329
return true
318330
})
319331

320332
// Start shard processing for shards assigned to this executor
321333
for shardID, assignment := range shardAssignments {
322334
if assignment.Status == types.AssignmentStatusREADY {
323-
if _, ok := e.managedProcessors.Load(shardID); !ok {
324-
e.metrics.Counter(metricsconstants.ShardDistributorExecutorShardsStarted).Inc(1)
325-
326-
wg.Add(1)
327-
go func() {
328-
defer wg.Done()
329-
processor, err := e.shardProcessorFactory.NewShardProcessor(shardID)
330-
if err != nil {
331-
e.logger.Error("failed to create shard processor", tag.Error(err))
332-
e.metrics.Counter(metricsconstants.ShardDistributorExecutorProcessorCreationFailures).Inc(1)
333-
return
334-
}
335-
managedProcessor := newManagedProcessor(processor, processorStateStarting)
336-
e.managedProcessors.Store(shardID, managedProcessor)
337-
338-
processor.Start(ctx)
339-
340-
managedProcessor.setState(processorStateStarted)
341-
}()
342-
}
335+
wg.Add(1)
336+
go func(shardID string) {
337+
defer wg.Done()
338+
e.addManagerProcessor(ctx, shardID)
339+
}(shardID)
343340
}
344341
}
345342

346343
wg.Wait()
347344
}
348345

349-
func (e *executorImpl[SP]) stopShardProcessors() {
346+
func (e *executorImpl[SP]) addNewShards(ctx context.Context, shardAssignments map[string]*types.ShardAssignment) {
350347
wg := sync.WaitGroup{}
351348

352-
e.managedProcessors.Range(func(shardID string, managedProcessor *managedProcessor[SP]) bool {
353-
// If the processor is already stopping, skip it
354-
if managedProcessor.getState() == processorStateStopping {
355-
return true
349+
for shardID, assignment := range shardAssignments {
350+
if assignment.Status == types.AssignmentStatusREADY {
351+
wg.Add(1)
352+
go func(shardID string) {
353+
defer wg.Done()
354+
e.addManagerProcessor(ctx, shardID)
355+
}(shardID)
356356
}
357+
}
358+
359+
wg.Wait()
360+
}
357361

362+
func (e *executorImpl[SP]) deleteShards(shardIDs []string) {
363+
wg := sync.WaitGroup{}
364+
for _, shardID := range shardIDs {
358365
wg.Add(1)
359-
go func() {
366+
go func(shardID string) {
360367
defer wg.Done()
368+
e.stopManagerProcessor(shardID)
369+
}(shardID)
370+
}
371+
wg.Wait()
372+
}
361373

362-
managedProcessor.setState(processorStateStopping)
363-
managedProcessor.processor.Stop()
364-
e.managedProcessors.Delete(shardID)
365-
}()
374+
func (e *executorImpl[SP]) stopShardProcessors() {
375+
wg := sync.WaitGroup{}
376+
377+
e.managedProcessors.Range(func(shardID string, managedProcessor *managedProcessor[SP]) bool {
378+
wg.Add(1)
379+
go func(shardID string) {
380+
defer wg.Done()
381+
e.stopManagerProcessor(shardID)
382+
}(shardID)
366383
return true
367384
})
368385

369386
wg.Wait()
370387
}
371388

389+
func (e *executorImpl[SP]) addManagerProcessor(ctx context.Context, shardID string) {
390+
if _, ok := e.managedProcessors.Load(shardID); !ok {
391+
e.metrics.Counter(metricsconstants.ShardDistributorExecutorShardsStarted).Inc(1)
392+
processor, err := e.shardProcessorFactory.NewShardProcessor(shardID)
393+
if err != nil {
394+
e.logger.Error("failed to create shard processor", tag.Error(err))
395+
e.metrics.Counter(metricsconstants.ShardDistributorExecutorProcessorCreationFailures).Inc(1)
396+
return
397+
}
398+
managedProcessor := newManagedProcessor(processor, processorStateStarting)
399+
e.managedProcessors.Store(shardID, managedProcessor)
400+
401+
processor.Start(ctx)
402+
403+
managedProcessor.setState(processorStateStarted)
404+
405+
}
406+
}
407+
func (e *executorImpl[SP]) stopManagerProcessor(shardID string) {
408+
managedProcessor, ok := e.managedProcessors.Load(shardID)
409+
// If the processor do not exist for the shard, or it is already stopping, skip it
410+
if !ok || managedProcessor.getState() == processorStateStopping {
411+
return
412+
}
413+
e.metrics.Counter(metricsconstants.ShardDistributorExecutorShardsStopped).Inc(1)
414+
managedProcessor.setState(processorStateStopping)
415+
managedProcessor.processor.Stop()
416+
e.managedProcessors.Delete(shardID)
417+
}
418+
372419
// compareAssignments compares the local assignments with the heartbeat response assignments
373420
// and emits convergence or divergence metrics
374421
func (e *executorImpl[SP]) compareAssignments(heartbeatAssignments map[string]*types.ShardAssignment) {

0 commit comments

Comments
 (0)