Skip to content

Commit 0230922

Browse files
committed
add global stream limit
1 parent 648dfdf commit 0230922

File tree

13 files changed

+188
-62
lines changed

13 files changed

+188
-62
lines changed

cmd/access/node_builder/access_node_builder.go

Lines changed: 54 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,8 @@ type AccessNodeConfig struct {
112112
apiRatelimits map[string]int
113113
apiBurstlimits map[string]int
114114
rpcConf rpc.Config
115+
stateStreamConf state_stream.Config
116+
stateStreamFilterConf map[string]int
115117
ExecutionNodeAddress string // deprecated
116118
HistoricalAccessRPCs []access.AccessAPIClient
117119
logTxTimeToFinalized bool
@@ -143,7 +145,6 @@ func DefaultAccessNodeConfig() *AccessNodeConfig {
143145
rpcConf: rpc.Config{
144146
UnsecureGRPCListenAddr: "0.0.0.0:9000",
145147
SecureGRPCListenAddr: "0.0.0.0:9001",
146-
StateStreamListenAddr: "",
147148
HTTPListenAddr: "0.0.0.0:8000",
148149
RESTListenAddr: "",
149150
CollectionAddr: "",
@@ -154,9 +155,17 @@ func DefaultAccessNodeConfig() *AccessNodeConfig {
154155
MaxHeightRange: backend.DefaultMaxHeightRange,
155156
PreferredExecutionNodeIDs: nil,
156157
FixedExecutionNodeIDs: nil,
157-
MaxExecutionDataMsgSize: grpcutils.DefaultMaxMsgSize,
158158
MaxMsgSize: grpcutils.DefaultMaxMsgSize,
159159
},
160+
stateStreamConf: state_stream.Config{
161+
MaxExecutionDataMsgSize: grpcutils.DefaultMaxMsgSize,
162+
ExecutionDataCacheSize: state_stream.DefaultCacheSize,
163+
ClientSendTimeout: state_stream.DefaultSendTimeout,
164+
ClientSendBufferSize: state_stream.DefaultSendBufferSize,
165+
MaxGlobalStreams: state_stream.DefaultMaxGlobalStreams,
166+
EventFilterConfig: state_stream.DefaultEventFilterConfig,
167+
},
168+
stateStreamFilterConf: nil,
160169
ExecutionNodeAddress: "localhost:9000",
161170
logTxTimeToFinalized: false,
162171
logTxTimeToExecuted: false,
@@ -553,27 +562,33 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionDataRequester() *FlowAccessN
553562
return builder.ExecutionDataRequester, nil
554563
})
555564

556-
if builder.rpcConf.StateStreamListenAddr != "" {
565+
if builder.stateStreamConf.ListenAddr != "" {
557566
builder.Component("exec state stream engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
558-
conf := state_stream.Config{
559-
ListenAddr: builder.rpcConf.StateStreamListenAddr,
560-
MaxExecutionDataMsgSize: builder.rpcConf.MaxExecutionDataMsgSize,
561-
RpcMetricsEnabled: builder.rpcMetricsEnabled,
567+
for key, value := range builder.stateStreamFilterConf {
568+
switch key {
569+
case "EventTypes":
570+
builder.stateStreamConf.MaxEventTypes = value
571+
case "Addresses":
572+
builder.stateStreamConf.MaxAddresses = value
573+
case "Contracts":
574+
builder.stateStreamConf.MaxContracts = value
575+
}
562576
}
577+
builder.stateStreamConf.RpcMetricsEnabled = builder.rpcMetricsEnabled
563578

564579
var heroCacheCollector module.HeroCacheMetrics = metrics.NewNoopCollector()
565580
if builder.HeroCacheMetricsEnable {
566581
heroCacheCollector = metrics.AccessNodeExecutionDataCacheMetrics(builder.MetricsRegisterer)
567582
}
568583

569584
stateStreamEng, err := state_stream.NewEng(
570-
conf,
585+
node.Logger,
586+
builder.stateStreamConf,
571587
builder.ExecutionDataStore,
572588
node.State,
573589
node.Storage.Headers,
574590
node.Storage.Seals,
575591
node.Storage.Results,
576-
node.Logger,
577592
node.RootChainID,
578593
builder.apiRatelimits,
579594
builder.apiBurstlimits,
@@ -620,7 +635,7 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
620635
flags.UintVar(&builder.executionGRPCPort, "execution-ingress-port", defaultConfig.executionGRPCPort, "the grpc ingress port for all execution nodes")
621636
flags.StringVarP(&builder.rpcConf.UnsecureGRPCListenAddr, "rpc-addr", "r", defaultConfig.rpcConf.UnsecureGRPCListenAddr, "the address the unsecured gRPC server listens on")
622637
flags.StringVar(&builder.rpcConf.SecureGRPCListenAddr, "secure-rpc-addr", defaultConfig.rpcConf.SecureGRPCListenAddr, "the address the secure gRPC server listens on")
623-
flags.StringVar(&builder.rpcConf.StateStreamListenAddr, "state-stream-addr", defaultConfig.rpcConf.StateStreamListenAddr, "the address the state stream server listens on (if empty the server will not be started)")
638+
flags.StringVar(&builder.stateStreamConf.ListenAddr, "state-stream-addr", defaultConfig.stateStreamConf.ListenAddr, "the address the state stream server listens on (if empty the server will not be started)")
624639
flags.StringVarP(&builder.rpcConf.HTTPListenAddr, "http-addr", "h", defaultConfig.rpcConf.HTTPListenAddr, "the address the http proxy server listens on")
625640
flags.StringVar(&builder.rpcConf.RESTListenAddr, "rest-addr", defaultConfig.rpcConf.RESTListenAddr, "the address the REST server listens on (if empty the REST server will not be started)")
626641
flags.StringVarP(&builder.rpcConf.CollectionAddr, "static-collection-ingress-addr", "", defaultConfig.rpcConf.CollectionAddr, "the address (of the collection node) to send transactions to")
@@ -631,7 +646,6 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
631646
flags.UintVar(&builder.rpcConf.ConnectionPoolSize, "connection-pool-size", defaultConfig.rpcConf.ConnectionPoolSize, "maximum number of connections allowed in the connection pool, size of 0 disables the connection pooling, and anything less than the default size will be overridden to use the default size")
632647
flags.UintVar(&builder.rpcConf.MaxMsgSize, "rpc-max-message-size", grpcutils.DefaultMaxMsgSize, "the maximum message size in bytes for messages sent or received over grpc")
633648
flags.UintVar(&builder.rpcConf.MaxHeightRange, "rpc-max-height-range", defaultConfig.rpcConf.MaxHeightRange, "maximum size for height range requests")
634-
flags.UintVar(&builder.rpcConf.MaxExecutionDataMsgSize, "max-block-msg-size", defaultConfig.rpcConf.MaxExecutionDataMsgSize, "maximum size for a gRPC message containing block execution data")
635649
flags.StringSliceVar(&builder.rpcConf.PreferredExecutionNodeIDs, "preferred-execution-node-ids", defaultConfig.rpcConf.PreferredExecutionNodeIDs, "comma separated list of execution nodes ids to choose from when making an upstream call e.g. b4a4dbdcd443d...,fb386a6a... etc.")
636650
flags.StringSliceVar(&builder.rpcConf.FixedExecutionNodeIDs, "fixed-execution-node-ids", defaultConfig.rpcConf.FixedExecutionNodeIDs, "comma separated list of execution nodes ids to choose from when making an upstream call if no matching preferred execution id is found e.g. b4a4dbdcd443d...,fb386a6a... etc.")
637651
flags.BoolVar(&builder.logTxTimeToFinalized, "log-tx-time-to-finalized", defaultConfig.logTxTimeToFinalized, "log transaction time to finalized")
@@ -655,6 +669,14 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
655669
flags.DurationVar(&builder.executionDataConfig.MaxFetchTimeout, "execution-data-max-fetch-timeout", defaultConfig.executionDataConfig.MaxFetchTimeout, "maximum timeout to use when fetching execution data from the network e.g. 300s")
656670
flags.DurationVar(&builder.executionDataConfig.RetryDelay, "execution-data-retry-delay", defaultConfig.executionDataConfig.RetryDelay, "initial delay for exponential backoff when fetching execution data fails e.g. 10s")
657671
flags.DurationVar(&builder.executionDataConfig.MaxRetryDelay, "execution-data-max-retry-delay", defaultConfig.executionDataConfig.MaxRetryDelay, "maximum delay for exponential backoff when fetching execution data fails e.g. 5m")
672+
673+
// Execution State Streaming API
674+
flags.Uint32Var(&builder.stateStreamConf.ExecutionDataCacheSize, "execution-data-cache-size", defaultConfig.stateStreamConf.ExecutionDataCacheSize, "block execution data cache size")
675+
flags.Uint32Var(&builder.stateStreamConf.MaxGlobalStreams, "state-stream-global-max-streams", defaultConfig.stateStreamConf.MaxGlobalStreams, "global maximum number of concurrent streams")
676+
flags.UintVar(&builder.stateStreamConf.MaxExecutionDataMsgSize, "state-stream-max-message-size", defaultConfig.stateStreamConf.MaxExecutionDataMsgSize, "maximum size for a gRPC message containing block execution data")
677+
flags.DurationVar(&builder.stateStreamConf.ClientSendTimeout, "state-stream-send-timeout", defaultConfig.stateStreamConf.ClientSendTimeout, "maximum wait before timing out while sending a response to a streaming client e.g. 30s")
678+
flags.UintVar(&builder.stateStreamConf.ClientSendBufferSize, "state-stream-send-buffer-size", defaultConfig.stateStreamConf.ClientSendBufferSize, "maximum number of responses to buffer within a stream")
679+
flags.StringToIntVar(&builder.stateStreamFilterConf, "state-stream-event-filter-limits", defaultConfig.stateStreamFilterConf, "event filter limits for ExecutionData SubscribeEvents API e.g. EventTypes=100,Addresses=100,Contracts=100 etc.")
658680
}).ValidateFlags(func() error {
659681
if builder.supportsObserver && (builder.PublicNetworkConfig.BindAddress == cmd.NotSet || builder.PublicNetworkConfig.BindAddress == "") {
660682
return errors.New("public-network-address must be set if supports-observer is true")
@@ -676,6 +698,27 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
676698
return errors.New("execution-data-max-search-ahead must be greater than 0")
677699
}
678700
}
701+
if builder.stateStreamConf.ListenAddr != "" {
702+
if builder.stateStreamConf.ExecutionDataCacheSize == 0 {
703+
return errors.New("execution-data-cache-size must be greater than 0")
704+
}
705+
if builder.stateStreamConf.ClientSendBufferSize == 0 {
706+
return errors.New("state-stream-send-buffer-size must be greater than 0")
707+
}
708+
if len(builder.stateStreamFilterConf) > 3 {
709+
return errors.New("state-stream-event-filter-limits must have at most 3 keys (EventTypes, Addresses, Contracts)")
710+
}
711+
for key, value := range builder.stateStreamFilterConf {
712+
switch key {
713+
case "EventTypes", "Addresses", "Contracts":
714+
if value <= 0 {
715+
return fmt.Errorf("state-stream-event-filter-limits %s must be greater than 0", key)
716+
}
717+
default:
718+
return errors.New("state-stream-event-filter-limits may only contain the keys EventTypes, Addresses, Contracts")
719+
}
720+
}
721+
}
679722

680723
return nil
681724
})

engine/access/rpc/engine.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,12 @@ import (
3232
type Config struct {
3333
UnsecureGRPCListenAddr string // the non-secure GRPC server address as ip:port
3434
SecureGRPCListenAddr string // the secure GRPC server address as ip:port
35-
StateStreamListenAddr string // the state stream GRPC server address as ip:port
3635
TransportCredentials credentials.TransportCredentials // the secure GRPC credentials
3736
HTTPListenAddr string // the HTTP web proxy address as ip:port
3837
RESTListenAddr string // the REST server address as ip:port (if empty the REST server will not be started)
3938
CollectionAddr string // the address of the upstream collection node
4039
HistoricalAccessAddrs string // the list of all access nodes from previous spork
4140
MaxMsgSize uint // GRPC max message size
42-
MaxExecutionDataMsgSize uint // GRPC max message size for block execution data
4341
ExecutionClientTimeout time.Duration // execution API GRPC client timeout
4442
CollectionClientTimeout time.Duration // collection API GRPC client timeout
4543
ConnectionPoolSize uint // size of the cache for storing collection and execution connections

engine/access/state_stream/backend.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ import (
2020
)
2121

2222
const (
23+
// DefaultMaxGlobalStreams defines the default max number of streams that can be open at the same time.
24+
DefaultMaxGlobalStreams = 1000
25+
26+
// DefaultCacheSize defines the default max number of objects for the execution data cache.
2327
DefaultCacheSize = 100
2428

2529
// DefaultSendTimeout is the default timeout for sending a message to the client. After the timeout
@@ -48,11 +52,11 @@ type StateStreamBackend struct {
4852
execDataStore execution_data.ExecutionDataStore
4953
execDataCache *herocache.Cache
5054
broadcaster *engine.Broadcaster
51-
sendTimeout time.Duration
5255
}
5356

5457
func New(
5558
log zerolog.Logger,
59+
config Config,
5660
state protocol.State,
5761
headers storage.Headers,
5862
seals storage.Seals,
@@ -72,14 +76,14 @@ func New(
7276
execDataStore: execDataStore,
7377
execDataCache: execDataCache,
7478
broadcaster: broadcaster,
75-
sendTimeout: DefaultSendTimeout,
7679
}
7780

7881
b.ExecutionDataBackend = ExecutionDataBackend{
7982
log: logger,
8083
headers: headers,
8184
broadcaster: broadcaster,
82-
sendTimeout: DefaultSendTimeout,
85+
sendTimeout: config.ClientSendTimeout,
86+
sendBufferSize: int(config.ClientSendBufferSize),
8387
getExecutionData: b.getExecutionData,
8488
getStartHeight: b.getStartHeight,
8589
}
@@ -88,7 +92,8 @@ func New(
8892
log: logger,
8993
headers: headers,
9094
broadcaster: broadcaster,
91-
sendTimeout: DefaultSendTimeout,
95+
sendTimeout: config.ClientSendTimeout,
96+
sendBufferSize: int(config.ClientSendBufferSize),
9297
getExecutionData: b.getExecutionData,
9398
getStartHeight: b.getStartHeight,
9499
}

engine/access/state_stream/backend_events.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,11 @@ type EventsResponse struct {
2121
}
2222

2323
type EventsBackend struct {
24-
log zerolog.Logger
25-
headers storage.Headers
26-
broadcaster *engine.Broadcaster
27-
sendTimeout time.Duration
24+
log zerolog.Logger
25+
headers storage.Headers
26+
broadcaster *engine.Broadcaster
27+
sendTimeout time.Duration
28+
sendBufferSize int
2829

2930
getExecutionData GetExecutionDataFunc
3031
getStartHeight GetStartHeightFunc
@@ -33,7 +34,7 @@ type EventsBackend struct {
3334
func (b EventsBackend) SubscribeEvents(ctx context.Context, startBlockID flow.Identifier, startHeight uint64, filter EventFilter) Subscription {
3435
nextHeight, err := b.getStartHeight(startBlockID, startHeight)
3536
if err != nil {
36-
sub := NewSubscription()
37+
sub := NewSubscription(b.sendBufferSize)
3738
if st, ok := status.FromError(err); ok {
3839
sub.Fail(status.Errorf(st.Code(), "could not get start height: %s", st.Message()))
3940
return sub
@@ -43,7 +44,7 @@ func (b EventsBackend) SubscribeEvents(ctx context.Context, startBlockID flow.Id
4344
return sub
4445
}
4546

46-
sub := NewHeightBasedSubscription(nextHeight, b.getResponseFactory(filter))
47+
sub := NewHeightBasedSubscription(b.sendBufferSize, nextHeight, b.getResponseFactory(filter))
4748

4849
go NewStreamer(b.log, b.broadcaster, b.sendTimeout, sub).Stream(ctx)
4950

engine/access/state_stream/backend_events_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,13 +77,13 @@ func (s *BackendEventsSuite) TestSubscribeEvents() {
7777

7878
t2 := test
7979
t2.name = fmt.Sprintf("%s - some events", test.name)
80-
t2.filters, err = NewEventFilter(chain, []string{string(testEventTypes[0])}, nil, nil)
80+
t2.filters, err = NewEventFilter(DefaultEventFilterConfig, chain, []string{string(testEventTypes[0])}, nil, nil)
8181
require.NoError(s.T(), err)
8282
tests = append(tests, t2)
8383

8484
t3 := test
8585
t3.name = fmt.Sprintf("%s - no events", test.name)
86-
t3.filters, err = NewEventFilter(chain, []string{"A.0x1.NonExistent.Event"}, nil, nil)
86+
t3.filters, err = NewEventFilter(DefaultEventFilterConfig, chain, []string{"A.0x1.NonExistent.Event"}, nil, nil)
8787
require.NoError(s.T(), err)
8888
tests = append(tests, t3)
8989
}

engine/access/state_stream/backend_executiondata.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,11 @@ type ExecutionDataResponse struct {
2323
}
2424

2525
type ExecutionDataBackend struct {
26-
log zerolog.Logger
27-
headers storage.Headers
28-
broadcaster *engine.Broadcaster
29-
sendTimeout time.Duration
26+
log zerolog.Logger
27+
headers storage.Headers
28+
broadcaster *engine.Broadcaster
29+
sendTimeout time.Duration
30+
sendBufferSize int
3031

3132
getExecutionData GetExecutionDataFunc
3233
getStartHeight GetStartHeightFunc
@@ -50,7 +51,7 @@ func (b *ExecutionDataBackend) GetExecutionDataByBlockID(ctx context.Context, bl
5051
func (b *ExecutionDataBackend) SubscribeExecutionData(ctx context.Context, startBlockID flow.Identifier, startHeight uint64) Subscription {
5152
nextHeight, err := b.getStartHeight(startBlockID, startHeight)
5253
if err != nil {
53-
sub := NewSubscription()
54+
sub := NewSubscription(b.sendBufferSize)
5455
if st, ok := status.FromError(err); ok {
5556
sub.Fail(status.Errorf(st.Code(), "could not get start height: %s", st.Message()))
5657
return sub
@@ -60,7 +61,7 @@ func (b *ExecutionDataBackend) SubscribeExecutionData(ctx context.Context, start
6061
return sub
6162
}
6263

63-
sub := NewHeightBasedSubscription(nextHeight, b.getResponse)
64+
sub := NewHeightBasedSubscription(b.sendBufferSize, nextHeight, b.getResponse)
6465

6566
go NewStreamer(b.log, b.broadcaster, b.sendTimeout, sub).Stream(ctx)
6667

engine/access/state_stream/backend_executiondata_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,9 +92,15 @@ func (s *BackendExecutionDataSuite) SetupTest() {
9292
metrics.NewNoopCollector(),
9393
)
9494

95+
conf := Config{
96+
ClientSendTimeout: DefaultSendTimeout,
97+
ClientSendBufferSize: DefaultSendBufferSize,
98+
}
99+
95100
var err error
96101
s.backend, err = New(
97102
logger,
103+
conf,
98104
s.state,
99105
s.headers,
100106
s.seals,

engine/access/state_stream/engine.go

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package state_stream
33
import (
44
"fmt"
55
"net"
6+
"time"
67

78
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
89
access "github.com/onflow/flow/protobuf/go/flow/executiondata"
@@ -25,9 +26,29 @@ import (
2526

2627
// Config defines the configurable options for the ingress server.
2728
type Config struct {
28-
ListenAddr string
29-
MaxExecutionDataMsgSize uint // in bytes
30-
RpcMetricsEnabled bool // enable GRPC metrics
29+
EventFilterConfig
30+
31+
// ListenAddr is the address the GRPC server will listen on as host:port
32+
ListenAddr string
33+
34+
// MaxExecutionDataMsgSize is the max message size for block execution data API
35+
MaxExecutionDataMsgSize uint
36+
37+
// RpcMetricsEnabled specifies whether to enable the GRPC metrics
38+
RpcMetricsEnabled bool
39+
40+
// MaxGlobalStreams defines the global max number of streams that can be open at the same time.
41+
MaxGlobalStreams uint32
42+
43+
// ExecutionDataCacheSize is the max number of objects for the execution data cache.
44+
ExecutionDataCacheSize uint32
45+
46+
// ClientSendTimeout is the timeout for sending a message to the client. After the timeout,
47+
// the stream is closed with an error.
48+
ClientSendTimeout time.Duration
49+
50+
// ClientSendBufferSize is the size of the response buffer for sending messages to the client.
51+
ClientSendBufferSize uint
3152
}
3253

3354
// Engine exposes the server with the state stream API.
@@ -48,15 +69,15 @@ type Engine struct {
4869
stateStreamGrpcAddress net.Addr
4970
}
5071

51-
// New returns a new ingress server.
72+
// NewEng returns a new ingress server.
5273
func NewEng(
74+
log zerolog.Logger,
5375
config Config,
5476
execDataStore execution_data.ExecutionDataStore,
5577
state protocol.State,
5678
headers storage.Headers,
5779
seals storage.Seals,
5880
results storage.ExecutionResults,
59-
log zerolog.Logger,
6081
chainID flow.ChainID,
6182
apiRatelimits map[string]int, // the api rate limit (max calls per second) for each of the gRPC API e.g. Ping->100, GetExecutionDataByBlockID->300
6283
apiBurstLimits map[string]int, // the api burst limit (max calls at the same time) for each of the gRPC API e.g. Ping->50, GetExecutionDataByBlockID->10
@@ -93,7 +114,7 @@ func NewEng(
93114
server := grpc.NewServer(grpcOpts...)
94115

95116
execDataCache := herocache.NewCache(
96-
DefaultCacheSize,
117+
config.ExecutionDataCacheSize,
97118
herocache.DefaultOversizeFactor,
98119
heropool.LRUEjection,
99120
logger,
@@ -102,7 +123,7 @@ func NewEng(
102123

103124
broadcaster := engine.NewBroadcaster()
104125

105-
backend, err := New(logger, state, headers, seals, results, execDataStore, execDataCache, broadcaster)
126+
backend, err := New(logger, config, state, headers, seals, results, execDataStore, execDataCache, broadcaster)
106127
if err != nil {
107128
return nil, fmt.Errorf("could not create state stream backend: %w", err)
108129
}
@@ -113,7 +134,7 @@ func NewEng(
113134
server: server,
114135
chain: chainID.Chain(),
115136
config: config,
116-
handler: NewHandler(backend, chainID.Chain()),
137+
handler: NewHandler(backend, chainID.Chain(), config.EventFilterConfig, config.MaxGlobalStreams),
117138
execDataBroadcaster: broadcaster,
118139
execDataCache: execDataCache,
119140
}
@@ -127,6 +148,7 @@ func NewEng(
127148
return e, nil
128149
}
129150

151+
// OnExecutionData is called to notify the engine when a new execution data is received.
130152
func (e *Engine) OnExecutionData(executionData *execution_data.BlockExecutionDataEntity) {
131153
e.log.Trace().
132154
Hex("block_id", logging.ID(executionData.BlockID)).

0 commit comments

Comments
 (0)