-
Notifications
You must be signed in to change notification settings - Fork 251
buffered provider #1149
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
buffered provider #1149
Changes from 8 commits
Commits
Show all changes
14 commits
Select commit
Hold shift + click to select a range
9d30fd5
provider: ResettableKeyStore
guillaumemichel 172e857
keystore: remove mutex
guillaumemichel 5088386
use datastore namespace
guillaumemichel d154d71
don't sync to write to altDs
guillaumemichel 38a707a
simplify put
guillaumemichel 2f182c7
deduplicate operation execution code
guillaumemichel 54e8d3c
buffered provider
guillaumemichel e10840a
tests
guillaumemichel 0551bf2
removing redundant code
guillaumemichel 743cd49
docs
guillaumemichel 339b0f0
wait on empty queue
guillaumemichel 450daee
fix flaky test
guillaumemichel bfc6664
Merge branch 'provider' into provider-accelerate-keystore
guillaumemichel 0c86763
Merge branch 'provider-accelerate-keystore' into buffered-provider
guillaumemichel File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
// Package buffered provides a buffered provider implementation that queues operations | ||
// and processes them in batches for improved performance. | ||
package buffered | ||
|
||
import "time" | ||
|
||
const ( | ||
// DefaultDsName is the default datastore namespace for the buffered provider. | ||
DefaultDsName = "bprov" // for buffered provider | ||
// DefaultBatchSize is the default number of operations to process in a single batch. | ||
DefaultBatchSize = 1 << 10 | ||
// DefaultIdleWriteTime is the default duration to wait before flushing pending operations. | ||
DefaultIdleWriteTime = time.Minute | ||
) | ||
|
||
// config contains all options for the buffered provider. | ||
type config struct { | ||
dsName string | ||
batchSize int | ||
idleWriteTime time.Duration | ||
} | ||
|
||
// Option is a function that configures the buffered provider. | ||
type Option func(*config) | ||
|
||
// getOpts creates a config and applies Options to it. | ||
func getOpts(opts []Option) config { | ||
cfg := config{ | ||
dsName: DefaultDsName, | ||
batchSize: DefaultBatchSize, | ||
idleWriteTime: DefaultIdleWriteTime, | ||
} | ||
|
||
for _, opt := range opts { | ||
opt(&cfg) | ||
} | ||
return cfg | ||
} | ||
|
||
// WithDsName sets the datastore namespace for the buffered provider. | ||
// If name is empty, the option is ignored. | ||
func WithDsName(name string) Option { | ||
return func(c *config) { | ||
if len(name) > 0 { | ||
c.dsName = name | ||
} | ||
} | ||
} | ||
|
||
// WithBatchSize sets the number of operations to process in a single batch. | ||
// If n is zero or negative, the option is ignored. | ||
func WithBatchSize(n int) Option { | ||
return func(c *config) { | ||
if n > 0 { | ||
c.batchSize = n | ||
} | ||
} | ||
} | ||
|
||
// WithIdleWriteTime sets the duration to wait before flushing pending operations. | ||
func WithIdleWriteTime(d time.Duration) Option { | ||
return func(c *config) { | ||
c.idleWriteTime = d | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,244 @@ | ||
package buffered | ||
|
||
import ( | ||
"errors" | ||
"sync" | ||
|
||
"github.com/ipfs/go-datastore" | ||
"github.com/ipfs/go-dsqueue" | ||
logging "github.com/ipfs/go-log/v2" | ||
"github.com/libp2p/go-libp2p-kad-dht/provider" | ||
"github.com/libp2p/go-libp2p-kad-dht/provider/internal" | ||
mh "github.com/multiformats/go-multihash" | ||
) | ||
|
||
var logger = logging.Logger(provider.LoggerName) | ||
|
||
const ( | ||
// provideOnceOp represents a one-time provide operation. | ||
provideOnceOp byte = iota | ||
// startProvidingOp represents starting continuous providing. | ||
startProvidingOp | ||
// forceStartProvidingOp represents forcefully starting providing (overrides existing). | ||
forceStartProvidingOp | ||
// stopProvidingOp represents stopping providing. | ||
stopProvidingOp | ||
// lastOp is used for array sizing. | ||
lastOp | ||
) | ||
|
||
var _ internal.Provider = (*SweepingProvider)(nil) | ||
|
||
// SweepingProvider implements a buffered provider that queues operations and | ||
// processes them asynchronously in batches. | ||
type SweepingProvider struct { | ||
closeOnce sync.Once | ||
done chan struct{} | ||
closed chan struct{} | ||
provider internal.Provider | ||
queue *dsqueue.DSQueue | ||
batchSize int | ||
} | ||
|
||
// New creates a new SweepingProvider that wraps the given provider with | ||
// buffering capabilities. Operations are queued and processed asynchronously | ||
// in batches for improved performance. | ||
func New(prov internal.Provider, ds datastore.Batching, opts ...Option) *SweepingProvider { | ||
cfg := getOpts(opts) | ||
s := &SweepingProvider{ | ||
done: make(chan struct{}), | ||
closed: make(chan struct{}), | ||
|
||
provider: prov, | ||
queue: dsqueue.New(ds, cfg.dsName, | ||
dsqueue.WithDedupCacheSize(0), // disable deduplication | ||
dsqueue.WithIdleWriteTime(cfg.idleWriteTime), | ||
), | ||
batchSize: cfg.batchSize, | ||
} | ||
go s.worker() | ||
return s | ||
} | ||
|
||
// Close stops the provider and releases all resources. | ||
// | ||
// It waits for the worker goroutine to finish processing current operations | ||
// and closes the underneath provider. The queue current state is persisted on | ||
// the datastore. | ||
func (s *SweepingProvider) Close() error { | ||
var err error | ||
s.closeOnce.Do(func() { | ||
close(s.closed) | ||
err = errors.Join(s.queue.Close(), s.provider.Close()) | ||
<-s.done | ||
}) | ||
return err | ||
} | ||
|
||
// toBytes serializes an operation and multihash into a byte slice for storage. | ||
func toBytes(op byte, key mh.Multihash) []byte { | ||
return append([]byte{op}, key...) | ||
} | ||
|
||
// fromBytes deserializes a byte slice back into an operation and multihash. | ||
func fromBytes(data []byte) (byte, mh.Multihash, error) { | ||
op := data[0] | ||
h, err := mh.Cast(data[1:]) | ||
return op, h, err | ||
} | ||
|
||
// getOperations processes a batch of dequeued operations and groups them by | ||
// type. | ||
// | ||
// It discards multihashes from the `StopProviding` operation if | ||
// `StartProviding` was called after `StopProviding` for the same multihash. | ||
func getOperations(dequeued [][]byte) ([][]mh.Multihash, error) { | ||
ops := [lastOp][]mh.Multihash{} | ||
stopProv := make(map[string]struct{}) | ||
|
||
for _, bs := range dequeued { | ||
op, h, err := fromBytes(bs) | ||
if err != nil { | ||
return nil, err | ||
} | ||
switch op { | ||
case provideOnceOp: | ||
ops[provideOnceOp] = append(ops[provideOnceOp], h) | ||
case startProvidingOp, forceStartProvidingOp: | ||
delete(stopProv, string(h)) | ||
ops[op] = append(ops[op], h) | ||
case stopProvidingOp: | ||
stopProv[string(h)] = struct{}{} | ||
} | ||
} | ||
for hstr := range stopProv { | ||
ops[stopProvidingOp] = append(ops[stopProvidingOp], mh.Multihash(hstr)) | ||
} | ||
return ops[:], nil | ||
} | ||
|
||
// worker processes operations from the queue in batches. | ||
// It runs in a separate goroutine and continues until the provider is closed. | ||
func (s *SweepingProvider) worker() { | ||
defer close(s.done) | ||
for { | ||
gammazero marked this conversation as resolved.
Show resolved
Hide resolved
|
||
select { | ||
case <-s.closed: | ||
return | ||
default: | ||
} | ||
|
||
res, err := s.queue.GetN(s.batchSize) | ||
if err != nil { | ||
logger.Warnf("BufferedSweepingProvider unable to dequeue: %v", err) | ||
continue | ||
} | ||
ops, err := getOperations(res) | ||
if err != nil { | ||
logger.Warnf("BufferedSweepingProvider unable to parse dequeued item: %v", err) | ||
continue | ||
} | ||
|
||
// Process `StartProviding` (force=true) ops first, so that if | ||
// `StartProviding` (force=false) is called after, there is no need to | ||
// enqueue the multihash a second time to the provide queue. | ||
err = s.provider.StartProviding(true, ops[forceStartProvidingOp]...) | ||
gammazero marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
if err != nil { | ||
logger.Warnf("BufferedSweepingProvider unable to start providing (force): %v", err) | ||
} | ||
err = s.provider.StartProviding(false, ops[startProvidingOp]...) | ||
if err != nil { | ||
logger.Warnf("BufferedSweepingProvider unable to start providing: %v", err) | ||
} | ||
err = s.provider.ProvideOnce(ops[provideOnceOp]...) | ||
guillaumemichel marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
if err != nil { | ||
logger.Warnf("BufferedSweepingProvider unable to provide once: %v", err) | ||
} | ||
// Process `StopProviding` last, so that multihashes that should have been | ||
// provided, and then stopped provided in the same batch are provided only | ||
// once. Don't `StopProviding` multihashes, for which `StartProviding` has | ||
// been called after `StopProviding`. | ||
err = s.provider.StopProviding(ops[stopProvidingOp]...) | ||
if err != nil { | ||
logger.Warnf("BufferedSweepingProvider unable to stop providing: %v", err) | ||
} | ||
} | ||
} | ||
|
||
// enqueue adds operations to the queue for asynchronous processing. | ||
func (s *SweepingProvider) enqueue(op byte, keys ...mh.Multihash) error { | ||
for _, h := range keys { | ||
if err := s.queue.Put(toBytes(op, h)); err != nil { | ||
return err | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
// ProvideOnce enqueues multihashes for which the provider will send provider | ||
// records out only once to the DHT swarm. It does NOT take the responsibility | ||
// to reprovide these keys. | ||
// | ||
// Returns immediately after enqueuing the keys, the actual provide operation | ||
// happens asynchronously. Returns an error if the multihashes couldn't be | ||
// enqueued. | ||
func (s *SweepingProvider) ProvideOnce(keys ...mh.Multihash) error { | ||
return s.enqueue(provideOnceOp, keys...) | ||
} | ||
|
||
// StartProviding adds the supplied keys to the queue of keys that will be | ||
// provided to the DHT swarm unless they were already provided in the past. The | ||
// keys will be periodically reprovided until StopProviding is called for the | ||
// same keys or the keys are removed from the Keystore. | ||
// | ||
// If force is true, the keys are provided to the DHT swarm regardless of | ||
// whether they were already being reprovided in the past. | ||
// | ||
// Returns immediately after enqueuing the keys, the actual provide operation | ||
// happens asynchronously. Returns an error if the multihashes couldn't be | ||
// enqueued. | ||
func (s *SweepingProvider) StartProviding(force bool, keys ...mh.Multihash) error { | ||
op := startProvidingOp | ||
if force { | ||
op = forceStartProvidingOp | ||
} | ||
return s.enqueue(op, keys...) | ||
} | ||
|
||
// StopProviding adds the supplied multihashes to the BufferedSweepingProvider | ||
// queue, to stop reproviding the given keys to the DHT swarm. | ||
// | ||
// The node stops being referred as a provider when the provider records in the | ||
// DHT swarm expire. | ||
// | ||
// Returns immediately after enqueuing the keys, the actual provide operation | ||
// happens asynchronously. Returns an error if the multihashes couldn't be | ||
// enqueued. | ||
func (s *SweepingProvider) StopProviding(keys ...mh.Multihash) error { | ||
return s.enqueue(stopProvidingOp, keys...) | ||
} | ||
|
||
// Clear clears the all the keys from the provide queue and returns the number | ||
// of keys that were cleared. | ||
// | ||
// The keys are not deleted from the keystore, so they will continue to be | ||
// reprovided as scheduled. | ||
func (s *SweepingProvider) Clear() int { | ||
return s.provider.Clear() | ||
} | ||
|
||
// RefreshSchedule scans the KeyStore for any keys that are not currently | ||
// scheduled for reproviding. If such keys are found, it schedules their | ||
// associated keyspace region to be reprovided. | ||
// | ||
// This function doesn't remove prefixes that have no keys from the schedule. | ||
// This is done automatically during the reprovide operation if a region has no | ||
// keys. | ||
// | ||
// Returns an error if the provider is closed or if the node is currently | ||
// Offline (either never bootstrapped, or disconnected since more than | ||
// `OfflineDelay`). The schedule depends on the network size, hence recent | ||
// network connectivity is essential. | ||
func (s *SweepingProvider) RefreshSchedule() error { | ||
return s.provider.RefreshSchedule() | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.