Skip to content

Commit 7ac1f6b

Browse files
committed
Dual DHT scaffold
1 parent 6c82b40 commit 7ac1f6b

File tree

1 file changed

+176
-0
lines changed

1 file changed

+176
-0
lines changed

dual/dual.go

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
// Package dual provides an implementaiton of a split or "dual" dht, where two parallel instances
2+
// are maintained for the global internet and the local LAN respectively.
3+
package dual
4+
5+
import (
6+
"context"
7+
"fmt"
8+
"sync"
9+
10+
"github.com/ipfs/go-cid"
11+
ci "github.com/libp2p/go-libp2p-core/crypto"
12+
"github.com/libp2p/go-libp2p-core/host"
13+
"github.com/libp2p/go-libp2p-core/peer"
14+
"github.com/libp2p/go-libp2p-core/routing"
15+
dht "github.com/libp2p/go-libp2p-kad-dht"
16+
)
17+
18+
// DHT implements the routing interface to provide two concrete DHT implementationts for use
19+
// in IPFS that are used to support both global network users and disjoint LAN usecases.
20+
type DHT struct {
21+
WAN *dht.IpfsDHT
22+
LAN *dht.IpfsDHT
23+
}
24+
25+
// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
26+
// guarantee, but we can use them to aid refactoring.
27+
var (
28+
_ routing.ContentRouting = (*DHT)(nil)
29+
_ routing.Routing = (*DHT)(nil)
30+
_ routing.PeerRouting = (*DHT)(nil)
31+
_ routing.PubKeyFetcher = (*DHT)(nil)
32+
_ routing.ValueStore = (*DHT)(nil)
33+
)
34+
35+
// NewDHT creates a new DualDHT instance. Options provided are forwarded on to the two concrete
36+
// IpfsDHT internal constructions, modulo additional options used by the Dual DHT to enforce
37+
// the LAN-vs-WAN distinction.
38+
func NewDHT(ctx context.Context, h host.Host, options ...dht.Option) (*DHT, error) {
39+
wanOpts := append(options,
40+
dht.ProtocolPrefix(dht.DefaultPrefix),
41+
dht.QueryFilter(dht.PublicQueryFilter),
42+
dht.RoutingTableFilter(dht.PublicRoutingTableFilter),
43+
)
44+
wan, err := dht.New(ctx, h, wanOpts...)
45+
if err != nil {
46+
return nil, err
47+
}
48+
49+
lanOpts := append(options,
50+
dht.ProtocolPrefix(dht.DefaultPrefix+"/lan"),
51+
dht.QueryFilter(dht.PrivateQueryFilter),
52+
dht.RoutingTableFilter(dht.PrivateRoutingTableFilter),
53+
)
54+
lan, err := dht.New(ctx, h, lanOpts...)
55+
if err != nil {
56+
return nil, err
57+
}
58+
59+
impl := DHT{wan, lan}
60+
return &impl, nil
61+
}
62+
63+
func (dht *DHT) activeWAN() bool {
64+
wanPeers := dht.WAN.RoutingTable().ListPeers()
65+
return len(wanPeers) > 0
66+
}
67+
68+
// Provide adds the given cid to the content routing system.
69+
func (dht *DHT) Provide(ctx context.Context, key cid.Cid, announce bool) error {
70+
if dht.activeWAN() {
71+
return dht.WAN.Provide(ctx, key, announce)
72+
}
73+
return dht.LAN.Provide(ctx, key, announce)
74+
}
75+
76+
// FindProvidersAsync searches for peers who are able to provide a given key
77+
func (dht *DHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo {
78+
if dht.activeWAN() {
79+
return dht.WAN.FindProvidersAsync(ctx, key, count)
80+
}
81+
return dht.LAN.FindProvidersAsync(ctx, key, count)
82+
}
83+
84+
// FindPeer searches for a peer with given ID
85+
func (dht *DHT) FindPeer(ctx context.Context, pid peer.ID) (peer.AddrInfo, error) {
86+
// TODO: should these be run in parallel?
87+
infoa, erra := dht.WAN.FindPeer(ctx, pid)
88+
infob, errb := dht.LAN.FindPeer(ctx, pid)
89+
return peer.AddrInfo{
90+
ID: pid,
91+
Addrs: append(infoa.Addrs, infob.Addrs...),
92+
}, mergeErrors(erra, errb)
93+
}
94+
95+
func mergeErrors(a, b error) error {
96+
if a == nil && b == nil {
97+
return nil
98+
} else if a != nil && b != nil {
99+
return fmt.Errorf("%v, %v", a, b)
100+
} else if a != nil {
101+
return a
102+
}
103+
return b
104+
}
105+
106+
// Bootstrap allows callers to hint to the routing system to get into a
107+
// Boostrapped state and remain there.
108+
func (dht *DHT) Bootstrap(ctx context.Context) error {
109+
erra := dht.WAN.Bootstrap(ctx)
110+
errb := dht.LAN.Bootstrap(ctx)
111+
return mergeErrors(erra, errb)
112+
}
113+
114+
// PutValue adds value corresponding to given Key.
115+
func (dht *DHT) PutValue(ctx context.Context, key string, val []byte, opts ...routing.Option) error {
116+
if dht.activeWAN() {
117+
return dht.WAN.PutValue(ctx, key, val, opts...)
118+
}
119+
return dht.LAN.PutValue(ctx, key, val, opts...)
120+
}
121+
122+
// GetValue searches for the value corresponding to given Key.
123+
func (dht *DHT) GetValue(ctx context.Context, key string, opts ...routing.Option) ([]byte, error) {
124+
vala, erra := dht.WAN.GetValue(ctx, key, opts...)
125+
if vala != nil {
126+
return vala, erra
127+
}
128+
valb, errb := dht.LAN.GetValue(ctx, key, opts...)
129+
return valb, mergeErrors(erra, errb)
130+
}
131+
132+
// SearchValue searches for better values from this value
133+
func (dht *DHT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) {
134+
streama, erra := dht.WAN.SearchValue(ctx, key, opts...)
135+
streamb, errb := dht.WAN.SearchValue(ctx, key, opts...)
136+
if erra == nil && errb == nil {
137+
combinedStream := make(chan []byte)
138+
var combinedWg sync.WaitGroup
139+
combinedWg.Add(2)
140+
go func(out chan []byte) {
141+
for itm := range streama {
142+
out <- itm
143+
}
144+
combinedWg.Done()
145+
}(combinedStream)
146+
go func(out chan []byte) {
147+
for itm := range streamb {
148+
out <- itm
149+
}
150+
combinedWg.Done()
151+
}(combinedStream)
152+
go func() {
153+
combinedWg.Wait()
154+
close(combinedStream)
155+
}()
156+
return combinedStream, nil
157+
} else if erra == nil {
158+
return streama, nil
159+
} else if errb == nil {
160+
return streamb, nil
161+
}
162+
return nil, mergeErrors(erra, errb)
163+
}
164+
165+
// GetPublicKey returns the public key for the given peer.
166+
func (dht *DHT) GetPublicKey(ctx context.Context, pid peer.ID) (ci.PubKey, error) {
167+
pka, erra := dht.WAN.GetPublicKey(ctx, pid)
168+
if erra == nil {
169+
return pka, nil
170+
}
171+
pkb, errb := dht.LAN.GetPublicKey(ctx, pid)
172+
if errb == nil {
173+
return pkb, nil
174+
}
175+
return nil, mergeErrors(erra, errb)
176+
}

0 commit comments

Comments
 (0)