Skip to content

Commit 8a4963d

Browse files
authored
Merge pull request #570 from libp2p/feat/dual
Dual DHT scaffold
2 parents 969e03e + ca09167 commit 8a4963d

File tree

9 files changed

+655
-41
lines changed

9 files changed

+655
-41
lines changed

dht.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ type IpfsDHT struct {
8686
// DHT protocols we can respond to.
8787
serverProtocols []protocol.ID
8888

89-
auto bool
89+
auto ModeOpt
9090
mode mode
9191
modeLk sync.Mutex
9292

@@ -159,15 +159,11 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
159159

160160
dht.Validator = cfg.validator
161161

162+
dht.auto = cfg.mode
162163
switch cfg.mode {
163-
case ModeAuto:
164-
dht.auto = true
164+
case ModeAuto, ModeClient:
165165
dht.mode = modeClient
166-
case ModeClient:
167-
dht.auto = false
168-
dht.mode = modeClient
169-
case ModeServer:
170-
dht.auto = false
166+
case ModeAutoServer, ModeServer:
171167
dht.mode = modeServer
172168
default:
173169
return nil, fmt.Errorf("invalid dht mode %d", cfg.mode)
@@ -312,6 +308,11 @@ func makeRoutingTable(dht *IpfsDHT, cfg config) (*kb.RoutingTable, error) {
312308
return rt, err
313309
}
314310

311+
// Mode allows introspection of the operation mode of the DHT
312+
func (dht *IpfsDHT) Mode() ModeOpt {
313+
return dht.auto
314+
}
315+
315316
// fixLowPeers tries to get more peers into the routing table if we're below the threshold
316317
func (dht *IpfsDHT) fixLowPeersRoutine(proc goprocess.Process) {
317318
for {

dht_options.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ const (
2525
ModeClient
2626
// ModeServer operates the DHT as a server, it can both send and respond to queries
2727
ModeServer
28+
// ModeAutoServer operates in the same way as ModeAuto, but acts as a server when reachability is unknown
29+
ModeAutoServer
2830
)
2931

3032
// DefaultPrefix is the application specific prefix attached to all DHT protocols by default.
@@ -256,6 +258,15 @@ func ProtocolPrefix(prefix protocol.ID) Option {
256258
}
257259
}
258260

261+
// ProtocolExtension adds an application specific protocol to the DHT protocol. For example,
262+
// /ipfs/lan/kad/1.0.0 instead of /ipfs/kad/1.0.0. extension should be of the form /lan.
263+
func ProtocolExtension(ext protocol.ID) Option {
264+
return func(c *config) error {
265+
c.protocolPrefix += ext
266+
return nil
267+
}
268+
}
269+
259270
// BucketSize configures the bucket size (k in the Kademlia paper) of the routing table.
260271
//
261272
// The default value is 20.

dht_test.go

Lines changed: 6 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/stretchr/testify/require"
2626

2727
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
28+
test "github.com/libp2p/go-libp2p-kad-dht/internal/testing"
2829

2930
"github.com/ipfs/go-cid"
3031
u "github.com/ipfs/go-ipfs-util"
@@ -72,33 +73,8 @@ type blankValidator struct{}
7273
func (blankValidator) Validate(_ string, _ []byte) error { return nil }
7374
func (blankValidator) Select(_ string, _ [][]byte) (int, error) { return 0, nil }
7475

75-
type testValidator struct{}
76-
77-
func (testValidator) Select(_ string, bs [][]byte) (int, error) {
78-
index := -1
79-
for i, b := range bs {
80-
if bytes.Equal(b, []byte("newer")) {
81-
index = i
82-
} else if bytes.Equal(b, []byte("valid")) {
83-
if index == -1 {
84-
index = i
85-
}
86-
}
87-
}
88-
if index == -1 {
89-
return -1, errors.New("no rec found")
90-
}
91-
return index, nil
92-
}
93-
func (testValidator) Validate(_ string, b []byte) error {
94-
if bytes.Equal(b, []byte("expired")) {
95-
return errors.New("expired")
96-
}
97-
return nil
98-
}
99-
10076
type testAtomicPutValidator struct {
101-
testValidator
77+
test.TestValidator
10278
}
10379

10480
// selects the entry with the 'highest' last byte
@@ -372,7 +348,7 @@ func TestValueSetInvalid(t *testing.T) {
372348
defer dhtA.host.Close()
373349
defer dhtB.host.Close()
374350

375-
dhtA.Validator.(record.NamespacedValidator)["v"] = testValidator{}
351+
dhtA.Validator.(record.NamespacedValidator)["v"] = test.TestValidator{}
376352
dhtB.Validator.(record.NamespacedValidator)["v"] = blankValidator{}
377353

378354
connect(t, ctx, dhtA, dhtB)
@@ -451,8 +427,8 @@ func TestSearchValue(t *testing.T) {
451427

452428
connect(t, ctx, dhtA, dhtB)
453429

454-
dhtA.Validator.(record.NamespacedValidator)["v"] = testValidator{}
455-
dhtB.Validator.(record.NamespacedValidator)["v"] = testValidator{}
430+
dhtA.Validator.(record.NamespacedValidator)["v"] = test.TestValidator{}
431+
dhtB.Validator.(record.NamespacedValidator)["v"] = test.TestValidator{}
456432

457433
ctxT, cancel := context.WithTimeout(ctx, time.Second)
458434
defer cancel()
@@ -554,7 +530,7 @@ func TestValueGetInvalid(t *testing.T) {
554530
defer dhtB.host.Close()
555531

556532
dhtA.Validator.(record.NamespacedValidator)["v"] = blankValidator{}
557-
dhtB.Validator.(record.NamespacedValidator)["v"] = testValidator{}
533+
dhtB.Validator.(record.NamespacedValidator)["v"] = test.TestValidator{}
558534

559535
connect(t, ctx, dhtA, dhtB)
560536

dual/dual.go

Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
// Package dual provides an implementaiton of a split or "dual" dht, where two parallel instances
2+
// are maintained for the global internet and the local LAN respectively.
3+
package dual
4+
5+
import (
6+
"context"
7+
"sync"
8+
9+
"github.com/ipfs/go-cid"
10+
ci "github.com/libp2p/go-libp2p-core/crypto"
11+
"github.com/libp2p/go-libp2p-core/host"
12+
"github.com/libp2p/go-libp2p-core/peer"
13+
"github.com/libp2p/go-libp2p-core/protocol"
14+
"github.com/libp2p/go-libp2p-core/routing"
15+
dht "github.com/libp2p/go-libp2p-kad-dht"
16+
helper "github.com/libp2p/go-libp2p-routing-helpers"
17+
18+
"github.com/hashicorp/go-multierror"
19+
)
20+
21+
// DHT implements the routing interface to provide two concrete DHT implementationts for use
22+
// in IPFS that are used to support both global network users and disjoint LAN usecases.
23+
type DHT struct {
24+
WAN *dht.IpfsDHT
25+
LAN *dht.IpfsDHT
26+
}
27+
28+
// LanExtension is used to differentiate local protocol requests from those on the WAN DHT.
29+
const LanExtension protocol.ID = "/lan"
30+
31+
// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
32+
// guarantee, but we can use them to aid refactoring.
33+
var (
34+
_ routing.ContentRouting = (*DHT)(nil)
35+
_ routing.Routing = (*DHT)(nil)
36+
_ routing.PeerRouting = (*DHT)(nil)
37+
_ routing.PubKeyFetcher = (*DHT)(nil)
38+
_ routing.ValueStore = (*DHT)(nil)
39+
)
40+
41+
// New creates a new DualDHT instance. Options provided are forwarded on to the two concrete
42+
// IpfsDHT internal constructions, modulo additional options used by the Dual DHT to enforce
43+
// the LAN-vs-WAN distinction.
44+
// Note: query or routing table functional options provided as arguments to this function
45+
// will be overriden by this constructor.
46+
func New(ctx context.Context, h host.Host, options ...dht.Option) (*DHT, error) {
47+
wanOpts := append(options,
48+
dht.QueryFilter(dht.PublicQueryFilter),
49+
dht.RoutingTableFilter(dht.PublicRoutingTableFilter),
50+
)
51+
wan, err := dht.New(ctx, h, wanOpts...)
52+
if err != nil {
53+
return nil, err
54+
}
55+
56+
// Unless overridden by user supplied options, the LAN DHT should default
57+
// to 'AutoServer' mode.
58+
lanOpts := append(options,
59+
dht.ProtocolExtension(LanExtension),
60+
dht.QueryFilter(dht.PrivateQueryFilter),
61+
dht.RoutingTableFilter(dht.PrivateRoutingTableFilter),
62+
)
63+
if wan.Mode() != dht.ModeClient {
64+
lanOpts = append(lanOpts, dht.Mode(dht.ModeServer))
65+
}
66+
lan, err := dht.New(ctx, h, lanOpts...)
67+
if err != nil {
68+
return nil, err
69+
}
70+
71+
impl := DHT{wan, lan}
72+
return &impl, nil
73+
}
74+
75+
// Close closes the DHT context.
76+
func (dht *DHT) Close() error {
77+
return multierror.Append(dht.WAN.Close(), dht.LAN.Close()).ErrorOrNil()
78+
}
79+
80+
func (dht *DHT) activeWAN() bool {
81+
return dht.WAN.RoutingTable().Size() > 0
82+
}
83+
84+
// Provide adds the given cid to the content routing system.
85+
func (dht *DHT) Provide(ctx context.Context, key cid.Cid, announce bool) error {
86+
if dht.activeWAN() {
87+
return dht.WAN.Provide(ctx, key, announce)
88+
}
89+
return dht.LAN.Provide(ctx, key, announce)
90+
}
91+
92+
// FindProvidersAsync searches for peers who are able to provide a given key
93+
func (dht *DHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo {
94+
reqCtx, cancel := context.WithCancel(ctx)
95+
outCh := make(chan peer.AddrInfo)
96+
wanCh := dht.WAN.FindProvidersAsync(reqCtx, key, count)
97+
lanCh := dht.LAN.FindProvidersAsync(reqCtx, key, count)
98+
zeroCount := (count == 0)
99+
go func() {
100+
defer cancel()
101+
defer close(outCh)
102+
103+
found := make(map[peer.ID]struct{}, count)
104+
var pi peer.AddrInfo
105+
for (zeroCount || count > 0) && (wanCh != nil || lanCh != nil) {
106+
var ok bool
107+
select {
108+
case pi, ok = <-wanCh:
109+
if !ok {
110+
wanCh = nil
111+
continue
112+
}
113+
case pi, ok = <-lanCh:
114+
if !ok {
115+
lanCh = nil
116+
continue
117+
}
118+
}
119+
// already found
120+
if _, ok = found[pi.ID]; ok {
121+
continue
122+
}
123+
124+
select {
125+
case outCh <- pi:
126+
found[pi.ID] = struct{}{}
127+
count--
128+
case <-ctx.Done():
129+
return
130+
}
131+
}
132+
}()
133+
return outCh
134+
}
135+
136+
// FindPeer searches for a peer with given ID
137+
// Note: with signed peer records, we can change this to short circuit once either DHT returns.
138+
func (dht *DHT) FindPeer(ctx context.Context, pid peer.ID) (peer.AddrInfo, error) {
139+
var wg sync.WaitGroup
140+
wg.Add(2)
141+
var wanInfo, lanInfo peer.AddrInfo
142+
var wanErr, lanErr error
143+
go func() {
144+
defer wg.Done()
145+
wanInfo, wanErr = dht.WAN.FindPeer(ctx, pid)
146+
}()
147+
go func() {
148+
defer wg.Done()
149+
lanInfo, lanErr = dht.LAN.FindPeer(ctx, pid)
150+
}()
151+
152+
wg.Wait()
153+
154+
return peer.AddrInfo{
155+
ID: pid,
156+
Addrs: append(wanInfo.Addrs, lanInfo.Addrs...),
157+
}, multierror.Append(wanErr, lanErr).ErrorOrNil()
158+
}
159+
160+
// Bootstrap allows callers to hint to the routing system to get into a
161+
// Boostrapped state and remain there.
162+
func (dht *DHT) Bootstrap(ctx context.Context) error {
163+
erra := dht.WAN.Bootstrap(ctx)
164+
errb := dht.LAN.Bootstrap(ctx)
165+
return multierror.Append(erra, errb).ErrorOrNil()
166+
}
167+
168+
// PutValue adds value corresponding to given Key.
169+
func (dht *DHT) PutValue(ctx context.Context, key string, val []byte, opts ...routing.Option) error {
170+
if dht.activeWAN() {
171+
return dht.WAN.PutValue(ctx, key, val, opts...)
172+
}
173+
return dht.LAN.PutValue(ctx, key, val, opts...)
174+
}
175+
176+
// GetValue searches for the value corresponding to given Key.
177+
func (d *DHT) GetValue(ctx context.Context, key string, opts ...routing.Option) ([]byte, error) {
178+
lanCtx, cancelLan := context.WithCancel(ctx)
179+
defer cancelLan()
180+
181+
var (
182+
lanVal []byte
183+
lanErr error
184+
lanWaiter sync.WaitGroup
185+
)
186+
lanWaiter.Add(1)
187+
go func() {
188+
defer lanWaiter.Done()
189+
lanVal, lanErr = d.LAN.GetValue(lanCtx, key, opts...)
190+
}()
191+
192+
wanVal, wanErr := d.WAN.GetValue(ctx, key, opts...)
193+
if wanErr == nil {
194+
cancelLan()
195+
}
196+
lanWaiter.Wait()
197+
if wanErr != nil {
198+
if lanErr != nil {
199+
return nil, multierror.Append(wanErr, lanErr).ErrorOrNil()
200+
}
201+
return lanVal, nil
202+
}
203+
return wanVal, nil
204+
}
205+
206+
// SearchValue searches for better values from this value
207+
func (dht *DHT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) {
208+
p := helper.Parallel{Routers: []routing.Routing{dht.WAN, dht.LAN}, Validator: dht.WAN.Validator}
209+
return p.SearchValue(ctx, key, opts...)
210+
}
211+
212+
// GetPublicKey returns the public key for the given peer.
213+
func (dht *DHT) GetPublicKey(ctx context.Context, pid peer.ID) (ci.PubKey, error) {
214+
p := helper.Parallel{Routers: []routing.Routing{dht.WAN, dht.LAN}, Validator: dht.WAN.Validator}
215+
return p.GetPublicKey(ctx, pid)
216+
}

0 commit comments

Comments
 (0)