Skip to content
This repository was archived by the owner on Mar 28, 2023. It is now read-only.

Commit 9fbedbe

Browse files
authored
Merge pull request #1816 from OpenBazaar/1800-ipns-improvement
IPNS improvements to prevent panic
2 parents 3fd0924 + 209d6cb commit 9fbedbe

File tree

15 files changed

+326
-128
lines changed

15 files changed

+326
-128
lines changed

api/jsonapi.go

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,12 @@ import (
2424
ipnspath "gx/ipfs/QmQAgv6Gaoe2tQpcabqwKXKChp2MZ7i3UXv9DqTTaxCaTR/go-path"
2525
files "gx/ipfs/QmQmhotPUzVrMEWNK3x1R5jQ5ZHWyL7tVUrmRPjrBrvyCb/go-ipfs-files"
2626
cid "gx/ipfs/QmTbxNB1NwDesLmKTscr4udL2tVP7MaxvXnD1D9yX7g3PN/go-cid"
27-
datastore "gx/ipfs/QmUadX5EcvrBmxAV9sE7wUWtWSqxns5K84qKJBixmcT1w9/go-datastore"
2827
ipns "gx/ipfs/QmUwMnKKjH3JwGKNVZ3TcP37W93xzqNA4ECFFiMo6sXkkc/go-ipns"
2928
iface "gx/ipfs/QmXLwxifxwfc2bAwq6rdjbYqAsGzWsDE9RM5TWMGtykyj6/interface-go-ipfs-core"
3029
peer "gx/ipfs/QmYVXrKrKHDC9FobgmcmshCDyWwdrfwfanNQN4oxJ9Fk3h/go-libp2p-peer"
3130
routing "gx/ipfs/QmYxUdYY9S6yg5tSPVin5GFTvtfsLauVcr7reHDD3dM8xf/go-libp2p-routing"
3231
ps "gx/ipfs/QmaCTz9RkrU13bm9kMB54f7atgqM4qkjDZpRwRoJiWXEqs/go-libp2p-peerstore"
32+
ggproto "gx/ipfs/QmddjPSGZb3ieihSseFeCfVRpZzcqczPNsD2DvarSwnjJB/gogo-protobuf/proto"
3333
mh "gx/ipfs/QmerPMzPk1mJVowm8KgmoknWa4yCYvvugMPsgWmDNUvDLW/go-multihash"
3434

3535
"github.com/OpenBazaar/jsonpb"
@@ -45,7 +45,6 @@ import (
4545
"github.com/golang/protobuf/proto"
4646
"github.com/golang/protobuf/ptypes"
4747
"github.com/ipfs/go-ipfs/core/coreapi"
48-
"github.com/ipfs/go-ipfs/namesys"
4948
"github.com/ipfs/go-ipfs/repo/fsrepo"
5049
)
5150

@@ -3778,6 +3777,7 @@ func (i *jsonAPIHandler) GETWalletStatus(w http.ResponseWriter, r *http.Request)
37783777
}
37793778

37803779
func (i *jsonAPIHandler) GETIPNS(w http.ResponseWriter, r *http.Request) {
3780+
ipfsStore := i.node.IpfsNode.Repo.Datastore()
37813781
_, peerID := path.Split(r.URL.Path)
37823782

37833783
pid, err := peer.IDB58Decode(peerID)
@@ -3786,7 +3786,7 @@ func (i *jsonAPIHandler) GETIPNS(w http.ResponseWriter, r *http.Request) {
37863786
return
37873787
}
37883788

3789-
val, err := i.node.IpfsNode.Repo.Datastore().Get(namesys.IpnsDsKey(pid))
3789+
peerIPNSRecord, err := ipfs.GetCachedIPNSRecord(ipfsStore, pid)
37903790
if err != nil { // No record in datastore
37913791
ErrorResponse(w, http.StatusNotFound, err.Error())
37923792
return
@@ -3795,7 +3795,7 @@ func (i *jsonAPIHandler) GETIPNS(w http.ResponseWriter, r *http.Request) {
37953795
var keyBytes []byte
37963796
pubkey := i.node.IpfsNode.Peerstore.PubKey(pid)
37973797
if pubkey == nil || !pid.MatchesPublicKey(pubkey) {
3798-
keyval, err := i.node.IpfsNode.Repo.Datastore().Get(datastore.NewKey(core.KeyCachePrefix + peerID))
3798+
keyval, err := ipfs.GetCachedPubkey(ipfsStore, peerID)
37993799
if err != nil {
38003800
ErrorResponse(w, http.StatusNotFound, err.Error())
38013801
return
@@ -3813,8 +3813,13 @@ func (i *jsonAPIHandler) GETIPNS(w http.ResponseWriter, r *http.Request) {
38133813
Pubkey string `json:"pubkey"`
38143814
Record string `json:"record"`
38153815
}
3816+
peerIPNSBytes, err := ggproto.Marshal(peerIPNSRecord)
3817+
if err != nil {
3818+
ErrorResponse(w, http.StatusInternalServerError, fmt.Sprintf("marshaling IPNS record: %s", err.Error()))
3819+
return
3820+
}
38163821

3817-
ret := KeyAndRecord{hex.EncodeToString(keyBytes), string(val)}
3822+
ret := KeyAndRecord{hex.EncodeToString(keyBytes), string(peerIPNSBytes)}
38183823
retBytes, err := json.MarshalIndent(ret, "", " ")
38193824
if err != nil {
38203825
ErrorResponse(w, http.StatusInternalServerError, err.Error())
@@ -3839,9 +3844,14 @@ func (i *jsonAPIHandler) GETResolveIPNS(w http.ResponseWriter, r *http.Request)
38393844
var response = respType{PeerID: peerID}
38403845

38413846
if i.node.IpfsNode.Identity.Pretty() == peerID {
3842-
ipnsBytes, err := i.node.IpfsNode.Repo.Datastore().Get(namesys.IpnsDsKey(i.node.IpfsNode.Identity))
3847+
rec, err := ipfs.GetCachedIPNSRecord(i.node.IpfsNode.Repo.Datastore(), i.node.IpfsNode.Identity)
3848+
if err != nil {
3849+
ErrorResponse(w, http.StatusInternalServerError, fmt.Sprintf("retrieving self: %s", err))
3850+
return
3851+
}
3852+
ipnsBytes, err := proto.Marshal(rec)
38433853
if err != nil {
3844-
ErrorResponse(w, http.StatusInternalServerError, fmt.Sprintf("retrieving self from datastore: %s", err))
3854+
ErrorResponse(w, http.StatusInternalServerError, fmt.Sprintf("marshaling self: %s", err))
38453855
return
38463856
}
38473857
response.Record.Hex = hex.EncodeToString(ipnsBytes)

cmd/start.go

Lines changed: 16 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,10 @@ import (
2323
dht "gx/ipfs/QmSY3nkMNLzh9GdbFKK5tT7YMfLpf52iUZ8ZRkr29MJaa5/go-libp2p-kad-dht"
2424
ma "gx/ipfs/QmTZBfrPJmjWsCvHEtX5FE6KimVJhsJg5sBbqEFYf4UZtL/go-multiaddr"
2525
config "gx/ipfs/QmUAuYuiafnJRZxDDX7MuruMNsicYNuyub5vUeAcupUBNs/go-ipfs-config"
26-
ipnspb "gx/ipfs/QmUwMnKKjH3JwGKNVZ3TcP37W93xzqNA4ECFFiMo6sXkkc/go-ipns/pb"
2726
peer "gx/ipfs/QmYVXrKrKHDC9FobgmcmshCDyWwdrfwfanNQN4oxJ9Fk3h/go-libp2p-peer"
2827
oniontp "gx/ipfs/QmYv2MbwHn7qcvAPFisZ94w85crQVpwUuv8G7TuUeBnfPb/go-onion-transport"
2928
ipfslogging "gx/ipfs/QmbkT7eMTyXfpeyB3ZMxxcxg7XH8t6uXp49jqzz4HB7BGF/go-log/writer"
3029
manet "gx/ipfs/Qmc85NSvmSG4Frn9Vb2cBc1rMyULH6D3TNVEfCzSKoUpip/go-multiaddr-net"
31-
proto "gx/ipfs/QmddjPSGZb3ieihSseFeCfVRpZzcqczPNsD2DvarSwnjJB/gogo-protobuf/proto"
3230

3331
"github.com/OpenBazaar/openbazaar-go/api"
3432
"github.com/OpenBazaar/openbazaar-go/core"
@@ -53,7 +51,6 @@ import (
5351
"github.com/ipfs/go-ipfs/commands"
5452
ipfscore "github.com/ipfs/go-ipfs/core"
5553
"github.com/ipfs/go-ipfs/core/corehttp"
56-
"github.com/ipfs/go-ipfs/namesys"
5754
"github.com/ipfs/go-ipfs/repo/fsrepo"
5855
"github.com/natefinch/lumberjack"
5956
"github.com/op/go-logging"
@@ -407,34 +404,28 @@ func (x *Start) Execute(args []string) error {
407404
}
408405
var dhtRouting *dht.IpfsDHT
409406
for _, router := range tiered.Routers {
410-
if r, ok := router.(*ipfs.CachingRouter); ok {
411-
r.APIRouter().Start(torDialer)
412-
dhtRouting, err = r.DHT()
413-
if err != nil {
414-
return err
415-
}
407+
if r, ok := router.(*dht.IpfsDHT); ok {
408+
dhtRouting = r
416409
}
417410
}
418411
if dhtRouting == nil {
419412
return errors.New("IPFS DHT routing is not configured")
420413
}
421414

422415
// Get current directory root hash
423-
ipnskey := namesys.IpnsDsKey(nd.Identity)
424-
ival, hasherr := nd.Repo.Datastore().Get(ipnskey)
416+
cachedIPNSRecord, hasherr := ipfs.GetCachedIPNSRecord(nd.Repo.Datastore(), nd.Identity)
425417
if hasherr != nil {
426-
log.Error("get ipns key:", hasherr)
427-
}
428-
ourIpnsRecord := new(ipnspb.IpnsEntry)
429-
err = proto.Unmarshal(ival, ourIpnsRecord)
430-
if err != nil {
431-
log.Error("unmarshal record value", err)
432-
nd.Repo.Datastore().Delete(ipnskey)
418+
log.Warning(err)
419+
if err := ipfs.DeleteCachedIPNSRecord(nd.Repo.Datastore(), nd.Identity); err != nil {
420+
log.Errorf("deleting invalid IPNS record: %s", err.Error())
421+
}
433422
}
434423

435424
if x.ForceKeyCachePurge {
436425
log.Infof("forcing key purge from namesys cache...")
437-
nd.Repo.Datastore().Delete(ipnskey)
426+
if err := ipfs.DeleteCachedIPNSRecord(nd.Repo.Datastore(), nd.Identity); err != nil {
427+
log.Errorf("force-purging IPNS record: %s", err.Error())
428+
}
438429
}
439430

440431
// Wallet
@@ -587,6 +578,11 @@ func (x *Start) Execute(args []string) error {
587578
subscriber := ipfs.NewPubsubSubscriber(context.Background(), nd.PeerHost, nd.Routing, nd.Repo.Datastore(), nd.PubSub)
588579
ps := ipfs.Pubsub{Publisher: publisher, Subscriber: subscriber}
589580

581+
var rootHash string
582+
if cachedIPNSRecord != nil {
583+
rootHash = string(cachedIPNSRecord.Value)
584+
}
585+
590586
// OpenBazaar node setup
591587
core.Node = &core.OpenBazaarNode{
592588
AcceptStoreRequests: dataSharing.AcceptStoreRequests,
@@ -601,7 +597,7 @@ func (x *Start) Execute(args []string) error {
601597
PushNodes: pushNodes,
602598
RegressionTestEnable: x.Regtest,
603599
RepoPath: repoPath,
604-
RootHash: string(ourIpnsRecord.Value),
600+
RootHash: rootHash,
605601
TestnetEnable: x.Testnet,
606602
TorDialer: torDialer,
607603
UserAgent: core.USERAGENT,

core/core.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
libp2p "gx/ipfs/QmTW4SdgBWq9GjsBsHeUx8WuGxzhgzAf88UMH2w62PC8yK/go-libp2p-crypto"
1111
ma "gx/ipfs/QmTZBfrPJmjWsCvHEtX5FE6KimVJhsJg5sBbqEFYf4UZtL/go-multiaddr"
1212
cid "gx/ipfs/QmTbxNB1NwDesLmKTscr4udL2tVP7MaxvXnD1D9yX7g3PN/go-cid"
13-
"gx/ipfs/QmUadX5EcvrBmxAV9sE7wUWtWSqxns5K84qKJBixmcT1w9/go-datastore"
1413
peer "gx/ipfs/QmYVXrKrKHDC9FobgmcmshCDyWwdrfwfanNQN4oxJ9Fk3h/go-libp2p-peer"
1514
routing "gx/ipfs/QmYxUdYY9S6yg5tSPVin5GFTvtfsLauVcr7reHDD3dM8xf/go-libp2p-routing"
1615

@@ -332,8 +331,11 @@ func (n *OpenBazaarNode) EncryptMessage(peerID peer.ID, peerKey *libp2p.PubKey,
332331
ctx, cancel := context.WithTimeout(context.Background(), n.OfflineMessageFailoverTimeout)
333332
defer cancel()
334333
if peerKey == nil {
335-
var pubKey libp2p.PubKey
336-
keyval, err := n.IpfsNode.Repo.Datastore().Get(datastore.NewKey(KeyCachePrefix + peerID.Pretty()))
334+
var (
335+
pubKey libp2p.PubKey
336+
store = n.IpfsNode.Repo.Datastore()
337+
)
338+
keyval, err := ipfs.GetCachedPubkey(store, peerID.Pretty())
337339
if err != nil {
338340
pubKey, err = routing.GetPublicKey(n.IpfsNode.Routing, ctx, peerID)
339341
if err != nil {

core/net.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ func (n *OpenBazaarNode) SendOfflineMessage(p peer.ID, k *libp2p.PubKey, m *pb.M
122122
go func() {
123123
ctx, cancel := context.WithCancel(context.Background())
124124
defer cancel()
125-
err := n.Pubsub.Publisher.Publish(ctx, ipfs.MessageTopicPrefix+pointer.Cid.String(), ciphertext)
125+
err := n.Pubsub.Publisher.Publish(ctx, pointer.Cid.String(), ciphertext)
126126
if err != nil {
127127
log.Error(err)
128128
}

core/profile.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,6 @@ import (
2424
"github.com/imdario/mergo"
2525
)
2626

27-
// KeyCachePrefix - cache prefix for public key
28-
const KeyCachePrefix = "/pubkey/"
29-
3027
// ErrorProfileNotFound - profile not found error
3128
var ErrorProfileNotFound = errors.New("profile not found")
3229

ipfs/config.go

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func PrepareIPFSConfig(r repo.Repo, routerAPIEndpoint string, testEnable, regtes
5555
}
5656

5757
func constructRouting(ctx context.Context, host p2phost.Host, dstore ds.Batching, validator record.Validator) (routing.IpfsRouting, error) {
58-
dhtRouting, err := dht.New(
58+
return dht.New(
5959
ctx, host,
6060
dhtopts.Datastore(dstore),
6161
dhtopts.Validator(validator),
@@ -64,12 +64,6 @@ func constructRouting(ctx context.Context, host p2phost.Host, dstore ds.Batching
6464
IPFSProtocolDHTMainnetLegacy,
6565
),
6666
)
67-
if err != nil {
68-
return nil, err
69-
}
70-
apiRouter := NewAPIRouter(routerCacheURI, dhtRouting.Validator)
71-
cachingRouter := NewCachingRouter(dhtRouting, &apiRouter)
72-
return cachingRouter, nil
7367
}
7468

7569
func constructRegtestRouting(ctx context.Context, host p2phost.Host, dstore ds.Batching, validator record.Validator) (routing.IpfsRouting, error) {
@@ -85,21 +79,13 @@ func constructRegtestRouting(ctx context.Context, host p2phost.Host, dstore ds.B
8579
}
8680

8781
func constructTestnetRouting(ctx context.Context, host p2phost.Host, dstore ds.Batching, validator record.Validator) (routing.IpfsRouting, error) {
88-
var (
89-
dhtRouting, err = dht.New(
90-
ctx, host,
91-
dhtopts.Datastore(dstore),
92-
dhtopts.Validator(validator),
93-
dhtopts.Protocols(
94-
IPFSProtocolKademliaTestnetOne,
95-
IPFSProtocolAppTestnetOne,
96-
),
97-
)
82+
return dht.New(
83+
ctx, host,
84+
dhtopts.Datastore(dstore),
85+
dhtopts.Validator(validator),
86+
dhtopts.Protocols(
87+
IPFSProtocolKademliaTestnetOne,
88+
IPFSProtocolAppTestnetOne,
89+
),
9890
)
99-
if err != nil {
100-
return nil, err
101-
}
102-
apiRouter := NewAPIRouter(routerCacheURI, dhtRouting.Validator)
103-
cachingRouter := NewCachingRouter(dhtRouting, &apiRouter)
104-
return cachingRouter, nil
10591
}

ipfs/pubsub.go

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,7 @@ import (
1616
"time"
1717
)
1818

19-
const (
20-
MessageTopicPrefix = "/offlinemessage/"
21-
GlobalIPNSTopic = "IPNS"
22-
GlobalBlockTopic = "BLOCK"
23-
GlobalCIDTopic = "CID"
24-
)
19+
const messageTopicPrefix = "/offlinemessage/"
2520

2621
type Pubsub struct {
2722
Subscriber *PubsubSubscriber
@@ -79,40 +74,42 @@ func NewPubsubSubscriber(ctx context.Context, host p2phost.Host, cr routing.Cont
7974

8075
func (p *PubsubPublisher) Publish(ctx context.Context, topic string, data []byte) error {
8176
p.mx.Lock()
82-
_, ok := p.subs[topic]
77+
id := messageTopicPrefix + topic
78+
_, ok := p.subs[id]
8379

8480
if !ok {
85-
p.subs[topic] = struct{}{}
81+
p.subs[id] = struct{}{}
8682
p.mx.Unlock()
8783

88-
bootstrapPubsub(p.ctx, p.cr, p.host, topic)
84+
bootstrapPubsub(p.ctx, p.cr, p.host, id)
8985
} else {
9086
p.mx.Unlock()
9187
}
9288

93-
log.Debugf("PubsubPublish: publish data for %s", topic)
94-
return p.ps.Publish(topic, data)
89+
log.Debugf("PubsubPublish: publish data for %s", id)
90+
return p.ps.Publish(id, data)
9591
}
9692

9793
func (r *PubsubSubscriber) Subscribe(ctx context.Context, topic string) (chan []byte, error) {
9894
r.mx.Lock()
9995
// see if we already have a pubsub subscription; if not, subscribe
100-
_, ok := r.subs[topic]
96+
id := messageTopicPrefix + topic
97+
_, ok := r.subs[id]
10198
resp := make(chan []byte)
10299
if !ok {
103-
sub, err := r.ps.Subscribe(topic)
100+
sub, err := r.ps.Subscribe(id)
104101
if err != nil {
105102
r.mx.Unlock()
106103
return nil, err
107104
}
108105

109-
log.Debugf("PubsubSubscribe: subscribed to %s", topic)
106+
log.Debugf("PubsubSubscribe: subscribed to %s", id)
110107

111-
r.subs[topic] = sub
108+
r.subs[id] = sub
112109

113110
ctx, cancel := context.WithCancel(r.ctx)
114-
go r.handleSubscription(sub, topic, resp, cancel)
115-
go bootstrapPubsub(ctx, r.cr, r.host, topic)
111+
go r.handleSubscription(sub, id, resp, cancel)
112+
go bootstrapPubsub(ctx, r.cr, r.host, id)
116113
}
117114
r.mx.Unlock()
118115
return resp, nil

0 commit comments

Comments
 (0)