Skip to content

Commit bc11d79

Browse files
Merge #3723
3723: [Access] Add streaming API for BlockExecutionData r=peterargue a=peterargue Implement streaming gRPC APIs for `BlockExecutionData` and events. Protobuf: onflow/flow#1275 FLIP: onflow/flips#73 Co-authored-by: Peter Argue <[email protected]>
2 parents b93fee7 + 2f89541 commit bc11d79

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+2479
-320
lines changed

cmd/access/node_builder/access_node_builder.go

Lines changed: 73 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,8 @@ type AccessNodeConfig struct {
112112
apiRatelimits map[string]int
113113
apiBurstlimits map[string]int
114114
rpcConf rpc.Config
115+
stateStreamConf state_stream.Config
116+
stateStreamFilterConf map[string]int
115117
ExecutionNodeAddress string // deprecated
116118
HistoricalAccessRPCs []access.AccessAPIClient
117119
logTxTimeToFinalized bool
@@ -143,7 +145,6 @@ func DefaultAccessNodeConfig() *AccessNodeConfig {
143145
rpcConf: rpc.Config{
144146
UnsecureGRPCListenAddr: "0.0.0.0:9000",
145147
SecureGRPCListenAddr: "0.0.0.0:9001",
146-
StateStreamListenAddr: "",
147148
HTTPListenAddr: "0.0.0.0:8000",
148149
RESTListenAddr: "",
149150
CollectionAddr: "",
@@ -154,9 +155,17 @@ func DefaultAccessNodeConfig() *AccessNodeConfig {
154155
MaxHeightRange: backend.DefaultMaxHeightRange,
155156
PreferredExecutionNodeIDs: nil,
156157
FixedExecutionNodeIDs: nil,
157-
MaxExecutionDataMsgSize: grpcutils.DefaultMaxMsgSize,
158158
MaxMsgSize: grpcutils.DefaultMaxMsgSize,
159159
},
160+
stateStreamConf: state_stream.Config{
161+
MaxExecutionDataMsgSize: grpcutils.DefaultMaxMsgSize,
162+
ExecutionDataCacheSize: state_stream.DefaultCacheSize,
163+
ClientSendTimeout: state_stream.DefaultSendTimeout,
164+
ClientSendBufferSize: state_stream.DefaultSendBufferSize,
165+
MaxGlobalStreams: state_stream.DefaultMaxGlobalStreams,
166+
EventFilterConfig: state_stream.DefaultEventFilterConfig,
167+
},
168+
stateStreamFilterConf: nil,
160169
ExecutionNodeAddress: "localhost:9000",
161170
logTxTimeToFinalized: false,
162171
logTxTimeToExecuted: false,
@@ -425,6 +434,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionDataRequester() *FlowAccessN
425434
var processedBlockHeight storage.ConsumerProgress
426435
var processedNotifications storage.ConsumerProgress
427436
var bsDependable *module.ProxiedReadyDoneAware
437+
var execDataDistributor *edrequester.ExecutionDataDistributor
428438

429439
builder.
430440
AdminCommand("read-execution-data", func(config *cmd.NodeConfig) commands.AdminCommand {
@@ -531,6 +541,8 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionDataRequester() *FlowAccessN
531541
builder.executionDataConfig.InitialBlockHeight = builder.RootBlock.Header.Height
532542
}
533543

544+
execDataDistributor = edrequester.NewExecutionDataDistributor()
545+
534546
builder.ExecutionDataRequester = edrequester.New(
535547
builder.Logger,
536548
metrics.NewExecutionDataRequesterCollector(),
@@ -545,29 +557,50 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionDataRequester() *FlowAccessN
545557
)
546558

547559
builder.FinalizationDistributor.AddOnBlockFinalizedConsumer(builder.ExecutionDataRequester.OnBlockFinalized)
560+
builder.ExecutionDataRequester.AddOnExecutionDataReceivedConsumer(execDataDistributor.OnExecutionDataReceived)
548561

549562
return builder.ExecutionDataRequester, nil
550563
})
551564

552-
if builder.rpcConf.StateStreamListenAddr != "" {
565+
if builder.stateStreamConf.ListenAddr != "" {
553566
builder.Component("exec state stream engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
554-
conf := state_stream.Config{
555-
ListenAddr: builder.rpcConf.StateStreamListenAddr,
556-
MaxExecutionDataMsgSize: builder.rpcConf.MaxExecutionDataMsgSize,
557-
RpcMetricsEnabled: builder.rpcMetricsEnabled,
567+
for key, value := range builder.stateStreamFilterConf {
568+
switch key {
569+
case "EventTypes":
570+
builder.stateStreamConf.MaxEventTypes = value
571+
case "Addresses":
572+
builder.stateStreamConf.MaxAddresses = value
573+
case "Contracts":
574+
builder.stateStreamConf.MaxContracts = value
575+
}
558576
}
577+
builder.stateStreamConf.RpcMetricsEnabled = builder.rpcMetricsEnabled
559578

560-
builder.StateStreamEng = state_stream.NewEng(
561-
conf,
579+
var heroCacheCollector module.HeroCacheMetrics = metrics.NewNoopCollector()
580+
if builder.HeroCacheMetricsEnable {
581+
heroCacheCollector = metrics.AccessNodeExecutionDataCacheMetrics(builder.MetricsRegisterer)
582+
}
583+
584+
stateStreamEng, err := state_stream.NewEng(
585+
node.Logger,
586+
builder.stateStreamConf,
562587
builder.ExecutionDataStore,
588+
node.State,
563589
node.Storage.Headers,
564590
node.Storage.Seals,
565591
node.Storage.Results,
566-
node.Logger,
567592
node.RootChainID,
568593
builder.apiRatelimits,
569594
builder.apiBurstlimits,
595+
heroCacheCollector,
570596
)
597+
if err != nil {
598+
return nil, fmt.Errorf("could not create state stream engine: %w", err)
599+
}
600+
builder.StateStreamEng = stateStreamEng
601+
602+
execDataDistributor.AddOnExecutionDataReceivedConsumer(builder.StateStreamEng.OnExecutionData)
603+
571604
return builder.StateStreamEng, nil
572605
})
573606
}
@@ -602,7 +635,7 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
602635
flags.UintVar(&builder.executionGRPCPort, "execution-ingress-port", defaultConfig.executionGRPCPort, "the grpc ingress port for all execution nodes")
603636
flags.StringVarP(&builder.rpcConf.UnsecureGRPCListenAddr, "rpc-addr", "r", defaultConfig.rpcConf.UnsecureGRPCListenAddr, "the address the unsecured gRPC server listens on")
604637
flags.StringVar(&builder.rpcConf.SecureGRPCListenAddr, "secure-rpc-addr", defaultConfig.rpcConf.SecureGRPCListenAddr, "the address the secure gRPC server listens on")
605-
flags.StringVar(&builder.rpcConf.StateStreamListenAddr, "state-stream-addr", defaultConfig.rpcConf.StateStreamListenAddr, "the address the state stream server listens on (if empty the server will not be started)")
638+
flags.StringVar(&builder.stateStreamConf.ListenAddr, "state-stream-addr", defaultConfig.stateStreamConf.ListenAddr, "the address the state stream server listens on (if empty the server will not be started)")
606639
flags.StringVarP(&builder.rpcConf.HTTPListenAddr, "http-addr", "h", defaultConfig.rpcConf.HTTPListenAddr, "the address the http proxy server listens on")
607640
flags.StringVar(&builder.rpcConf.RESTListenAddr, "rest-addr", defaultConfig.rpcConf.RESTListenAddr, "the address the REST server listens on (if empty the REST server will not be started)")
608641
flags.StringVarP(&builder.rpcConf.CollectionAddr, "static-collection-ingress-addr", "", defaultConfig.rpcConf.CollectionAddr, "the address (of the collection node) to send transactions to")
@@ -613,7 +646,6 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
613646
flags.UintVar(&builder.rpcConf.ConnectionPoolSize, "connection-pool-size", defaultConfig.rpcConf.ConnectionPoolSize, "maximum number of connections allowed in the connection pool, size of 0 disables the connection pooling, and anything less than the default size will be overridden to use the default size")
614647
flags.UintVar(&builder.rpcConf.MaxMsgSize, "rpc-max-message-size", grpcutils.DefaultMaxMsgSize, "the maximum message size in bytes for messages sent or received over grpc")
615648
flags.UintVar(&builder.rpcConf.MaxHeightRange, "rpc-max-height-range", defaultConfig.rpcConf.MaxHeightRange, "maximum size for height range requests")
616-
flags.UintVar(&builder.rpcConf.MaxExecutionDataMsgSize, "max-block-msg-size", defaultConfig.rpcConf.MaxExecutionDataMsgSize, "maximum size for a gRPC message containing block execution data")
617649
flags.StringSliceVar(&builder.rpcConf.PreferredExecutionNodeIDs, "preferred-execution-node-ids", defaultConfig.rpcConf.PreferredExecutionNodeIDs, "comma separated list of execution nodes ids to choose from when making an upstream call e.g. b4a4dbdcd443d...,fb386a6a... etc.")
618650
flags.StringSliceVar(&builder.rpcConf.FixedExecutionNodeIDs, "fixed-execution-node-ids", defaultConfig.rpcConf.FixedExecutionNodeIDs, "comma separated list of execution nodes ids to choose from when making an upstream call if no matching preferred execution id is found e.g. b4a4dbdcd443d...,fb386a6a... etc.")
619651
flags.BoolVar(&builder.logTxTimeToFinalized, "log-tx-time-to-finalized", defaultConfig.logTxTimeToFinalized, "log transaction time to finalized")
@@ -637,6 +669,14 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
637669
flags.DurationVar(&builder.executionDataConfig.MaxFetchTimeout, "execution-data-max-fetch-timeout", defaultConfig.executionDataConfig.MaxFetchTimeout, "maximum timeout to use when fetching execution data from the network e.g. 300s")
638670
flags.DurationVar(&builder.executionDataConfig.RetryDelay, "execution-data-retry-delay", defaultConfig.executionDataConfig.RetryDelay, "initial delay for exponential backoff when fetching execution data fails e.g. 10s")
639671
flags.DurationVar(&builder.executionDataConfig.MaxRetryDelay, "execution-data-max-retry-delay", defaultConfig.executionDataConfig.MaxRetryDelay, "maximum delay for exponential backoff when fetching execution data fails e.g. 5m")
672+
673+
// Execution State Streaming API
674+
flags.Uint32Var(&builder.stateStreamConf.ExecutionDataCacheSize, "execution-data-cache-size", defaultConfig.stateStreamConf.ExecutionDataCacheSize, "block execution data cache size")
675+
flags.Uint32Var(&builder.stateStreamConf.MaxGlobalStreams, "state-stream-global-max-streams", defaultConfig.stateStreamConf.MaxGlobalStreams, "global maximum number of concurrent streams")
676+
flags.UintVar(&builder.stateStreamConf.MaxExecutionDataMsgSize, "state-stream-max-message-size", defaultConfig.stateStreamConf.MaxExecutionDataMsgSize, "maximum size for a gRPC message containing block execution data")
677+
flags.DurationVar(&builder.stateStreamConf.ClientSendTimeout, "state-stream-send-timeout", defaultConfig.stateStreamConf.ClientSendTimeout, "maximum wait before timing out while sending a response to a streaming client e.g. 30s")
678+
flags.UintVar(&builder.stateStreamConf.ClientSendBufferSize, "state-stream-send-buffer-size", defaultConfig.stateStreamConf.ClientSendBufferSize, "maximum number of responses to buffer within a stream")
679+
flags.StringToIntVar(&builder.stateStreamFilterConf, "state-stream-event-filter-limits", defaultConfig.stateStreamFilterConf, "event filter limits for ExecutionData SubscribeEvents API e.g. EventTypes=100,Addresses=100,Contracts=100 etc.")
640680
}).ValidateFlags(func() error {
641681
if builder.supportsObserver && (builder.PublicNetworkConfig.BindAddress == cmd.NotSet || builder.PublicNetworkConfig.BindAddress == "") {
642682
return errors.New("public-network-address must be set if supports-observer is true")
@@ -658,6 +698,27 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
658698
return errors.New("execution-data-max-search-ahead must be greater than 0")
659699
}
660700
}
701+
if builder.stateStreamConf.ListenAddr != "" {
702+
if builder.stateStreamConf.ExecutionDataCacheSize == 0 {
703+
return errors.New("execution-data-cache-size must be greater than 0")
704+
}
705+
if builder.stateStreamConf.ClientSendBufferSize == 0 {
706+
return errors.New("state-stream-send-buffer-size must be greater than 0")
707+
}
708+
if len(builder.stateStreamFilterConf) > 3 {
709+
return errors.New("state-stream-event-filter-limits must have at most 3 keys (EventTypes, Addresses, Contracts)")
710+
}
711+
for key, value := range builder.stateStreamFilterConf {
712+
switch key {
713+
case "EventTypes", "Addresses", "Contracts":
714+
if value <= 0 {
715+
return fmt.Errorf("state-stream-event-filter-limits %s must be greater than 0", key)
716+
}
717+
default:
718+
return errors.New("state-stream-event-filter-limits may only contain the keys EventTypes, Addresses, Contracts")
719+
}
720+
}
721+
}
661722

662723
return nil
663724
})

engine/access/rpc/engine.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,12 @@ import (
3232
type Config struct {
3333
UnsecureGRPCListenAddr string // the non-secure GRPC server address as ip:port
3434
SecureGRPCListenAddr string // the secure GRPC server address as ip:port
35-
StateStreamListenAddr string // the state stream GRPC server address as ip:port
3635
TransportCredentials credentials.TransportCredentials // the secure GRPC credentials
3736
HTTPListenAddr string // the HTTP web proxy address as ip:port
3837
RESTListenAddr string // the REST server address as ip:port (if empty the REST server will not be started)
3938
CollectionAddr string // the address of the upstream collection node
4039
HistoricalAccessAddrs string // the list of all access nodes from previous spork
4140
MaxMsgSize uint // GRPC max message size
42-
MaxExecutionDataMsgSize uint // GRPC max message size for block execution data
4341
ExecutionClientTimeout time.Duration // execution API GRPC client timeout
4442
CollectionClientTimeout time.Duration // collection API GRPC client timeout
4543
ConnectionPoolSize uint // size of the cache for storing collection and execution connections

engine/access/state_stream/api.go

Lines changed: 0 additions & 66 deletions
This file was deleted.

engine/access/state_stream/api_test.go

Lines changed: 0 additions & 121 deletions
This file was deleted.

0 commit comments

Comments
 (0)