Skip to content

Commit f10eabb

Browse files
committed
[CRE][Gateway] Support for multi-handlers: part 2
1 parent ea39291 commit f10eabb

File tree

6 files changed

+409
-26
lines changed

6 files changed

+409
-26
lines changed

core/services/gateway/config/config.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,16 @@ type ShardedDONConfig struct {
6363
F int
6464
Shards []Shard
6565
}
66+
67+
// ShardDONID returns the donID for a given shard
68+
func ShardDONID(donName string, shardIdx int) string {
69+
if shardIdx == 0 {
70+
// NOTE: special case for backward compatibility - shard 0 doesn't have an index suffix
71+
return donName
72+
}
73+
return fmt.Sprintf("%s_%d", donName, shardIdx)
74+
}
75+
6676
type Shard struct {
6777
Nodes []NodeConfig
6878
}

core/services/gateway/connectionmanager.go

Lines changed: 70 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ func (m *connectionManager) Name() string { return m.lggr.Name() }
7272
type donConnectionManager struct {
7373
donConfig *config.DONConfig
7474
nodes map[string]*nodeState
75-
handler handlers.Handler
75+
handlers map[string]handlers.Handler // service name -> handler
7676
closeWait sync.WaitGroup
7777
shutdownCh services.StopChan
7878
gMetrics *monitoring.GatewayMetrics
@@ -102,30 +102,43 @@ func NewConnectionManager(gwConfig *config.GatewayConfig, clock clockwork.Clock,
102102
if ok {
103103
return nil, fmt.Errorf("duplicate DON ID %s", donConfig.DonId)
104104
}
105-
nodes := make(map[string]*nodeState)
106-
for _, nodeConfig := range donConfig.Members {
107-
nodeAddress := strings.ToLower(nodeConfig.Address)
108-
_, ok := nodes[nodeAddress]
109-
if ok {
110-
return nil, fmt.Errorf("duplicate node address %s in DON %s", nodeAddress, donConfig.DonId)
111-
}
112-
connWrapper := network.NewWSConnectionWrapper(lggr)
113-
if connWrapper == nil {
114-
return nil, fmt.Errorf("error creating WSConnectionWrapper for node %s", nodeAddress)
115-
}
116-
nodes[nodeAddress] = &nodeState{
117-
name: nodeConfig.Name,
118-
conn: connWrapper,
119-
}
105+
nodes, err := buildNodeStates(donConfig.Members, donConfig.DonId, lggr)
106+
if err != nil {
107+
return nil, err
120108
}
121109
dons[donConfig.DonId] = &donConnectionManager{
122110
donConfig: &donConfig,
123111
nodes: nodes,
112+
handlers: make(map[string]handlers.Handler),
124113
shutdownCh: make(chan struct{}),
125114
gMetrics: gMetrics,
126115
lggr: logger.Named(lggr, "DONConnectionManager."+donConfig.DonId),
127116
}
128117
}
118+
for _, shardedDON := range gwConfig.ShardedDONs {
119+
for shardIdx, shard := range shardedDON.Shards {
120+
donID := config.ShardDONID(shardedDON.DonName, shardIdx)
121+
if _, ok := dons[donID]; ok {
122+
return nil, fmt.Errorf("duplicate DON ID %s", donID)
123+
}
124+
nodes, err := buildNodeStates(shard.Nodes, donID, lggr)
125+
if err != nil {
126+
return nil, err
127+
}
128+
dons[donID] = &donConnectionManager{
129+
donConfig: &config.DONConfig{
130+
DonId: donID,
131+
F: shardedDON.F,
132+
Members: shard.Nodes,
133+
},
134+
nodes: nodes,
135+
handlers: make(map[string]handlers.Handler),
136+
shutdownCh: make(chan struct{}),
137+
gMetrics: gMetrics,
138+
lggr: logger.Named(lggr, "DONConnectionManager."+donID),
139+
}
140+
}
141+
}
129142
connMgr := &connectionManager{
130143
config: &gwConfig.ConnectionManagerConfig,
131144
dons: dons,
@@ -142,6 +155,25 @@ func NewConnectionManager(gwConfig *config.GatewayConfig, clock clockwork.Clock,
142155
return connMgr, nil
143156
}
144157

158+
func buildNodeStates(members []config.NodeConfig, donID string, lggr logger.Logger) (map[string]*nodeState, error) {
159+
nodes := make(map[string]*nodeState)
160+
for _, nodeConfig := range members {
161+
nodeAddress := strings.ToLower(nodeConfig.Address)
162+
if _, ok := nodes[nodeAddress]; ok {
163+
return nil, fmt.Errorf("duplicate node address %s in DON %s", nodeAddress, donID)
164+
}
165+
connWrapper := network.NewWSConnectionWrapper(lggr)
166+
if connWrapper == nil {
167+
return nil, fmt.Errorf("error creating WSConnectionWrapper for node %s", nodeAddress)
168+
}
169+
nodes[nodeAddress] = &nodeState{
170+
name: nodeConfig.Name,
171+
conn: connWrapper,
172+
}
173+
}
174+
return nodes, nil
175+
}
176+
145177
func (m *connectionManager) DONConnectionManager(donId string) *donConnectionManager {
146178
return m.dons[donId]
147179
}
@@ -263,8 +295,22 @@ func (m *connectionManager) GetPort() int {
263295
return m.wsServer.GetPort()
264296
}
265297

266-
func (m *donConnectionManager) SetHandler(handler handlers.Handler) {
267-
m.handler = handler
298+
func (m *donConnectionManager) SetHandler(serviceName string, handler handlers.Handler) {
299+
m.handlers[serviceName] = handler
300+
}
301+
302+
func (m *donConnectionManager) getHandler(method string) (handlers.Handler, error) {
303+
if len(m.handlers) == 1 {
304+
for _, h := range m.handlers {
305+
return h, nil // supports legacy single-handler case
306+
}
307+
}
308+
serviceName := strings.Split(method, ".")[0]
309+
handler, ok := m.handlers[serviceName]
310+
if !ok {
311+
return nil, fmt.Errorf("no handler for service %q (method %q)", serviceName, method)
312+
}
313+
return handler, nil
268314
}
269315

270316
func (m *donConnectionManager) SendToNode(ctx context.Context, nodeAddress string, req *jsonrpc.Request[json.RawMessage]) error {
@@ -297,8 +343,13 @@ func (m *donConnectionManager) readLoop(nodeAddress string, nodeState *nodeState
297343
m.lggr.Errorw("parse error when reading from node", "nodeAddress", nodeAddress, "err", err)
298344
break
299345
}
346+
handler, err := m.getHandler(resp.Method)
347+
if err != nil {
348+
m.lggr.Errorw("no handler for node message", "nodeAddress", nodeAddress, "method", resp.Method, "err", err)
349+
break
350+
}
300351
startTime := time.Now()
301-
err = m.handler.HandleNodeMessage(ctx, &resp, nodeAddress)
352+
err = handler.HandleNodeMessage(ctx, &resp, nodeAddress)
302353
m.gMetrics.RecordNodeMsgHandlerInvocation(ctx, nodeAddress, nodeState.name, err == nil)
303354
m.gMetrics.RecordNodeMsgHandlerDuration(ctx, nodeAddress, nodeState.name, time.Since(startTime), err == nil)
304355
if err != nil {

core/services/gateway/connectionmanager_test.go

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,176 @@ func TestConnectionManager_CleanStartClose(t *testing.T) {
244244
require.NoError(t, err)
245245
}
246246

247+
func TestConnectionManager_ShardedDONs_CreatesPerShardManagers(t *testing.T) {
248+
t.Parallel()
249+
250+
tomlConfig := `
251+
[nodeServerConfig]
252+
Path = "/node"
253+
254+
[[shardedDONs]]
255+
DonName = "myDON"
256+
F = 1
257+
258+
[[shardedDONs.Shards]]
259+
[[shardedDONs.Shards.Nodes]]
260+
Name = "s0_n0"
261+
Address = "0x0001020304050607080900010203040506070809"
262+
[[shardedDONs.Shards.Nodes]]
263+
Name = "s0_n1"
264+
Address = "0x0002020304050607080900010203040506070809"
265+
[[shardedDONs.Shards.Nodes]]
266+
Name = "s0_n2"
267+
Address = "0x0003020304050607080900010203040506070809"
268+
[[shardedDONs.Shards.Nodes]]
269+
Name = "s0_n3"
270+
Address = "0x0004020304050607080900010203040506070809"
271+
272+
[[shardedDONs.Shards]]
273+
[[shardedDONs.Shards.Nodes]]
274+
Name = "s1_n0"
275+
Address = "0x0005020304050607080900010203040506070809"
276+
[[shardedDONs.Shards.Nodes]]
277+
Name = "s1_n1"
278+
Address = "0x0006020304050607080900010203040506070809"
279+
[[shardedDONs.Shards.Nodes]]
280+
Name = "s1_n2"
281+
Address = "0x0007020304050607080900010203040506070809"
282+
[[shardedDONs.Shards.Nodes]]
283+
Name = "s1_n3"
284+
Address = "0x0008020304050607080900010203040506070809"
285+
`
286+
287+
cfg := parseTOMLConfig(t, tomlConfig)
288+
mgr := newConnectionManager(t, cfg, clockwork.NewFakeClock())
289+
290+
require.NotNil(t, mgr.DONConnectionManager(config.ShardDONID("myDON", 0)), "shard 0 connection manager should exist")
291+
require.NotNil(t, mgr.DONConnectionManager(config.ShardDONID("myDON", 1)), "shard 1 connection manager should exist")
292+
require.Nil(t, mgr.DONConnectionManager("myDON_2"), "shard 2 should not exist")
293+
}
294+
295+
func TestConnectionManager_ShardedDONs_MultipleDONs(t *testing.T) {
296+
t.Parallel()
297+
298+
tomlConfig := `
299+
[nodeServerConfig]
300+
Path = "/node"
301+
302+
[[shardedDONs]]
303+
DonName = "donA"
304+
F = 0
305+
306+
[[shardedDONs.Shards]]
307+
[[shardedDONs.Shards.Nodes]]
308+
Name = "a_n0"
309+
Address = "0x0001020304050607080900010203040506070809"
310+
311+
[[shardedDONs]]
312+
DonName = "donB"
313+
F = 0
314+
315+
[[shardedDONs.Shards]]
316+
[[shardedDONs.Shards.Nodes]]
317+
Name = "b_n0"
318+
Address = "0x0002020304050607080900010203040506070809"
319+
`
320+
321+
cfg := parseTOMLConfig(t, tomlConfig)
322+
mgr := newConnectionManager(t, cfg, clockwork.NewFakeClock())
323+
324+
require.NotNil(t, mgr.DONConnectionManager(config.ShardDONID("donA", 0)))
325+
require.NotNil(t, mgr.DONConnectionManager(config.ShardDONID("donB", 0)))
326+
}
327+
328+
func TestConnectionManager_ShardedDONs_DuplicateNodeAddress(t *testing.T) {
329+
t.Parallel()
330+
331+
tomlConfig := `
332+
[nodeServerConfig]
333+
Path = "/node"
334+
335+
[[shardedDONs]]
336+
DonName = "myDON"
337+
F = 0
338+
339+
[[shardedDONs.Shards]]
340+
[[shardedDONs.Shards.Nodes]]
341+
Name = "n0"
342+
Address = "0x0001020304050607080900010203040506070809"
343+
[[shardedDONs.Shards.Nodes]]
344+
Name = "n1"
345+
Address = "0x0001020304050607080900010203040506070809"
346+
`
347+
348+
cfg := parseTOMLConfig(t, tomlConfig)
349+
lggr := logger.Test(t)
350+
gMetrics, err := monitoring.NewGatewayMetrics()
351+
require.NoError(t, err)
352+
_, err = gateway.NewConnectionManager(cfg, clockwork.NewFakeClock(), gMetrics, lggr, limits.Factory{Logger: lggr})
353+
require.Error(t, err)
354+
require.Contains(t, err.Error(), "duplicate node address")
355+
}
356+
357+
func TestConnectionManager_ShardedDONs_SendToNode(t *testing.T) {
358+
t.Parallel()
359+
360+
tomlConfig := `
361+
[nodeServerConfig]
362+
Path = "/node"
363+
364+
[[shardedDONs]]
365+
DonName = "myDON"
366+
F = 0
367+
368+
[[shardedDONs.Shards]]
369+
[[shardedDONs.Shards.Nodes]]
370+
Name = "n0"
371+
Address = "0x0001020304050607080900010203040506070809"
372+
`
373+
374+
cfg := parseTOMLConfig(t, tomlConfig)
375+
mgr := newConnectionManager(t, cfg, clockwork.NewFakeClock())
376+
377+
donMgr := mgr.DONConnectionManager(config.ShardDONID("myDON", 0))
378+
require.NotNil(t, donMgr)
379+
380+
err := donMgr.SendToNode(testutils.Context(t), "0x0001020304050607080900010203040506070809", nil)
381+
require.Error(t, err, "nil request should fail")
382+
383+
message := &jsonrpc.Request[json.RawMessage]{}
384+
err = donMgr.SendToNode(testutils.Context(t), "0xdeadbeef", message)
385+
require.Error(t, err, "unknown node should fail")
386+
}
387+
388+
func TestConnectionManager_ShardedDONs_StartClose(t *testing.T) {
389+
t.Parallel()
390+
391+
tomlConfig := `
392+
[nodeServerConfig]
393+
Path = "/node"
394+
[connectionManagerConfig]
395+
HeartbeatIntervalSec = 1
396+
397+
[[shardedDONs]]
398+
DonName = "myDON"
399+
F = 0
400+
401+
[[shardedDONs.Shards]]
402+
[[shardedDONs.Shards.Nodes]]
403+
Name = "n0"
404+
Address = "0x0001020304050607080900010203040506070809"
405+
`
406+
407+
cfg := parseTOMLConfig(t, tomlConfig)
408+
mgr := newConnectionManager(t, cfg, clockwork.NewFakeClock())
409+
410+
err := mgr.Start(testutils.Context(t))
411+
require.NoError(t, err)
412+
413+
err = mgr.Close()
414+
require.NoError(t, err)
415+
}
416+
247417
func newConnectionManager(t *testing.T, gwConfig *config.GatewayConfig, clock clockwork.Clock) gateway.ConnectionManager {
248418
lggr := logger.Test(t)
249419
gMetrics, err := monitoring.NewGatewayMetrics()

core/services/gateway/gateway.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ func setupFromNewConfig(
141141

142142
var shardConnMgrs []handlers.DON
143143
for shardIdx := range donCfg.Shards {
144-
donID := fmt.Sprintf("%s_%d", donName, shardIdx)
144+
donID := config.ShardDONID(donName, shardIdx)
145145
donConnMgr := connMgr.DONConnectionManager(donID)
146146
if donConnMgr == nil {
147147
return nil, fmt.Errorf("connection manager for DON %s shard %d not found", donName, shardIdx)
@@ -158,15 +158,15 @@ func setupFromNewConfig(
158158

159159
serviceToMultiHandler[svc.ServiceName] = handler
160160

161-
// Set (multi)handler on all associated DON connection managers
161+
// Set (multi)handler on all associated DON connection managers, keyed by service name
162162
for i, donName := range svc.DONs {
163163
for shardIdx := range shardsConnMgrs[i] {
164-
donID := fmt.Sprintf("%s_%d", donName, shardIdx)
164+
donID := config.ShardDONID(donName, shardIdx)
165165
donConnMgr := connMgr.DONConnectionManager(donID)
166166
if donConnMgr == nil {
167167
return nil, fmt.Errorf("connection manager for DON %s shard %d not found", donName, shardIdx)
168168
}
169-
donConnMgr.SetHandler(handler)
169+
donConnMgr.SetHandler(svc.ServiceName, handler)
170170
}
171171
}
172172

@@ -235,7 +235,7 @@ func setupFromLegacyConfig(
235235
}
236236
}
237237

238-
donConnMgr.SetHandler(handler)
238+
donConnMgr.SetHandler("", handler)
239239
}
240240

241241
return handlerMap, serviceNameToDonID, nil

0 commit comments

Comments
 (0)