Skip to content

Commit afb194f

Browse files
committed
[CRE][Gateway] Support for multi-handlers: part 2
1 parent 70b1e2b commit afb194f

File tree

3 files changed

+368
-15
lines changed

3 files changed

+368
-15
lines changed

core/services/gateway/connectionmanager.go

Lines changed: 45 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -102,21 +102,9 @@ func NewConnectionManager(gwConfig *config.GatewayConfig, clock clockwork.Clock,
102102
if ok {
103103
return nil, fmt.Errorf("duplicate DON ID %s", donConfig.DonId)
104104
}
105-
nodes := make(map[string]*nodeState)
106-
for _, nodeConfig := range donConfig.Members {
107-
nodeAddress := strings.ToLower(nodeConfig.Address)
108-
_, ok := nodes[nodeAddress]
109-
if ok {
110-
return nil, fmt.Errorf("duplicate node address %s in DON %s", nodeAddress, donConfig.DonId)
111-
}
112-
connWrapper := network.NewWSConnectionWrapper(lggr)
113-
if connWrapper == nil {
114-
return nil, fmt.Errorf("error creating WSConnectionWrapper for node %s", nodeAddress)
115-
}
116-
nodes[nodeAddress] = &nodeState{
117-
name: nodeConfig.Name,
118-
conn: connWrapper,
119-
}
105+
nodes, err := buildNodeStates(donConfig.Members, donConfig.DonId, lggr)
106+
if err != nil {
107+
return nil, err
120108
}
121109
dons[donConfig.DonId] = &donConnectionManager{
122110
donConfig: &donConfig,
@@ -126,6 +114,29 @@ func NewConnectionManager(gwConfig *config.GatewayConfig, clock clockwork.Clock,
126114
lggr: logger.Named(lggr, "DONConnectionManager."+donConfig.DonId),
127115
}
128116
}
117+
for _, shardedDON := range gwConfig.ShardedDONs {
118+
for shardIdx, shard := range shardedDON.Shards {
119+
donID := fmt.Sprintf("%s_%d", shardedDON.DonName, shardIdx)
120+
if _, ok := dons[donID]; ok {
121+
return nil, fmt.Errorf("duplicate DON ID %s", donID)
122+
}
123+
nodes, err := buildNodeStates(shard.Nodes, donID, lggr)
124+
if err != nil {
125+
return nil, err
126+
}
127+
dons[donID] = &donConnectionManager{
128+
donConfig: &config.DONConfig{
129+
DonId: donID,
130+
F: shardedDON.F,
131+
Members: shard.Nodes,
132+
},
133+
nodes: nodes,
134+
shutdownCh: make(chan struct{}),
135+
gMetrics: gMetrics,
136+
lggr: logger.Named(lggr, "DONConnectionManager."+donID),
137+
}
138+
}
139+
}
129140
connMgr := &connectionManager{
130141
config: &gwConfig.ConnectionManagerConfig,
131142
dons: dons,
@@ -142,6 +153,25 @@ func NewConnectionManager(gwConfig *config.GatewayConfig, clock clockwork.Clock,
142153
return connMgr, nil
143154
}
144155

156+
func buildNodeStates(members []config.NodeConfig, donID string, lggr logger.Logger) (map[string]*nodeState, error) {
157+
nodes := make(map[string]*nodeState)
158+
for _, nodeConfig := range members {
159+
nodeAddress := strings.ToLower(nodeConfig.Address)
160+
if _, ok := nodes[nodeAddress]; ok {
161+
return nil, fmt.Errorf("duplicate node address %s in DON %s", nodeAddress, donID)
162+
}
163+
connWrapper := network.NewWSConnectionWrapper(lggr)
164+
if connWrapper == nil {
165+
return nil, fmt.Errorf("error creating WSConnectionWrapper for node %s", nodeAddress)
166+
}
167+
nodes[nodeAddress] = &nodeState{
168+
name: nodeConfig.Name,
169+
conn: connWrapper,
170+
}
171+
}
172+
return nodes, nil
173+
}
174+
145175
func (m *connectionManager) DONConnectionManager(donId string) *donConnectionManager {
146176
return m.dons[donId]
147177
}

core/services/gateway/connectionmanager_test.go

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,177 @@ func TestConnectionManager_CleanStartClose(t *testing.T) {
244244
require.NoError(t, err)
245245
}
246246

247+
func TestConnectionManager_ShardedDONs_CreatesPerShardManagers(t *testing.T) {
248+
t.Parallel()
249+
250+
tomlConfig := `
251+
[nodeServerConfig]
252+
Path = "/node"
253+
254+
[[shardedDONs]]
255+
DonName = "myDON"
256+
F = 1
257+
258+
[[shardedDONs.Shards]]
259+
[[shardedDONs.Shards.Nodes]]
260+
Name = "s0_n0"
261+
Address = "0x0001020304050607080900010203040506070809"
262+
[[shardedDONs.Shards.Nodes]]
263+
Name = "s0_n1"
264+
Address = "0x0002020304050607080900010203040506070809"
265+
[[shardedDONs.Shards.Nodes]]
266+
Name = "s0_n2"
267+
Address = "0x0003020304050607080900010203040506070809"
268+
[[shardedDONs.Shards.Nodes]]
269+
Name = "s0_n3"
270+
Address = "0x0004020304050607080900010203040506070809"
271+
272+
[[shardedDONs.Shards]]
273+
[[shardedDONs.Shards.Nodes]]
274+
Name = "s1_n0"
275+
Address = "0x0005020304050607080900010203040506070809"
276+
[[shardedDONs.Shards.Nodes]]
277+
Name = "s1_n1"
278+
Address = "0x0006020304050607080900010203040506070809"
279+
[[shardedDONs.Shards.Nodes]]
280+
Name = "s1_n2"
281+
Address = "0x0007020304050607080900010203040506070809"
282+
[[shardedDONs.Shards.Nodes]]
283+
Name = "s1_n3"
284+
Address = "0x0008020304050607080900010203040506070809"
285+
`
286+
287+
cfg := parseTOMLConfig(t, tomlConfig)
288+
mgr := newConnectionManager(t, cfg, clockwork.NewFakeClock())
289+
290+
require.NotNil(t, mgr.DONConnectionManager("myDON_0"), "shard 0 connection manager should exist")
291+
require.NotNil(t, mgr.DONConnectionManager("myDON_1"), "shard 1 connection manager should exist")
292+
require.Nil(t, mgr.DONConnectionManager("myDON_2"), "shard 2 should not exist")
293+
require.Nil(t, mgr.DONConnectionManager("myDON"), "bare DON name should not exist")
294+
}
295+
296+
func TestConnectionManager_ShardedDONs_MultipleDONs(t *testing.T) {
297+
t.Parallel()
298+
299+
tomlConfig := `
300+
[nodeServerConfig]
301+
Path = "/node"
302+
303+
[[shardedDONs]]
304+
DonName = "donA"
305+
F = 0
306+
307+
[[shardedDONs.Shards]]
308+
[[shardedDONs.Shards.Nodes]]
309+
Name = "a_n0"
310+
Address = "0x0001020304050607080900010203040506070809"
311+
312+
[[shardedDONs]]
313+
DonName = "donB"
314+
F = 0
315+
316+
[[shardedDONs.Shards]]
317+
[[shardedDONs.Shards.Nodes]]
318+
Name = "b_n0"
319+
Address = "0x0002020304050607080900010203040506070809"
320+
`
321+
322+
cfg := parseTOMLConfig(t, tomlConfig)
323+
mgr := newConnectionManager(t, cfg, clockwork.NewFakeClock())
324+
325+
require.NotNil(t, mgr.DONConnectionManager("donA_0"))
326+
require.NotNil(t, mgr.DONConnectionManager("donB_0"))
327+
}
328+
329+
func TestConnectionManager_ShardedDONs_DuplicateNodeAddress(t *testing.T) {
330+
t.Parallel()
331+
332+
tomlConfig := `
333+
[nodeServerConfig]
334+
Path = "/node"
335+
336+
[[shardedDONs]]
337+
DonName = "myDON"
338+
F = 0
339+
340+
[[shardedDONs.Shards]]
341+
[[shardedDONs.Shards.Nodes]]
342+
Name = "n0"
343+
Address = "0x0001020304050607080900010203040506070809"
344+
[[shardedDONs.Shards.Nodes]]
345+
Name = "n1"
346+
Address = "0x0001020304050607080900010203040506070809"
347+
`
348+
349+
cfg := parseTOMLConfig(t, tomlConfig)
350+
lggr := logger.Test(t)
351+
gMetrics, err := monitoring.NewGatewayMetrics()
352+
require.NoError(t, err)
353+
_, err = gateway.NewConnectionManager(cfg, clockwork.NewFakeClock(), gMetrics, lggr, limits.Factory{Logger: lggr})
354+
require.Error(t, err)
355+
require.Contains(t, err.Error(), "duplicate node address")
356+
}
357+
358+
func TestConnectionManager_ShardedDONs_SendToNode(t *testing.T) {
359+
t.Parallel()
360+
361+
tomlConfig := `
362+
[nodeServerConfig]
363+
Path = "/node"
364+
365+
[[shardedDONs]]
366+
DonName = "myDON"
367+
F = 0
368+
369+
[[shardedDONs.Shards]]
370+
[[shardedDONs.Shards.Nodes]]
371+
Name = "n0"
372+
Address = "0x0001020304050607080900010203040506070809"
373+
`
374+
375+
cfg := parseTOMLConfig(t, tomlConfig)
376+
mgr := newConnectionManager(t, cfg, clockwork.NewFakeClock())
377+
378+
donMgr := mgr.DONConnectionManager("myDON_0")
379+
require.NotNil(t, donMgr)
380+
381+
err := donMgr.SendToNode(testutils.Context(t), "0x0001020304050607080900010203040506070809", nil)
382+
require.Error(t, err, "nil request should fail")
383+
384+
message := &jsonrpc.Request[json.RawMessage]{}
385+
err = donMgr.SendToNode(testutils.Context(t), "0xdeadbeef", message)
386+
require.Error(t, err, "unknown node should fail")
387+
}
388+
389+
func TestConnectionManager_ShardedDONs_StartClose(t *testing.T) {
390+
t.Parallel()
391+
392+
tomlConfig := `
393+
[nodeServerConfig]
394+
Path = "/node"
395+
[connectionManagerConfig]
396+
HeartbeatIntervalSec = 1
397+
398+
[[shardedDONs]]
399+
DonName = "myDON"
400+
F = 0
401+
402+
[[shardedDONs.Shards]]
403+
[[shardedDONs.Shards.Nodes]]
404+
Name = "n0"
405+
Address = "0x0001020304050607080900010203040506070809"
406+
`
407+
408+
cfg := parseTOMLConfig(t, tomlConfig)
409+
mgr := newConnectionManager(t, cfg, clockwork.NewFakeClock())
410+
411+
err := mgr.Start(testutils.Context(t))
412+
require.NoError(t, err)
413+
414+
err = mgr.Close()
415+
require.NoError(t, err)
416+
}
417+
247418
func newConnectionManager(t *testing.T, gwConfig *config.GatewayConfig, clock clockwork.Clock) gateway.ConnectionManager {
248419
lggr := logger.Test(t)
249420
gMetrics, err := monitoring.NewGatewayMetrics()

0 commit comments

Comments
 (0)