Skip to content

Commit 27a6da4

Browse files
authored
Merge pull request #433 from blinklabs-io/feat/connection-manager
feat: connection manager
2 parents 03f0b9e + 6aa0054 commit 27a6da4

File tree

11 files changed

+360
-14
lines changed

11 files changed

+360
-14
lines changed

connection_manager.go

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
// Copyright 2023 Blink Labs Software
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package ouroboros
16+
17+
// ConnectionManagerErrorFunc is a function that takes a connection ID and an error
18+
type ConnectionManagerErrorFunc func(int, error)
19+
20+
// ConnectionManagerTag represents the various tags that can be associated with a host or connection
21+
type ConnectionManagerTag uint16
22+
23+
const (
24+
ConnectionManagerTagNone ConnectionManagerTag = iota
25+
26+
ConnectionManagerTagHostProducer
27+
ConnectionManagerTagHostLocalRoot
28+
ConnectionManagerTagHostPublicRoot
29+
ConnectionManagerTagHostP2PLedger
30+
ConnectionManagerTagHostP2PGossip
31+
32+
ConnectionManagerTagRoleInitiator
33+
ConnectionManagerTagRoleResponder
34+
// TODO: add more tags
35+
)
36+
37+
func (c ConnectionManagerTag) String() string {
38+
tmp := map[ConnectionManagerTag]string{
39+
ConnectionManagerTagHostProducer: "HostProducer",
40+
ConnectionManagerTagHostLocalRoot: "HostLocalRoot",
41+
ConnectionManagerTagHostPublicRoot: "HostPublicRoot",
42+
ConnectionManagerTagHostP2PLedger: "HostP2PLedger",
43+
ConnectionManagerTagHostP2PGossip: "HostP2PGossip",
44+
ConnectionManagerTagRoleInitiator: "RoleInitiator",
45+
ConnectionManagerTagRoleResponder: "RoleResponder",
46+
// TODO: add more tags to match those added above
47+
}
48+
ret, ok := tmp[c]
49+
if !ok {
50+
return "Unknown"
51+
}
52+
return ret
53+
}
54+
55+
type ConnectionManager struct {
56+
config ConnectionManagerConfig
57+
hosts []ConnectionManagerHost
58+
connections []*ConnectionManagerConnection
59+
}
60+
61+
type ConnectionManagerConfig struct {
62+
ErrorFunc ConnectionManagerErrorFunc
63+
}
64+
65+
type ConnectionManagerHost struct {
66+
Address string
67+
Port uint
68+
Tags map[ConnectionManagerTag]bool
69+
}
70+
71+
func NewConnectionManager(cfg ConnectionManagerConfig) *ConnectionManager {
72+
return &ConnectionManager{
73+
config: cfg,
74+
}
75+
}
76+
77+
func (c *ConnectionManager) AddHost(address string, port uint, tags ...ConnectionManagerTag) {
78+
tmpTags := map[ConnectionManagerTag]bool{}
79+
for _, tag := range tags {
80+
tmpTags[tag] = true
81+
}
82+
c.hosts = append(
83+
c.hosts,
84+
ConnectionManagerHost{
85+
Address: address,
86+
Port: port,
87+
Tags: tmpTags,
88+
},
89+
)
90+
}
91+
92+
func (c *ConnectionManager) AddHostsFromTopology(topology *TopologyConfig) {
93+
for _, host := range topology.Producers {
94+
c.AddHost(host.Address, host.Port, ConnectionManagerTagHostProducer)
95+
}
96+
for _, localRoot := range topology.LocalRoots {
97+
for _, host := range localRoot.AccessPoints {
98+
c.AddHost(host.Address, host.Port, ConnectionManagerTagHostLocalRoot)
99+
}
100+
}
101+
for _, publicRoot := range topology.PublicRoots {
102+
for _, host := range publicRoot.AccessPoints {
103+
c.AddHost(host.Address, host.Port, ConnectionManagerTagHostPublicRoot)
104+
}
105+
}
106+
}
107+
108+
func (c *ConnectionManager) AddConnection(connId int, conn *Connection) {
109+
c.connections = append(
110+
c.connections,
111+
&ConnectionManagerConnection{
112+
Id: connId,
113+
Conn: conn,
114+
},
115+
)
116+
go func() {
117+
err, ok := <-conn.ErrorChan()
118+
if !ok {
119+
// Connection has closed normally
120+
return
121+
}
122+
// Call configured error callback func
123+
c.config.ErrorFunc(connId, err)
124+
}()
125+
}
126+
127+
func (c *ConnectionManager) GetConnectionById(id int) *ConnectionManagerConnection {
128+
for _, conn := range c.connections {
129+
if conn.Id == id {
130+
return conn
131+
}
132+
}
133+
return nil
134+
}
135+
136+
func (c *ConnectionManager) GetConnectionsByTags(tags ...ConnectionManagerTag) []*ConnectionManagerConnection {
137+
var ret []*ConnectionManagerConnection
138+
for _, conn := range c.connections {
139+
skipConn := false
140+
for _, tag := range tags {
141+
if _, ok := conn.Tags[tag]; !ok {
142+
skipConn = true
143+
break
144+
}
145+
}
146+
if !skipConn {
147+
ret = append(ret, conn)
148+
}
149+
}
150+
return ret
151+
}
152+
153+
type ConnectionManagerConnection struct {
154+
Id int
155+
Conn *Connection
156+
Tags map[ConnectionManagerTag]bool
157+
}
158+
159+
func (c *ConnectionManagerConnection) AddTags(tags ...ConnectionManagerTag) {
160+
for _, tag := range tags {
161+
c.Tags[tag] = true
162+
}
163+
}
164+
165+
func (c *ConnectionManagerConnection) RemoveTags(tags ...ConnectionManagerTag) {
166+
for _, tag := range tags {
167+
delete(c.Tags, tag)
168+
}
169+
}

connection_manager_test.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
// Copyright 2023 Blink Labs Software
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package ouroboros_test
16+
17+
import (
18+
"io"
19+
"testing"
20+
"time"
21+
22+
ouroboros "github.com/blinklabs-io/gouroboros"
23+
"github.com/blinklabs-io/gouroboros/internal/test/ouroboros_mock"
24+
"github.com/blinklabs-io/gouroboros/protocol/keepalive"
25+
)
26+
27+
func TestConnectionManagerTagString(t *testing.T) {
28+
testDefs := map[ouroboros.ConnectionManagerTag]string{
29+
ouroboros.ConnectionManagerTagHostP2PLedger: "HostP2PLedger",
30+
ouroboros.ConnectionManagerTagHostP2PGossip: "HostP2PGossip",
31+
ouroboros.ConnectionManagerTagRoleInitiator: "RoleInitiator",
32+
ouroboros.ConnectionManagerTagRoleResponder: "RoleResponder",
33+
ouroboros.ConnectionManagerTagNone: "Unknown",
34+
ouroboros.ConnectionManagerTag(9999): "Unknown",
35+
}
36+
for k, v := range testDefs {
37+
if k.String() != v {
38+
t.Fatalf(
39+
"did not get expected string for ID %d: got %s, expected %s",
40+
k,
41+
k.String(),
42+
v,
43+
)
44+
}
45+
}
46+
}
47+
48+
func TestConnectionManagerConnError(t *testing.T) {
49+
expectedConnId := 2
50+
expectedErr := io.EOF
51+
doneChan := make(chan any)
52+
connManager := ouroboros.NewConnectionManager(
53+
ouroboros.ConnectionManagerConfig{
54+
ErrorFunc: func(connId int, err error) {
55+
if connId != expectedConnId {
56+
t.Fatalf("did not receive error from expected connection: got %d, wanted %d", connId, expectedConnId)
57+
}
58+
if err != expectedErr {
59+
t.Fatalf("did not receive expected error: got: %s, expected: %s", err, expectedErr)
60+
}
61+
close(doneChan)
62+
},
63+
},
64+
)
65+
for i := 0; i < 3; i++ {
66+
mockConversation := ouroboros_mock.ConversationKeepAlive
67+
if i == expectedConnId {
68+
mockConversation = ouroboros_mock.ConversationKeepAliveClose
69+
}
70+
mockConn := ouroboros_mock.NewConnection(
71+
ouroboros_mock.ProtocolRoleClient,
72+
mockConversation,
73+
)
74+
oConn, err := ouroboros.New(
75+
ouroboros.WithConnection(mockConn),
76+
ouroboros.WithNetworkMagic(ouroboros_mock.MockNetworkMagic),
77+
ouroboros.WithNodeToNode(true),
78+
ouroboros.WithKeepAlive(true),
79+
ouroboros.WithKeepAliveConfig(
80+
keepalive.NewConfig(
81+
keepalive.WithCookie(ouroboros_mock.MockKeepAliveCookie),
82+
keepalive.WithPeriod(2*time.Second),
83+
keepalive.WithTimeout(1*time.Second),
84+
),
85+
),
86+
)
87+
if err != nil {
88+
t.Fatalf("unexpected error when creating Ouroboros object: %s", err)
89+
}
90+
connManager.AddConnection(i, oConn)
91+
}
92+
select {
93+
case <-doneChan:
94+
return
95+
case <-time.After(10 * time.Second):
96+
t.Fatalf("did not receive error within timeout")
97+
}
98+
}

connection_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func TestDoubleClose(t *testing.T) {
4141
ouroboros_mock.ProtocolRoleClient,
4242
[]ouroboros_mock.ConversationEntry{
4343
ouroboros_mock.ConversationEntryHandshakeRequestGeneric,
44-
ouroboros_mock.ConversationEntryHandshakeResponse,
44+
ouroboros_mock.ConversationEntryHandshakeNtCResponse,
4545
},
4646
)
4747
oConn, err := ouroboros.New(

internal/test/ouroboros_mock/connection.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,8 @@ func (c *Connection) asyncLoop() {
139139
}
140140
case EntryTypeClose:
141141
c.Close()
142+
case EntryTypeSleep:
143+
time.Sleep(entry.Duration)
142144
default:
143145
panic(
144146
fmt.Sprintf(

0 commit comments

Comments
 (0)