Skip to content

Commit 54e8d3c

Browse files
buffered provider
1 parent 2f182c7 commit 54e8d3c

File tree

8 files changed

+354
-27
lines changed

8 files changed

+354
-27
lines changed

go.mod

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,19 @@
11
module github.com/libp2p/go-libp2p-kad-dht
22

3-
go 1.24
3+
go 1.24.0
44

55
require (
6-
github.com/gammazero/deque v1.0.0
6+
github.com/gammazero/deque v1.1.0
77
github.com/google/gopacket v1.1.19
88
github.com/google/uuid v1.6.0
99
github.com/guillaumemichel/reservedpool v0.2.0
1010
github.com/hashicorp/golang-lru v1.0.2
1111
github.com/ipfs/boxo v0.33.1
1212
github.com/ipfs/go-cid v0.5.0
13-
github.com/ipfs/go-datastore v0.8.2
13+
github.com/ipfs/go-datastore v0.8.3
1414
github.com/ipfs/go-detect-race v0.0.1
15-
github.com/ipfs/go-log/v2 v2.8.0
15+
github.com/ipfs/go-dsqueue v0.0.4
16+
github.com/ipfs/go-log/v2 v2.8.1
1617
github.com/ipfs/go-test v0.2.3
1718
github.com/libp2p/go-libp2p v0.43.0
1819
github.com/libp2p/go-libp2p-kbucket v0.8.0
@@ -53,6 +54,7 @@ require (
5354
github.com/go-logr/logr v1.4.3 // indirect
5455
github.com/go-logr/stdr v1.2.2 // indirect
5556
github.com/gorilla/websocket v1.5.3 // indirect
57+
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
5658
github.com/huin/goupnp v1.3.0 // indirect
5759
github.com/ipfs/go-block-format v0.2.2 // indirect
5860
github.com/ipld/go-ipld-prime v0.21.0 // indirect

go.sum

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,8 @@ github.com/francoispqt/gojay v1.2.13/go.mod h1:ehT5mTG4ua4581f1++1WLG0vPdaA9HaiD
6363
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
6464
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
6565
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
66-
github.com/gammazero/deque v1.0.0 h1:LTmimT8H7bXkkCy6gZX7zNLtkbz4NdS2z8LZuor3j34=
67-
github.com/gammazero/deque v1.0.0/go.mod h1:iflpYvtGfM3U8S8j+sZEKIak3SAKYpA5/SQewgfXDKo=
66+
github.com/gammazero/deque v1.1.0 h1:OyiyReBbnEG2PP0Bnv1AASLIYvyKqIFN5xfl1t8oGLo=
67+
github.com/gammazero/deque v1.1.0/go.mod h1:JVrR+Bj1NMQbPnYclvDlvSX0nVGReLrQZ0aUMuWLctg=
6868
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
6969
github.com/gliderlabs/ssh v0.1.1/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0=
7070
github.com/go-check/check v0.0.0-20180628173108-788fd7840127/go.mod h1:9ES+weclKsC9YodN5RgxqK/VD9HM9JsCSh7rNhMZE98=
@@ -117,6 +117,8 @@ github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmv
117117
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
118118
github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c=
119119
github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
120+
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
121+
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
120122
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
121123
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
122124
github.com/huin/goupnp v1.3.0 h1:UvLUlWDNpoUdYzb2TCn+MuTWtcjXKSza2n6CBdQ0xXc=
@@ -131,17 +133,19 @@ github.com/ipfs/go-cid v0.5.0 h1:goEKKhaGm0ul11IHA7I6p1GmKz8kEYniqFopaB5Otwg=
131133
github.com/ipfs/go-cid v0.5.0/go.mod h1:0L7vmeNXpQpUS9vt+yEARkJ8rOg43DF3iPgn4GIN0mk=
132134
github.com/ipfs/go-datastore v0.1.0/go.mod h1:d4KVXhMt913cLBEI/PXAy6ko+W7e9AhyAKBGh803qeE=
133135
github.com/ipfs/go-datastore v0.1.1/go.mod h1:w38XXW9kVFNp57Zj5knbKWM2T+KOZCGDRVNdgPHtbHw=
134-
github.com/ipfs/go-datastore v0.8.2 h1:Jy3wjqQR6sg/LhyY0NIePZC3Vux19nLtg7dx0TVqr6U=
135-
github.com/ipfs/go-datastore v0.8.2/go.mod h1:W+pI1NsUsz3tcsAACMtfC+IZdnQTnC/7VfPoJBQuts0=
136+
github.com/ipfs/go-datastore v0.8.3 h1:z391GsQyGKUIUof2tPoaZVeDknbt7fNHs6Gqjcw5Jo4=
137+
github.com/ipfs/go-datastore v0.8.3/go.mod h1:raxQ/CreIy9L6MxT71ItfMX12/ASN6EhXJoUFjICQ2M=
136138
github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk=
137139
github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps=
138140
github.com/ipfs/go-ds-badger v0.0.7/go.mod h1:qt0/fWzZDoPW6jpQeqUjR5kBfhDNB65jd9YlmAvpQBk=
139141
github.com/ipfs/go-ds-leveldb v0.1.0/go.mod h1:hqAW8y4bwX5LWcCtku2rFNX3vjDZCy5LZCg+cSZvYb8=
142+
github.com/ipfs/go-dsqueue v0.0.4 h1:tesq26hKRYPG72Tu9kZKsbsLWp1KBfAxWNQlMyU17tk=
143+
github.com/ipfs/go-dsqueue v0.0.4/go.mod h1:K68ng9BVl+gLr8fqCJKaoXnXqo6MzQ6nV0MhZZFEwg4=
140144
github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw=
141145
github.com/ipfs/go-ipfs-util v0.0.1/go.mod h1:spsl5z8KUnrve+73pOhSVZND1SIxPW5RyBCNzQxlJBc=
142146
github.com/ipfs/go-log v0.0.1/go.mod h1:kL1d2/hzSpI0thNYjiKfjanbVNU+IIGA/WnNESY9leM=
143-
github.com/ipfs/go-log/v2 v2.8.0 h1:SptNTPJQV3s5EF4FdrTu/yVdOKfGbDgn1EBZx4til2o=
144-
github.com/ipfs/go-log/v2 v2.8.0/go.mod h1:2LEEhdv8BGubPeSFTyzbqhCqrwqxCbuTNTLWqgNAipo=
147+
github.com/ipfs/go-log/v2 v2.8.1 h1:Y/X36z7ASoLJaYIJAL4xITXgwf7RVeqb1+/25aq/Xk0=
148+
github.com/ipfs/go-log/v2 v2.8.1/go.mod h1:NyhTBcZmh2Y55eWVjOeKf8M7e4pnJYM3yDZNxQBWEEY=
145149
github.com/ipfs/go-test v0.2.3 h1:Z/jXNAReQFtCYyn7bsv/ZqUwS6E7iIcSpJ2CuzCvnrc=
146150
github.com/ipfs/go-test v0.2.3/go.mod h1:QW8vSKkwYvWFwIZQLGQXdkt9Ud76eQXRQ9Ao2H+cA1o=
147151
github.com/ipld/go-ipld-prime v0.21.0 h1:n4JmcpOlPDIxBcY037SVfpd1G+Sj1nKZah0m6QH9C2E=

provider/buffered/options.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// Package buffered provides a buffered provider implementation that queues operations
2+
// and processes them in batches for improved performance.
3+
package buffered
4+
5+
import "time"
6+
7+
const (
8+
// DefaultDsName is the default datastore namespace for the buffered provider.
9+
DefaultDsName = "bprov" // for buffered provider
10+
// DefaultBatchSize is the default number of operations to process in a single batch.
11+
DefaultBatchSize = 1 << 10
12+
// DefaultIdleWriteTime is the default duration to wait before flushing pending operations.
13+
DefaultIdleWriteTime = time.Minute
14+
)
15+
16+
// config contains all options for the buffered provider.
17+
type config struct {
18+
dsName string
19+
batchSize int
20+
idleWriteTime time.Duration
21+
}
22+
23+
// Option is a function that configures the buffered provider.
24+
type Option func(*config)
25+
26+
// getOpts creates a config and applies Options to it.
27+
func getOpts(opts []Option) config {
28+
cfg := config{
29+
dsName: DefaultDsName,
30+
batchSize: DefaultBatchSize,
31+
idleWriteTime: DefaultIdleWriteTime,
32+
}
33+
34+
for _, opt := range opts {
35+
opt(&cfg)
36+
}
37+
return cfg
38+
}
39+
40+
// WithDsName sets the datastore namespace for the buffered provider.
41+
// If name is empty, the option is ignored.
42+
func WithDsName(name string) Option {
43+
return func(c *config) {
44+
if len(name) > 0 {
45+
c.dsName = name
46+
}
47+
}
48+
}
49+
50+
// WithBatchSize sets the number of operations to process in a single batch.
51+
// If n is zero or negative, the option is ignored.
52+
func WithBatchSize(n int) Option {
53+
return func(c *config) {
54+
if n > 0 {
55+
c.batchSize = n
56+
}
57+
}
58+
}
59+
60+
// WithIdleWriteTime sets the duration to wait before flushing pending operations.
61+
func WithIdleWriteTime(d time.Duration) Option {
62+
return func(c *config) {
63+
c.idleWriteTime = d
64+
}
65+
}

provider/buffered/provider.go

Lines changed: 244 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,244 @@
1+
package buffered
2+
3+
import (
4+
"errors"
5+
"sync"
6+
7+
"github.com/ipfs/go-datastore"
8+
"github.com/ipfs/go-dsqueue"
9+
logging "github.com/ipfs/go-log/v2"
10+
"github.com/libp2p/go-libp2p-kad-dht/provider"
11+
"github.com/libp2p/go-libp2p-kad-dht/provider/internal"
12+
mh "github.com/multiformats/go-multihash"
13+
)
14+
15+
var logger = logging.Logger(provider.LoggerName)
16+
17+
const (
18+
// provideOnceOp represents a one-time provide operation.
19+
provideOnceOp byte = iota
20+
// startProvidingOp represents starting continuous providing.
21+
startProvidingOp
22+
// forceStartProvidingOp represents forcefully starting providing (overrides existing).
23+
forceStartProvidingOp
24+
// stopProvidingOp represents stopping providing.
25+
stopProvidingOp
26+
// lastOp is used for array sizing.
27+
lastOp
28+
)
29+
30+
var _ internal.Provider = (*SweepingProvider)(nil)
31+
32+
// SweepingProvider implements a buffered provider that queues operations and
33+
// processes them asynchronously in batches.
34+
type SweepingProvider struct {
35+
closeOnce sync.Once
36+
done chan struct{}
37+
closed chan struct{}
38+
provider internal.Provider
39+
queue *dsqueue.DSQueue
40+
batchSize int
41+
}
42+
43+
// New creates a new SweepingProvider that wraps the given provider with
44+
// buffering capabilities. Operations are queued and processed asynchronously
45+
// in batches for improved performance.
46+
func New(prov internal.Provider, ds datastore.Batching, opts ...Option) *SweepingProvider {
47+
cfg := getOpts(opts)
48+
s := &SweepingProvider{
49+
done: make(chan struct{}),
50+
closed: make(chan struct{}),
51+
52+
provider: prov,
53+
queue: dsqueue.New(ds, cfg.dsName,
54+
dsqueue.WithDedupCacheSize(0), // disable deduplication
55+
dsqueue.WithIdleWriteTime(cfg.idleWriteTime),
56+
),
57+
batchSize: cfg.batchSize,
58+
}
59+
go s.worker()
60+
return s
61+
}
62+
63+
// Close stops the provider and releases all resources.
64+
//
65+
// It waits for the worker goroutine to finish processing current operations
66+
// and closes the underneath provider. The queue current state is persisted on
67+
// the datastore.
68+
func (s *SweepingProvider) Close() error {
69+
var err error
70+
s.closeOnce.Do(func() {
71+
close(s.closed)
72+
err = errors.Join(s.queue.Close(), s.provider.Close())
73+
<-s.done
74+
})
75+
return err
76+
}
77+
78+
// toBytes serializes an operation and multihash into a byte slice for storage.
79+
func toBytes(op byte, key mh.Multihash) []byte {
80+
return append([]byte{op}, key...)
81+
}
82+
83+
// fromBytes deserializes a byte slice back into an operation and multihash.
84+
func fromBytes(data []byte) (byte, mh.Multihash, error) {
85+
op := data[0]
86+
h, err := mh.Cast(data[1:])
87+
return op, h, err
88+
}
89+
90+
// getOperations processes a batch of dequeued operations and groups them by
91+
// type.
92+
//
93+
// It discards multihashes from the `StopProviding` operation if
94+
// `StartProviding` was called after `StopProviding` for the same multihash.
95+
func getOperations(dequeued [][]byte) ([][]mh.Multihash, error) {
96+
ops := [lastOp][]mh.Multihash{}
97+
stopProv := make(map[string]struct{})
98+
99+
for _, bs := range dequeued {
100+
op, h, err := fromBytes(bs)
101+
if err != nil {
102+
return nil, err
103+
}
104+
switch op {
105+
case provideOnceOp:
106+
ops[provideOnceOp] = append(ops[provideOnceOp], h)
107+
case startProvidingOp, forceStartProvidingOp:
108+
delete(stopProv, string(h))
109+
ops[op] = append(ops[op], h)
110+
case stopProvidingOp:
111+
stopProv[string(h)] = struct{}{}
112+
}
113+
}
114+
for hstr := range stopProv {
115+
ops[stopProvidingOp] = append(ops[stopProvidingOp], mh.Multihash(hstr))
116+
}
117+
return ops[:], nil
118+
}
119+
120+
// worker processes operations from the queue in batches.
121+
// It runs in a separate goroutine and continues until the provider is closed.
122+
func (s *SweepingProvider) worker() {
123+
defer close(s.done)
124+
for {
125+
select {
126+
case <-s.closed:
127+
return
128+
default:
129+
}
130+
131+
res, err := s.queue.GetN(s.batchSize)
132+
if err != nil {
133+
logger.Warnf("BufferedSweepingProvider unable to dequeue: %v", err)
134+
continue
135+
}
136+
ops, err := getOperations(res)
137+
if err != nil {
138+
logger.Warnf("BufferedSweepingProvider unable to parse dequeued item: %v", err)
139+
continue
140+
}
141+
142+
// Process `StartProviding` (force=true) ops first, so that if
143+
// `StartProviding` (force=false) is called after, there is no need to
144+
// enqueue the multihash a second time to the provide queue.
145+
err = s.provider.StartProviding(true, ops[forceStartProvidingOp]...)
146+
if err != nil {
147+
logger.Warnf("BufferedSweepingProvider unable to start providing (force): %v", err)
148+
}
149+
err = s.provider.StartProviding(false, ops[startProvidingOp]...)
150+
if err != nil {
151+
logger.Warnf("BufferedSweepingProvider unable to start providing: %v", err)
152+
}
153+
err = s.provider.ProvideOnce(ops[provideOnceOp]...)
154+
if err != nil {
155+
logger.Warnf("BufferedSweepingProvider unable to provide once: %v", err)
156+
}
157+
// Process `StopProviding` last, so that multihashes that should have been
158+
// provided, and then stopped provided in the same batch are provided only
159+
// once. Don't `StopProviding` multihashes, for which `StartProviding` has
160+
// been called after `StopProviding`.
161+
err = s.provider.StopProviding(ops[stopProvidingOp]...)
162+
if err != nil {
163+
logger.Warnf("BufferedSweepingProvider unable to stop providing: %v", err)
164+
}
165+
}
166+
}
167+
168+
// enqueue adds operations to the queue for asynchronous processing.
169+
func (s *SweepingProvider) enqueue(op byte, keys ...mh.Multihash) error {
170+
for _, h := range keys {
171+
if err := s.queue.Put(toBytes(op, h)); err != nil {
172+
return err
173+
}
174+
}
175+
return nil
176+
}
177+
178+
// ProvideOnce enqueues multihashes for which the provider will send provider
179+
// records out only once to the DHT swarm. It does NOT take the responsibility
180+
// to reprovide these keys.
181+
//
182+
// Returns immediately after enqueuing the keys, the actual provide operation
183+
// happens asynchronously. Returns an error if the multihashes couldn't be
184+
// enqueued.
185+
func (s *SweepingProvider) ProvideOnce(keys ...mh.Multihash) error {
186+
return s.enqueue(provideOnceOp, keys...)
187+
}
188+
189+
// StartProviding adds the supplied keys to the queue of keys that will be
190+
// provided to the DHT swarm unless they were already provided in the past. The
191+
// keys will be periodically reprovided until StopProviding is called for the
192+
// same keys or the keys are removed from the Keystore.
193+
//
194+
// If force is true, the keys are provided to the DHT swarm regardless of
195+
// whether they were already being reprovided in the past.
196+
//
197+
// Returns immediately after enqueuing the keys, the actual provide operation
198+
// happens asynchronously. Returns an error if the multihashes couldn't be
199+
// enqueued.
200+
func (s *SweepingProvider) StartProviding(force bool, keys ...mh.Multihash) error {
201+
op := startProvidingOp
202+
if force {
203+
op = forceStartProvidingOp
204+
}
205+
return s.enqueue(op, keys...)
206+
}
207+
208+
// StopProviding adds the supplied multihashes to the BufferedSweepingProvider
209+
// queue, to stop reproviding the given keys to the DHT swarm.
210+
//
211+
// The node stops being referred as a provider when the provider records in the
212+
// DHT swarm expire.
213+
//
214+
// Returns immediately after enqueuing the keys, the actual provide operation
215+
// happens asynchronously. Returns an error if the multihashes couldn't be
216+
// enqueued.
217+
func (s *SweepingProvider) StopProviding(keys ...mh.Multihash) error {
218+
return s.enqueue(stopProvidingOp, keys...)
219+
}
220+
221+
// Clear clears the all the keys from the provide queue and returns the number
222+
// of keys that were cleared.
223+
//
224+
// The keys are not deleted from the keystore, so they will continue to be
225+
// reprovided as scheduled.
226+
func (s *SweepingProvider) Clear() int {
227+
return s.provider.Clear()
228+
}
229+
230+
// RefreshSchedule scans the KeyStore for any keys that are not currently
231+
// scheduled for reproviding. If such keys are found, it schedules their
232+
// associated keyspace region to be reprovided.
233+
//
234+
// This function doesn't remove prefixes that have no keys from the schedule.
235+
// This is done automatically during the reprovide operation if a region has no
236+
// keys.
237+
//
238+
// Returns an error if the provider is closed or if the node is currently
239+
// Offline (either never bootstrapped, or disconnected since more than
240+
// `OfflineDelay`). The schedule depends on the network size, hence recent
241+
// network connectivity is essential.
242+
func (s *SweepingProvider) RefreshSchedule() error {
243+
return s.provider.RefreshSchedule()
244+
}

dual/provider/options.go renamed to provider/dual/options.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package provider
1+
package dual
22

33
import (
44
"errors"

0 commit comments

Comments
 (0)