Skip to content

Commit 0d90799

Browse files
committed
cleaner parallelism
1 parent 02d310f commit 0d90799

File tree

2 files changed

+28
-98
lines changed

2 files changed

+28
-98
lines changed

dual/dual.go

Lines changed: 27 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ package dual
44

55
import (
66
"context"
7-
"fmt"
87
"sync"
98

109
"github.com/ipfs/go-cid"
@@ -15,6 +14,8 @@ import (
1514
"github.com/libp2p/go-libp2p-core/routing"
1615
dht "github.com/libp2p/go-libp2p-kad-dht"
1716
helper "github.com/libp2p/go-libp2p-routing-helpers"
17+
18+
"github.com/hashicorp/go-multierror"
1819
)
1920

2021
// DHT implements the routing interface to provide two concrete DHT implementationts for use
@@ -24,8 +25,8 @@ type DHT struct {
2425
LAN *dht.IpfsDHT
2526
}
2627

27-
// DefaultLanExtension is used to differentiate local protocol requests from those on the WAN DHT.
28-
const DefaultLanExtension protocol.ID = "/lan"
28+
// LanExtension is used to differentiate local protocol requests from those on the WAN DHT.
29+
const LanExtension protocol.ID = "/lan"
2930

3031
// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
3132
// guarantee, but we can use them to aid refactoring.
@@ -55,7 +56,7 @@ func New(ctx context.Context, h host.Host, options ...dht.Option) (*DHT, error)
5556
// Unless overridden by user supplied options, the LAN DHT should default
5657
// to 'AutoServer' mode.
5758
lanOpts := append(options,
58-
dht.ProtocolExtension(DefaultLanExtension),
59+
dht.ProtocolExtension(LanExtension),
5960
dht.QueryFilter(dht.PrivateQueryFilter),
6061
dht.RoutingTableFilter(dht.PrivateRoutingTableFilter),
6162
)
@@ -73,7 +74,7 @@ func New(ctx context.Context, h host.Host, options ...dht.Option) (*DHT, error)
7374

7475
// Close closes the DHT context.
7576
func (dht *DHT) Close() error {
76-
return mergeErrors(dht.WAN.Close(), dht.LAN.Close())
77+
return multierror.Append(dht.WAN.Close(), dht.LAN.Close()).ErrorOrNil()
7778
}
7879

7980
func (dht *DHT) activeWAN() bool {
@@ -99,21 +100,18 @@ func (dht *DHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int)
99100
defer close(outCh)
100101

101102
found := make(map[peer.ID]struct{}, count)
102-
nch := 2
103103
var pi peer.AddrInfo
104-
for nch > 0 && count > 0 {
104+
for count > 0 && (wanCh != nil || lanCh != nil) {
105105
var ok bool
106106
select {
107107
case pi, ok = <-wanCh:
108108
if !ok {
109109
wanCh = nil
110-
nch--
111110
continue
112111
}
113112
case pi, ok = <-lanCh:
114113
if !ok {
115114
lanCh = nil
116-
nch--
117115
continue
118116
}
119117
}
@@ -155,26 +153,15 @@ func (dht *DHT) FindPeer(ctx context.Context, pid peer.ID) (peer.AddrInfo, error
155153
return peer.AddrInfo{
156154
ID: pid,
157155
Addrs: append(wanInfo.Addrs, lanInfo.Addrs...),
158-
}, mergeErrors(wanErr, lanErr)
159-
}
160-
161-
func mergeErrors(a, b error) error {
162-
if a == nil && b == nil {
163-
return nil
164-
} else if a != nil && b != nil {
165-
return fmt.Errorf("%v, %v", a, b)
166-
} else if a != nil {
167-
return a
168-
}
169-
return b
156+
}, multierror.Append(wanErr, lanErr).ErrorOrNil()
170157
}
171158

172159
// Bootstrap allows callers to hint to the routing system to get into a
173160
// Boostrapped state and remain there.
174161
func (dht *DHT) Bootstrap(ctx context.Context) error {
175162
erra := dht.WAN.Bootstrap(ctx)
176163
errb := dht.LAN.Bootstrap(ctx)
177-
return mergeErrors(erra, errb)
164+
return multierror.Append(erra, errb).ErrorOrNil()
178165
}
179166

180167
// PutValue adds value corresponding to given Key.
@@ -190,43 +177,24 @@ func (d *DHT) GetValue(ctx context.Context, key string, opts ...routing.Option)
190177
reqCtx, cncl := context.WithCancel(ctx)
191178
defer cncl()
192179

193-
resChan := make(chan []byte)
194-
defer close(resChan)
195-
errChan := make(chan error)
196-
defer close(errChan)
197-
runner := func(impl *dht.IpfsDHT, valCh chan []byte, errCh chan error) {
198-
val, err := impl.GetValue(reqCtx, key, opts...)
199-
if err != nil {
200-
errCh <- err
201-
return
202-
}
203-
valCh <- val
204-
}
205-
go runner(d.WAN, resChan, errChan)
206-
go runner(d.LAN, resChan, errChan)
207-
208-
var err error
209-
var val []byte
210-
select {
211-
case val = <-resChan:
212-
cncl()
213-
case err = <-errChan:
214-
}
180+
var lanVal []byte
181+
var lanErr error
182+
var lanWaiter sync.WaitGroup
183+
lanWaiter.Add(1)
184+
go func() {
185+
defer lanWaiter.Done()
186+
lanVal, lanErr = d.LAN.GetValue(reqCtx, key, opts...)
187+
}()
215188

216-
// Drain or wait for the slower runner
217-
select {
218-
case secondVal := <-resChan:
219-
if val == nil {
220-
val = secondVal
221-
}
222-
case secondErr := <-errChan:
223-
if err != nil {
224-
err = mergeErrors(err, secondErr)
225-
} else if val == nil {
226-
err = secondErr
189+
wanVal, wanErr := d.WAN.GetValue(ctx, key, opts...)
190+
if wanErr != nil {
191+
lanWaiter.Wait()
192+
if lanErr != nil {
193+
return nil, multierror.Append(wanErr, lanErr).ErrorOrNil()
227194
}
195+
return lanVal, nil
228196
}
229-
return val, err
197+
return wanVal, nil
230198
}
231199

232200
// SearchValue searches for better values from this value
@@ -236,45 +204,7 @@ func (dht *DHT) SearchValue(ctx context.Context, key string, opts ...routing.Opt
236204
}
237205

238206
// GetPublicKey returns the public key for the given peer.
239-
func (d *DHT) GetPublicKey(ctx context.Context, pid peer.ID) (ci.PubKey, error) {
240-
reqCtx, cncl := context.WithCancel(ctx)
241-
defer cncl()
242-
243-
resChan := make(chan ci.PubKey)
244-
defer close(resChan)
245-
errChan := make(chan error)
246-
defer close(errChan)
247-
runner := func(impl *dht.IpfsDHT, valCh chan ci.PubKey, errCh chan error) {
248-
val, err := impl.GetPublicKey(reqCtx, pid)
249-
if err != nil {
250-
errCh <- err
251-
return
252-
}
253-
valCh <- val
254-
}
255-
go runner(d.WAN, resChan, errChan)
256-
go runner(d.LAN, resChan, errChan)
257-
258-
var err error
259-
var val ci.PubKey
260-
select {
261-
case val = <-resChan:
262-
cncl()
263-
case err = <-errChan:
264-
}
265-
266-
// Drain or wait for the slower runner
267-
select {
268-
case secondVal := <-resChan:
269-
if val == nil {
270-
val = secondVal
271-
}
272-
case secondErr := <-errChan:
273-
if err != nil {
274-
err = mergeErrors(err, secondErr)
275-
} else if val == nil {
276-
err = secondErr
277-
}
278-
}
279-
return val, err
207+
func (dht *DHT) GetPublicKey(ctx context.Context, pid peer.ID) (ci.PubKey, error) {
208+
p := helper.Parallel{Routers: []routing.Routing{dht.WAN, dht.LAN}, Validator: dht.WAN.Validator}
209+
return p.GetPublicKey(ctx, pid)
280210
}

dual/dual_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ func setupDHTWithFilters(ctx context.Context, t *testing.T, options ...dht.Optio
6565
lanOpts := []dht.Option{
6666
dht.NamespacedValidator("v", blankValidator{}),
6767
dht.ProtocolPrefix("/test"),
68-
dht.ProtocolExtension(DefaultLanExtension),
68+
dht.ProtocolExtension(LanExtension),
6969
dht.DisableAutoRefresh(),
7070
dht.RoutingTableFilter(lanFilter),
7171
dht.Mode(dht.ModeServer),

0 commit comments

Comments
 (0)