Skip to content

Commit 50be664

Browse files
authored
[shard-distributor] Store rework (#7048)
* [shard-distributor] Refactor storage to support shard state
1 parent c424b53 commit 50be664

File tree

24 files changed

+1188
-370
lines changed

24 files changed

+1188
-370
lines changed

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ ENV GOFLAGS="-mod=readonly"
1919
COPY go.* ./
2020
COPY cmd/server/go.* ./cmd/server/
2121
COPY common/archiver/gcloud/go.* ./common/archiver/gcloud/
22-
COPY service/sharddistributor/leader/leaderstore/etcd/go.* ./service/sharddistributor/leader/leaderstore/etcd/
22+
COPY service/sharddistributor/leader/store/etcd/go.* ./service/sharddistributor/leader/store/etcd/
2323
# go.work means this downloads everything, not just the top module
2424
RUN go mod download
2525

cmd/server/cadence/fx.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ import (
4646
"github.com/uber/cadence/common/rpc/rpcfx"
4747
"github.com/uber/cadence/common/service"
4848
shardDistributorCfg "github.com/uber/cadence/service/sharddistributor/config"
49-
"github.com/uber/cadence/service/sharddistributor/leader/leaderstore"
49+
"github.com/uber/cadence/service/sharddistributor/leader/store"
5050
"github.com/uber/cadence/service/sharddistributor/sharddistributorfx"
5151
"github.com/uber/cadence/tools/cassandra"
5252
"github.com/uber/cadence/tools/sql"
@@ -75,7 +75,7 @@ func Module(serviceName string) fx.Option {
7575
fx.Decorate(func(z *zap.Logger, l log.Logger) (*zap.Logger, log.Logger) {
7676
return z.With(zap.String("service", service.ShardDistributor)), l.WithTags(tag.Service(service.ShardDistributor))
7777
}),
78-
leaderstore.StoreModule("etcd"),
78+
store.Module("etcd"),
7979

8080
rpcfx.Module,
8181
// PeerProvider could be overriden e.g. with a DNS based internal solution.

cmd/server/cadence/fx_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import (
3636
"github.com/uber/cadence/common/service"
3737
"github.com/uber/cadence/testflags"
3838

39-
_ "github.com/uber/cadence/service/sharddistributor/leader/leaderstore/etcd" // needed for shard distributor leader election
39+
_ "github.com/uber/cadence/service/sharddistributor/leader/store/etcd" // needed for shard distributor leader election
4040
)
4141

4242
func TestFxDependencies(t *testing.T) {

cmd/server/go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ replace github.com/uber/cadence => ../..
1313

1414
replace github.com/uber/cadence/common/archiver/gcloud => ../../common/archiver/gcloud
1515

16-
replace github.com/uber/cadence/service/sharddistributor/leader/leaderstore/etcd => ../../service/sharddistributor/leader/leaderstore/etcd
16+
replace github.com/uber/cadence/service/sharddistributor/leader/store/etcd => ../../service/sharddistributor/leader/store/etcd
1717

1818
require (
1919
github.com/Shopify/sarama v1.33.0 // indirect
@@ -71,7 +71,7 @@ require (
7171
require (
7272
github.com/uber/cadence v0.0.0-00010101000000-000000000000
7373
github.com/uber/cadence/common/archiver/gcloud v0.0.0-00010101000000-000000000000
74-
github.com/uber/cadence/service/sharddistributor/leader/leaderstore/etcd v0.0.0-00010101000000-000000000000
74+
github.com/uber/cadence/service/sharddistributor/leader/store/etcd v0.0.0-00010101000000-000000000000
7575
go.uber.org/automaxprocs v1.6.0
7676
go.uber.org/mock v0.5.0
7777
)

cmd/server/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import (
3434
_ "github.com/uber/cadence/common/persistence/sql/sqlplugin/mysql" // needed to load mysql plugin
3535
_ "github.com/uber/cadence/common/persistence/sql/sqlplugin/postgres" // needed to load postgres plugin
3636
_ "github.com/uber/cadence/common/persistence/sql/sqlplugin/sqlite" // needed to load sqlite plugin
37-
_ "github.com/uber/cadence/service/sharddistributor/leader/leaderstore/etcd" // needed for shard distributor leader election
37+
_ "github.com/uber/cadence/service/sharddistributor/leader/store/etcd" // needed for shard distributor leader election
3838
)
3939

4040
// main entry point for the cadence server

docker/github_actions/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,6 @@ COPY go.* /cadence
3333
COPY internal/tools/go.* /cadence/internal/tools/
3434
COPY cmd/server/go.* /cadence/cmd/server/
3535
COPY common/archiver/gcloud/go.* /cadence/common/archiver/gcloud/
36-
COPY service/sharddistributor/leader/leaderstore/etcd/go.* ./service/sharddistributor/leader/leaderstore/etcd/
36+
COPY service/sharddistributor/leader/store/etcd/go.* ./service/sharddistributor/leader/store/etcd/
3737
# go.work means this downloads everything, not just the top module
3838
RUN go mod download

go.work

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use (
1818
.
1919
./cmd/server
2020
./common/archiver/gcloud
21-
./service/sharddistributor/leader/leaderstore/etcd
21+
./service/sharddistributor/leader/store/etcd
2222

2323
// DO NOT include, tools dependencies are intentionally separate.
2424
// ./internal/tools

service/sharddistributor/leader/election/election.go

Lines changed: 45 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ import (
1313
"github.com/uber/cadence/common/log"
1414
"github.com/uber/cadence/common/log/tag"
1515
"github.com/uber/cadence/service/sharddistributor/config"
16-
"github.com/uber/cadence/service/sharddistributor/leader/leaderstore"
16+
"github.com/uber/cadence/service/sharddistributor/leader/process"
17+
"github.com/uber/cadence/service/sharddistributor/leader/store"
1718
)
1819

1920
//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination=election_mock.go Factory,Elector
@@ -30,7 +31,7 @@ type ProcessFunc func(ctx context.Context) error
3031

3132
// Elector handles leader election for a specific namespace
3233
type Elector interface {
33-
Run(ctx context.Context, OnLeader, OnResign ProcessFunc) <-chan bool
34+
Run(ctx context.Context) <-chan bool
3435
}
3536

3637
// Factory creates elector instances
@@ -39,60 +40,64 @@ type Factory interface {
3940
}
4041

4142
type electionFactory struct {
42-
hostname string
43-
cfg config.Election
44-
store leaderstore.Store
45-
logger log.Logger
46-
serviceID string
47-
clock clock.TimeSource
43+
hostname string
44+
cfg config.Election
45+
store store.Elector
46+
logger log.Logger
47+
serviceID string
48+
clock clock.TimeSource
49+
processFactory process.Factory
4850
}
4951

5052
type elector struct {
51-
hostname string
52-
namespace string
53-
store leaderstore.Store
54-
logger log.Logger
55-
cfg config.Election
56-
leaderStarted time.Time
57-
clock clock.TimeSource
53+
hostname string
54+
namespace string
55+
store store.Elector
56+
logger log.Logger
57+
cfg config.Election
58+
leaderStarted time.Time
59+
clock clock.TimeSource
60+
processFactory process.Factory
5861
}
5962

6063
type FactoryParams struct {
6164
fx.In
6265

63-
HostName string `name:"hostname"`
64-
Cfg config.LeaderElection
65-
Store leaderstore.Store
66-
Logger log.Logger
67-
Clock clock.TimeSource
66+
HostName string `name:"hostname"`
67+
Cfg config.LeaderElection
68+
Store store.Elector
69+
Logger log.Logger
70+
Clock clock.TimeSource
71+
ProcessFactory process.Factory
6872
}
6973

7074
// NewElectionFactory creates a new election factory
7175
func NewElectionFactory(p FactoryParams) Factory {
7276
return &electionFactory{
73-
cfg: p.Cfg.Election,
74-
store: p.Store,
75-
logger: p.Logger,
76-
clock: p.Clock,
77-
hostname: p.HostName,
77+
cfg: p.Cfg.Election,
78+
store: p.Store,
79+
logger: p.Logger,
80+
clock: p.Clock,
81+
hostname: p.HostName,
82+
processFactory: p.ProcessFactory,
7883
}
7984
}
8085

8186
// CreateElector creates a new elector for the given namespace
8287
func (f *electionFactory) CreateElector(ctx context.Context, namespace string) (Elector, error) {
8388
return &elector{
84-
namespace: namespace,
85-
store: f.store,
86-
logger: f.logger.WithTags(tag.ComponentLeaderElection, tag.ShardNamespace(namespace)),
87-
cfg: f.cfg,
88-
clock: f.clock,
89-
hostname: f.hostname,
89+
namespace: namespace,
90+
store: f.store,
91+
logger: f.logger.WithTags(tag.ComponentLeaderElection, tag.ShardNamespace(namespace)),
92+
cfg: f.cfg,
93+
clock: f.clock,
94+
hostname: f.hostname,
95+
processFactory: f.processFactory,
9096
}, nil
9197
}
9298

9399
// Run starts the leader election process it returns a channel that will return the value if the current instance becomes the leader or resigns from leadership.
94-
// OnLeader will be called once leadership is acquired. OnResign will be called once leadership is lost.
95-
func (e *elector) Run(ctx context.Context, OnLeader, OnResign ProcessFunc) <-chan bool {
100+
func (e *elector) Run(ctx context.Context) <-chan bool {
96101
leaderCh := make(chan bool, 1)
97102

98103
// Create a child context that we can explicitly cancel when errors occur
@@ -108,7 +113,7 @@ func (e *elector) Run(ctx context.Context, OnLeader, OnResign ProcessFunc) <-cha
108113
}()
109114

110115
for {
111-
if err := e.runElection(runCtx, leaderCh, OnLeader, OnResign); err != nil {
116+
if err := e.runElection(runCtx, leaderCh); err != nil {
112117
// Check if parent context is already canceled
113118
if runCtx.Err() != nil {
114119
e.logger.Info("Context canceled, stopping election loop", tag.Error(runCtx.Err()))
@@ -137,7 +142,7 @@ func (e *elector) Run(ctx context.Context, OnLeader, OnResign ProcessFunc) <-cha
137142
}
138143

139144
// runElection runs a single election attempt
140-
func (e *elector) runElection(ctx context.Context, leaderCh chan<- bool, OnLeader, OnResign ProcessFunc) (err error) {
145+
func (e *elector) runElection(ctx context.Context, leaderCh chan<- bool) (err error) {
141146
// Add random delay before campaigning to spread load across instances
142147
delay := time.Duration(rand.Intn(int(e.cfg.MaxRandomDelay)))
143148

@@ -150,12 +155,14 @@ func (e *elector) runElection(ctx context.Context, leaderCh chan<- bool, OnLeade
150155
return fmt.Errorf("context cancelled during pre-campaign delay: %w", ctx.Err())
151156
}
152157

158+
leaderProcess := e.processFactory.CreateProcessor(e.namespace)
159+
153160
election, err := e.store.CreateElection(ctx, e.namespace)
154161
if err != nil {
155162
return fmt.Errorf("create session: %w", err)
156163
}
157164
defer func() {
158-
resignErr := e.resign(election, OnResign)
165+
resignErr := e.resign(election, leaderProcess.Terminate)
159166
if resignErr != nil {
160167
if err == nil {
161168
err = resignErr
@@ -175,7 +182,7 @@ func (e *elector) runElection(ctx context.Context, leaderCh chan<- bool, OnLeade
175182
return fmt.Errorf("failed to campaign: %w", err)
176183
}
177184

178-
err = OnLeader(ctx)
185+
err = leaderProcess.Run(ctx)
179186
if err != nil {
180187
return fmt.Errorf("onLeader: %w", err)
181188
}
@@ -207,7 +214,7 @@ func (e *elector) runElection(ctx context.Context, leaderCh chan<- bool, OnLeade
207214
}
208215
}
209216

210-
func (e *elector) resign(election leaderstore.Election, onResign ProcessFunc) error {
217+
func (e *elector) resign(election store.Election, onResign ProcessFunc) error {
211218
ctx, cancel := e.clock.ContextWithTimeout(context.Background(), 3*time.Second)
212219
defer cancel()
213220

service/sharddistributor/leader/election/election_mock.go

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)