Skip to content

Commit 6e16d5e

Browse files
buffered provider (libp2p#1149)
* provider: ResettableKeyStore * keystore: remove mutex * use datastore namespace * don't sync to write to altDs * simplify put * deduplicate operation execution code * buffered provider * tests * removing redundant code * docs * wait on empty queue * fix flaky test
1 parent eade86d commit 6e16d5e

File tree

9 files changed

+661
-27
lines changed

9 files changed

+661
-27
lines changed

go.mod

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,19 @@
11
module github.com/libp2p/go-libp2p-kad-dht
22

3-
go 1.24
3+
go 1.24.0
44

55
require (
6-
github.com/gammazero/deque v1.0.0
6+
github.com/gammazero/deque v1.1.0
77
github.com/google/gopacket v1.1.19
88
github.com/google/uuid v1.6.0
99
github.com/guillaumemichel/reservedpool v0.2.0
1010
github.com/hashicorp/golang-lru v1.0.2
1111
github.com/ipfs/boxo v0.33.1
1212
github.com/ipfs/go-cid v0.5.0
13-
github.com/ipfs/go-datastore v0.8.2
13+
github.com/ipfs/go-datastore v0.8.3
1414
github.com/ipfs/go-detect-race v0.0.1
15-
github.com/ipfs/go-log/v2 v2.8.0
15+
github.com/ipfs/go-dsqueue v0.0.4
16+
github.com/ipfs/go-log/v2 v2.8.1
1617
github.com/ipfs/go-test v0.2.3
1718
github.com/libp2p/go-libp2p v0.43.0
1819
github.com/libp2p/go-libp2p-kbucket v0.7.1-0.20250718125122-f77e735b68e8
@@ -53,6 +54,7 @@ require (
5354
github.com/go-logr/logr v1.4.3 // indirect
5455
github.com/go-logr/stdr v1.2.2 // indirect
5556
github.com/gorilla/websocket v1.5.3 // indirect
57+
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
5658
github.com/huin/goupnp v1.3.0 // indirect
5759
github.com/ipfs/go-block-format v0.2.2 // indirect
5860
github.com/ipld/go-ipld-prime v0.21.0 // indirect

go.sum

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,8 @@ github.com/francoispqt/gojay v1.2.13/go.mod h1:ehT5mTG4ua4581f1++1WLG0vPdaA9HaiD
6363
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
6464
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
6565
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
66-
github.com/gammazero/deque v1.0.0 h1:LTmimT8H7bXkkCy6gZX7zNLtkbz4NdS2z8LZuor3j34=
67-
github.com/gammazero/deque v1.0.0/go.mod h1:iflpYvtGfM3U8S8j+sZEKIak3SAKYpA5/SQewgfXDKo=
66+
github.com/gammazero/deque v1.1.0 h1:OyiyReBbnEG2PP0Bnv1AASLIYvyKqIFN5xfl1t8oGLo=
67+
github.com/gammazero/deque v1.1.0/go.mod h1:JVrR+Bj1NMQbPnYclvDlvSX0nVGReLrQZ0aUMuWLctg=
6868
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
6969
github.com/gliderlabs/ssh v0.1.1/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0=
7070
github.com/go-check/check v0.0.0-20180628173108-788fd7840127/go.mod h1:9ES+weclKsC9YodN5RgxqK/VD9HM9JsCSh7rNhMZE98=
@@ -117,6 +117,8 @@ github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmv
117117
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
118118
github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c=
119119
github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
120+
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
121+
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
120122
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
121123
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
122124
github.com/huin/goupnp v1.3.0 h1:UvLUlWDNpoUdYzb2TCn+MuTWtcjXKSza2n6CBdQ0xXc=
@@ -131,17 +133,19 @@ github.com/ipfs/go-cid v0.5.0 h1:goEKKhaGm0ul11IHA7I6p1GmKz8kEYniqFopaB5Otwg=
131133
github.com/ipfs/go-cid v0.5.0/go.mod h1:0L7vmeNXpQpUS9vt+yEARkJ8rOg43DF3iPgn4GIN0mk=
132134
github.com/ipfs/go-datastore v0.1.0/go.mod h1:d4KVXhMt913cLBEI/PXAy6ko+W7e9AhyAKBGh803qeE=
133135
github.com/ipfs/go-datastore v0.1.1/go.mod h1:w38XXW9kVFNp57Zj5knbKWM2T+KOZCGDRVNdgPHtbHw=
134-
github.com/ipfs/go-datastore v0.8.2 h1:Jy3wjqQR6sg/LhyY0NIePZC3Vux19nLtg7dx0TVqr6U=
135-
github.com/ipfs/go-datastore v0.8.2/go.mod h1:W+pI1NsUsz3tcsAACMtfC+IZdnQTnC/7VfPoJBQuts0=
136+
github.com/ipfs/go-datastore v0.8.3 h1:z391GsQyGKUIUof2tPoaZVeDknbt7fNHs6Gqjcw5Jo4=
137+
github.com/ipfs/go-datastore v0.8.3/go.mod h1:raxQ/CreIy9L6MxT71ItfMX12/ASN6EhXJoUFjICQ2M=
136138
github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk=
137139
github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps=
138140
github.com/ipfs/go-ds-badger v0.0.7/go.mod h1:qt0/fWzZDoPW6jpQeqUjR5kBfhDNB65jd9YlmAvpQBk=
139141
github.com/ipfs/go-ds-leveldb v0.1.0/go.mod h1:hqAW8y4bwX5LWcCtku2rFNX3vjDZCy5LZCg+cSZvYb8=
142+
github.com/ipfs/go-dsqueue v0.0.4 h1:tesq26hKRYPG72Tu9kZKsbsLWp1KBfAxWNQlMyU17tk=
143+
github.com/ipfs/go-dsqueue v0.0.4/go.mod h1:K68ng9BVl+gLr8fqCJKaoXnXqo6MzQ6nV0MhZZFEwg4=
140144
github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw=
141145
github.com/ipfs/go-ipfs-util v0.0.1/go.mod h1:spsl5z8KUnrve+73pOhSVZND1SIxPW5RyBCNzQxlJBc=
142146
github.com/ipfs/go-log v0.0.1/go.mod h1:kL1d2/hzSpI0thNYjiKfjanbVNU+IIGA/WnNESY9leM=
143-
github.com/ipfs/go-log/v2 v2.8.0 h1:SptNTPJQV3s5EF4FdrTu/yVdOKfGbDgn1EBZx4til2o=
144-
github.com/ipfs/go-log/v2 v2.8.0/go.mod h1:2LEEhdv8BGubPeSFTyzbqhCqrwqxCbuTNTLWqgNAipo=
147+
github.com/ipfs/go-log/v2 v2.8.1 h1:Y/X36z7ASoLJaYIJAL4xITXgwf7RVeqb1+/25aq/Xk0=
148+
github.com/ipfs/go-log/v2 v2.8.1/go.mod h1:NyhTBcZmh2Y55eWVjOeKf8M7e4pnJYM3yDZNxQBWEEY=
145149
github.com/ipfs/go-test v0.2.3 h1:Z/jXNAReQFtCYyn7bsv/ZqUwS6E7iIcSpJ2CuzCvnrc=
146150
github.com/ipfs/go-test v0.2.3/go.mod h1:QW8vSKkwYvWFwIZQLGQXdkt9Ud76eQXRQ9Ao2H+cA1o=
147151
github.com/ipld/go-ipld-prime v0.21.0 h1:n4JmcpOlPDIxBcY037SVfpd1G+Sj1nKZah0m6QH9C2E=

provider/buffered/options.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// Package buffered provides a buffered provider implementation that queues operations
2+
// and processes them in batches for improved performance.
3+
package buffered
4+
5+
import "time"
6+
7+
const (
8+
// DefaultDsName is the default datastore namespace for the buffered provider.
9+
DefaultDsName = "bprov" // for buffered provider
10+
// DefaultBatchSize is the default number of operations to process in a single batch.
11+
DefaultBatchSize = 1 << 10
12+
// DefaultIdleWriteTime is the default duration to wait before flushing pending operations.
13+
DefaultIdleWriteTime = time.Minute
14+
)
15+
16+
// config contains all options for the buffered provider.
17+
type config struct {
18+
dsName string
19+
batchSize int
20+
idleWriteTime time.Duration
21+
}
22+
23+
// Option is a function that configures the buffered provider.
24+
type Option func(*config)
25+
26+
// getOpts creates a config and applies Options to it.
27+
func getOpts(opts []Option) config {
28+
cfg := config{
29+
dsName: DefaultDsName,
30+
batchSize: DefaultBatchSize,
31+
idleWriteTime: DefaultIdleWriteTime,
32+
}
33+
34+
for _, opt := range opts {
35+
opt(&cfg)
36+
}
37+
return cfg
38+
}
39+
40+
// WithDsName sets the datastore namespace for the buffered provider.
41+
// If name is empty, the option is ignored.
42+
func WithDsName(name string) Option {
43+
return func(c *config) {
44+
if len(name) > 0 {
45+
c.dsName = name
46+
}
47+
}
48+
}
49+
50+
// WithBatchSize sets the number of operations to process in a single batch.
51+
// If n is zero or negative, the option is ignored.
52+
func WithBatchSize(n int) Option {
53+
return func(c *config) {
54+
if n > 0 {
55+
c.batchSize = n
56+
}
57+
}
58+
}
59+
60+
// WithIdleWriteTime sets the duration to wait before flushing pending operations.
61+
func WithIdleWriteTime(d time.Duration) Option {
62+
return func(c *config) {
63+
c.idleWriteTime = d
64+
}
65+
}

0 commit comments

Comments
 (0)