@@ -3,6 +3,7 @@ package shell
3
3
4
4
import (
5
5
"bytes"
6
+ "context"
6
7
"encoding/json"
7
8
"errors"
8
9
"fmt"
@@ -13,6 +14,8 @@ import (
13
14
"strings"
14
15
"time"
15
16
17
+ pstore "github.com/libp2p/go-libp2p-peerstore"
18
+ notif "github.com/libp2p/go-libp2p-routing/notifications"
16
19
ma "github.com/multiformats/go-multiaddr"
17
20
manet "github.com/multiformats/go-multiaddr-net"
18
21
files "github.com/whyrusleeping/go-multipart-files"
@@ -371,6 +374,53 @@ func (s *Shell) FindPeer(peer string) (*PeerInfo, error) {
371
374
return & str .Responses [0 ], nil
372
375
}
373
376
377
+ func (s * Shell ) FindProvs (ctx context.Context , cid string ) (<- chan pstore.PeerInfo , error ) {
378
+ ctx , cancel := context .WithCancel (ctx )
379
+
380
+ resp , err := s .newRequest ("dht/findprovs" , cid ).Send (s .httpcli )
381
+ if err != nil {
382
+ return nil , err
383
+ }
384
+
385
+ if resp .Error != nil {
386
+ return nil , resp .Error
387
+ }
388
+
389
+ // 4 is arbitrary here just to make the channel buffered
390
+ outchan := make (chan pstore.PeerInfo , 4 )
391
+
392
+ go func () {
393
+ defer close (outchan )
394
+ defer cancel ()
395
+
396
+ var n notif.QueryEvent
397
+ decoder := json .NewDecoder (resp .Output )
398
+ for {
399
+ err := decoder .Decode (& n )
400
+ if err != nil {
401
+ return
402
+ }
403
+
404
+ if n .Type == notif .Provider {
405
+ for _ , p := range n .Responses {
406
+ select {
407
+ case outchan <- * p :
408
+ case <- ctx .Done ():
409
+ return
410
+ }
411
+ }
412
+ }
413
+ }
414
+ }()
415
+
416
+ go func () {
417
+ <- ctx .Done ()
418
+ resp .Close ()
419
+ }()
420
+
421
+ return outchan , nil
422
+ }
423
+
374
424
func (s * Shell ) Refs (hash string , recursive bool ) (<- chan string , error ) {
375
425
req := s .newRequest ("refs" , hash )
376
426
if recursive {
0 commit comments