Skip to content

Commit 4f87a45

Browse files
committed
[CRE][Gateway] Support for multi-handlers: part 2
1 parent be4b540 commit 4f87a45

File tree

6 files changed

+405
-33
lines changed

6 files changed

+405
-33
lines changed

core/services/gateway/config/config.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,11 @@ type ShardedDONConfig struct {
6363
F int
6464
Shards []Shard
6565
}
66+
67+
// ShardDONID returns the connection-manager ID for a given shard index (e.g. "myDON_0").
68+
func ShardDONID(donName string, shardIdx int) string {
69+
return fmt.Sprintf("%s_%d", donName, shardIdx)
70+
}
6671
type Shard struct {
6772
Nodes []NodeConfig
6873
}

core/services/gateway/connectionmanager.go

Lines changed: 70 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ func (m *connectionManager) Name() string { return m.lggr.Name() }
7272
type donConnectionManager struct {
7373
donConfig *config.DONConfig
7474
nodes map[string]*nodeState
75-
handler handlers.Handler
75+
handlers map[string]handlers.Handler // service name -> handler
7676
closeWait sync.WaitGroup
7777
shutdownCh services.StopChan
7878
gMetrics *monitoring.GatewayMetrics
@@ -102,30 +102,43 @@ func NewConnectionManager(gwConfig *config.GatewayConfig, clock clockwork.Clock,
102102
if ok {
103103
return nil, fmt.Errorf("duplicate DON ID %s", donConfig.DonId)
104104
}
105-
nodes := make(map[string]*nodeState)
106-
for _, nodeConfig := range donConfig.Members {
107-
nodeAddress := strings.ToLower(nodeConfig.Address)
108-
_, ok := nodes[nodeAddress]
109-
if ok {
110-
return nil, fmt.Errorf("duplicate node address %s in DON %s", nodeAddress, donConfig.DonId)
111-
}
112-
connWrapper := network.NewWSConnectionWrapper(lggr)
113-
if connWrapper == nil {
114-
return nil, fmt.Errorf("error creating WSConnectionWrapper for node %s", nodeAddress)
115-
}
116-
nodes[nodeAddress] = &nodeState{
117-
name: nodeConfig.Name,
118-
conn: connWrapper,
119-
}
105+
nodes, err := buildNodeStates(donConfig.Members, donConfig.DonId, lggr)
106+
if err != nil {
107+
return nil, err
120108
}
121109
dons[donConfig.DonId] = &donConnectionManager{
122110
donConfig: &donConfig,
123111
nodes: nodes,
112+
handlers: make(map[string]handlers.Handler),
124113
shutdownCh: make(chan struct{}),
125114
gMetrics: gMetrics,
126115
lggr: logger.Named(lggr, "DONConnectionManager."+donConfig.DonId),
127116
}
128117
}
118+
for _, shardedDON := range gwConfig.ShardedDONs {
119+
for shardIdx, shard := range shardedDON.Shards {
120+
donID := config.ShardDONID(shardedDON.DonName, shardIdx)
121+
if _, ok := dons[donID]; ok {
122+
return nil, fmt.Errorf("duplicate DON ID %s", donID)
123+
}
124+
nodes, err := buildNodeStates(shard.Nodes, donID, lggr)
125+
if err != nil {
126+
return nil, err
127+
}
128+
dons[donID] = &donConnectionManager{
129+
donConfig: &config.DONConfig{
130+
DonId: donID,
131+
F: shardedDON.F,
132+
Members: shard.Nodes,
133+
},
134+
nodes: nodes,
135+
handlers: make(map[string]handlers.Handler),
136+
shutdownCh: make(chan struct{}),
137+
gMetrics: gMetrics,
138+
lggr: logger.Named(lggr, "DONConnectionManager."+donID),
139+
}
140+
}
141+
}
129142
connMgr := &connectionManager{
130143
config: &gwConfig.ConnectionManagerConfig,
131144
dons: dons,
@@ -142,6 +155,25 @@ func NewConnectionManager(gwConfig *config.GatewayConfig, clock clockwork.Clock,
142155
return connMgr, nil
143156
}
144157

158+
func buildNodeStates(members []config.NodeConfig, donID string, lggr logger.Logger) (map[string]*nodeState, error) {
159+
nodes := make(map[string]*nodeState)
160+
for _, nodeConfig := range members {
161+
nodeAddress := strings.ToLower(nodeConfig.Address)
162+
if _, ok := nodes[nodeAddress]; ok {
163+
return nil, fmt.Errorf("duplicate node address %s in DON %s", nodeAddress, donID)
164+
}
165+
connWrapper := network.NewWSConnectionWrapper(lggr)
166+
if connWrapper == nil {
167+
return nil, fmt.Errorf("error creating WSConnectionWrapper for node %s", nodeAddress)
168+
}
169+
nodes[nodeAddress] = &nodeState{
170+
name: nodeConfig.Name,
171+
conn: connWrapper,
172+
}
173+
}
174+
return nodes, nil
175+
}
176+
145177
func (m *connectionManager) DONConnectionManager(donId string) *donConnectionManager {
146178
return m.dons[donId]
147179
}
@@ -263,8 +295,22 @@ func (m *connectionManager) GetPort() int {
263295
return m.wsServer.GetPort()
264296
}
265297

266-
func (m *donConnectionManager) SetHandler(handler handlers.Handler) {
267-
m.handler = handler
298+
func (m *donConnectionManager) SetHandler(serviceName string, handler handlers.Handler) {
299+
m.handlers[serviceName] = handler
300+
}
301+
302+
func (m *donConnectionManager) getHandler(method string) (handlers.Handler, error) {
303+
if len(m.handlers) == 1 {
304+
for _, h := range m.handlers {
305+
return h, nil // supports legacy single-handler case
306+
}
307+
}
308+
serviceName := strings.Split(method, ".")[0]
309+
handler, ok := m.handlers[serviceName]
310+
if !ok {
311+
return nil, fmt.Errorf("no handler for service %q (method %q)", serviceName, method)
312+
}
313+
return handler, nil
268314
}
269315

270316
func (m *donConnectionManager) SendToNode(ctx context.Context, nodeAddress string, req *jsonrpc.Request[json.RawMessage]) error {
@@ -297,8 +343,13 @@ func (m *donConnectionManager) readLoop(nodeAddress string, nodeState *nodeState
297343
m.lggr.Errorw("parse error when reading from node", "nodeAddress", nodeAddress, "err", err)
298344
break
299345
}
346+
handler, err := m.getHandler(resp.Method)
347+
if err != nil {
348+
m.lggr.Errorw("no handler for node message", "nodeAddress", nodeAddress, "method", resp.Method, "err", err)
349+
break
350+
}
300351
startTime := time.Now()
301-
err = m.handler.HandleNodeMessage(ctx, &resp, nodeAddress)
352+
err = handler.HandleNodeMessage(ctx, &resp, nodeAddress)
302353
m.gMetrics.RecordNodeMsgHandlerInvocation(ctx, nodeAddress, nodeState.name, err == nil)
303354
m.gMetrics.RecordNodeMsgHandlerDuration(ctx, nodeAddress, nodeState.name, time.Since(startTime), err == nil)
304355
if err != nil {

core/services/gateway/connectionmanager_test.go

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,177 @@ func TestConnectionManager_CleanStartClose(t *testing.T) {
244244
require.NoError(t, err)
245245
}
246246

247+
func TestConnectionManager_ShardedDONs_CreatesPerShardManagers(t *testing.T) {
248+
t.Parallel()
249+
250+
tomlConfig := `
251+
[nodeServerConfig]
252+
Path = "/node"
253+
254+
[[shardedDONs]]
255+
DonName = "myDON"
256+
F = 1
257+
258+
[[shardedDONs.Shards]]
259+
[[shardedDONs.Shards.Nodes]]
260+
Name = "s0_n0"
261+
Address = "0x0001020304050607080900010203040506070809"
262+
[[shardedDONs.Shards.Nodes]]
263+
Name = "s0_n1"
264+
Address = "0x0002020304050607080900010203040506070809"
265+
[[shardedDONs.Shards.Nodes]]
266+
Name = "s0_n2"
267+
Address = "0x0003020304050607080900010203040506070809"
268+
[[shardedDONs.Shards.Nodes]]
269+
Name = "s0_n3"
270+
Address = "0x0004020304050607080900010203040506070809"
271+
272+
[[shardedDONs.Shards]]
273+
[[shardedDONs.Shards.Nodes]]
274+
Name = "s1_n0"
275+
Address = "0x0005020304050607080900010203040506070809"
276+
[[shardedDONs.Shards.Nodes]]
277+
Name = "s1_n1"
278+
Address = "0x0006020304050607080900010203040506070809"
279+
[[shardedDONs.Shards.Nodes]]
280+
Name = "s1_n2"
281+
Address = "0x0007020304050607080900010203040506070809"
282+
[[shardedDONs.Shards.Nodes]]
283+
Name = "s1_n3"
284+
Address = "0x0008020304050607080900010203040506070809"
285+
`
286+
287+
cfg := parseTOMLConfig(t, tomlConfig)
288+
mgr := newConnectionManager(t, cfg, clockwork.NewFakeClock())
289+
290+
require.NotNil(t, mgr.DONConnectionManager("myDON_0"), "shard 0 connection manager should exist")
291+
require.NotNil(t, mgr.DONConnectionManager("myDON_1"), "shard 1 connection manager should exist")
292+
require.Nil(t, mgr.DONConnectionManager("myDON_2"), "shard 2 should not exist")
293+
require.Nil(t, mgr.DONConnectionManager("myDON"), "bare DON name should not exist")
294+
}
295+
296+
func TestConnectionManager_ShardedDONs_MultipleDONs(t *testing.T) {
297+
t.Parallel()
298+
299+
tomlConfig := `
300+
[nodeServerConfig]
301+
Path = "/node"
302+
303+
[[shardedDONs]]
304+
DonName = "donA"
305+
F = 0
306+
307+
[[shardedDONs.Shards]]
308+
[[shardedDONs.Shards.Nodes]]
309+
Name = "a_n0"
310+
Address = "0x0001020304050607080900010203040506070809"
311+
312+
[[shardedDONs]]
313+
DonName = "donB"
314+
F = 0
315+
316+
[[shardedDONs.Shards]]
317+
[[shardedDONs.Shards.Nodes]]
318+
Name = "b_n0"
319+
Address = "0x0002020304050607080900010203040506070809"
320+
`
321+
322+
cfg := parseTOMLConfig(t, tomlConfig)
323+
mgr := newConnectionManager(t, cfg, clockwork.NewFakeClock())
324+
325+
require.NotNil(t, mgr.DONConnectionManager("donA_0"))
326+
require.NotNil(t, mgr.DONConnectionManager("donB_0"))
327+
}
328+
329+
func TestConnectionManager_ShardedDONs_DuplicateNodeAddress(t *testing.T) {
330+
t.Parallel()
331+
332+
tomlConfig := `
333+
[nodeServerConfig]
334+
Path = "/node"
335+
336+
[[shardedDONs]]
337+
DonName = "myDON"
338+
F = 0
339+
340+
[[shardedDONs.Shards]]
341+
[[shardedDONs.Shards.Nodes]]
342+
Name = "n0"
343+
Address = "0x0001020304050607080900010203040506070809"
344+
[[shardedDONs.Shards.Nodes]]
345+
Name = "n1"
346+
Address = "0x0001020304050607080900010203040506070809"
347+
`
348+
349+
cfg := parseTOMLConfig(t, tomlConfig)
350+
lggr := logger.Test(t)
351+
gMetrics, err := monitoring.NewGatewayMetrics()
352+
require.NoError(t, err)
353+
_, err = gateway.NewConnectionManager(cfg, clockwork.NewFakeClock(), gMetrics, lggr, limits.Factory{Logger: lggr})
354+
require.Error(t, err)
355+
require.Contains(t, err.Error(), "duplicate node address")
356+
}
357+
358+
func TestConnectionManager_ShardedDONs_SendToNode(t *testing.T) {
359+
t.Parallel()
360+
361+
tomlConfig := `
362+
[nodeServerConfig]
363+
Path = "/node"
364+
365+
[[shardedDONs]]
366+
DonName = "myDON"
367+
F = 0
368+
369+
[[shardedDONs.Shards]]
370+
[[shardedDONs.Shards.Nodes]]
371+
Name = "n0"
372+
Address = "0x0001020304050607080900010203040506070809"
373+
`
374+
375+
cfg := parseTOMLConfig(t, tomlConfig)
376+
mgr := newConnectionManager(t, cfg, clockwork.NewFakeClock())
377+
378+
donMgr := mgr.DONConnectionManager("myDON_0")
379+
require.NotNil(t, donMgr)
380+
381+
err := donMgr.SendToNode(testutils.Context(t), "0x0001020304050607080900010203040506070809", nil)
382+
require.Error(t, err, "nil request should fail")
383+
384+
message := &jsonrpc.Request[json.RawMessage]{}
385+
err = donMgr.SendToNode(testutils.Context(t), "0xdeadbeef", message)
386+
require.Error(t, err, "unknown node should fail")
387+
}
388+
389+
func TestConnectionManager_ShardedDONs_StartClose(t *testing.T) {
390+
t.Parallel()
391+
392+
tomlConfig := `
393+
[nodeServerConfig]
394+
Path = "/node"
395+
[connectionManagerConfig]
396+
HeartbeatIntervalSec = 1
397+
398+
[[shardedDONs]]
399+
DonName = "myDON"
400+
F = 0
401+
402+
[[shardedDONs.Shards]]
403+
[[shardedDONs.Shards.Nodes]]
404+
Name = "n0"
405+
Address = "0x0001020304050607080900010203040506070809"
406+
`
407+
408+
cfg := parseTOMLConfig(t, tomlConfig)
409+
mgr := newConnectionManager(t, cfg, clockwork.NewFakeClock())
410+
411+
err := mgr.Start(testutils.Context(t))
412+
require.NoError(t, err)
413+
414+
err = mgr.Close()
415+
require.NoError(t, err)
416+
}
417+
247418
func newConnectionManager(t *testing.T, gwConfig *config.GatewayConfig, clock clockwork.Clock) gateway.ConnectionManager {
248419
lggr := logger.Test(t)
249420
gMetrics, err := monitoring.NewGatewayMetrics()

core/services/gateway/gateway.go

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,6 @@ func setupFromNewConfig(
120120
donNameToConfig[don.DonName] = don
121121
}
122122

123-
assignedDONs := make(map[string]struct{})
124123
// For each service, create a MultiHandler with its handlers and attached DONs
125124
for _, svc := range cfg.Services {
126125
var shardedDONs []config.ShardedDONConfig
@@ -131,17 +130,11 @@ func setupFromNewConfig(
131130
if !ok {
132131
return nil, fmt.Errorf("service %q references unknown DON: %s", svc.ServiceName, donName)
133132
}
134-
if _, assigned := assignedDONs[donName]; assigned {
135-
// NOTE: this check can be relaxed in the future once we clean up all "service.method" strings
136-
// and split them correctly in Multihandler
137-
return nil, fmt.Errorf("DON %q is assigned to multiple services", donName)
138-
}
139-
assignedDONs[donName] = struct{}{}
140133
shardedDONs = append(shardedDONs, donCfg)
141134

142135
var shardConnMgrs []handlers.DON
143136
for shardIdx := range donCfg.Shards {
144-
donID := fmt.Sprintf("%s_%d", donName, shardIdx)
137+
donID := config.ShardDONID(donName, shardIdx)
145138
donConnMgr := connMgr.DONConnectionManager(donID)
146139
if donConnMgr == nil {
147140
return nil, fmt.Errorf("connection manager for DON %s shard %d not found", donName, shardIdx)
@@ -158,15 +151,15 @@ func setupFromNewConfig(
158151

159152
serviceToMultiHandler[svc.ServiceName] = handler
160153

161-
// Set (multi)handler on all associated DON connection managers
154+
// Set (multi)handler on all associated DON connection managers, keyed by service name
162155
for i, donName := range svc.DONs {
163156
for shardIdx := range shardsConnMgrs[i] {
164-
donID := fmt.Sprintf("%s_%d", donName, shardIdx)
157+
donID := config.ShardDONID(donName, shardIdx)
165158
donConnMgr := connMgr.DONConnectionManager(donID)
166159
if donConnMgr == nil {
167160
return nil, fmt.Errorf("connection manager for DON %s shard %d not found", donName, shardIdx)
168161
}
169-
donConnMgr.SetHandler(handler)
162+
donConnMgr.SetHandler(svc.ServiceName, handler)
170163
}
171164
}
172165

@@ -235,7 +228,7 @@ func setupFromLegacyConfig(
235228
}
236229
}
237230

238-
donConnMgr.SetHandler(handler)
231+
donConnMgr.SetHandler("", handler)
239232
}
240233

241234
return handlerMap, serviceNameToDonID, nil

0 commit comments

Comments
 (0)