Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions core/services/gateway/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@ type ShardedDONConfig struct {
F int
Shards []Shard
}

// ShardDONID returns the donID for a given shard
func ShardDONID(donName string, shardIdx int) string {
if shardIdx == 0 {
// NOTE: special case for backward compatibility - shard 0 doesn't have an index suffix
return donName
}
return fmt.Sprintf("%s_%d", donName, shardIdx)
}

type Shard struct {
Nodes []NodeConfig
}
Expand Down
89 changes: 70 additions & 19 deletions core/services/gateway/connectionmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (m *connectionManager) Name() string { return m.lggr.Name() }
type donConnectionManager struct {
donConfig *config.DONConfig
nodes map[string]*nodeState
handler handlers.Handler
handlers map[string]handlers.Handler // service name -> handler
closeWait sync.WaitGroup
shutdownCh services.StopChan
gMetrics *monitoring.GatewayMetrics
Expand Down Expand Up @@ -102,30 +102,43 @@ func NewConnectionManager(gwConfig *config.GatewayConfig, clock clockwork.Clock,
if ok {
return nil, fmt.Errorf("duplicate DON ID %s", donConfig.DonId)
}
nodes := make(map[string]*nodeState)
for _, nodeConfig := range donConfig.Members {
nodeAddress := strings.ToLower(nodeConfig.Address)
_, ok := nodes[nodeAddress]
if ok {
return nil, fmt.Errorf("duplicate node address %s in DON %s", nodeAddress, donConfig.DonId)
}
connWrapper := network.NewWSConnectionWrapper(lggr)
if connWrapper == nil {
return nil, fmt.Errorf("error creating WSConnectionWrapper for node %s", nodeAddress)
}
nodes[nodeAddress] = &nodeState{
name: nodeConfig.Name,
conn: connWrapper,
}
nodes, err := buildNodeStates(donConfig.Members, donConfig.DonId, lggr)
if err != nil {
return nil, err
}
dons[donConfig.DonId] = &donConnectionManager{
donConfig: &donConfig,
nodes: nodes,
handlers: make(map[string]handlers.Handler),
shutdownCh: make(chan struct{}),
gMetrics: gMetrics,
lggr: logger.Named(lggr, "DONConnectionManager."+donConfig.DonId),
}
}
for _, shardedDON := range gwConfig.ShardedDONs {
for shardIdx, shard := range shardedDON.Shards {
donID := config.ShardDONID(shardedDON.DonName, shardIdx)
if _, ok := dons[donID]; ok {
return nil, fmt.Errorf("duplicate DON ID %s", donID)
}
nodes, err := buildNodeStates(shard.Nodes, donID, lggr)
if err != nil {
return nil, err
}
dons[donID] = &donConnectionManager{
donConfig: &config.DONConfig{
DonId: donID,
F: shardedDON.F,
Members: shard.Nodes,
},
nodes: nodes,
handlers: make(map[string]handlers.Handler),
shutdownCh: make(chan struct{}),
gMetrics: gMetrics,
lggr: logger.Named(lggr, "DONConnectionManager."+donID),
}
}
}
connMgr := &connectionManager{
config: &gwConfig.ConnectionManagerConfig,
dons: dons,
Expand All @@ -142,6 +155,25 @@ func NewConnectionManager(gwConfig *config.GatewayConfig, clock clockwork.Clock,
return connMgr, nil
}

func buildNodeStates(members []config.NodeConfig, donID string, lggr logger.Logger) (map[string]*nodeState, error) {
nodes := make(map[string]*nodeState)
for _, nodeConfig := range members {
nodeAddress := strings.ToLower(nodeConfig.Address)
if _, ok := nodes[nodeAddress]; ok {
return nil, fmt.Errorf("duplicate node address %s in DON %s", nodeAddress, donID)
}
connWrapper := network.NewWSConnectionWrapper(lggr)
if connWrapper == nil {
return nil, fmt.Errorf("error creating WSConnectionWrapper for node %s", nodeAddress)
}
nodes[nodeAddress] = &nodeState{
name: nodeConfig.Name,
conn: connWrapper,
}
}
return nodes, nil
}

func (m *connectionManager) DONConnectionManager(donId string) *donConnectionManager {
return m.dons[donId]
}
Expand Down Expand Up @@ -263,8 +295,22 @@ func (m *connectionManager) GetPort() int {
return m.wsServer.GetPort()
}

func (m *donConnectionManager) SetHandler(handler handlers.Handler) {
m.handler = handler
func (m *donConnectionManager) SetHandler(serviceName string, handler handlers.Handler) {
m.handlers[serviceName] = handler
}

func (m *donConnectionManager) getHandler(method string) (handlers.Handler, error) {
if len(m.handlers) == 1 {
for _, h := range m.handlers {
return h, nil // supports legacy single-handler case
}
}
serviceName := strings.Split(method, ".")[0]
handler, ok := m.handlers[serviceName]
if !ok {
return nil, fmt.Errorf("no handler for service %q (method %q)", serviceName, method)
}
return handler, nil
}

func (m *donConnectionManager) SendToNode(ctx context.Context, nodeAddress string, req *jsonrpc.Request[json.RawMessage]) error {
Expand Down Expand Up @@ -297,8 +343,13 @@ func (m *donConnectionManager) readLoop(nodeAddress string, nodeState *nodeState
m.lggr.Errorw("parse error when reading from node", "nodeAddress", nodeAddress, "err", err)
break
}
handler, err := m.getHandler(resp.Method)
if err != nil {
m.lggr.Errorw("no handler for node message", "nodeAddress", nodeAddress, "method", resp.Method, "err", err)
break
}
startTime := time.Now()
err = m.handler.HandleNodeMessage(ctx, &resp, nodeAddress)
err = handler.HandleNodeMessage(ctx, &resp, nodeAddress)
m.gMetrics.RecordNodeMsgHandlerInvocation(ctx, nodeAddress, nodeState.name, err == nil)
m.gMetrics.RecordNodeMsgHandlerDuration(ctx, nodeAddress, nodeState.name, time.Since(startTime), err == nil)
if err != nil {
Expand Down
170 changes: 170 additions & 0 deletions core/services/gateway/connectionmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,176 @@ func TestConnectionManager_CleanStartClose(t *testing.T) {
require.NoError(t, err)
}

func TestConnectionManager_ShardedDONs_CreatesPerShardManagers(t *testing.T) {
t.Parallel()

tomlConfig := `
[nodeServerConfig]
Path = "/node"

[[shardedDONs]]
DonName = "myDON"
F = 1

[[shardedDONs.Shards]]
[[shardedDONs.Shards.Nodes]]
Name = "s0_n0"
Address = "0x0001020304050607080900010203040506070809"
[[shardedDONs.Shards.Nodes]]
Name = "s0_n1"
Address = "0x0002020304050607080900010203040506070809"
[[shardedDONs.Shards.Nodes]]
Name = "s0_n2"
Address = "0x0003020304050607080900010203040506070809"
[[shardedDONs.Shards.Nodes]]
Name = "s0_n3"
Address = "0x0004020304050607080900010203040506070809"

[[shardedDONs.Shards]]
[[shardedDONs.Shards.Nodes]]
Name = "s1_n0"
Address = "0x0005020304050607080900010203040506070809"
[[shardedDONs.Shards.Nodes]]
Name = "s1_n1"
Address = "0x0006020304050607080900010203040506070809"
[[shardedDONs.Shards.Nodes]]
Name = "s1_n2"
Address = "0x0007020304050607080900010203040506070809"
[[shardedDONs.Shards.Nodes]]
Name = "s1_n3"
Address = "0x0008020304050607080900010203040506070809"
`

cfg := parseTOMLConfig(t, tomlConfig)
mgr := newConnectionManager(t, cfg, clockwork.NewFakeClock())

require.NotNil(t, mgr.DONConnectionManager(config.ShardDONID("myDON", 0)), "shard 0 connection manager should exist")
require.NotNil(t, mgr.DONConnectionManager(config.ShardDONID("myDON", 1)), "shard 1 connection manager should exist")
require.Nil(t, mgr.DONConnectionManager("myDON_2"), "shard 2 should not exist")
}

func TestConnectionManager_ShardedDONs_MultipleDONs(t *testing.T) {
t.Parallel()

tomlConfig := `
[nodeServerConfig]
Path = "/node"

[[shardedDONs]]
DonName = "donA"
F = 0

[[shardedDONs.Shards]]
[[shardedDONs.Shards.Nodes]]
Name = "a_n0"
Address = "0x0001020304050607080900010203040506070809"

[[shardedDONs]]
DonName = "donB"
F = 0

[[shardedDONs.Shards]]
[[shardedDONs.Shards.Nodes]]
Name = "b_n0"
Address = "0x0002020304050607080900010203040506070809"
`

cfg := parseTOMLConfig(t, tomlConfig)
mgr := newConnectionManager(t, cfg, clockwork.NewFakeClock())

require.NotNil(t, mgr.DONConnectionManager(config.ShardDONID("donA", 0)))
require.NotNil(t, mgr.DONConnectionManager(config.ShardDONID("donB", 0)))
}

func TestConnectionManager_ShardedDONs_DuplicateNodeAddress(t *testing.T) {
t.Parallel()

tomlConfig := `
[nodeServerConfig]
Path = "/node"

[[shardedDONs]]
DonName = "myDON"
F = 0

[[shardedDONs.Shards]]
[[shardedDONs.Shards.Nodes]]
Name = "n0"
Address = "0x0001020304050607080900010203040506070809"
[[shardedDONs.Shards.Nodes]]
Name = "n1"
Address = "0x0001020304050607080900010203040506070809"
`

cfg := parseTOMLConfig(t, tomlConfig)
lggr := logger.Test(t)
gMetrics, err := monitoring.NewGatewayMetrics()
require.NoError(t, err)
_, err = gateway.NewConnectionManager(cfg, clockwork.NewFakeClock(), gMetrics, lggr, limits.Factory{Logger: lggr})
require.Error(t, err)
require.Contains(t, err.Error(), "duplicate node address")
}

func TestConnectionManager_ShardedDONs_SendToNode(t *testing.T) {
t.Parallel()

tomlConfig := `
[nodeServerConfig]
Path = "/node"

[[shardedDONs]]
DonName = "myDON"
F = 0

[[shardedDONs.Shards]]
[[shardedDONs.Shards.Nodes]]
Name = "n0"
Address = "0x0001020304050607080900010203040506070809"
`

cfg := parseTOMLConfig(t, tomlConfig)
mgr := newConnectionManager(t, cfg, clockwork.NewFakeClock())

donMgr := mgr.DONConnectionManager(config.ShardDONID("myDON", 0))
require.NotNil(t, donMgr)

err := donMgr.SendToNode(testutils.Context(t), "0x0001020304050607080900010203040506070809", nil)
require.Error(t, err, "nil request should fail")

message := &jsonrpc.Request[json.RawMessage]{}
err = donMgr.SendToNode(testutils.Context(t), "0xdeadbeef", message)
require.Error(t, err, "unknown node should fail")
}

func TestConnectionManager_ShardedDONs_StartClose(t *testing.T) {
t.Parallel()

tomlConfig := `
[nodeServerConfig]
Path = "/node"
[connectionManagerConfig]
HeartbeatIntervalSec = 1

[[shardedDONs]]
DonName = "myDON"
F = 0

[[shardedDONs.Shards]]
[[shardedDONs.Shards.Nodes]]
Name = "n0"
Address = "0x0001020304050607080900010203040506070809"
`

cfg := parseTOMLConfig(t, tomlConfig)
mgr := newConnectionManager(t, cfg, clockwork.NewFakeClock())

err := mgr.Start(testutils.Context(t))
require.NoError(t, err)

err = mgr.Close()
require.NoError(t, err)
}

func newConnectionManager(t *testing.T, gwConfig *config.GatewayConfig, clock clockwork.Clock) gateway.ConnectionManager {
lggr := logger.Test(t)
gMetrics, err := monitoring.NewGatewayMetrics()
Expand Down
10 changes: 5 additions & 5 deletions core/services/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func setupFromNewConfig(

var shardConnMgrs []handlers.DON
for shardIdx := range donCfg.Shards {
donID := fmt.Sprintf("%s_%d", donName, shardIdx)
donID := config.ShardDONID(donName, shardIdx)
donConnMgr := connMgr.DONConnectionManager(donID)
if donConnMgr == nil {
return nil, fmt.Errorf("connection manager for DON %s shard %d not found", donName, shardIdx)
Expand All @@ -158,15 +158,15 @@ func setupFromNewConfig(

serviceToMultiHandler[svc.ServiceName] = handler

// Set (multi)handler on all associated DON connection managers
// Set (multi)handler on all associated DON connection managers, keyed by service name
for i, donName := range svc.DONs {
for shardIdx := range shardsConnMgrs[i] {
donID := fmt.Sprintf("%s_%d", donName, shardIdx)
donID := config.ShardDONID(donName, shardIdx)
donConnMgr := connMgr.DONConnectionManager(donID)
if donConnMgr == nil {
return nil, fmt.Errorf("connection manager for DON %s shard %d not found", donName, shardIdx)
}
donConnMgr.SetHandler(handler)
donConnMgr.SetHandler(svc.ServiceName, handler)
}
}

Expand Down Expand Up @@ -235,7 +235,7 @@ func setupFromLegacyConfig(
}
}

donConnMgr.SetHandler(handler)
donConnMgr.SetHandler("", handler)
}

return handlerMap, serviceNameToDonID, nil
Expand Down
Loading
Loading