diff --git a/provider/buffered/provider.go b/provider/buffered/provider.go index 5f335a03a..a1710d60f 100644 --- a/provider/buffered/provider.go +++ b/provider/buffered/provider.go @@ -29,7 +29,7 @@ const ( var _ internal.Provider = (*SweepingProvider)(nil) -// buffered.SweepingProvider is a wrapper around a SweepingProvider buffering +// SweepingProvider (buffered) is a wrapper around a SweepingProvider buffering // requests, to allow core operations to return instantly. Operations are // queued and processed asynchronously in batches for improved performance. type SweepingProvider struct { diff --git a/provider/buffered/provider_test.go b/provider/buffered/provider_test.go index 3c66ae56b..b3552197d 100644 --- a/provider/buffered/provider_test.go +++ b/provider/buffered/provider_test.go @@ -129,7 +129,7 @@ func TestQueueingMechanism(t *testing.T) { } // Wait for operations to be processed by expecting 4 signals - for i := 0; i < 4; i++ { + for i := range 4 { select { case <-fake.processed: case <-time.After(time.Second): diff --git a/provider/dual/options.go b/provider/dual/options.go index e657aeacb..e2e48baac 100644 --- a/provider/dual/options.go +++ b/provider/dual/options.go @@ -5,7 +5,6 @@ import ( "fmt" "time" - ds "github.com/ipfs/go-datastore" "github.com/libp2p/go-libp2p-kad-dht/amino" "github.com/libp2p/go-libp2p-kad-dht/dual" pb "github.com/libp2p/go-libp2p-kad-dht/pb" @@ -38,53 +37,44 @@ type config struct { type Option func(opt *config) error -func (cfg *config) apply(opts ...Option) error { - for i, o := range opts { - if err := o(cfg); err != nil { - return fmt.Errorf("dual dht provider option %d failed: %w", i, err) +// getOpts creates a config and applies Options to it. +func getOpts(opts []Option, d *dual.DHT) (config, error) { + cfg := config{ + reprovideInterval: [2]time.Duration{amino.DefaultReprovideInterval, amino.DefaultReprovideInterval}, + maxReprovideDelay: [2]time.Duration{provider.DefaultMaxReprovideDelay, provider.DefaultMaxReprovideDelay}, + + offlineDelay: [2]time.Duration{provider.DefaultOfflineDelay, provider.DefaultOfflineDelay}, + connectivityCheckOnlineInterval: [2]time.Duration{provider.DefaultConnectivityCheckOnlineInterval, provider.DefaultConnectivityCheckOnlineInterval}, + + maxWorkers: [2]int{4, 4}, + dedicatedPeriodicWorkers: [2]int{2, 2}, + dedicatedBurstWorkers: [2]int{1, 1}, + maxProvideConnsPerWorker: [2]int{20, 20}, + } + + // Apply options + for i, opt := range opts { + if err := opt(&cfg); err != nil { + return config{}, fmt.Errorf("dual dht provider option %d failed: %w", i, err) } } - return nil -} -func (cfg *config) resolveDefaults(d *dual.DHT) { + // Resolve defaults if cfg.msgSenders[lanID] == nil { cfg.msgSenders[lanID] = d.LAN.MessageSender() } if cfg.msgSenders[wanID] == nil { cfg.msgSenders[wanID] = d.WAN.MessageSender() } -} -func (c *config) validate() error { - if c.dedicatedPeriodicWorkers[lanID]+c.dedicatedBurstWorkers[lanID] > c.maxWorkers[lanID] { - return errors.New("provider config: total dedicated workers exceed max workers") + // Validate config + if cfg.dedicatedPeriodicWorkers[lanID]+cfg.dedicatedBurstWorkers[lanID] > cfg.maxWorkers[lanID] { + return config{}, errors.New("provider config: total dedicated workers exceed max workers") } - if c.dedicatedPeriodicWorkers[wanID]+c.dedicatedBurstWorkers[wanID] > c.maxWorkers[wanID] { - return errors.New("provider config: total dedicated workers exceed max workers") + if cfg.dedicatedPeriodicWorkers[wanID]+cfg.dedicatedBurstWorkers[wanID] > cfg.maxWorkers[wanID] { + return config{}, errors.New("provider config: total dedicated workers exceed max workers") } - return nil -} - -var DefaultConfig = func(cfg *config) error { - var err error - cfg.keystore, err = keystore.NewKeystore(ds.NewMapDatastore()) - if err != nil { - return err - } - - cfg.reprovideInterval = [2]time.Duration{amino.DefaultReprovideInterval, amino.DefaultReprovideInterval} - cfg.maxReprovideDelay = [2]time.Duration{provider.DefaultMaxReprovideDelay, provider.DefaultMaxReprovideDelay} - - cfg.offlineDelay = [2]time.Duration{provider.DefaultOfflineDelay, provider.DefaultOfflineDelay} - cfg.connectivityCheckOnlineInterval = [2]time.Duration{provider.DefaultConnectivityCheckOnlineInterval, provider.DefaultConnectivityCheckOnlineInterval} - - cfg.maxWorkers = [2]int{4, 4} - cfg.dedicatedPeriodicWorkers = [2]int{2, 2} - cfg.dedicatedBurstWorkers = [2]int{1, 1} - cfg.maxProvideConnsPerWorker = [2]int{20, 20} - - return nil + return cfg, nil } func WithKeystore(ks keystore.Keystore) Option { diff --git a/provider/dual/provider.go b/provider/dual/provider.go index 6a1ac463c..ef2e568ea 100644 --- a/provider/dual/provider.go +++ b/provider/dual/provider.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" dht "github.com/libp2p/go-libp2p-kad-dht" "github.com/libp2p/go-libp2p-kad-dht/dual" "github.com/libp2p/go-libp2p-kad-dht/provider" @@ -23,6 +24,8 @@ type SweepingProvider struct { LAN *provider.SweepingProvider WAN *provider.SweepingProvider keystore keystore.Keystore + + cleanupFuncs []func() error } // New creates a new SweepingProvider that manages provides and reprovides for @@ -32,15 +35,19 @@ func New(d *dual.DHT, opts ...Option) (*SweepingProvider, error) { return nil, errors.New("cannot create sweeping provider for nil dual DHT") } - var cfg config - err := cfg.apply(append([]Option{DefaultConfig}, opts...)...) + cfg, err := getOpts(opts, d) if err != nil { return nil, err } - cfg.resolveDefaults(d) - err = cfg.validate() - if err != nil { - return nil, err + var cleanupFuncs []func() error + if cfg.keystore == nil { + ds := datastore.NewMapDatastore() + cfg.keystore, err = keystore.NewKeystore(ds) + if err != nil { + ds.Close() + return nil, fmt.Errorf("couldn't create a keystore: %w", err) + } + cleanupFuncs = []func() error{ds.Close, cfg.keystore.Close, func() error { return cfg.keystore.Empty(context.Background()) }} } sweepingProviders := make([]*provider.SweepingProvider, 2) @@ -74,10 +81,11 @@ func New(d *dual.DHT, opts ...Option) (*SweepingProvider, error) { } return &SweepingProvider{ - dht: d, - LAN: sweepingProviders[0], - WAN: sweepingProviders[1], - keystore: cfg.keystore, + dht: d, + LAN: sweepingProviders[0], + WAN: sweepingProviders[1], + keystore: cfg.keystore, + cleanupFuncs: cleanupFuncs, }, nil } @@ -102,9 +110,25 @@ func (s *SweepingProvider) runOnBoth(f func(*provider.SweepingProvider) error) e // Close stops both DHT providers and releases associated resources. func (s *SweepingProvider) Close() error { - return s.runOnBoth(func(p *provider.SweepingProvider) error { + err := s.runOnBoth(func(p *provider.SweepingProvider) error { return p.Close() }) + + if s.cleanupFuncs != nil { + // Cleanup keystore and datastore if we created them + var errs []error + for i := len(s.cleanupFuncs) - 1; i >= 0; i-- { // LIFO: last-added is cleaned up first + if f := s.cleanupFuncs[i]; f != nil { + if err := f(); err != nil { + errs = append(errs, err) + } + } + } + if len(errs) > 0 { + err = errors.Join(append(errs, err)...) + } + } + return err } // ProvideOnce sends provider records for the specified keys to both DHT swarms diff --git a/provider/internal/connectivity/connectivity.go b/provider/internal/connectivity/connectivity.go index 495b9a32a..8565d253a 100644 --- a/provider/internal/connectivity/connectivity.go +++ b/provider/internal/connectivity/connectivity.go @@ -53,8 +53,7 @@ type ConnectivityChecker struct { // New creates a new ConnectivityChecker instance. func New(checkFunc func() bool, opts ...Option) (*ConnectivityChecker, error) { - var cfg config - err := cfg.apply(append([]Option{DefaultConfig}, opts...)...) + cfg, err := getOpts(opts) if err != nil { return nil, err } diff --git a/provider/internal/connectivity/options.go b/provider/internal/connectivity/options.go index a67fe2a6a..5ba2ea43d 100644 --- a/provider/internal/connectivity/options.go +++ b/provider/internal/connectivity/options.go @@ -14,21 +14,21 @@ type config struct { onOnline func() } -func (cfg *config) apply(opts ...Option) error { - for i, o := range opts { - if err := o(cfg); err != nil { - return fmt.Errorf("reprovider dht option %d failed: %w", i, err) - } - } - return nil -} - type Option func(opt *config) error -var DefaultConfig = func(cfg *config) error { - cfg.onlineCheckInterval = 1 * time.Minute - cfg.offlineDelay = 2 * time.Hour - return nil +// getOpts creates a config and applies Options to it. +func getOpts(opts []Option) (config, error) { + cfg := config{ + onlineCheckInterval: 1 * time.Minute, + offlineDelay: 2 * time.Hour, + } + + for i, opt := range opts { + if err := opt(&cfg); err != nil { + return config{}, fmt.Errorf("connectivity option %d error: %s", i, err) + } + } + return cfg, nil } // WithOnlineCheckInterval sets the minimum interval between online checks. diff --git a/provider/options.go b/provider/options.go index 5d3ffda99..8e109cf01 100644 --- a/provider/options.go +++ b/provider/options.go @@ -53,51 +53,49 @@ type config struct { maxProvideConnsPerWorker int } -func (cfg *config) apply(opts ...Option) error { - for i, o := range opts { - if err := o(cfg); err != nil { - return fmt.Errorf("reprovider dht option %d failed: %w", i, err) +type Option func(opt *config) error + +// getOpts creates a config and applies Options to it. +func getOpts(opts []Option) (config, error) { + cfg := config{ + replicationFactor: amino.DefaultBucketSize, + reprovideInterval: amino.DefaultReprovideInterval, + maxReprovideDelay: DefaultMaxReprovideDelay, + offlineDelay: DefaultOfflineDelay, + connectivityCheckOnlineInterval: DefaultConnectivityCheckOnlineInterval, + + maxWorkers: 4, + dedicatedPeriodicWorkers: 2, + dedicatedBurstWorkers: 1, + maxProvideConnsPerWorker: 20, + + addLocalRecord: func(mh mh.Multihash) error { return nil }, + } + + // Apply options + for i, opt := range opts { + if err := opt(&cfg); err != nil { + return config{}, fmt.Errorf("reprovider dht option %d error: %s", i, err) } } - return nil -} -func (c *config) validate() error { - if len(c.peerid) == 0 { - return errors.New("reprovider config: peer id is required") + // Validate config + if len(cfg.peerid) == 0 { + return config{}, errors.New("reprovider config: peer id is required") } - if c.router == nil { - return errors.New("reprovider config: router is required") + if cfg.router == nil { + return config{}, errors.New("reprovider config: router is required") } - if c.msgSender == nil { - return errors.New("reprovider config: message sender is required") + if cfg.msgSender == nil { + return config{}, errors.New("reprovider config: message sender is required") } - if c.selfAddrs == nil { - return errors.New("reprovider config: self addrs func is required") + if cfg.selfAddrs == nil { + return config{}, errors.New("reprovider config: self addrs func is required") } - if c.dedicatedPeriodicWorkers+c.dedicatedBurstWorkers > c.maxWorkers { - return errors.New("reprovider config: total dedicated workers exceed max workers") + if cfg.dedicatedPeriodicWorkers+cfg.dedicatedBurstWorkers > cfg.maxWorkers { + return config{}, errors.New("reprovider config: total dedicated workers exceed max workers") } - return nil -} - -type Option func(opt *config) error - -var DefaultConfig = func(cfg *config) error { - cfg.replicationFactor = amino.DefaultBucketSize - cfg.reprovideInterval = amino.DefaultReprovideInterval - cfg.maxReprovideDelay = DefaultMaxReprovideDelay - cfg.offlineDelay = DefaultOfflineDelay - cfg.connectivityCheckOnlineInterval = DefaultConnectivityCheckOnlineInterval - - cfg.maxWorkers = 4 - cfg.dedicatedPeriodicWorkers = 2 - cfg.dedicatedBurstWorkers = 1 - cfg.maxProvideConnsPerWorker = 20 - - cfg.addLocalRecord = func(mh mh.Multihash) error { return nil } - - return nil + return cfg, nil } // WithReplicationFactor sets the replication factor for provider records. It diff --git a/provider/provider.go b/provider/provider.go index 423f1fe47..c5d24605f 100644 --- a/provider/provider.go +++ b/provider/provider.go @@ -160,8 +160,7 @@ type SweepingProvider struct { // New creates a new SweepingProvider instance with the supplied options. func New(opts ...Option) (*SweepingProvider, error) { - var cfg config - err := cfg.apply(append([]Option{DefaultConfig}, opts...)...) + cfg, err := getOpts(opts) if err != nil { return nil, err } @@ -176,11 +175,7 @@ func New(opts ...Option) (*SweepingProvider, error) { cleanup(cleanupFuncs) return nil, err } - } - cleanupFuncs = append(cleanupFuncs, cfg.keystore.Close) - if err := cfg.validate(); err != nil { - cleanup(cleanupFuncs) - return nil, err + cleanupFuncs = append(cleanupFuncs, cfg.keystore.Close, func() error { return cfg.keystore.Empty(context.Background()) }) } meter := otel.Meter("github.com/libp2p/go-libp2p-kad-dht/provider") providerCounter, err := meter.Int64Counter(