Skip to content

Commit 99f1a4b

Browse files
authored
Fix peer creation for many new peers (#27)
* Fix peer creation for many new peers * Old code blocked the channel. * We use the new peers count and not channel length for checking the number of peers. * We made the channel a bit longer than necessary to avoid overlooked race conditions. * Add better tests. One for reproduction of the error.
1 parent 8556535 commit 99f1a4b

File tree

3 files changed

+144
-23
lines changed

3 files changed

+144
-23
lines changed

.github/workflows/push-docker-custom.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ name: Deploy dev images to GHCR
33
on:
44
push:
55
branches:
6-
- 'feature/public-peers'
6+
- 'bugfix/many-peers'
77

88
jobs:
99
push-store-image:
@@ -21,5 +21,5 @@ jobs:
2121

2222
- name: 'Build Inventory Image'
2323
run: |
24-
docker build . --tag ghcr.io/qubic/qubic-nodes:public-peers
25-
docker push ghcr.io/qubic/qubic-nodes:public-peers
24+
docker build . --tag ghcr.io/qubic/qubic-nodes:snapshot
25+
docker push ghcr.io/qubic/qubic-nodes:snapshot

node/peer_discovery.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
"time"
99
)
1010

11-
const maxNewPeersPerUpdate = 50
11+
const approxMaxNewPeers = 50
1212

1313
type UpdatedPeerList struct {
1414
originalPeers []string
@@ -107,7 +107,9 @@ func (ppd *PublicPeerDiscovery) FindNewPeers(nodes []*Node, addresses []string)
107107
}
108108

109109
var waitGroup sync.WaitGroup
110-
nodesChannel := make(chan *Node, maxNewPeersPerUpdate)
110+
// channel must be large enough to not block.
111+
// the new peer size should be enough but better be safe in case of a race condition.
112+
nodesChannel := make(chan *Node, approxMaxNewPeers*2)
111113
for _, node := range nodes {
112114
ppd.lookupPeers(node.Peers, peers, nodesChannel, &waitGroup)
113115
}
@@ -126,10 +128,12 @@ func (ppd *PublicPeerDiscovery) FindNewPeers(nodes []*Node, addresses []string)
126128
// recursive
127129
func (ppd *PublicPeerDiscovery) lookupPeers(hosts []string, peers *UpdatedPeerList, channel chan *Node, waitGroup *sync.WaitGroup) {
128130
for _, host := range hosts {
129-
// abort if channel is filled with next peer
130-
if len(channel) < maxNewPeersPerUpdate-2 && peers.addIfNew(host) {
131+
// add new host and its new peers if we don't have enough peers yet (len-1 and -1 for current one)
132+
// attention: asynchronous recursive call
133+
if len(peers.newPeers) < approxMaxNewPeers && peers.addIfNew(host) {
134+
log.Printf("Found new host: [%s]. (%d)", host, len(peers.newPeers))
131135
waitGroup.Add(1)
132-
go ppd.lookupPeer(host, peers, channel, waitGroup)
136+
go ppd.lookupPeer(host, peers, channel, waitGroup) // async. This will get executed after this loop most probably
133137
}
134138
}
135139
}
@@ -139,7 +143,7 @@ func (ppd *PublicPeerDiscovery) lookupPeer(host string, peers *UpdatedPeerList,
139143
defer waitGroup.Done()
140144
node, err := ppd.createNodeFunction(host)
141145
if err == nil {
142-
channel <- node
146+
channel <- node // attention: we block here forever if the channel is full
143147
ppd.lookupPeers(node.Peers, peers, channel, waitGroup)
144148
}
145149
}

node/peer_discovery_test.go

Lines changed: 131 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
package node
22

33
import (
4-
"github.com/pkg/errors"
5-
"github.com/stretchr/testify/assert"
4+
"context"
5+
"fmt"
66
"sync"
77
"testing"
88
"time"
9+
10+
"github.com/pkg/errors"
11+
"github.com/stretchr/testify/assert"
12+
"github.com/stretchr/testify/require"
913
)
1014

1115
func TestNoPeerDiscovery_UpdatePeers(t *testing.T) {
@@ -87,19 +91,132 @@ func TestPublicPeerDiscovery_UpdatePeers(t *testing.T) {
8791
assert.Contains(t, hosts, "6.7.8.9")
8892
}
8993

90-
func TestPublicPeerDiscovery_ExcludePeers(t *testing.T) {
91-
createNodeFunc := func(host string) (*Node, error) {
92-
return createTestNodeWithPeers(host, []string{"1.2.3.4", "6.6.6.6"}), nil // 6.6.6.6 excluded
93-
}
94-
discovery := newPublicPeerDiscovery(createNodeFunc, []string{" 6.6.6.6"}, time.Hour)
94+
func TestPublicPeerDiscovery_FindNewPeers(t *testing.T) {
95+
t.Run("returns empty when no new peers found", func(t *testing.T) {
96+
createNodeFunc := func(host string) (*Node, error) {
97+
return createTestNodeWithPeers(host, []string{}), nil
98+
}
99+
discovery := newPublicPeerDiscovery(createNodeFunc, []string{}, time.Hour)
95100

96-
discoveredPeers := discovery.FindNewPeers([]*Node{
97-
createTestNodeWithPeers("1.2.3.4", []string{"2.3.4.5", "3.4.5.6"}), // 3.4.5.6 new peer
98-
}, []string{"1.2.3.4", "2.3.4.5"})
101+
discoveredPeers := discovery.FindNewPeers([]*Node{
102+
createTestNodeWithPeers("1.2.3.4", []string{"2.3.4.5"}),
103+
}, []string{"1.2.3.4", "2.3.4.5"})
99104

100-
assert.Len(t, discoveredPeers, 1)
101-
hosts := getHosts(discoveredPeers)
102-
assert.Contains(t, hosts, "3.4.5.6")
105+
assert.Empty(t, discoveredPeers)
106+
})
107+
108+
t.Run("discovers new peers from existing nodes", func(t *testing.T) {
109+
createNodeFunc := func(host string) (*Node, error) {
110+
return createTestNodeWithPeers(host, []string{}), nil
111+
}
112+
discovery := newPublicPeerDiscovery(createNodeFunc, []string{}, time.Hour)
113+
114+
discoveredPeers := discovery.FindNewPeers([]*Node{
115+
createTestNodeWithPeers("1.2.3.4", []string{"2.3.4.5", "3.4.5.6"}),
116+
}, []string{"1.2.3.4"})
117+
118+
assert.Len(t, discoveredPeers, 2)
119+
hosts := getHosts(discoveredPeers)
120+
assert.Contains(t, hosts, "2.3.4.5")
121+
assert.Contains(t, hosts, "3.4.5.6")
122+
})
123+
124+
t.Run("recursively discovers peers from new peers", func(t *testing.T) {
125+
createNodeFunc := func(host string) (*Node, error) {
126+
switch host {
127+
case "2.3.4.5":
128+
return createTestNodeWithPeers(host, []string{"3.4.5.6"}), nil
129+
case "3.4.5.6":
130+
return createTestNodeWithPeers(host, []string{"4.5.6.7"}), nil
131+
default:
132+
return createTestNodeWithPeers(host, []string{}), nil
133+
}
134+
}
135+
discovery := newPublicPeerDiscovery(createNodeFunc, []string{}, time.Hour)
136+
137+
discoveredPeers := discovery.FindNewPeers([]*Node{
138+
createTestNodeWithPeers("1.2.3.4", []string{"2.3.4.5"}),
139+
}, []string{"1.2.3.4"})
140+
141+
assert.Len(t, discoveredPeers, 3)
142+
hosts := getHosts(discoveredPeers)
143+
assert.Contains(t, hosts, "2.3.4.5")
144+
assert.Contains(t, hosts, "3.4.5.6")
145+
assert.Contains(t, hosts, "4.5.6.7")
146+
})
147+
148+
t.Run("excludes peers from excluded list", func(t *testing.T) {
149+
createNodeFunc := func(host string) (*Node, error) {
150+
return createTestNodeWithPeers(host, []string{}), nil
151+
}
152+
discovery := newPublicPeerDiscovery(createNodeFunc, []string{"3.4.5.6"}, time.Hour)
153+
154+
discoveredPeers := discovery.FindNewPeers([]*Node{
155+
createTestNodeWithPeers("1.2.3.4", []string{"2.3.4.5", "3.4.5.6", "4.5.6.7"}),
156+
}, []string{"1.2.3.4"})
157+
158+
assert.Len(t, discoveredPeers, 2)
159+
hosts := getHosts(discoveredPeers)
160+
assert.Contains(t, hosts, "2.3.4.5")
161+
assert.Contains(t, hosts, "4.5.6.7")
162+
assert.NotContains(t, hosts, "3.4.5.6")
163+
})
164+
165+
t.Run("handles node creation errors gracefully", func(t *testing.T) {
166+
createNodeFunc := func(host string) (*Node, error) {
167+
if host == "3.4.5.6" {
168+
return nil, errors.Errorf("connection failed")
169+
}
170+
return createTestNodeWithPeers(host, []string{}), nil
171+
}
172+
discovery := newPublicPeerDiscovery(createNodeFunc, []string{}, time.Hour)
173+
174+
discoveredPeers := discovery.FindNewPeers([]*Node{
175+
createTestNodeWithPeers("1.2.3.4", []string{"2.3.4.5", "3.4.5.6", "4.5.6.7"}),
176+
}, []string{"1.2.3.4"})
177+
178+
assert.Len(t, discoveredPeers, 2)
179+
hosts := getHosts(discoveredPeers)
180+
assert.Contains(t, hosts, "2.3.4.5")
181+
assert.Contains(t, hosts, "4.5.6.7")
182+
assert.NotContains(t, hosts, "3.4.5.6")
183+
})
184+
}
185+
186+
func TestPublicPeerDiscovery_nodeWithManyPeers(t *testing.T) {
187+
188+
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
189+
defer cancel()
190+
done := make(chan struct{})
191+
192+
go func() {
193+
// Generate many peers to test the limit
194+
numberOfPeers := 100
195+
manyPeers := make([]string, numberOfPeers)
196+
for i := 0; i < numberOfPeers; i++ {
197+
manyPeers[i] = fmt.Sprintf("10.0.0.%d", i)
198+
}
199+
200+
createNodeFunc := func(host string) (*Node, error) {
201+
return createTestNodeWithPeers(host, []string{}), nil
202+
}
203+
discovery := newPublicPeerDiscovery(createNodeFunc, []string{}, time.Hour)
204+
205+
discoveredPeers := discovery.FindNewPeers([]*Node{
206+
createTestNodeWithPeers("1.2.3.4", manyPeers),
207+
}, []string{"1.2.3.4"})
208+
209+
// Should not exceed maxNewPeersPerUpdate (50)
210+
require.LessOrEqual(t, len(discoveredPeers), approxMaxNewPeers)
211+
close(done)
212+
}()
213+
214+
select {
215+
case <-done:
216+
// Test passed
217+
case <-ctx.Done():
218+
t.Fatal("Test timed out")
219+
}
103220
}
104221

105222
func TestPublicPeerDiscovery_CleanupPeers(t *testing.T) {
@@ -121,7 +238,7 @@ func TestPublicPeerDiscovery_CleanupPeers(t *testing.T) {
121238
assert.Contains(t, unhealthy, "3.4.5.6")
122239

123240
time.Sleep(5 * time.Millisecond)
124-
// clean up one
241+
// clean up
125242
unhealthy = discovery.CleanupPeers([]*Node{createTestNode("2.3.4.5")}, []string{"2.3.4.5", "3.4.5.6"})
126243
assert.Len(t, unhealthy, 1)
127244
assert.Contains(t, unhealthy, "3.4.5.6")

0 commit comments

Comments
 (0)