Skip to content

Commit 56a0794

Browse files
1. Expire non-provider records older than MaxAge
2. Original publisher shoulld republish putvalue records
1 parent a12e621 commit 56a0794

File tree

8 files changed

+208
-13
lines changed

8 files changed

+208
-13
lines changed

dht.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ type IpfsDHT struct {
6868
stripedPutLocks [256]sync.Mutex
6969

7070
protocols []protocol.ID // DHT protocols
71+
72+
nonProvRecordsCleanupInterval time.Duration
7173
}
7274

7375
// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
@@ -100,11 +102,18 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er
100102
dht.proc.AddChild(dht.providers.Process())
101103
dht.Validator = cfg.Validator
102104

105+
// proc to expire putValue records
106+
dht.nonProvRecordsCleanupInterval = nonProvRecordsCleanupInterval
107+
recordExpiryProc := goprocessctx.WithContext(ctx)
108+
recordExpiryProc.Go(dht.expireNonProviderRecords)
109+
dht.proc.AddChild(recordExpiryProc)
110+
103111
if !cfg.Client {
104112
for _, p := range cfg.Protocols {
105113
h.SetStreamHandler(p, dht.handleNewStream)
106114
}
107115
}
116+
108117
return dht, nil
109118
}
110119

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ require (
2121
github.com/mr-tron/base58 v1.1.2
2222
github.com/multiformats/go-multiaddr v0.0.4
2323
github.com/multiformats/go-multiaddr-dns v0.0.2
24+
github.com/multiformats/go-multihash v0.0.5
2425
github.com/multiformats/go-multistream v0.1.0
2526
github.com/stretchr/testify v1.3.0
2627
github.com/whyrusleeping/base32 v0.0.0-20170828182744-c30ac30633cc

handlers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ func (dht *IpfsDHT) checkLocalDatastore(k []byte) (*recpb.Record, error) {
121121
recordIsBad = true
122122
}
123123

124-
if time.Since(recvtime) > MaxRecordAge {
124+
if time.Since(recvtime) > maxNonProviderRecordAge {
125125
logger.Debug("old record found, tossing.")
126126
recordIsBad = true
127127
}

providers/providers.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,11 @@ func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching)
7474
return pm
7575
}
7676

77-
const providersKeyPrefix = "/providers/"
77+
// prefix to be used for all provider record keys
78+
const ProvidersKeyPrefix = "/providers/"
7879

7980
func mkProvKey(k cid.Cid) string {
80-
return providersKeyPrefix + base32.RawStdEncoding.EncodeToString(k.Bytes())
81+
return ProvidersKeyPrefix + base32.RawStdEncoding.EncodeToString(k.Bytes())
8182
}
8283

8384
func (pm *ProviderManager) Process() goprocess.Process {
@@ -284,7 +285,7 @@ func (pm *ProviderManager) run(proc goprocess.Process) {
284285

285286
// Now, kick off a GC of the datastore.
286287
q, err := pm.dstore.Query(dsq.Query{
287-
Prefix: providersKeyPrefix,
288+
Prefix: ProvidersKeyPrefix,
288289
})
289290
if err != nil {
290291
log.Error("provider record GC query failed: ", err)

providers/providers_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ func TestProvidesExpire(t *testing.T) {
185185
t.Fatal("providers map not cleaned up")
186186
}
187187

188-
res, err := ds.Query(dsq.Query{Prefix: providersKeyPrefix})
188+
res, err := ds.Query(dsq.Query{Prefix: ProvidersKeyPrefix})
189189
if err != nil {
190190
t.Fatal(err)
191191
}

records.go

Lines changed: 69 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,30 @@ package dht
33
import (
44
"context"
55
"fmt"
6+
"strings"
67
"time"
78

9+
"github.com/gogo/protobuf/proto"
10+
ds "github.com/ipfs/go-datastore"
11+
"github.com/ipfs/go-datastore/query"
12+
u "github.com/ipfs/go-ipfs-util"
13+
"github.com/jbenet/goprocess"
14+
ci "github.com/libp2p/go-libp2p-core/crypto"
815
"github.com/libp2p/go-libp2p-core/peer"
916
"github.com/libp2p/go-libp2p-core/routing"
10-
11-
ci "github.com/libp2p/go-libp2p-core/crypto"
17+
"github.com/libp2p/go-libp2p-kad-dht/providers"
18+
recpb "github.com/libp2p/go-libp2p-record/pb"
1219
)
1320

14-
// MaxRecordAge specifies the maximum time that any node will hold onto a record
21+
// maxNonProviderRecordAge specifies the maximum time that any node will hold onto a record
1522
// from the time its received. This does not apply to any other forms of validity that
1623
// the record may contain.
1724
// For example, a record may contain an ipns entry with an EOL saying its valid
1825
// until the year 2020 (a great time in the future). For that record to stick around
19-
// it must be rebroadcasted more frequently than once every 'MaxRecordAge'
20-
const MaxRecordAge = time.Hour * 36
26+
// it must be rebroadcasted more frequently than once every 'maxNonProviderRecordAge'
27+
var maxNonProviderRecordAge = time.Hour * 12
28+
29+
var nonProvRecordsCleanupInterval = time.Hour * 1
2130

2231
type pubkrs struct {
2332
pubk ci.PubKey
@@ -135,3 +144,58 @@ func (dht *IpfsDHT) getPublicKeyFromNode(ctx context.Context, p peer.ID) (ci.Pub
135144
logger.Debugf("Got public key from node %v itself", p)
136145
return pubk, nil
137146
}
147+
148+
func (dht *IpfsDHT) expireNonProviderRecords(proc goprocess.Process) {
149+
for {
150+
select {
151+
case <-proc.Closing():
152+
return
153+
case <-time.After(dht.nonProvRecordsCleanupInterval):
154+
}
155+
156+
res, err := dht.datastore.Query(query.Query{Filters: []query.Filter{&expireRecordFilter{}}})
157+
if err != nil {
158+
logger.Errorf("expire records proc: failed to run query against datastore, error is %+v", err)
159+
continue
160+
}
161+
162+
for {
163+
e, ok := res.NextSync()
164+
if !ok {
165+
break
166+
}
167+
if err := dht.datastore.Delete(ds.RawKey(e.Key)); err != nil {
168+
logger.Errorf("expire records proc: failed to delete key %s from datastore, error is %+v", e.Key, err)
169+
}
170+
}
171+
}
172+
}
173+
174+
type expireRecordFilter struct{}
175+
176+
func (f *expireRecordFilter) Filter(e query.Entry) bool {
177+
// unmarshal record
178+
rec := new(recpb.Record)
179+
if err := proto.Unmarshal(e.Value, rec); err != nil {
180+
logger.Debugf("expire records filter: failed to unmarshal DHT record from datastore, error is %+v", err)
181+
return false
182+
}
183+
184+
// should not be a provider record
185+
if strings.HasPrefix(e.Key, providers.ProvidersKeyPrefix) {
186+
return false
187+
}
188+
189+
// age should be greater than maxNonProviderRecordAge
190+
t, err := u.ParseRFC3339(rec.TimeReceived)
191+
if err != nil {
192+
logger.Debugf("expire records filter: failed to parse time in DHT record, error is %+v", err)
193+
return false
194+
}
195+
196+
if time.Since(t) > maxNonProviderRecordAge {
197+
return true
198+
}
199+
200+
return false
201+
}

records_test.go

Lines changed: 99 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,22 @@ package dht
33
import (
44
"context"
55
"crypto/rand"
6-
"github.com/libp2p/go-libp2p-core/test"
76
"testing"
87
"time"
98

9+
"github.com/ipfs/go-cid"
10+
"github.com/libp2p/go-libp2p-core/test"
11+
"github.com/multiformats/go-multihash"
12+
"github.com/stretchr/testify/assert"
13+
14+
ds "github.com/ipfs/go-datastore"
1015
u "github.com/ipfs/go-ipfs-util"
1116
ci "github.com/libp2p/go-libp2p-core/crypto"
1217
"github.com/libp2p/go-libp2p-core/peer"
1318
"github.com/libp2p/go-libp2p-core/routing"
14-
record "github.com/libp2p/go-libp2p-record"
15-
tnet "github.com/libp2p/go-libp2p-testing/net"
19+
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
20+
"github.com/libp2p/go-libp2p-record"
21+
"github.com/libp2p/go-libp2p-testing/net"
1622
)
1723

1824
// Check that GetPublicKey() correctly extracts a public key
@@ -305,3 +311,93 @@ func TestPubkeyGoodKeyFromDHTGoodKeyDirect(t *testing.T) {
305311
t.Fatal("got incorrect public key")
306312
}
307313
}
314+
315+
func TestExpireNonProviderRecords(t *testing.T) {
316+
// short sweep duration for testing
317+
sVal := nonProvRecordsCleanupInterval
318+
defer func() { nonProvRecordsCleanupInterval = sVal }()
319+
nonProvRecordsCleanupInterval = 10 * time.Millisecond
320+
321+
ctx, cancel := context.WithCancel(context.Background())
322+
defer cancel()
323+
324+
// helper functions
325+
putRecord := func(d *IpfsDHT, key string, value []byte) error {
326+
rec := record.MakePutRecord(key, value)
327+
pmes := pb.NewMessage(pb.Message_PUT_VALUE, rec.Key, 0)
328+
pmes.Record = rec
329+
_, err := d.handlePutValue(ctx, "testpeer", pmes)
330+
return err
331+
}
332+
333+
addProv := func(d *IpfsDHT, c cid.Cid) error {
334+
msg, err := d.makeProvRecord(c)
335+
pi := peer.AddrInfo{
336+
ID: "testpeer",
337+
Addrs: d.host.Addrs(),
338+
}
339+
msg.ProviderPeers = pb.RawPeerInfosToPBPeers([]peer.AddrInfo{pi})
340+
assert.NoError(t, err)
341+
342+
_, err = d.handleAddProvider(ctx, "testpeer", msg)
343+
return err
344+
}
345+
346+
getProv := func(d *IpfsDHT, c cid.Cid) (*pb.Message, error) {
347+
pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, c.Bytes(), 0)
348+
m, err := d.handleGetProviders(ctx, "test peer", pmes)
349+
return m, err
350+
}
351+
352+
// TEST expiry does not happen if age(record) < MaxAge
353+
d := setupDHT(ctx, t, false)
354+
355+
// put non-provider record with current time
356+
key1 := "/v/key1"
357+
value1 := []byte("v1")
358+
assert.NoError(t, putRecord(d, key1, value1))
359+
360+
// sweep will not delete it
361+
time.Sleep(100 * time.Millisecond)
362+
363+
// get & verify it's present
364+
365+
// we need to check the datastore for non-provider records to test the expiry Proc
366+
// because a side-effect of handle get value is also that it deletes records which are beyond MaxAge
367+
// & we do not want to hit that path
368+
_, err := d.datastore.Get(convertToDsKey([]byte(key1)))
369+
assert.NoError(t, err)
370+
d.Close()
371+
d.host.Close()
372+
373+
// TEST expiry happens if age(record) > MaxAge
374+
mVal := maxNonProviderRecordAge
375+
maxNonProviderRecordAge = 50 * time.Millisecond
376+
defer func() { maxNonProviderRecordAge = mVal }()
377+
378+
d = setupDHT(ctx, t, false)
379+
380+
// put non-provider record with current time
381+
assert.NoError(t, putRecord(d, key1, value1))
382+
383+
// add provider record with current time
384+
mh, err := multihash.Sum([]byte("data"), multihash.SHA2_256, -1)
385+
assert.NoError(t, err)
386+
c := cid.NewCidV0(mh)
387+
assert.NoError(t, addProv(d, c))
388+
389+
// sweep will remove non-provider record now
390+
time.Sleep(100 * time.Millisecond)
391+
392+
// verify non-provider record is absent
393+
_, err = d.datastore.Get(convertToDsKey([]byte(key1)))
394+
assert.Equal(t, ds.ErrNotFound, err)
395+
396+
// but.... provider record is still available
397+
m, err := getProv(d, c)
398+
assert.NoError(t, err)
399+
assert.NotEmpty(t, m.ProviderPeers)
400+
401+
d.Close()
402+
d.host.Close()
403+
}

routing.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ import (
2727
// results will wait for the channel to drain.
2828
var asyncQueryBuffer = 10
2929

30+
var putValueRepublishInterval = 6 * time.Hour
31+
3032
// This file implements the Routing interface for the IpfsDHT struct.
3133

3234
// Basic Put/Get
@@ -98,6 +100,28 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts
98100
}(p)
99101
}
100102
wg.Wait()
103+
104+
// original publisher should keep re-publishing the record because the network isn't `steady`/`stable`
105+
// and the K closet peers we just published to can become unavailable / no longer be the K closet
106+
go func() {
107+
for {
108+
select {
109+
case <-dht.proc.Closing():
110+
return
111+
case <-time.After(putValueRepublishInterval):
112+
// TODO:We can not re-use the original context here as it may have expired
113+
// But, is it fair to use this one ?
114+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
115+
if err := dht.PutValue(ctx, key, value, opts...); err != nil {
116+
logger.Errorf("putValue republish proc: failed to republish key %s, error is %+v", key, err)
117+
} else {
118+
logger.Debugf("putValue republish proc: successfully republished key %s", key)
119+
}
120+
cancel()
121+
}
122+
}
123+
}()
124+
101125
return nil
102126
}
103127

0 commit comments

Comments
 (0)