Skip to content

Commit d27a11e

Browse files
cortzeMarcoPolo
andauthored
Add threadsafe dynamic direct peer handling to GossipSub (#673)
# Description The current direct peer handling is a bit clunky, it only allows direct peers to be set at the start of the service. This makes the application layer unable to modify the direct peer list, or to create new services as we want to add/remove peers to that list. The PR adds two new self-descriptive methods struct: - `AddDirectPeer()` - `RemoveDirectPeer()` Both handling the underlaying `Peerstore` and `TagTracer` changes as well. Ensuring that we have all in place to ensure the connection protected with the remote peer. --------- Co-authored-by: Mikel Cortes <crtz trvllpt> Co-authored-by: Marco Munizaga <git@marcopolo.io>
1 parent 63d7d8e commit d27a11e

File tree

5 files changed

+125
-19
lines changed

5 files changed

+125
-19
lines changed

gossipsub.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -533,7 +533,10 @@ func WithDirectPeers(pis []peer.AddrInfo) Option {
533533
gs.direct = direct
534534

535535
if gs.tagTracer != nil {
536-
gs.tagTracer.direct = direct
536+
gs.tagTracer.isDirect = func(p peer.ID) bool {
537+
_, ok := gs.direct[p]
538+
return ok
539+
}
537540
}
538541

539542
return nil
@@ -827,6 +830,20 @@ func (gs *GossipSubRouter) EnoughPeers(topic string, suggested int) bool {
827830
return false
828831
}
829832

833+
func (gs *GossipSubRouter) AddDirectPeer(pi peer.AddrInfo) {
834+
if gs.direct == nil {
835+
gs.direct = make(map[peer.ID]struct{})
836+
}
837+
gs.direct[pi.ID] = struct{}{}
838+
gs.p.host.Peerstore().AddAddrs(pi.ID, pi.Addrs, peerstore.PermanentAddrTTL)
839+
gs.tagTracer.protectDirect(pi.ID)
840+
}
841+
842+
func (gs *GossipSubRouter) RemoveDirectPeer(p peer.ID) {
843+
delete(gs.direct, p)
844+
gs.tagTracer.unprotectDirect(p)
845+
}
846+
830847
func (gs *GossipSubRouter) AcceptFrom(p peer.ID) AcceptStatus {
831848
_, direct := gs.direct[p]
832849
if direct {

gossipsub_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1340,6 +1340,50 @@ func TestGossipsubDirectPeers(t *testing.T) {
13401340
}
13411341
}
13421342

1343+
func TestGossipsubDynamicDirectPeers(t *testing.T) {
1344+
ctx, cancel := context.WithCancel(context.Background())
1345+
defer cancel()
1346+
1347+
h := getDefaultHosts(t, 3)
1348+
psubs := []*PubSub{
1349+
getGossipsub(ctx, h[0], WithDirectConnectTicks(2)),
1350+
getGossipsub(ctx, h[1], WithDirectConnectTicks(2)),
1351+
getGossipsub(ctx, h[2], WithDirectConnectTicks(2)),
1352+
}
1353+
1354+
listDirectPeers := func(psb *PubSub) int {
1355+
directPeers := 0
1356+
gspRt, _ := psb.rt.(*GossipSubRouter)
1357+
fn := func() {
1358+
directPeers = len(gspRt.direct)
1359+
}
1360+
psb.syncEval(fn)
1361+
return directPeers
1362+
}
1363+
1364+
// test dinamic addition of direct-peers to h[1] and h[2]
1365+
psubs[1].AddDirectPeer(peer.AddrInfo{ID: h[2].ID(), Addrs: h[2].Addrs()})
1366+
psubs[2].AddDirectPeer(peer.AddrInfo{ID: h[1].ID(), Addrs: h[1].Addrs()})
1367+
1368+
// give enough time to the state machine to process the direct additions
1369+
time.Sleep(time.Second)
1370+
1371+
if listDirectPeers(psubs[1]) < 1 || listDirectPeers(psubs[2]) < 1 {
1372+
t.Fatal("expected 1 direct peer at both gsp rts")
1373+
}
1374+
1375+
// remove peer from direct from directPeers
1376+
psubs[1].RemoveDirectPeer(h[2].ID())
1377+
psubs[2].RemoveDirectPeer(h[1].ID())
1378+
1379+
// give enough time to the state machine to process the direct additions
1380+
time.Sleep(time.Second)
1381+
1382+
if listDirectPeers(psubs[1]) > 0 || listDirectPeers(psubs[2]) > 0 {
1383+
t.Fatal("expected no direct peers both gsp rts")
1384+
}
1385+
}
1386+
13431387
func TestGossipSubPeerFilter(t *testing.T) {
13441388
ctx, cancel := context.WithCancel(context.Background())
13451389
defer cancel()

pubsub.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1909,3 +1909,44 @@ type addRelayReq struct {
19091909
topic string
19101910
resp chan RelayCancelFunc
19111911
}
1912+
1913+
func (p *PubSub) syncEval(f func()) error {
1914+
done := make(chan struct{})
1915+
syncFn := func() {
1916+
defer close(done)
1917+
f()
1918+
}
1919+
select {
1920+
case p.eval <- syncFn:
1921+
select {
1922+
case <-done:
1923+
case <-p.ctx.Done():
1924+
return p.ctx.Err()
1925+
}
1926+
case <-p.ctx.Done():
1927+
return p.ctx.Err()
1928+
}
1929+
return nil
1930+
}
1931+
1932+
// AddDirectPeer tags the peer as a direct peer at the internal router
1933+
func (p *PubSub) AddDirectPeer(pInfo peer.AddrInfo) error {
1934+
gs, ok := p.rt.(*GossipSubRouter)
1935+
if !ok {
1936+
return errors.New("add direct peer only supported by gossipsub")
1937+
}
1938+
return p.syncEval(func() {
1939+
gs.AddDirectPeer(pInfo)
1940+
})
1941+
}
1942+
1943+
// RemoveDirectPeer un-tags the peer from being direct peer at the internal router
1944+
func (p *PubSub) RemoveDirectPeer(pid peer.ID) error {
1945+
gs, ok := p.rt.(*GossipSubRouter)
1946+
if !ok {
1947+
return errors.New("remove direct peer only supported by gossipsub")
1948+
}
1949+
return p.syncEval(func() {
1950+
gs.RemoveDirectPeer(pid)
1951+
})
1952+
}

tag_tracer.go

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ type tagTracer struct {
4949
idGen *msgIDGenerator
5050
decayer connmgr.Decayer
5151
decaying map[string]connmgr.DecayingTag
52-
direct map[peer.ID]struct{}
52+
isDirect func(p peer.ID) bool
5353

5454
// a map of message ids to the set of peers who delivered the message after the first delivery,
5555
// but before the message was finished validating
@@ -71,7 +71,7 @@ func newTagTracer(cmgr connmgr.ConnManager) *tagTracer {
7171
decayer: decayer,
7272
decaying: make(map[string]connmgr.DecayingTag),
7373
nearFirst: make(map[string]map[peer.ID]struct{}),
74-
direct: make(map[peer.ID]struct{}),
74+
isDirect: func(p peer.ID) bool { return false },
7575
logger: logger, // Overridden in Start
7676
}
7777
}
@@ -83,18 +83,9 @@ func (t *tagTracer) Start(gs *GossipSubRouter, logger *slog.Logger) {
8383
t.logger = logger
8484

8585
t.idGen = gs.p.idGen
86-
t.direct = gs.direct
87-
}
88-
89-
func (t *tagTracer) tagPeerIfDirect(p peer.ID) {
90-
if t.direct == nil {
91-
return
92-
}
93-
94-
// tag peer if it is a direct peer
95-
_, direct := t.direct[p]
96-
if direct {
97-
t.cmgr.Protect(p, "pubsub:<direct>")
86+
t.isDirect = func(p peer.ID) bool {
87+
_, ok := gs.direct[p]
88+
return ok
9889
}
9990
}
10091

@@ -181,11 +172,21 @@ func (t *tagTracer) nearFirstPeers(msg *Message) []peer.ID {
181172
return peers
182173
}
183174

175+
func (t *tagTracer) protectDirect(p peer.ID) {
176+
t.cmgr.Protect(p, "pubsub:<direct>")
177+
}
178+
179+
func (t *tagTracer) unprotectDirect(p peer.ID) {
180+
t.cmgr.Unprotect(p, "pubsub:<direct>")
181+
}
182+
184183
// -- RawTracer interface methods
185184
var _ RawTracer = (*tagTracer)(nil)
186185

187186
func (t *tagTracer) AddPeer(p peer.ID, proto protocol.ID) {
188-
t.tagPeerIfDirect(p)
187+
if t.isDirect(p) {
188+
t.protectDirect(p)
189+
}
189190
}
190191

191192
func (t *tagTracer) Join(topic string) {

tag_tracer_test.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,7 @@ func TestTagTracerDirectPeerTags(t *testing.T) {
5151
p2 := peer.ID("2")
5252
p3 := peer.ID("3")
5353

54-
// in the real world, tagTracer.direct is set in the WithDirectPeers option function
55-
tt.direct = make(map[peer.ID]struct{})
56-
tt.direct[p1] = struct{}{}
54+
tt.protectDirect(p1)
5755

5856
tt.AddPeer(p1, GossipSubID_v10)
5957
tt.AddPeer(p2, GossipSubID_v10)
@@ -69,6 +67,11 @@ func TestTagTracerDirectPeerTags(t *testing.T) {
6967
t.Fatal("expected non-direct peer to be unprotected")
7068
}
7169
}
70+
71+
tt.unprotectDirect(p1)
72+
if cmgr.IsProtected(p1, tag) {
73+
t.Fatal("expected direct peer to not be protected")
74+
}
7275
}
7376

7477
func TestTagTracerDeliveryTags(t *testing.T) {

0 commit comments

Comments
 (0)