diff --git a/proxyd/config.go b/proxyd/config.go index 677a1942..3a9335a7 100644 --- a/proxyd/config.go +++ b/proxyd/config.go @@ -26,11 +26,12 @@ type ServerConfig struct { MaxUpstreamBatchSize int `toml:"max_upstream_batch_size"` - EnableRequestLog bool `toml:"enable_request_log"` - MaxRequestBodyLogLen int `toml:"max_request_body_log_len"` - EnablePprof bool `toml:"enable_pprof"` - EnableXServedByHeader bool `toml:"enable_served_by_header"` - AllowAllOrigins bool `toml:"allow_all_origins"` + EnableRequestLog bool `toml:"enable_request_log"` + MaxRequestBodyLogLen int `toml:"max_request_body_log_len"` + EnablePprof bool `toml:"enable_pprof"` + EnableXServedByHeader bool `toml:"enable_served_by_header"` + AllowAllOrigins bool `toml:"allow_all_origins"` + EnableFlashblocksAwareRouting bool `toml:"enable_flashblocks_aware_routing"` } type CacheConfig struct { @@ -212,22 +213,23 @@ type SenderRateLimitConfig struct { } type Config struct { - WSBackendGroup string `toml:"ws_backend_group"` - Server ServerConfig `toml:"server"` - Cache CacheConfig `toml:"cache"` - Redis RedisConfig `toml:"redis"` - Metrics MetricsConfig `toml:"metrics"` - RateLimit RateLimitConfig `toml:"rate_limit"` - BackendOptions BackendOptions `toml:"backend"` - Backends BackendsConfig `toml:"backends"` - BatchConfig BatchConfig `toml:"batch"` - Authentication map[string]string `toml:"authentication"` - BackendGroups BackendGroupsConfig `toml:"backend_groups"` - RPCMethodMappings map[string]string `toml:"rpc_method_mappings"` - WSMethodWhitelist []string `toml:"ws_method_whitelist"` - WhitelistErrorMessage string `toml:"whitelist_error_message"` - SenderRateLimit SenderRateLimitConfig `toml:"sender_rate_limit"` - InteropValidationConfig InteropValidationConfig `toml:"interop_validation"` + WSBackendGroup string `toml:"ws_backend_group"` + FlashblocksAwareBackendGroup string `toml:"flashblocks_aware_backend_group"` + Server ServerConfig `toml:"server"` + Cache CacheConfig `toml:"cache"` + Redis RedisConfig `toml:"redis"` + Metrics MetricsConfig `toml:"metrics"` + RateLimit RateLimitConfig `toml:"rate_limit"` + BackendOptions BackendOptions `toml:"backend"` + Backends BackendsConfig `toml:"backends"` + BatchConfig BatchConfig `toml:"batch"` + Authentication map[string]string `toml:"authentication"` + BackendGroups BackendGroupsConfig `toml:"backend_groups"` + RPCMethodMappings map[string]string `toml:"rpc_method_mappings"` + WSMethodWhitelist []string `toml:"ws_method_whitelist"` + WhitelistErrorMessage string `toml:"whitelist_error_message"` + SenderRateLimit SenderRateLimitConfig `toml:"sender_rate_limit"` + InteropValidationConfig InteropValidationConfig `toml:"interop_validation"` } type InteropValidationConfig struct { diff --git a/proxyd/example.config.toml b/proxyd/example.config.toml index 26da42da..0879326d 100644 --- a/proxyd/example.config.toml +++ b/proxyd/example.config.toml @@ -1,11 +1,9 @@ # List of WS methods to whitelist. -ws_method_whitelist = [ - "eth_subscribe", - "eth_call", - "eth_chainId" -] +ws_method_whitelist = ["eth_subscribe", "eth_call", "eth_chainId"] # Enable WS on this backend group. There can only be one WS-enabled backend group. ws_backend_group = "main" +# Enable Flashblocks-aware routing on this backend group. There can only be one flashblocks-aware backend group. +flashblocks_aware_backend_group = "flashblocks" [server] # Host for the proxyd RPC server to listen on. @@ -22,6 +20,8 @@ max_body_size_bytes = 10485760 max_concurrent_rpcs = 1000 # Server log level log_level = "info" +# Enable flashblocks-aware routing for pending JSON-RPC calls. +enable_flashblocks_aware_routing = true [redis] # URL to a Redis instance. @@ -114,6 +114,9 @@ backends = ["infura"] [backend_groups.alchemy] backends = ["alchemy"] +[backend_groups.flashblocks] +backends = ["alchemy"] + # If the authentication group below is in the config, # proxyd will only accept authenticated requests. [authentication] diff --git a/proxyd/integration_tests/flashblocks_aware_test.go b/proxyd/integration_tests/flashblocks_aware_test.go new file mode 100644 index 00000000..bc8cf610 --- /dev/null +++ b/proxyd/integration_tests/flashblocks_aware_test.go @@ -0,0 +1,99 @@ +package integration_tests + +import ( + "os" + "testing" + + "github.com/ethereum-optimism/infra/proxyd" + "github.com/stretchr/testify/require" +) + +const flashblocksDummyRes = "{\"id\": 456, \"jsonrpc\": \"2.0\", \"result\": \"flashblocks\"}" + +func TestFlashblocksAwareBackend(t *testing.T) { + goodBackend := NewMockBackend(SingleResponseHandler(200, dummyRes)) + defer goodBackend.Close() + + flashblocksBackend := NewMockBackend(SingleResponseHandler(200, flashblocksDummyRes)) + defer flashblocksBackend.Close() + + require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", goodBackend.URL())) + require.NoError(t, os.Setenv("FLASHBLOCKS_AWARE_BACKEND_RPC_URL", flashblocksBackend.URL())) + + config := ReadConfig("flashblocks") + client := NewProxydClient("http://127.0.0.1:8545") + _, shutdown, err := proxyd.Start(config) + require.NoError(t, err) + defer shutdown() + + tests := []struct { + name string + method string + params []any + want string + }{ + { + "eth_getTransactionReceipt always routes to flashblocks backend", + "eth_getTransactionReceipt", + []any{"0x01"}, + flashblocksDummyRes, + }, + { + "flashblocks-incompatible RPC routes to regular backend", + "eth_chainId", + nil, + dummyRes, + }, + { + "pending eth_getBlockByNumber", + "eth_getBlockByNumber", + []any{"pending", true}, + flashblocksDummyRes, + }, + { + "pending eth_getBlockByNumber - non-standard parameters", + "eth_getBlockByNumber", + []any{"pending", "true"}, + flashblocksDummyRes, + }, + { + "pending eth_getBlockByNumber - no detail flag", + "eth_getBlockByNumber", + []any{"pending", false}, + dummyRes, + }, + { + "pending eth_getBalance", + "eth_getBalance", + []any{"0x01", "pending"}, + flashblocksDummyRes, + }, + { + "latest eth_getBalance", + "eth_getBalance", + []any{"0x01", "latest"}, + dummyRes, + }, + { + "pending eth_getTransactionCount", + "eth_getTransactionCount", + []any{"0x01", "pending"}, + flashblocksDummyRes, + }, + { + "latest eth_getTransactionCount", + "eth_getTransactionCount", + []any{"0x01", "latest"}, + dummyRes, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + res, code, err := client.SendRPC(tt.method, tt.params) + require.NoError(t, err) + require.Equal(t, 200, code) + RequireEqualJSON(t, []byte(tt.want), res) + }) + } +} diff --git a/proxyd/integration_tests/testdata/flashblocks.toml b/proxyd/integration_tests/testdata/flashblocks.toml new file mode 100644 index 00000000..0808efc4 --- /dev/null +++ b/proxyd/integration_tests/testdata/flashblocks.toml @@ -0,0 +1,29 @@ +flashblocks_aware_backend_group = "flashblocks" + +[server] +rpc_port = 8545 +enable_flashblocks_aware_routing = true + +[backend] +response_timeout_seconds = 1 + +[backends] +[backends.good] +rpc_url = "$GOOD_BACKEND_RPC_URL" + +[backends.flashblocks_aware] +rpc_url = "$FLASHBLOCKS_AWARE_BACKEND_RPC_URL" + +[backend_groups] +[backend_groups.main] +backends = ["good"] + +[backend_groups.flashblocks] +backends = ["flashblocks_aware"] + +[rpc_method_mappings] +eth_chainId = "main" +eth_getTransactionCount = "main" +eth_getTransactionReceipt = "main" +eth_getBlockByNumber = "main" +eth_getBalance = "main" diff --git a/proxyd/proxyd.go b/proxyd/proxyd.go index c5134809..67bf4b35 100644 --- a/proxyd/proxyd.go +++ b/proxyd/proxyd.go @@ -311,6 +311,18 @@ func Start(config *Config) (*Server, func(), error) { return nil, nil, fmt.Errorf("a ws port was defined, but no ws group was defined") } + var flashblocksAwareBackendGroup *BackendGroup + if config.FlashblocksAwareBackendGroup != "" { + flashblocksAwareBackendGroup = backendGroups[config.FlashblocksAwareBackendGroup] + if flashblocksAwareBackendGroup == nil { + return nil, nil, fmt.Errorf("flashblocks-aware backend group %s does not exist", config.FlashblocksAwareBackendGroup) + } + } + + if flashblocksAwareBackendGroup == nil && config.Server.EnableFlashblocksAwareRouting { + return nil, nil, errors.New("flashblocks-aware routing is enabled, but no flashblock-aware group was defined") + } + for _, bg := range config.RPCMethodMappings { if backendGroups[bg] == nil { return nil, nil, fmt.Errorf("undefined backend group %s", bg) @@ -417,6 +429,8 @@ func Start(config *Config) (*Server, func(), error) { limiterFactory, config.InteropValidationConfig, interopStrategy, + config.Server.EnableFlashblocksAwareRouting, + flashblocksAwareBackendGroup, ) if err != nil { return nil, nil, fmt.Errorf("error creating server: %w", err) diff --git a/proxyd/rpc.go b/proxyd/rpc.go index a2319fd5..40c1c672 100644 --- a/proxyd/rpc.go +++ b/proxyd/rpc.go @@ -168,3 +168,50 @@ func IsBatch(raw []byte) bool { } return false } + +// IsPendingRequest returns true if req requests information available at the latest flashblock. +func IsPendingRequest(req *RPCReq) bool { + var params []any + if err := json.Unmarshal(req.Params, ¶ms); err != nil { + return false + } + + switch req.Method { + case "eth_getTransactionReceipt": + // When flashblocks-aware routing is enabled, any transaction receipt retrieval could be for a pending block. + return true + + case "eth_getBlockByNumber": + if len(params) != 2 { + break + } + + blockNumber, ok := params[0].(string) + if !ok { + break + } + + var detailFlag bool + switch flag := params[1].(type) { + case string: + detailFlag = flag == "true" + case bool: + detailFlag = flag + } + return blockNumber == "pending" && detailFlag + + case "eth_getBalance", "eth_getTransactionCount": + if len(params) != 2 { + break + } + + blockNumber, ok := params[1].(string) + if !ok { + break + } + return blockNumber == "pending" + } + + // Either the method is not flashblocks-compatible or the method's parameters do not request information at the latest flashblock. + return false +} diff --git a/proxyd/rpc_test.go b/proxyd/rpc_test.go index 9d6786e7..c554fcb7 100644 --- a/proxyd/rpc_test.go +++ b/proxyd/rpc_test.go @@ -87,3 +87,82 @@ func TestRPCResJSON(t *testing.T) { }) } } + +func TestIsPendingRequest(t *testing.T) { + tests := []struct { + name string + in *RPCReq + expected bool + }{ + { + "eth_getTransactionReceipt", + &RPCReq{ + Method: "eth_getTransactionReceipt", + Params: mustMarshalJSON([]string{"0x00"}), + }, + true, + }, + { + "eth_getBlockByNumber with pending block number and detail flag set", + &RPCReq{ + Method: "eth_getBlockByNumber", + Params: mustMarshalJSON([]any{"pending", "true"}), + }, + true, + }, + { + "eth_getBlockByNumber with pending block number and detail flag unset", + &RPCReq{ + Method: "eth_getBlockByNumber", + Params: mustMarshalJSON([]any{"pending", false}), + }, + false, + }, + { + "eth_getBlockByNumber with latest block number and detail flag set", + &RPCReq{ + Method: "eth_getBlockByNumber", + Params: mustMarshalJSON([]any{"latest", false}), + }, + false, + }, + { + "eth_getBalance with pending block number", + &RPCReq{ + Method: "eth_getBalance", + Params: mustMarshalJSON([]string{"0x01", "pending"}), + }, + true, + }, + { + "eth_getBalance with latest block number", + &RPCReq{ + Method: "eth_getBalance", + Params: mustMarshalJSON([]string{"0x01", "latest"}), + }, + false, + }, + { + "eth_getTransactionCount with pending block number", + &RPCReq{ + Method: "eth_getTransactionCount", + Params: mustMarshalJSON([]string{"0x01", "pending"}), + }, + true, + }, + { + "eth_getTransactionCount with latest block number", + &RPCReq{ + Method: "eth_getTransactionCount", + Params: mustMarshalJSON([]string{"0x01", "latest"}), + }, + false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require.Equal(t, IsPendingRequest(tt.in), tt.expected) + }) + } +} diff --git a/proxyd/server.go b/proxyd/server.go index 8bbcc5f4..f249a5bc 100644 --- a/proxyd/server.go +++ b/proxyd/server.go @@ -58,34 +58,36 @@ const ( var emptyArrayResponse = json.RawMessage("[]") type Server struct { - BackendGroups map[string]*BackendGroup - wsBackendGroup *BackendGroup - wsMethodWhitelist *StringSet - rpcMethodMappings map[string]string - maxBodySize int64 - enableRequestLog bool - maxRequestBodyLogLen int - authenticatedPaths map[string]string - timeout time.Duration - maxUpstreamBatchSize int - maxBatchSize int - enableServedByHeader bool - upgrader *websocket.Upgrader - mainLim FrontendRateLimiter - overrideLims map[string]FrontendRateLimiter - senderLim FrontendRateLimiter - interopSenderLim FrontendRateLimiter - allowedChainIds []*big.Int - limExemptOrigins []*regexp.Regexp - limExemptUserAgents []*regexp.Regexp - globallyLimitedMethods map[string]bool - rpcServer *http.Server - wsServer *http.Server - cache RPCCache - srvMu sync.Mutex - rateLimitHeader string - interopValidatingConfig InteropValidationConfig - interopStrategy InteropStrategy + BackendGroups map[string]*BackendGroup + wsBackendGroup *BackendGroup + enableFlashblocksAwareRouting bool + flashblocksAwareBackendGroup *BackendGroup + wsMethodWhitelist *StringSet + rpcMethodMappings map[string]string + maxBodySize int64 + enableRequestLog bool + maxRequestBodyLogLen int + authenticatedPaths map[string]string + timeout time.Duration + maxUpstreamBatchSize int + maxBatchSize int + enableServedByHeader bool + upgrader *websocket.Upgrader + mainLim FrontendRateLimiter + overrideLims map[string]FrontendRateLimiter + senderLim FrontendRateLimiter + interopSenderLim FrontendRateLimiter + allowedChainIds []*big.Int + limExemptOrigins []*regexp.Regexp + limExemptUserAgents []*regexp.Regexp + globallyLimitedMethods map[string]bool + rpcServer *http.Server + wsServer *http.Server + cache RPCCache + srvMu sync.Mutex + rateLimitHeader string + interopValidatingConfig InteropValidationConfig + interopStrategy InteropStrategy } type limiterFunc func(method string) bool @@ -112,6 +114,8 @@ func NewServer( limiterFactory limiterFactoryFunc, interopValidatingConfig InteropValidationConfig, interopStrategy InteropStrategy, + enableFlashblocksAwareRouting bool, + flashblocksAwareBackendGroup *BackendGroup, ) (*Server, error) { if cache == nil { cache = &NoopRPCCache{} @@ -201,17 +205,19 @@ func NewServer( upgrader: &websocket.Upgrader{ HandshakeTimeout: defaultWSHandshakeTimeout, }, - mainLim: mainLim, - overrideLims: overrideLims, - globallyLimitedMethods: globalMethodLims, - senderLim: senderLim, - interopSenderLim: interopSenderLim, - allowedChainIds: senderRateLimitConfig.AllowedChainIds, - limExemptOrigins: limExemptOrigins, - limExemptUserAgents: limExemptUserAgents, - rateLimitHeader: rateLimitHeader, - interopValidatingConfig: interopValidatingConfig, - interopStrategy: interopStrategy, + mainLim: mainLim, + overrideLims: overrideLims, + globallyLimitedMethods: globalMethodLims, + senderLim: senderLim, + interopSenderLim: interopSenderLim, + allowedChainIds: senderRateLimitConfig.AllowedChainIds, + limExemptOrigins: limExemptOrigins, + limExemptUserAgents: limExemptUserAgents, + rateLimitHeader: rateLimitHeader, + interopValidatingConfig: interopValidatingConfig, + interopStrategy: interopStrategy, + enableFlashblocksAwareRouting: enableFlashblocksAwareRouting, + flashblocksAwareBackendGroup: flashblocksAwareBackendGroup, }, nil } @@ -536,6 +542,10 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL } group := s.rpcMethodMappings[parsedReq.Method] + if s.enableFlashblocksAwareRouting && IsPendingRequest(parsedReq) { + group = s.flashblocksAwareBackendGroup.Name + } + if group == "" { // use unknown below to prevent DOS vector that fills up memory // with arbitrary method names.