Skip to content

Commit c768396

Browse files
committed
feat: peer sharing protocol
This implementation isn't entirely finished, but there's not yet a released version of cardano-node supporting this protocol to test against Fixes #14
1 parent 645052f commit c768396

File tree

12 files changed

+577
-20
lines changed

12 files changed

+577
-20
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
/gouroboros
88
/tx-monitor
99
/block-fetch
10+
/peer-sharing
1011

1112
# Test binary, built with `go test -c`
1213
*.test

cmd/peer-sharing/main.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
// Copyright 2023 Blink Labs, LLC.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package main
16+
17+
import (
18+
"fmt"
19+
"os"
20+
21+
ouroboros "github.com/blinklabs-io/gouroboros"
22+
"github.com/blinklabs-io/gouroboros/cmd/common"
23+
)
24+
25+
func main() {
26+
// Parse commandline
27+
f := common.NewGlobalFlags()
28+
f.Parse()
29+
// Create connection
30+
conn := common.CreateClientConnection(f)
31+
errorChan := make(chan error)
32+
go func() {
33+
for {
34+
err := <-errorChan
35+
fmt.Printf("ERROR(async): %s\n", err)
36+
os.Exit(1)
37+
}
38+
}()
39+
o, err := ouroboros.New(
40+
ouroboros.WithConnection(conn),
41+
ouroboros.WithNetworkMagic(uint32(f.NetworkMagic)),
42+
ouroboros.WithErrorChan(errorChan),
43+
ouroboros.WithNodeToNode(f.NtnProto),
44+
ouroboros.WithKeepAlive(true),
45+
)
46+
if err != nil {
47+
fmt.Printf("ERROR: %s\n", err)
48+
os.Exit(1)
49+
}
50+
o.PeerSharing().Client.Start()
51+
52+
peers, err := o.PeerSharing().Client.GetPeers(10)
53+
if err != nil {
54+
fmt.Printf("ERROR: %s\n", err)
55+
os.Exit(1)
56+
}
57+
58+
fmt.Printf("peers = %#v\n", peers)
59+
}

connection.go

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
"github.com/blinklabs-io/gouroboros/protocol/localstatequery"
4040
"github.com/blinklabs-io/gouroboros/protocol/localtxmonitor"
4141
"github.com/blinklabs-io/gouroboros/protocol/localtxsubmission"
42+
"github.com/blinklabs-io/gouroboros/protocol/peersharing"
4243
"github.com/blinklabs-io/gouroboros/protocol/txsubmission"
4344
)
4445

@@ -59,19 +60,21 @@ type Connection struct {
5960
delayMuxerStart bool
6061
fullDuplex bool
6162
// Mini-protocols
62-
handshake *handshake.Handshake
63-
chainSync *chainsync.ChainSync
64-
chainSyncConfig *chainsync.Config
6563
blockFetch *blockfetch.BlockFetch
6664
blockFetchConfig *blockfetch.Config
65+
chainSync *chainsync.ChainSync
66+
chainSyncConfig *chainsync.Config
67+
handshake *handshake.Handshake
6768
keepAlive *keepalive.KeepAlive
6869
keepAliveConfig *keepalive.Config
70+
localStateQuery *localstatequery.LocalStateQuery
71+
localStateQueryConfig *localstatequery.Config
6972
localTxMonitor *localtxmonitor.LocalTxMonitor
7073
localTxMonitorConfig *localtxmonitor.Config
7174
localTxSubmission *localtxsubmission.LocalTxSubmission
7275
localTxSubmissionConfig *localtxsubmission.Config
73-
localStateQuery *localstatequery.LocalStateQuery
74-
localStateQueryConfig *localstatequery.Config
76+
peerSharing *peersharing.PeerSharing
77+
peerSharingConfig *peersharing.Config
7578
txSubmission *txsubmission.TxSubmission
7679
txSubmissionConfig *txsubmission.Config
7780
}
@@ -165,16 +168,16 @@ func (c *Connection) Close() error {
165168
return err
166169
}
167170

168-
// ChainSync returns the chain-sync protocol handler
169-
func (c *Connection) ChainSync() *chainsync.ChainSync {
170-
return c.chainSync
171-
}
172-
173171
// BlockFetch returns the block-fetch protocol handler
174172
func (c *Connection) BlockFetch() *blockfetch.BlockFetch {
175173
return c.blockFetch
176174
}
177175

176+
// ChainSync returns the chain-sync protocol handler
177+
func (c *Connection) ChainSync() *chainsync.ChainSync {
178+
return c.chainSync
179+
}
180+
178181
// Handshake returns the handshake protocol handler
179182
func (c *Connection) Handshake() *handshake.Handshake {
180183
return c.handshake
@@ -185,6 +188,11 @@ func (c *Connection) KeepAlive() *keepalive.KeepAlive {
185188
return c.keepAlive
186189
}
187190

191+
// LocalStateQuery returns the local-state-query protocol handler
192+
func (c *Connection) LocalStateQuery() *localstatequery.LocalStateQuery {
193+
return c.localStateQuery
194+
}
195+
188196
// LocalTxMonitor returns the local-tx-monitor protocol handler
189197
func (c *Connection) LocalTxMonitor() *localtxmonitor.LocalTxMonitor {
190198
return c.localTxMonitor
@@ -195,9 +203,9 @@ func (c *Connection) LocalTxSubmission() *localtxsubmission.LocalTxSubmission {
195203
return c.localTxSubmission
196204
}
197205

198-
// LocalStateQuery returns the local-state-query protocol handler
199-
func (c *Connection) LocalStateQuery() *localstatequery.LocalStateQuery {
200-
return c.localStateQuery
206+
// PeerSharing returns the peer-sharing protocol handler
207+
func (c *Connection) PeerSharing() *peersharing.PeerSharing {
208+
return c.peerSharing
201209
}
202210

203211
// TxSubmission returns the tx-submission protocol handler
@@ -320,6 +328,9 @@ func (c *Connection) setupConnection() error {
320328
c.keepAlive.Client.Start()
321329
}
322330
}
331+
if versionNtN.EnablePeerSharingProtocol {
332+
c.peerSharing = peersharing.New(protoOptions, c.peerSharingConfig)
333+
}
323334
} else {
324335
versionNtC := GetProtocolVersionNtC(handshakeVersion)
325336
protoOptions.Mode = protocol.ProtocolModeNodeToClient

connection_options.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/blinklabs-io/gouroboros/protocol/keepalive"
2323
"github.com/blinklabs-io/gouroboros/protocol/localstatequery"
2424
"github.com/blinklabs-io/gouroboros/protocol/localtxsubmission"
25+
"github.com/blinklabs-io/gouroboros/protocol/peersharing"
2526
"github.com/blinklabs-io/gouroboros/protocol/txsubmission"
2627
)
2728

@@ -128,6 +129,13 @@ func WithLocalTxSubmissionConfig(cfg localtxsubmission.Config) ConnectionOptionF
128129
}
129130
}
130131

132+
// WithPeerSharingConfig specifies PeerSharing protocol config
133+
func WithPeerSharingConfig(cfg peersharing.Config) ConnectionOptionFunc {
134+
return func(c *Connection) {
135+
c.peerSharingConfig = &cfg
136+
}
137+
}
138+
131139
// WithTxSubmissionConfig specifies TxSubmission protocol config
132140
func WithTxSubmissionConfig(cfg txsubmission.Config) ConnectionOptionFunc {
133141
return func(c *Connection) {

protocol/handshake/client.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,13 @@ func (c *Client) Start() {
7272
}
7373
for _, version := range c.config.ProtocolVersions {
7474
if c.Mode() == protocol.ProtocolModeNodeToNode {
75+
// NOTE: it seems that protocol version 11 is still in flux, so we disable for now
76+
/*
77+
if version >= 11 {
78+
// TODO: make peer sharing mode configurable
79+
versionMap[version] = []interface{}{c.config.NetworkMagic, diffusionMode, PeerSharingModePeerSharingPrivate}
80+
} else {
81+
*/
7582
versionMap[version] = []interface{}{c.config.NetworkMagic, diffusionMode}
7683
} else {
7784
versionMap[version] = c.config.NetworkMagic

protocol/handshake/handshake.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,13 @@ const (
3333
DiffusionModeInitiatorAndResponder = true
3434
)
3535

36+
// Peer sharing modes
37+
const (
38+
PeerSharingModeNoPeerSharing = 0
39+
PeerSharingModePeerSharingPublic = 1
40+
PeerSharingModePeerSharingPrivate = 2
41+
)
42+
3643
var (
3744
statePropose = protocol.NewState(1, "Propose")
3845
stateConfirm = protocol.NewState(2, "Confirm")

protocol/peersharing/client.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
// Copyright 2023 Blink Labs, LLC.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package peersharing
16+
17+
import (
18+
"fmt"
19+
20+
"github.com/blinklabs-io/gouroboros/protocol"
21+
)
22+
23+
// Client implements the PeerSharing client
24+
type Client struct {
25+
*protocol.Protocol
26+
config *Config
27+
sharePeersChan chan []interface{}
28+
}
29+
30+
// NewClient returns a new PeerSharing client object
31+
func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
32+
if cfg == nil {
33+
tmpCfg := NewConfig()
34+
cfg = &tmpCfg
35+
}
36+
c := &Client{
37+
config: cfg,
38+
sharePeersChan: make(chan []interface{}),
39+
}
40+
// Update state map with timeout
41+
stateMap := StateMap.Copy()
42+
if entry, ok := stateMap[stateBusy]; ok {
43+
entry.Timeout = c.config.Timeout
44+
stateMap[stateBusy] = entry
45+
}
46+
// Configure underlying Protocol
47+
protoConfig := protocol.ProtocolConfig{
48+
Name: ProtocolName,
49+
ProtocolId: ProtocolId,
50+
Muxer: protoOptions.Muxer,
51+
ErrorChan: protoOptions.ErrorChan,
52+
Mode: protoOptions.Mode,
53+
Role: protocol.ProtocolRoleClient,
54+
MessageHandlerFunc: c.handleMessage,
55+
MessageFromCborFunc: NewMsgFromCbor,
56+
StateMap: stateMap,
57+
InitialState: stateIdle,
58+
}
59+
c.Protocol = protocol.New(protoConfig)
60+
return c
61+
}
62+
63+
func (c *Client) GetPeers(amount uint8) ([]interface{}, error) {
64+
msg := NewMsgShareRequest(amount)
65+
if err := c.SendMessage(msg); err != nil {
66+
return nil, err
67+
}
68+
peers := <-c.sharePeersChan
69+
return peers, nil
70+
}
71+
72+
func (c *Client) handleMessage(msg protocol.Message, isResponse bool) error {
73+
var err error
74+
switch msg.Type() {
75+
case MessageTypeSharePeers:
76+
err = c.handleSharePeers(msg)
77+
default:
78+
err = fmt.Errorf("%s: received unexpected message type %d", ProtocolName, msg.Type())
79+
}
80+
return err
81+
}
82+
83+
func (c *Client) handleSharePeers(msg protocol.Message) error {
84+
msgSharePeers := msg.(*MsgSharePeers)
85+
c.sharePeersChan <- msgSharePeers.PeerAddresses
86+
return nil
87+
}

0 commit comments

Comments
 (0)