Skip to content

Commit 0793dcd

Browse files
committed
initial testing support
1 parent 5042d4b commit 0793dcd

File tree

3 files changed

+237
-2
lines changed

3 files changed

+237
-2
lines changed

dht.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,11 @@ func makeRoutingTable(dht *IpfsDHT, cfg config) (*kb.RoutingTable, error) {
306306
return rt, err
307307
}
308308

309+
// Mode allows introspection of the operation mode of the DHT
310+
func (dht *IpfsDHT) Mode() ModeOpt {
311+
return dht.auto
312+
}
313+
309314
// fixLowPeers tries to get more peers into the routing table if we're below the threshold
310315
func (dht *IpfsDHT) fixLowPeersRoutine(proc goprocess.Process) {
311316
for {

dual/dual.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,14 @@ func New(ctx context.Context, h host.Host, options ...dht.Option) (*DHT, error)
5353

5454
// Unless overridden by user supplied options, the LAN DHT should default
5555
// to 'AutoServer' mode.
56-
lanOpts := append([]dht.Option{dht.Mode(dht.ModeAutoServer)}, options...)
57-
lanOpts = append(lanOpts,
56+
lanOpts := append(options,
5857
dht.ProtocolExtension(DefaultLanExtension),
5958
dht.QueryFilter(dht.PrivateQueryFilter),
6059
dht.RoutingTableFilter(dht.PrivateRoutingTableFilter),
6160
)
61+
if wan.Mode() != dht.ModeClient {
62+
lanOpts = append(lanOpts, dht.Mode(dht.ModeServer))
63+
}
6264
lan, err := dht.New(ctx, h, lanOpts...)
6365
if err != nil {
6466
return nil, err
@@ -68,6 +70,11 @@ func New(ctx context.Context, h host.Host, options ...dht.Option) (*DHT, error)
6870
return &impl, nil
6971
}
7072

73+
// Close closes the DHT context.
74+
func (dht *DHT) Close() error {
75+
return mergeErrors(dht.WAN.Close(), dht.LAN.Close())
76+
}
77+
7178
func (dht *DHT) activeWAN() bool {
7279
return dht.WAN.RoutingTable().Size() > 0
7380
}

dual/dual_test.go

Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
package dual
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os"
7+
"testing"
8+
"time"
9+
10+
"github.com/ipfs/go-cid"
11+
u "github.com/ipfs/go-ipfs-util"
12+
"github.com/libp2p/go-libp2p-core/network"
13+
"github.com/libp2p/go-libp2p-core/peer"
14+
dht "github.com/libp2p/go-libp2p-kad-dht"
15+
peerstore "github.com/libp2p/go-libp2p-peerstore"
16+
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
17+
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
18+
)
19+
20+
var wancid, lancid cid.Cid
21+
22+
func init() {
23+
wancid = cid.NewCidV1(cid.DagCBOR, u.Hash([]byte("wan cid -- value")))
24+
lancid = cid.NewCidV1(cid.DagCBOR, u.Hash([]byte("lan cid -- value")))
25+
}
26+
27+
type blankValidator struct{}
28+
29+
func (blankValidator) Validate(_ string, _ []byte) error { return nil }
30+
func (blankValidator) Select(_ string, _ [][]byte) (int, error) { return 0, nil }
31+
32+
type customRtHelper struct {
33+
allow peer.ID
34+
}
35+
36+
func MkFilterForPeer() (func(d *dht.IpfsDHT, conns []network.Conn) bool, *customRtHelper) {
37+
helper := customRtHelper{}
38+
f := func(_ *dht.IpfsDHT, conns []network.Conn) bool {
39+
for _, c := range conns {
40+
if c.RemotePeer() == helper.allow {
41+
fmt.Fprintf(os.Stderr, "allowed conn per filter\n")
42+
return true
43+
}
44+
}
45+
fmt.Fprintf(os.Stderr, "rejected conn per rt filter\n")
46+
return false
47+
}
48+
return f, &helper
49+
}
50+
51+
func setupDHTWithFilters(ctx context.Context, t *testing.T, options ...dht.Option) (*DHT, []*customRtHelper) {
52+
h := bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport))
53+
54+
wanFilter, wanRef := MkFilterForPeer()
55+
wanOpts := []dht.Option{
56+
dht.NamespacedValidator("v", blankValidator{}),
57+
dht.ProtocolPrefix("/test"),
58+
dht.DisableAutoRefresh(),
59+
dht.RoutingTableFilter(wanFilter),
60+
}
61+
wan, err := dht.New(ctx, h, wanOpts...)
62+
if err != nil {
63+
t.Fatal(err)
64+
}
65+
66+
lanFilter, lanRef := MkFilterForPeer()
67+
lanOpts := []dht.Option{
68+
dht.NamespacedValidator("v", blankValidator{}),
69+
dht.ProtocolPrefix("/test"),
70+
dht.ProtocolExtension(DefaultLanExtension),
71+
dht.DisableAutoRefresh(),
72+
dht.RoutingTableFilter(lanFilter),
73+
dht.Mode(dht.ModeServer),
74+
}
75+
lan, err := dht.New(ctx, h, lanOpts...)
76+
if err != nil {
77+
t.Fatal(err)
78+
}
79+
80+
impl := DHT{wan, lan}
81+
return &impl, []*customRtHelper{wanRef, lanRef}
82+
}
83+
84+
func setupDHT(ctx context.Context, t *testing.T, options ...dht.Option) *DHT {
85+
t.Helper()
86+
baseOpts := []dht.Option{
87+
dht.NamespacedValidator("v", blankValidator{}),
88+
dht.ProtocolPrefix("/test"),
89+
dht.DisableAutoRefresh(),
90+
}
91+
92+
d, err := New(
93+
ctx,
94+
bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)),
95+
append(baseOpts, options...)...,
96+
)
97+
if err != nil {
98+
t.Fatal(err)
99+
}
100+
return d
101+
}
102+
103+
func connect(ctx context.Context, t *testing.T, a, b *dht.IpfsDHT) {
104+
t.Helper()
105+
bid := b.PeerID()
106+
baddr := b.Host().Peerstore().Addrs(bid)
107+
if len(baddr) == 0 {
108+
t.Fatal("no addresses for connection.")
109+
}
110+
a.Host().Peerstore().AddAddrs(bid, baddr, peerstore.TempAddrTTL)
111+
fmt.Fprintf(os.Stderr, "gonna connect.\n")
112+
if err := a.Host().Connect(ctx, peer.AddrInfo{ID: bid}); err != nil {
113+
t.Fatal(err)
114+
}
115+
fmt.Fprintf(os.Stderr, "gonn wait\n")
116+
wait(ctx, t, a, b)
117+
}
118+
119+
func wait(ctx context.Context, t *testing.T, a, b *dht.IpfsDHT) {
120+
t.Helper()
121+
for a.RoutingTable().Find(b.PeerID()) == "" {
122+
//fmt.Fprintf(os.Stderr, "%v\n", a.RoutingTable().GetPeerInfos())
123+
select {
124+
case <-ctx.Done():
125+
t.Fatal(ctx.Err())
126+
case <-time.After(time.Millisecond * 5):
127+
}
128+
}
129+
}
130+
131+
func setupTier(ctx context.Context, t *testing.T) (*DHT, *dht.IpfsDHT, *dht.IpfsDHT) {
132+
t.Helper()
133+
baseOpts := []dht.Option{
134+
dht.NamespacedValidator("v", blankValidator{}),
135+
dht.ProtocolPrefix("/test"),
136+
dht.DisableAutoRefresh(),
137+
}
138+
139+
d, hlprs := setupDHTWithFilters(ctx, t)
140+
141+
wan, err := dht.New(
142+
ctx,
143+
bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)),
144+
append(baseOpts, dht.Mode(dht.ModeServer))...,
145+
)
146+
if err != nil {
147+
t.Fatal(err)
148+
}
149+
hlprs[0].allow = wan.PeerID()
150+
connect(ctx, t, wan, d.WAN)
151+
152+
lan, err := dht.New(
153+
ctx,
154+
bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)),
155+
append(baseOpts, dht.Mode(dht.ModeServer), dht.ProtocolExtension("/lan"))...,
156+
)
157+
if err != nil {
158+
t.Fatal(err)
159+
}
160+
hlprs[1].allow = lan.PeerID()
161+
connect(ctx, t, d.LAN, lan)
162+
163+
return d, wan, lan
164+
}
165+
166+
func TestDualModes(t *testing.T) {
167+
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
168+
defer cancel()
169+
170+
d := setupDHT(ctx, t)
171+
defer d.Close()
172+
173+
if d.WAN.Mode() != dht.ModeAuto {
174+
t.Fatal("wrong default mode for wan")
175+
} else if d.LAN.Mode() != dht.ModeServer {
176+
t.Fatal("wrong default mode for lan")
177+
}
178+
179+
d2 := setupDHT(ctx, t, dht.Mode(dht.ModeClient))
180+
defer d2.Close()
181+
if d2.WAN.Mode() != dht.ModeClient ||
182+
d2.LAN.Mode() != dht.ModeClient {
183+
t.Fatal("wrong client mode operation")
184+
}
185+
}
186+
187+
func TestFindProviderAsync(t *testing.T) {
188+
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
189+
defer cancel()
190+
191+
d, wan, lan := setupTier(ctx, t)
192+
defer d.Close()
193+
defer wan.Close()
194+
defer lan.Close()
195+
196+
if err := wan.Provide(ctx, wancid, true); err != nil {
197+
t.Fatal(err)
198+
}
199+
200+
if err := lan.Provide(ctx, lancid, true); err != nil {
201+
t.Fatal(err)
202+
}
203+
204+
wpc := d.FindProvidersAsync(ctx, wancid, 1)
205+
select {
206+
case p := <-wpc:
207+
if p.ID != wan.PeerID() {
208+
t.Fatal("wrong wan provider")
209+
}
210+
case <-ctx.Done():
211+
t.Fatal("find provider timeout.")
212+
}
213+
214+
lpc := d.FindProvidersAsync(ctx, lancid, 1)
215+
select {
216+
case p := <-lpc:
217+
if p.ID != lan.PeerID() {
218+
t.Fatal("wrong lan provider")
219+
}
220+
case <-ctx.Done():
221+
t.Fatal("find provider timeout.")
222+
}
223+
}

0 commit comments

Comments
 (0)