Skip to content

Commit 5042d4b

Browse files
committed
address comments, scaffold parallelism
1 parent 7ac1f6b commit 5042d4b

File tree

4 files changed

+179
-42
lines changed

4 files changed

+179
-42
lines changed

dht.go

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ type IpfsDHT struct {
8484
// DHT protocols we can respond to.
8585
serverProtocols []protocol.ID
8686

87-
auto bool
87+
auto ModeOpt
8888
mode mode
8989
modeLk sync.Mutex
9090

@@ -157,15 +157,11 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
157157

158158
dht.Validator = cfg.validator
159159

160+
dht.auto = cfg.mode
160161
switch cfg.mode {
161-
case ModeAuto:
162-
dht.auto = true
162+
case ModeAuto, ModeClient:
163163
dht.mode = modeClient
164-
case ModeClient:
165-
dht.auto = false
166-
dht.mode = modeClient
167-
case ModeServer:
168-
dht.auto = false
164+
case ModeAutoServer, ModeServer:
169165
dht.mode = modeServer
170166
default:
171167
return nil, fmt.Errorf("invalid dht mode %d", cfg.mode)

dht_options.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@ package dht
22

33
import (
44
"fmt"
5-
"github.com/ipfs/go-ipns"
65
"time"
76

7+
"github.com/ipfs/go-ipns"
8+
89
ds "github.com/ipfs/go-datastore"
910
dssync "github.com/ipfs/go-datastore/sync"
1011
"github.com/libp2p/go-libp2p-core/host"
@@ -25,6 +26,8 @@ const (
2526
ModeClient
2627
// ModeServer operates the DHT as a server, it can both send and respond to queries
2728
ModeServer
29+
// ModeAutoServer operates in the same way as ModeAuto, but acts as a server when reachability is unknown
30+
ModeAutoServer
2831
)
2932

3033
const DefaultPrefix protocol.ID = "/ipfs"
@@ -255,6 +258,15 @@ func ProtocolPrefix(prefix protocol.ID) Option {
255258
}
256259
}
257260

261+
// ProtocolExtension adds an application specific protocol to the DHT protocol. For example,
262+
// /ipfs/lan/kad/1.0.0 instead of /ipfs/kad/1.0.0. extension should be of the form /lan.
263+
func ProtocolExtension(ext protocol.ID) Option {
264+
return func(c *config) error {
265+
c.protocolPrefix += ext
266+
return nil
267+
}
268+
}
269+
258270
// BucketSize configures the bucket size (k in the Kademlia paper) of the routing table.
259271
//
260272
// The default value is 20.

dual/dual.go

Lines changed: 153 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
ci "github.com/libp2p/go-libp2p-core/crypto"
1212
"github.com/libp2p/go-libp2p-core/host"
1313
"github.com/libp2p/go-libp2p-core/peer"
14+
"github.com/libp2p/go-libp2p-core/protocol"
1415
"github.com/libp2p/go-libp2p-core/routing"
1516
dht "github.com/libp2p/go-libp2p-kad-dht"
1617
)
@@ -22,6 +23,9 @@ type DHT struct {
2223
LAN *dht.IpfsDHT
2324
}
2425

26+
// DefaultLanExtension is used to differentiate local protocol requests from those on the WAN DHT.
27+
const DefaultLanExtension protocol.ID = "/lan"
28+
2529
// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
2630
// guarantee, but we can use them to aid refactoring.
2731
var (
@@ -32,12 +36,13 @@ var (
3236
_ routing.ValueStore = (*DHT)(nil)
3337
)
3438

35-
// NewDHT creates a new DualDHT instance. Options provided are forwarded on to the two concrete
39+
// New creates a new DualDHT instance. Options provided are forwarded on to the two concrete
3640
// IpfsDHT internal constructions, modulo additional options used by the Dual DHT to enforce
3741
// the LAN-vs-WAN distinction.
38-
func NewDHT(ctx context.Context, h host.Host, options ...dht.Option) (*DHT, error) {
42+
// Note: query or routing table functional options provided as arguments to this function
43+
// will be overriden by this constructor.
44+
func New(ctx context.Context, h host.Host, options ...dht.Option) (*DHT, error) {
3945
wanOpts := append(options,
40-
dht.ProtocolPrefix(dht.DefaultPrefix),
4146
dht.QueryFilter(dht.PublicQueryFilter),
4247
dht.RoutingTableFilter(dht.PublicRoutingTableFilter),
4348
)
@@ -46,8 +51,11 @@ func NewDHT(ctx context.Context, h host.Host, options ...dht.Option) (*DHT, erro
4651
return nil, err
4752
}
4853

49-
lanOpts := append(options,
50-
dht.ProtocolPrefix(dht.DefaultPrefix+"/lan"),
54+
// Unless overridden by user supplied options, the LAN DHT should default
55+
// to 'AutoServer' mode.
56+
lanOpts := append([]dht.Option{dht.Mode(dht.ModeAutoServer)}, options...)
57+
lanOpts = append(lanOpts,
58+
dht.ProtocolExtension(DefaultLanExtension),
5159
dht.QueryFilter(dht.PrivateQueryFilter),
5260
dht.RoutingTableFilter(dht.PrivateRoutingTableFilter),
5361
)
@@ -61,8 +69,7 @@ func NewDHT(ctx context.Context, h host.Host, options ...dht.Option) (*DHT, erro
6169
}
6270

6371
func (dht *DHT) activeWAN() bool {
64-
wanPeers := dht.WAN.RoutingTable().ListPeers()
65-
return len(wanPeers) > 0
72+
return dht.WAN.RoutingTable().Size() > 0
6673
}
6774

6875
// Provide adds the given cid to the content routing system.
@@ -75,21 +82,72 @@ func (dht *DHT) Provide(ctx context.Context, key cid.Cid, announce bool) error {
7582

7683
// FindProvidersAsync searches for peers who are able to provide a given key
7784
func (dht *DHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo {
78-
if dht.activeWAN() {
79-
return dht.WAN.FindProvidersAsync(ctx, key, count)
80-
}
81-
return dht.LAN.FindProvidersAsync(ctx, key, count)
85+
reqCtx, cancel := context.WithCancel(ctx)
86+
outCh := make(chan peer.AddrInfo)
87+
wanCh := dht.WAN.FindProvidersAsync(reqCtx, key, count)
88+
lanCh := dht.LAN.FindProvidersAsync(reqCtx, key, count)
89+
go func() {
90+
defer cancel()
91+
defer close(outCh)
92+
93+
found := make(map[peer.ID]struct{}, count)
94+
nch := 2
95+
var pi peer.AddrInfo
96+
for nch > 0 && count > 0 {
97+
var ok bool
98+
select {
99+
case pi, ok = <-wanCh:
100+
if !ok {
101+
wanCh = nil
102+
nch--
103+
continue
104+
}
105+
case pi, ok = <-lanCh:
106+
if !ok {
107+
lanCh = nil
108+
nch--
109+
continue
110+
}
111+
}
112+
// already found
113+
if _, ok = found[pi.ID]; ok {
114+
continue
115+
}
116+
117+
select {
118+
case outCh <- pi:
119+
found[pi.ID] = struct{}{}
120+
count--
121+
case <-ctx.Done():
122+
return
123+
}
124+
}
125+
}()
126+
return outCh
82127
}
83128

84129
// FindPeer searches for a peer with given ID
130+
// Note: with signed peer records, we can change this to short circuit once either DHT returns.
85131
func (dht *DHT) FindPeer(ctx context.Context, pid peer.ID) (peer.AddrInfo, error) {
86-
// TODO: should these be run in parallel?
87-
infoa, erra := dht.WAN.FindPeer(ctx, pid)
88-
infob, errb := dht.LAN.FindPeer(ctx, pid)
132+
var wg sync.WaitGroup
133+
wg.Add(2)
134+
var wanInfo, lanInfo peer.AddrInfo
135+
var wanErr, lanErr error
136+
go func() {
137+
defer wg.Done()
138+
wanInfo, wanErr = dht.WAN.FindPeer(ctx, pid)
139+
}()
140+
go func() {
141+
defer wg.Done()
142+
lanInfo, lanErr = dht.LAN.FindPeer(ctx, pid)
143+
}()
144+
145+
wg.Wait()
146+
89147
return peer.AddrInfo{
90148
ID: pid,
91-
Addrs: append(infoa.Addrs, infob.Addrs...),
92-
}, mergeErrors(erra, errb)
149+
Addrs: append(wanInfo.Addrs, lanInfo.Addrs...),
150+
}, mergeErrors(wanErr, lanErr)
93151
}
94152

95153
func mergeErrors(a, b error) error {
@@ -120,13 +178,47 @@ func (dht *DHT) PutValue(ctx context.Context, key string, val []byte, opts ...ro
120178
}
121179

122180
// GetValue searches for the value corresponding to given Key.
123-
func (dht *DHT) GetValue(ctx context.Context, key string, opts ...routing.Option) ([]byte, error) {
124-
vala, erra := dht.WAN.GetValue(ctx, key, opts...)
125-
if vala != nil {
126-
return vala, erra
181+
func (d *DHT) GetValue(ctx context.Context, key string, opts ...routing.Option) ([]byte, error) {
182+
reqCtx, cncl := context.WithCancel(ctx)
183+
defer cncl()
184+
185+
resChan := make(chan []byte)
186+
defer close(resChan)
187+
errChan := make(chan error)
188+
defer close(errChan)
189+
runner := func(impl *dht.IpfsDHT, valCh chan []byte, errCh chan error) {
190+
val, err := impl.GetValue(reqCtx, key, opts...)
191+
if err != nil {
192+
errCh <- err
193+
return
194+
}
195+
valCh <- val
196+
}
197+
go runner(d.WAN, resChan, errChan)
198+
go runner(d.LAN, resChan, errChan)
199+
200+
var err error
201+
var val []byte
202+
select {
203+
case val = <-resChan:
204+
cncl()
205+
case err = <-errChan:
206+
}
207+
208+
// Drain or wait for the slower runner
209+
select {
210+
case secondVal := <-resChan:
211+
if val == nil {
212+
val = secondVal
213+
}
214+
case secondErr := <-errChan:
215+
if err != nil {
216+
err = mergeErrors(err, secondErr)
217+
} else if val == nil {
218+
err = secondErr
219+
}
127220
}
128-
valb, errb := dht.LAN.GetValue(ctx, key, opts...)
129-
return valb, mergeErrors(erra, errb)
221+
return val, err
130222
}
131223

132224
// SearchValue searches for better values from this value
@@ -163,14 +255,45 @@ func (dht *DHT) SearchValue(ctx context.Context, key string, opts ...routing.Opt
163255
}
164256

165257
// GetPublicKey returns the public key for the given peer.
166-
func (dht *DHT) GetPublicKey(ctx context.Context, pid peer.ID) (ci.PubKey, error) {
167-
pka, erra := dht.WAN.GetPublicKey(ctx, pid)
168-
if erra == nil {
169-
return pka, nil
258+
func (d *DHT) GetPublicKey(ctx context.Context, pid peer.ID) (ci.PubKey, error) {
259+
reqCtx, cncl := context.WithCancel(ctx)
260+
defer cncl()
261+
262+
resChan := make(chan ci.PubKey)
263+
defer close(resChan)
264+
errChan := make(chan error)
265+
defer close(errChan)
266+
runner := func(impl *dht.IpfsDHT, valCh chan ci.PubKey, errCh chan error) {
267+
val, err := impl.GetPublicKey(reqCtx, pid)
268+
if err != nil {
269+
errCh <- err
270+
return
271+
}
272+
valCh <- val
170273
}
171-
pkb, errb := dht.LAN.GetPublicKey(ctx, pid)
172-
if errb == nil {
173-
return pkb, nil
274+
go runner(d.WAN, resChan, errChan)
275+
go runner(d.LAN, resChan, errChan)
276+
277+
var err error
278+
var val ci.PubKey
279+
select {
280+
case val = <-resChan:
281+
cncl()
282+
case err = <-errChan:
174283
}
175-
return nil, mergeErrors(erra, errb)
284+
285+
// Drain or wait for the slower runner
286+
select {
287+
case secondVal := <-resChan:
288+
if val == nil {
289+
val = secondVal
290+
}
291+
case secondErr := <-errChan:
292+
if err != nil {
293+
err = mergeErrors(err, secondErr)
294+
} else if val == nil {
295+
err = secondErr
296+
}
297+
}
298+
return val, err
176299
}

subscriber_notifee.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func newSubscriberNotifiee(dht *IpfsDHT) (*subscriberNotifee, error) {
4141

4242
// register for event bus local routability changes in order to trigger switching between client and server modes
4343
// only register for events if the DHT is operating in ModeAuto
44-
if dht.auto {
44+
if dht.auto == ModeAuto || dht.auto == ModeAutoServer {
4545
evts = append(evts, new(event.EvtLocalReachabilityChanged))
4646
}
4747

@@ -96,7 +96,7 @@ func (nn *subscriberNotifee) subscribe(proc goprocess.Process) {
9696
case event.EvtPeerIdentificationCompleted:
9797
handlePeerIdentificationCompletedEvent(dht, evt)
9898
case event.EvtLocalReachabilityChanged:
99-
if dht.auto {
99+
if dht.auto == ModeAuto || dht.auto == ModeAutoServer {
100100
handleLocalReachabilityChangedEvent(dht, evt)
101101
} else {
102102
// something has gone really wrong if we get an event we did not subscribe to
@@ -150,8 +150,14 @@ func handleLocalReachabilityChangedEvent(dht *IpfsDHT, e event.EvtLocalReachabil
150150
var target mode
151151

152152
switch e.Reachability {
153-
case network.ReachabilityPrivate, network.ReachabilityUnknown:
153+
case network.ReachabilityPrivate:
154154
target = modeClient
155+
case network.ReachabilityUnknown:
156+
if dht.auto == ModeAutoServer {
157+
target = modeServer
158+
} else {
159+
target = modeClient
160+
}
155161
case network.ReachabilityPublic:
156162
target = modeServer
157163
}

0 commit comments

Comments
 (0)