Skip to content

Commit 71832d6

Browse files
authored
Integration of removal of ephemeral shards in the executor library (#7216)
What changed? The executor library now asks the processor for a struct of status information, The new struct contains the status of the shard so the processor can signal the state of the shard Why? This enables the executors to tell if an ephemeral shard should be delete. It also enables us to add more fields to the status in a backwards compatible way How did you test it? Unit tests Potential risks This is not backwards compatible, but we are still in the prototype phase so noone is implementing this interface, except our own test service. Release notes Documentation Changes
1 parent d11e146 commit 71832d6

File tree

4 files changed

+20
-12
lines changed

4 files changed

+20
-12
lines changed

service/sharddistributor/executorclient/client.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,21 @@ import (
1515
"github.com/uber/cadence/common/clock"
1616
"github.com/uber/cadence/common/log"
1717
"github.com/uber/cadence/common/metrics"
18+
"github.com/uber/cadence/common/types"
1819
"github.com/uber/cadence/service/sharddistributor/executorclient/metricsconstants"
1920
)
2021

2122
//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination interface_mock.go . ShardProcessorFactory,ShardProcessor,Executor
2223

24+
type ShardReport struct {
25+
ShardLoad float64
26+
Status types.ShardStatus
27+
}
28+
2329
type ShardProcessor interface {
2430
Start(ctx context.Context)
2531
Stop()
26-
GetShardLoad() float64
32+
GetShardReport() ShardReport
2733
}
2834

2935
type ShardProcessorFactory[SP ShardProcessor] interface {

service/sharddistributor/executorclient/clientimpl.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,9 +130,11 @@ func (e *executorImpl[SP]) heartbeat(ctx context.Context) (shardAssignments map[
130130
shardStatusReports := make(map[string]*types.ShardStatusReport)
131131
e.managedProcessors.Range(func(shardID string, managedProcessor *managedProcessor[SP]) bool {
132132
if managedProcessor.getState() == processorStateStarted {
133+
shardStatus := managedProcessor.processor.GetShardReport()
134+
133135
shardStatusReports[shardID] = &types.ShardStatusReport{
134-
ShardLoad: managedProcessor.processor.GetShardLoad(),
135-
Status: types.ShardStatusREADY,
136+
ShardLoad: shardStatus.ShardLoad,
137+
Status: shardStatus.Status,
136138
}
137139
}
138140
return true

service/sharddistributor/executorclient/clientimpl_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,9 +119,9 @@ func TestHeartbeat(t *testing.T) {
119119
}, nil)
120120

121121
shardProcessorMock1 := NewMockShardProcessor(ctrl)
122-
shardProcessorMock1.EXPECT().GetShardLoad().Return(0.123)
122+
shardProcessorMock1.EXPECT().GetShardReport().Return(ShardReport{ShardLoad: 0.123, Status: types.ShardStatusREADY})
123123
shardProcessorMock2 := NewMockShardProcessor(ctrl)
124-
shardProcessorMock2.EXPECT().GetShardLoad().Return(0.456)
124+
shardProcessorMock2.EXPECT().GetShardReport().Return(ShardReport{ShardLoad: 0.456, Status: types.ShardStatusREADY})
125125

126126
// Create the executor
127127
executor := &executorImpl[*MockShardProcessor]{

service/sharddistributor/executorclient/interface_mock.go

Lines changed: 7 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)