Skip to content

Commit 18161c1

Browse files
committed
cre-1601: Transmitter notifies Arbiter
1 parent effb57a commit 18161c1

File tree

1 file changed

+34
-10
lines changed

1 file changed

+34
-10
lines changed

pkg/workflows/ring/transmitter.go

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,26 +15,26 @@ var _ ocr3types.ContractTransmitter[[]byte] = (*Transmitter)(nil)
1515

1616
// Transmitter handles transmission of shard orchestration outcomes
1717
type Transmitter struct {
18-
lggr logger.Logger
19-
store *Store
20-
fromAccount types.Account
18+
lggr logger.Logger
19+
store *Store
20+
arbiterScaler pb.ArbiterScalerClient
21+
fromAccount types.Account
2122
}
2223

23-
func NewTransmitter(lggr logger.Logger, store *Store, fromAccount types.Account) *Transmitter {
24-
return &Transmitter{lggr: lggr, store: store, fromAccount: fromAccount}
24+
func NewTransmitter(lggr logger.Logger, store *Store, arbiterScaler pb.ArbiterScalerClient, fromAccount types.Account) *Transmitter {
25+
return &Transmitter{lggr: lggr, store: store, arbiterScaler: arbiterScaler, fromAccount: fromAccount}
2526
}
2627

27-
func (t *Transmitter) Transmit(_ context.Context, _ types.ConfigDigest, _ uint64, r ocr3types.ReportWithInfo[[]byte], _ []types.AttributedOnchainSignature) error {
28+
func (t *Transmitter) Transmit(ctx context.Context, _ types.ConfigDigest, _ uint64, r ocr3types.ReportWithInfo[[]byte], _ []types.AttributedOnchainSignature) error {
2829
outcome := &pb.Outcome{}
2930
if err := proto.Unmarshal(r.Report, outcome); err != nil {
3031
t.lggr.Errorf("failed to unmarshal report")
3132
return err
3233
}
3334

34-
if outcome.State != nil {
35-
if routableShards, ok := outcome.State.State.(*pb.RoutingState_RoutableShards); ok {
36-
t.lggr.Infow("Transmitting shard routing", "routableShards", routableShards.RoutableShards)
37-
}
35+
if err := t.notifyArbiter(ctx, outcome.State); err != nil {
36+
t.lggr.Errorf("failed to notify arbiter", "err", err)
37+
return err
3838
}
3939

4040
for workflowID, route := range outcome.Routes {
@@ -45,6 +45,30 @@ func (t *Transmitter) Transmit(_ context.Context, _ types.ConfigDigest, _ uint64
4545
return nil
4646
}
4747

48+
func (t *Transmitter) notifyArbiter(ctx context.Context, state *pb.RoutingState) error {
49+
if state == nil {
50+
return nil
51+
}
52+
53+
var nShards uint32
54+
switch s := state.State.(type) {
55+
case *pb.RoutingState_RoutableShards:
56+
nShards = s.RoutableShards
57+
t.lggr.Infow("Transmitting shard routing", "routableShards", nShards)
58+
case *pb.RoutingState_Transition:
59+
nShards = s.Transition.WantShards
60+
t.lggr.Infow("Transmitting shard routing (in transition)", "wantShards", nShards)
61+
}
62+
63+
if t.arbiterScaler != nil && nShards > 0 {
64+
if _, err := t.arbiterScaler.ConsensusWantShards(ctx, &pb.ConsensusWantShardsRequest{NShards: nShards}); err != nil {
65+
return err
66+
}
67+
}
68+
69+
return nil
70+
}
71+
4872
func (t *Transmitter) FromAccount(ctx context.Context) (types.Account, error) {
4973
return t.fromAccount, nil
5074
}

0 commit comments

Comments
 (0)