Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion provider/buffered/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ const (

var _ internal.Provider = (*SweepingProvider)(nil)

// buffered.SweepingProvider is a wrapper around a SweepingProvider buffering
// SweepingProvider (buffered) is a wrapper around a SweepingProvider buffering
// requests, to allow core operations to return instantly. Operations are
// queued and processed asynchronously in batches for improved performance.
type SweepingProvider struct {
Expand Down
2 changes: 1 addition & 1 deletion provider/buffered/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func TestQueueingMechanism(t *testing.T) {
}

// Wait for operations to be processed by expecting 4 signals
for i := 0; i < 4; i++ {
for i := range 4 {
select {
case <-fake.processed:
case <-time.After(time.Second):
Expand Down
62 changes: 26 additions & 36 deletions provider/dual/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"time"

ds "github.com/ipfs/go-datastore"
"github.com/libp2p/go-libp2p-kad-dht/amino"
"github.com/libp2p/go-libp2p-kad-dht/dual"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
Expand Down Expand Up @@ -38,53 +37,44 @@ type config struct {

type Option func(opt *config) error

func (cfg *config) apply(opts ...Option) error {
for i, o := range opts {
if err := o(cfg); err != nil {
return fmt.Errorf("dual dht provider option %d failed: %w", i, err)
// getOpts creates a config and applies Options to it.
func getOpts(opts []Option, d *dual.DHT) (config, error) {
cfg := config{
reprovideInterval: [2]time.Duration{amino.DefaultReprovideInterval, amino.DefaultReprovideInterval},
maxReprovideDelay: [2]time.Duration{provider.DefaultMaxReprovideDelay, provider.DefaultMaxReprovideDelay},

offlineDelay: [2]time.Duration{provider.DefaultOfflineDelay, provider.DefaultOfflineDelay},
connectivityCheckOnlineInterval: [2]time.Duration{provider.DefaultConnectivityCheckOnlineInterval, provider.DefaultConnectivityCheckOnlineInterval},

maxWorkers: [2]int{4, 4},
dedicatedPeriodicWorkers: [2]int{2, 2},
dedicatedBurstWorkers: [2]int{1, 1},
maxProvideConnsPerWorker: [2]int{20, 20},
}

// Apply options
for i, opt := range opts {
if err := opt(&cfg); err != nil {
return config{}, fmt.Errorf("dual dht provider option %d failed: %w", i, err)
}
}
return nil
}

func (cfg *config) resolveDefaults(d *dual.DHT) {
// Resolve defaults
if cfg.msgSenders[lanID] == nil {
cfg.msgSenders[lanID] = d.LAN.MessageSender()
}
if cfg.msgSenders[wanID] == nil {
cfg.msgSenders[wanID] = d.WAN.MessageSender()
}
}

func (c *config) validate() error {
if c.dedicatedPeriodicWorkers[lanID]+c.dedicatedBurstWorkers[lanID] > c.maxWorkers[lanID] {
return errors.New("provider config: total dedicated workers exceed max workers")
// Validate config
if cfg.dedicatedPeriodicWorkers[lanID]+cfg.dedicatedBurstWorkers[lanID] > cfg.maxWorkers[lanID] {
return config{}, errors.New("provider config: total dedicated workers exceed max workers")
}
if c.dedicatedPeriodicWorkers[wanID]+c.dedicatedBurstWorkers[wanID] > c.maxWorkers[wanID] {
return errors.New("provider config: total dedicated workers exceed max workers")
if cfg.dedicatedPeriodicWorkers[wanID]+cfg.dedicatedBurstWorkers[wanID] > cfg.maxWorkers[wanID] {
return config{}, errors.New("provider config: total dedicated workers exceed max workers")
}
return nil
}

var DefaultConfig = func(cfg *config) error {
var err error
cfg.keystore, err = keystore.NewKeystore(ds.NewMapDatastore())
if err != nil {
return err
}

cfg.reprovideInterval = [2]time.Duration{amino.DefaultReprovideInterval, amino.DefaultReprovideInterval}
cfg.maxReprovideDelay = [2]time.Duration{provider.DefaultMaxReprovideDelay, provider.DefaultMaxReprovideDelay}

cfg.offlineDelay = [2]time.Duration{provider.DefaultOfflineDelay, provider.DefaultOfflineDelay}
cfg.connectivityCheckOnlineInterval = [2]time.Duration{provider.DefaultConnectivityCheckOnlineInterval, provider.DefaultConnectivityCheckOnlineInterval}

cfg.maxWorkers = [2]int{4, 4}
cfg.dedicatedPeriodicWorkers = [2]int{2, 2}
cfg.dedicatedBurstWorkers = [2]int{1, 1}
cfg.maxProvideConnsPerWorker = [2]int{20, 20}

return nil
return cfg, nil
}

func WithKeystore(ks keystore.Keystore) Option {
Expand Down
46 changes: 35 additions & 11 deletions provider/dual/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p-kad-dht/dual"
"github.com/libp2p/go-libp2p-kad-dht/provider"
Expand All @@ -23,6 +24,8 @@ type SweepingProvider struct {
LAN *provider.SweepingProvider
WAN *provider.SweepingProvider
keystore keystore.Keystore

cleanupFuncs []func() error
}

// New creates a new SweepingProvider that manages provides and reprovides for
Expand All @@ -32,15 +35,19 @@ func New(d *dual.DHT, opts ...Option) (*SweepingProvider, error) {
return nil, errors.New("cannot create sweeping provider for nil dual DHT")
}

var cfg config
err := cfg.apply(append([]Option{DefaultConfig}, opts...)...)
cfg, err := getOpts(opts, d)
if err != nil {
return nil, err
}
cfg.resolveDefaults(d)
err = cfg.validate()
if err != nil {
return nil, err
var cleanupFuncs []func() error
if cfg.keystore == nil {
ds := datastore.NewMapDatastore()
cfg.keystore, err = keystore.NewKeystore(ds)
if err != nil {
ds.Close()
return nil, fmt.Errorf("couldn't create a keystore: %w", err)
}
cleanupFuncs = []func() error{ds.Close, cfg.keystore.Close, func() error { return cfg.keystore.Empty(context.Background()) }}
}

sweepingProviders := make([]*provider.SweepingProvider, 2)
Expand Down Expand Up @@ -74,10 +81,11 @@ func New(d *dual.DHT, opts ...Option) (*SweepingProvider, error) {
}

return &SweepingProvider{
dht: d,
LAN: sweepingProviders[0],
WAN: sweepingProviders[1],
keystore: cfg.keystore,
dht: d,
LAN: sweepingProviders[0],
WAN: sweepingProviders[1],
keystore: cfg.keystore,
cleanupFuncs: cleanupFuncs,
}, nil
}

Expand All @@ -102,9 +110,25 @@ func (s *SweepingProvider) runOnBoth(f func(*provider.SweepingProvider) error) e

// Close stops both DHT providers and releases associated resources.
func (s *SweepingProvider) Close() error {
return s.runOnBoth(func(p *provider.SweepingProvider) error {
err := s.runOnBoth(func(p *provider.SweepingProvider) error {
return p.Close()
})

if s.cleanupFuncs != nil {
// Cleanup keystore and datastore if we created them
var errs []error
for i := len(s.cleanupFuncs) - 1; i >= 0; i-- { // LIFO: last-added is cleaned up first
if f := s.cleanupFuncs[i]; f != nil {
if err := f(); err != nil {
errs = append(errs, err)
}
}
}
if len(errs) > 0 {
err = errors.Join(append(errs, err)...)
}
}
return err
}

// ProvideOnce sends provider records for the specified keys to both DHT swarms
Expand Down
3 changes: 1 addition & 2 deletions provider/internal/connectivity/connectivity.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ type ConnectivityChecker struct {

// New creates a new ConnectivityChecker instance.
func New(checkFunc func() bool, opts ...Option) (*ConnectivityChecker, error) {
var cfg config
err := cfg.apply(append([]Option{DefaultConfig}, opts...)...)
cfg, err := getOpts(opts)
if err != nil {
return nil, err
}
Expand Down
26 changes: 13 additions & 13 deletions provider/internal/connectivity/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,21 @@ type config struct {
onOnline func()
}

func (cfg *config) apply(opts ...Option) error {
for i, o := range opts {
if err := o(cfg); err != nil {
return fmt.Errorf("reprovider dht option %d failed: %w", i, err)
}
}
return nil
}

type Option func(opt *config) error

var DefaultConfig = func(cfg *config) error {
cfg.onlineCheckInterval = 1 * time.Minute
cfg.offlineDelay = 2 * time.Hour
return nil
// getOpts creates a config and applies Options to it.
func getOpts(opts []Option) (config, error) {
cfg := config{
onlineCheckInterval: 1 * time.Minute,
offlineDelay: 2 * time.Hour,
}

for i, opt := range opts {
if err := opt(&cfg); err != nil {
return config{}, fmt.Errorf("connectivity option %d error: %s", i, err)
}
}
return cfg, nil
}

// WithOnlineCheckInterval sets the minimum interval between online checks.
Expand Down
72 changes: 35 additions & 37 deletions provider/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,51 +53,49 @@ type config struct {
maxProvideConnsPerWorker int
}

func (cfg *config) apply(opts ...Option) error {
for i, o := range opts {
if err := o(cfg); err != nil {
return fmt.Errorf("reprovider dht option %d failed: %w", i, err)
type Option func(opt *config) error

// getOpts creates a config and applies Options to it.
func getOpts(opts []Option) (config, error) {
cfg := config{
replicationFactor: amino.DefaultBucketSize,
reprovideInterval: amino.DefaultReprovideInterval,
maxReprovideDelay: DefaultMaxReprovideDelay,
offlineDelay: DefaultOfflineDelay,
connectivityCheckOnlineInterval: DefaultConnectivityCheckOnlineInterval,

maxWorkers: 4,
dedicatedPeriodicWorkers: 2,
dedicatedBurstWorkers: 1,
maxProvideConnsPerWorker: 20,

addLocalRecord: func(mh mh.Multihash) error { return nil },
}

// Apply options
for i, opt := range opts {
if err := opt(&cfg); err != nil {
return config{}, fmt.Errorf("reprovider dht option %d error: %s", i, err)
}
}
return nil
}

func (c *config) validate() error {
if len(c.peerid) == 0 {
return errors.New("reprovider config: peer id is required")
// Validate config
if len(cfg.peerid) == 0 {
return config{}, errors.New("reprovider config: peer id is required")
}
if c.router == nil {
return errors.New("reprovider config: router is required")
if cfg.router == nil {
return config{}, errors.New("reprovider config: router is required")
}
if c.msgSender == nil {
return errors.New("reprovider config: message sender is required")
if cfg.msgSender == nil {
return config{}, errors.New("reprovider config: message sender is required")
}
if c.selfAddrs == nil {
return errors.New("reprovider config: self addrs func is required")
if cfg.selfAddrs == nil {
return config{}, errors.New("reprovider config: self addrs func is required")
}
if c.dedicatedPeriodicWorkers+c.dedicatedBurstWorkers > c.maxWorkers {
return errors.New("reprovider config: total dedicated workers exceed max workers")
if cfg.dedicatedPeriodicWorkers+cfg.dedicatedBurstWorkers > cfg.maxWorkers {
return config{}, errors.New("reprovider config: total dedicated workers exceed max workers")
}
return nil
}

type Option func(opt *config) error

var DefaultConfig = func(cfg *config) error {
cfg.replicationFactor = amino.DefaultBucketSize
cfg.reprovideInterval = amino.DefaultReprovideInterval
cfg.maxReprovideDelay = DefaultMaxReprovideDelay
cfg.offlineDelay = DefaultOfflineDelay
cfg.connectivityCheckOnlineInterval = DefaultConnectivityCheckOnlineInterval

cfg.maxWorkers = 4
cfg.dedicatedPeriodicWorkers = 2
cfg.dedicatedBurstWorkers = 1
cfg.maxProvideConnsPerWorker = 20

cfg.addLocalRecord = func(mh mh.Multihash) error { return nil }

return nil
return cfg, nil
}

// WithReplicationFactor sets the replication factor for provider records. It
Expand Down
9 changes: 2 additions & 7 deletions provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,7 @@ type SweepingProvider struct {

// New creates a new SweepingProvider instance with the supplied options.
func New(opts ...Option) (*SweepingProvider, error) {
var cfg config
err := cfg.apply(append([]Option{DefaultConfig}, opts...)...)
cfg, err := getOpts(opts)
if err != nil {
return nil, err
}
Expand All @@ -176,11 +175,7 @@ func New(opts ...Option) (*SweepingProvider, error) {
cleanup(cleanupFuncs)
return nil, err
}
}
cleanupFuncs = append(cleanupFuncs, cfg.keystore.Close)
if err := cfg.validate(); err != nil {
cleanup(cleanupFuncs)
return nil, err
cleanupFuncs = append(cleanupFuncs, cfg.keystore.Close, func() error { return cfg.keystore.Empty(context.Background()) })
}
meter := otel.Meter("github.com/libp2p/go-libp2p-kad-dht/provider")
providerCounter, err := meter.Int64Counter(
Expand Down
Loading