From 2249c41d2f7dcca6f12df37ab9ae96f749da3170 Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Mon, 10 Mar 2025 18:00:07 -0300 Subject: [PATCH 1/8] loopdb: switch to go.etcd.io/bbolt github.com/coreos/bbolt has race conditions: https://github.com/golang/go/issues/54690 --- go.mod | 3 +-- go.sum | 2 -- loopd/daemon.go | 2 +- loopdb/loopin.go | 2 +- loopdb/meta.go | 2 +- loopdb/migration_01_costs.go | 2 +- loopdb/migration_02_swap_publication_deadline.go | 2 +- loopdb/migration_03_last_hop.go | 2 +- loopdb/migration_04_updates.go | 2 +- loopdb/migration_04_updates_test.go | 2 +- loopdb/raw_db_test.go | 2 +- loopdb/store.go | 2 +- loopdb/store_test.go | 2 +- 13 files changed, 12 insertions(+), 15 deletions(-) diff --git a/go.mod b/go.mod index 0315bea20..f9898df04 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,6 @@ require ( github.com/btcsuite/btclog v0.0.0-20241003133417-09c4e92e319c // indirect github.com/btcsuite/btcwallet v0.16.10-0.20241127094224-93c858b2ad63 github.com/btcsuite/btcwallet/wtxmgr v1.5.4 - github.com/coreos/bbolt v1.3.3 github.com/davecgh/go-spew v1.1.1 github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 github.com/fortytw2/leaktest v1.3.0 @@ -35,6 +34,7 @@ require ( github.com/ory/dockertest/v3 v3.10.0 github.com/stretchr/testify v1.9.0 github.com/urfave/cli v1.22.9 + go.etcd.io/bbolt v1.3.11 golang.org/x/net v0.36.0 google.golang.org/grpc v1.64.1 google.golang.org/protobuf v1.34.2 @@ -164,7 +164,6 @@ require ( github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8 // indirect github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect gitlab.com/yawning/bsaes.git v0.0.0-20190805113838-0a714cd429ec // indirect - go.etcd.io/bbolt v1.3.11 // indirect go.etcd.io/etcd/api/v3 v3.5.12 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.12 // indirect go.etcd.io/etcd/client/v2 v2.305.12 // indirect diff --git a/go.sum b/go.sum index 8eeb2e01b..58d7fd052 100644 --- a/go.sum +++ b/go.sum @@ -732,8 +732,6 @@ github.com/cockroachdb/datadriven v1.0.2 h1:H9MtNqVoVhvd9nCBwOyDjUEdZCREqbIdCJD9 github.com/cockroachdb/datadriven v1.0.2/go.mod h1:a9RdTaap04u637JoCzcUoIcDmvwSUtcUFtT/C3kJlTU= github.com/containerd/continuity v0.3.0 h1:nisirsYROK15TAMVukJOUyGJjz4BNQJBVsNvAXZJ/eg= github.com/containerd/continuity v0.3.0/go.mod h1:wJEAIwKOm/pBZuBd0JmeTvnLquTB1Ag8espWhkykbPM= -github.com/coreos/bbolt v1.3.3 h1:n6AiVyVRKQFNb6mJlwESEvvLoDyiTzXX7ORAUlkeBdY= -github.com/coreos/bbolt v1.3.3/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM= github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= diff --git a/loopd/daemon.go b/loopd/daemon.go index bdd2c951f..01449ab52 100644 --- a/loopd/daemon.go +++ b/loopd/daemon.go @@ -12,7 +12,6 @@ import ( "sync/atomic" "time" - "github.com/coreos/bbolt" proxy "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" "github.com/lightninglabs/lndclient" "github.com/lightninglabs/loop" @@ -33,6 +32,7 @@ import ( "github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/macaroons" + "go.etcd.io/bbolt" "google.golang.org/grpc" "google.golang.org/protobuf/encoding/protojson" "gopkg.in/macaroon-bakery.v2/bakery" diff --git a/loopdb/loopin.go b/loopdb/loopin.go index e10e898fa..8801b1b4b 100644 --- a/loopdb/loopin.go +++ b/loopdb/loopin.go @@ -6,9 +6,9 @@ import ( "fmt" "time" - "github.com/coreos/bbolt" "github.com/lightninglabs/loop/labels" "github.com/lightningnetwork/lnd/routing/route" + "go.etcd.io/bbolt" ) // LoopInContract contains the data that is serialized to persistent storage for diff --git a/loopdb/meta.go b/loopdb/meta.go index 1c2732302..d445a021a 100644 --- a/loopdb/meta.go +++ b/loopdb/meta.go @@ -5,7 +5,7 @@ import ( "fmt" "github.com/btcsuite/btcd/chaincfg" - "github.com/coreos/bbolt" + "go.etcd.io/bbolt" ) var ( diff --git a/loopdb/migration_01_costs.go b/loopdb/migration_01_costs.go index b56053e4a..93ba0442f 100644 --- a/loopdb/migration_01_costs.go +++ b/loopdb/migration_01_costs.go @@ -5,7 +5,7 @@ import ( "fmt" "github.com/btcsuite/btcd/chaincfg" - "github.com/coreos/bbolt" + "go.etcd.io/bbolt" ) // noMigrationAvailable is the fall back migration in case there is no migration diff --git a/loopdb/migration_02_swap_publication_deadline.go b/loopdb/migration_02_swap_publication_deadline.go index b6879cd0f..5347993f9 100644 --- a/loopdb/migration_02_swap_publication_deadline.go +++ b/loopdb/migration_02_swap_publication_deadline.go @@ -6,7 +6,7 @@ import ( "fmt" "github.com/btcsuite/btcd/chaincfg" - "github.com/coreos/bbolt" + "go.etcd.io/bbolt" ) // migrateSwapPublicationDeadline migrates the database to v02, by adding the diff --git a/loopdb/migration_03_last_hop.go b/loopdb/migration_03_last_hop.go index f297e66e8..340f650d0 100644 --- a/loopdb/migration_03_last_hop.go +++ b/loopdb/migration_03_last_hop.go @@ -6,7 +6,7 @@ import ( "fmt" "github.com/btcsuite/btcd/chaincfg" - "github.com/coreos/bbolt" + "go.etcd.io/bbolt" ) // migrateLastHop migrates the database to v03, replacing the never used loop in diff --git a/loopdb/migration_04_updates.go b/loopdb/migration_04_updates.go index 9ad3a29f2..cb870a054 100644 --- a/loopdb/migration_04_updates.go +++ b/loopdb/migration_04_updates.go @@ -5,7 +5,7 @@ import ( "fmt" "github.com/btcsuite/btcd/chaincfg" - "github.com/coreos/bbolt" + "go.etcd.io/bbolt" ) // migrateUpdates migrates the swap updates to add an additional level of diff --git a/loopdb/migration_04_updates_test.go b/loopdb/migration_04_updates_test.go index 0c206efea..ad06f91e1 100644 --- a/loopdb/migration_04_updates_test.go +++ b/loopdb/migration_04_updates_test.go @@ -8,8 +8,8 @@ import ( "testing" "github.com/btcsuite/btcd/chaincfg" - "github.com/coreos/bbolt" "github.com/stretchr/testify/require" + "go.etcd.io/bbolt" ) // TestMigrationUpdates asserts that the swap updates migration is carried out diff --git a/loopdb/raw_db_test.go b/loopdb/raw_db_test.go index 6a5e705c8..26db043ce 100644 --- a/loopdb/raw_db_test.go +++ b/loopdb/raw_db_test.go @@ -5,7 +5,7 @@ import ( "fmt" "strings" - "github.com/coreos/bbolt" + "go.etcd.io/bbolt" ) // DumpDB dumps go code describing the contents of the database to stdout. This diff --git a/loopdb/store.go b/loopdb/store.go index 60cce0fb1..2087c76c8 100644 --- a/loopdb/store.go +++ b/loopdb/store.go @@ -13,8 +13,8 @@ import ( "github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/chaincfg/chainhash" - "github.com/coreos/bbolt" "github.com/lightningnetwork/lnd/lntypes" + "go.etcd.io/bbolt" ) var ( diff --git a/loopdb/store_test.go b/loopdb/store_test.go index 2a5a61a19..43dd0e75b 100644 --- a/loopdb/store_test.go +++ b/loopdb/store_test.go @@ -11,12 +11,12 @@ import ( "github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/chaincfg/chainhash" - "github.com/coreos/bbolt" "github.com/lightninglabs/loop/test" "github.com/lightningnetwork/lnd/keychain" "github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/routing/route" "github.com/stretchr/testify/require" + "go.etcd.io/bbolt" ) var ( From e0e66a9b38d20a8a015aeb81ab88c0a2f129420d Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Mon, 10 Mar 2025 19:20:20 -0300 Subject: [PATCH 2/8] loopd: fix data races of logger --- loopd/config.go | 6 +-- loopd/daemon.go | 80 ++++++++++++++++----------------- loopd/log.go | 40 +++++++++++++++-- loopd/migration.go | 2 +- loopd/run.go | 4 +- loopd/swapclient_server.go | 54 +++++++++++----------- loopd/swapclient_server_test.go | 3 +- loopd/utils.go | 8 ++-- 8 files changed, 116 insertions(+), 81 deletions(-) diff --git a/loopd/config.go b/loopd/config.go index 7c06e3bb6..3a0a78e3e 100644 --- a/loopd/config.go +++ b/loopd/config.go @@ -423,7 +423,7 @@ func getTLSConfig(cfg *Config) (*tls.Config, *credentials.TransportCredentials, // If the certificate expired or it was outdated, delete it and the TLS // key and generate a new pair. if time.Now().After(parsedCert.NotAfter) { - log.Info("TLS certificate is expired or outdated, " + + infof("TLS certificate is expired or outdated, " + "removing old file then generating a new one") err := os.Remove(cfg.TLSCertPath) @@ -464,7 +464,7 @@ func loadCertWithCreate(cfg *Config) (tls.Certificate, *x509.Certificate, if !lnrpc.FileExists(cfg.TLSCertPath) && !lnrpc.FileExists(cfg.TLSKeyPath) { - log.Infof("Generating TLS certificates...") + infof("Generating TLS certificates...") certBytes, keyBytes, err := cert.GenCertPair( defaultSelfSignedOrganization, cfg.TLSExtraIPs, cfg.TLSExtraDomains, cfg.TLSDisableAutofill, @@ -481,7 +481,7 @@ func loadCertWithCreate(cfg *Config) (tls.Certificate, *x509.Certificate, return tls.Certificate{}, nil, err } - log.Infof("Done generating TLS certificates") + infof("Done generating TLS certificates") } return cert.LoadCert(cfg.TLSCertPath, cfg.TLSKeyPath) diff --git a/loopd/daemon.go b/loopd/daemon.go index 01449ab52..9bb5e6225 100644 --- a/loopd/daemon.go +++ b/loopd/daemon.go @@ -169,11 +169,11 @@ func (d *Daemon) Start() error { // anything goes wrong now, we need to cleanly shut down again. startErr := d.startWebServers() if startErr != nil { - log.Errorf("Error while starting daemon: %v", err) + errorf("Error while starting daemon: %v", err) d.Stop() stopErr := <-d.ErrChan if stopErr != nil { - log.Errorf("Error while stopping daemon: %v", stopErr) + errorf("Error while stopping daemon: %v", stopErr) } return startErr } @@ -253,7 +253,7 @@ func (d *Daemon) startWebServers() error { d.registerDebugServer() // Next, start the gRPC server listening for HTTP/2 connections. - log.Infof("Starting gRPC listener") + infof("Starting gRPC listener") serverTLSCfg, restClientCreds, err := getTLSConfig(d.cfg) if err != nil { return fmt.Errorf("could not create gRPC server options: %v", @@ -322,7 +322,7 @@ func (d *Daemon) startWebServers() error { // A nil listener indicates REST is disabled. if d.restListener != nil { - log.Infof("Starting REST proxy listener") + infof("Starting REST proxy listener") d.restServer = &http.Server{ Handler: restHandler, @@ -333,7 +333,7 @@ func (d *Daemon) startWebServers() error { go func() { defer d.wg.Done() - log.Infof("REST proxy listening on %s", + infof("REST proxy listening on %s", d.restListener.Addr()) err := d.restServer.Serve(d.restListener) // ErrServerClosed is always returned when the proxy is @@ -347,7 +347,7 @@ func (d *Daemon) startWebServers() error { } }() } else { - log.Infof("REST proxy disabled") + infof("REST proxy disabled") } // Start the grpc server. @@ -355,7 +355,7 @@ func (d *Daemon) startWebServers() error { go func() { defer d.wg.Done() - log.Infof("RPC server listening on %s", d.grpcListener.Addr()) + infof("RPC server listening on %s", d.grpcListener.Addr()) err = d.grpcServer.Serve(d.grpcListener) if err != nil && !errors.Is(err, grpc.ErrServerStopped) { // Notify the main error handler goroutine that @@ -378,7 +378,7 @@ func (d *Daemon) initialize(withMacaroonService bool) error { loopdb.EnableExperimentalProtocol() } - log.Infof("Protocol version: %v", loopdb.CurrentProtocolVersion()) + infof("Protocol version: %v", loopdb.CurrentProtocolVersion()) // If no swap server is specified, use the default addresses for mainnet // and testnet. @@ -404,18 +404,18 @@ func (d *Daemon) initialize(withMacaroonService bool) error { // on main context cancel. So we create it early and pass it down. d.mainCtx, d.mainCtxCancel = context.WithCancel(context.Background()) - log.Infof("Swap server address: %v", d.cfg.Server.Host) + infof("Swap server address: %v", d.cfg.Server.Host) // Check if we need to migrate the database. if needSqlMigration(d.cfg) { - log.Infof("Boltdb found, running migration") + infof("Boltdb found, running migration") err := migrateBoltdb(d.mainCtx, d.cfg) if err != nil { return fmt.Errorf("unable to migrate boltdb: %v", err) } - log.Infof("Successfully migrated boltdb") + infof("Successfully migrated boltdb") } // Now that we know where the database will live, we'll go ahead and @@ -436,7 +436,7 @@ func (d *Daemon) initialize(withMacaroonService bool) error { swapDb, ) if err != nil { - log.Errorf("Cost migration failed: %v", err) + errorf("Cost migration failed: %v", err) return err } @@ -460,7 +460,7 @@ func (d *Daemon) initialize(withMacaroonService bool) error { d.lnd.NodePubkey) } - log.Infof("Using asset client with version %v", getInfo.Version) + infof("Using asset client with version %v", getInfo.Version) } // Create an instance of the loop client library. @@ -507,7 +507,7 @@ func (d *Daemon) initialize(withMacaroonService bool) error { cleanupMacaroonStore := func() { err := db.Close() if err != nil { - log.Errorf("Error closing macaroon store: %v", err) + errorf("Error closing macaroon store: %v", err) } } @@ -555,11 +555,11 @@ func (d *Daemon) initialize(withMacaroonService bool) error { go func() { defer d.wg.Done() - log.Info("Starting notification manager") + infof("Starting notification manager") err := notificationManager.Run(d.mainCtx) if err != nil { d.internalErrChan <- err - log.Errorf("Notification manager stopped: %v", err) + errorf("Notification manager stopped: %v", err) } }() @@ -711,7 +711,7 @@ func (d *Daemon) initialize(withMacaroonService bool) error { // started yet, so if we clean that up now, nothing else needs // to be shut down at this point. if err := d.macaroonService.Stop(); err != nil { - log.Errorf("Error shutting down macaroon service: %v", + errorf("Error shutting down macaroon service: %v", err) } cleanupMacaroonStore() @@ -728,7 +728,7 @@ func (d *Daemon) initialize(withMacaroonService bool) error { go func() { defer d.wg.Done() - log.Infof("Starting swap client") + infof("Starting swap client") err := d.impl.Run(d.mainCtx, d.statusChan) if err != nil { // Notify the main error handler goroutine that @@ -737,7 +737,7 @@ func (d *Daemon) initialize(withMacaroonService bool) error { // channel is sufficiently buffered. d.internalErrChan <- err } - log.Infof("Swap client stopped") + infof("Swap client stopped") }() // Start a goroutine that broadcasts swap updates to clients. @@ -745,7 +745,7 @@ func (d *Daemon) initialize(withMacaroonService bool) error { go func() { defer d.wg.Done() - log.Infof("Waiting for updates") + infof("Waiting for updates") d.processStatusUpdates(d.mainCtx) }() @@ -753,13 +753,13 @@ func (d *Daemon) initialize(withMacaroonService bool) error { go func() { defer d.wg.Done() - log.Info("Starting liquidity manager") + infof("Starting liquidity manager") err := d.liquidityMgr.Run(d.mainCtx) if err != nil && !errors.Is(err, context.Canceled) { d.internalErrChan <- err } - log.Info("Liquidity manager stopped") + infof("Liquidity manager stopped") }() // Start the reservation manager. @@ -777,8 +777,8 @@ func (d *Daemon) initialize(withMacaroonService bool) error { return } - log.Info("Starting reservation manager") - defer log.Info("Reservation manager stopped") + infof("Starting reservation manager") + defer infof("Reservation manager stopped") err = d.reservationManager.Run( d.mainCtx, int32(getInfo.BlockHeight), initChan, @@ -815,8 +815,8 @@ func (d *Daemon) initialize(withMacaroonService bool) error { return } - log.Info("Starting instantout manager") - defer log.Info("Instantout manager stopped") + infof("Starting instantout manager") + defer infof("Instantout manager stopped") err = d.instantOutManager.Run( d.mainCtx, initChan, int32(getInfo.BlockHeight), @@ -846,12 +846,12 @@ func (d *Daemon) initialize(withMacaroonService bool) error { go func() { defer d.wg.Done() - log.Info("Starting static address manager...") + infof("Starting static address manager...") err = staticAddressManager.Run(d.mainCtx) if err != nil && !errors.Is(context.Canceled, err) { d.internalErrChan <- err } - log.Info("Static address manager stopped") + infof("Static address manager stopped") }() } @@ -869,12 +869,12 @@ func (d *Daemon) initialize(withMacaroonService bool) error { return } - log.Info("Starting static address deposit manager...") + infof("Starting static address deposit manager...") err = depositManager.Run(d.mainCtx, info.BlockHeight) if err != nil && !errors.Is(context.Canceled, err) { d.internalErrChan <- err } - log.Info("Static address deposit manager stopped") + infof("Static address deposit manager stopped") }() depositManager.WaitInitComplete() } @@ -893,13 +893,13 @@ func (d *Daemon) initialize(withMacaroonService bool) error { return } - log.Info("Starting static address deposit withdrawal " + + infof("Starting static address deposit withdrawal " + "manager...") err = withdrawalManager.Run(d.mainCtx, info.BlockHeight) if err != nil && !errors.Is(context.Canceled, err) { d.internalErrChan <- err } - log.Info("Static address deposit withdrawal manager " + + infof("Static address deposit withdrawal manager " + "stopped") }() withdrawalManager.WaitInitComplete() @@ -920,14 +920,14 @@ func (d *Daemon) initialize(withMacaroonService bool) error { return } - log.Info("Starting static address loop-in manager...") + infof("Starting static address loop-in manager...") err = staticLoopInManager.Run( d.mainCtx, info.BlockHeight, ) if err != nil && !errors.Is(context.Canceled, err) { d.internalErrChan <- err } - log.Info("Starting static address loop-in manager " + + infof("Starting static address loop-in manager " + "stopped") }() staticLoopInManager.WaitInitComplete() @@ -948,7 +948,7 @@ func (d *Daemon) initialize(withMacaroonService bool) error { // signal the caller that we're done. select { case runtimeErr = <-d.internalErrChan: - log.Errorf("Runtime error in daemon, shutting down: "+ + errorf("Runtime error in daemon, shutting down: "+ "%v", runtimeErr) case <-d.quit: @@ -958,7 +958,7 @@ func (d *Daemon) initialize(withMacaroonService bool) error { // otherwise a caller might exit the process too early. d.stop() cleanupMacaroonStore() - log.Info("Daemon exited") + infof("Daemon exited") // The caller expects exactly one message. So we send the error // even if it's nil because we cleanly shut down. @@ -987,17 +987,17 @@ func (d *Daemon) stop() { // As there is no swap activity anymore, we can forcefully shut down the // gRPC and HTTP servers now. - log.Infof("Stopping gRPC server") + infof("Stopping gRPC server") if d.grpcServer != nil { d.grpcServer.Stop() } - log.Infof("Stopping REST server") + infof("Stopping REST server") if d.restServer != nil { // Don't return the error here, we first want to give everything // else a chance to shut down cleanly. err := d.restServer.Close() if err != nil { - log.Errorf("Error stopping REST server: %v", err) + errorf("Error stopping REST server: %v", err) } } if d.restCtxCancel != nil { @@ -1007,7 +1007,7 @@ func (d *Daemon) stop() { if d.macaroonService != nil { err := d.macaroonService.Stop() if err != nil { - log.Errorf("Error stopping macaroon service: %v", err) + errorf("Error stopping macaroon service: %v", err) } } diff --git a/loopd/log.go b/loopd/log.go index 2260d7c06..0b29d4b32 100644 --- a/loopd/log.go +++ b/loopd/log.go @@ -1,6 +1,8 @@ package loopd import ( + "sync/atomic" + "github.com/btcsuite/btclog/v2" "github.com/lightninglabs/aperture/l402" "github.com/lightninglabs/lndclient" @@ -22,18 +24,50 @@ import ( const Subsystem = "LOOPD" var ( - log btclog.Logger + log_ atomic.Pointer[btclog.Logger] interceptor signal.Interceptor ) +// log returns active logger. +func log() btclog.Logger { + return *log_.Load() +} + +// setLogger uses a specified Logger to output package logging info. +func setLogger(logger btclog.Logger) { + log_.Store(&logger) +} + +// tracef logs a message with level TRACE. +func tracef(format string, params ...interface{}) { + log().Tracef(format, params...) +} + +// infof logs a message with level INFO. +func infof(format string, params ...interface{}) { + log().Infof(format, params...) +} + +// warnf logs a message with level WARN. +func warnf(format string, params ...interface{}) { + log().Warnf(format, params...) +} + +// errorf logs a message with level ERROR. +func errorf(format string, params ...interface{}) { + log().Errorf(format, params...) +} + // SetupLoggers initializes all package-global logger variables. func SetupLoggers(root *build.SubLoggerManager, intercept signal.Interceptor) { genLogger := genSubLogger(root, intercept) - log = build.NewSubLogger(Subsystem, genLogger) + logger := build.NewSubLogger(Subsystem, genLogger) + setLogger(logger) + interceptor = intercept - lnd.SetSubLogger(root, Subsystem, log) + lnd.SetSubLogger(root, Subsystem, logger) lnd.AddSubLogger(root, "LOOP", intercept, loop.UseLogger) lnd.AddSubLogger(root, "SWEEP", intercept, sweepbatcher.UseLogger) lnd.AddSubLogger(root, "LNDC", intercept, lndclient.UseLogger) diff --git a/loopd/migration.go b/loopd/migration.go index 027ccd9bd..871674556 100644 --- a/loopd/migration.go +++ b/loopd/migration.go @@ -66,7 +66,7 @@ func needSqlMigration(cfg *Config) bool { // any deleted files occasionally (reboot, etc). sqliteDBPath := filepath.Join(cfg.DataDir, "loop_sqlite.db") if lnrpc.FileExists(sqliteDBPath) { - log.Infof("Found sqlite db at %v, skipping migration", + infof("Found sqlite db at %v, skipping migration", sqliteDBPath) return false diff --git a/loopd/run.go b/loopd/run.go index 3e71afb6b..00914d9eb 100644 --- a/loopd/run.go +++ b/loopd/run.go @@ -222,7 +222,7 @@ func Run(rpcCfg RPCConfig) error { } // Print the version before executing either primary directive. - log.Infof("Version: %v", loop.Version()) + infof("Version: %v", loop.Version()) lisCfg := NewListenerConfig(&config, rpcCfg) @@ -235,7 +235,7 @@ func Run(rpcCfg RPCConfig) error { select { case <-interceptor.ShutdownChannel(): - log.Infof("Received SIGINT (Ctrl+C).") + infof("Received SIGINT (Ctrl+C).") daemon.Stop() // The above stop will return immediately. But we'll be diff --git a/loopd/swapclient_server.go b/loopd/swapclient_server.go index bd037825b..1fdb9a15d 100644 --- a/loopd/swapclient_server.go +++ b/loopd/swapclient_server.go @@ -112,7 +112,7 @@ func (s *swapClientServer) LoopOut(ctx context.Context, in *looprpc.LoopOutRequest) ( *looprpc.SwapResponse, error) { - log.Infof("Loop out request received") + infof("Loop out request received") // Note that LoopOutRequest.PaymentTimeout is unsigned and therefore // cannot be negative. @@ -257,7 +257,7 @@ func (s *swapClientServer) LoopOut(ctx context.Context, info, err := s.impl.LoopOut(ctx, req) if err != nil { - log.Errorf("LoopOut: %v", err) + errorf("LoopOut: %v", err) return nil, err } @@ -461,7 +461,7 @@ func (s *swapClientServer) marshallSwap(ctx context.Context, func (s *swapClientServer) Monitor(in *looprpc.MonitorRequest, server looprpc.SwapClient_MonitorServer) error { - log.Infof("Monitor request received") + infof("Monitor request received") send := func(info loop.SwapInfo) error { rpcSwap, err := s.marshallSwap(server.Context(), &info) @@ -732,11 +732,11 @@ func (s *swapClientServer) AbandonSwap(ctx context.Context, func (s *swapClientServer) LoopOutTerms(ctx context.Context, _ *looprpc.TermsRequest) (*looprpc.OutTermsResponse, error) { - log.Infof("Loop out terms request received") + infof("Loop out terms request received") terms, err := s.impl.LoopOutTerms(ctx, defaultLoopdInitiator) if err != nil { - log.Errorf("Terms request: %v", err) + errorf("Terms request: %v", err) return nil, err } @@ -822,11 +822,11 @@ func (s *swapClientServer) LoopOutQuote(ctx context.Context, func (s *swapClientServer) GetLoopInTerms(ctx context.Context, _ *looprpc.TermsRequest) (*looprpc.InTermsResponse, error) { - log.Infof("Loop in terms request received") + infof("Loop in terms request received") terms, err := s.impl.LoopInTerms(ctx, defaultLoopdInitiator) if err != nil { - log.Errorf("Terms request: %v", err) + errorf("Terms request: %v", err) return nil, err } @@ -840,7 +840,7 @@ func (s *swapClientServer) GetLoopInTerms(ctx context.Context, func (s *swapClientServer) GetLoopInQuote(ctx context.Context, req *looprpc.QuoteRequest) (*looprpc.InQuoteResponse, error) { - log.Infof("Loop in quote request received") + infof("Loop in quote request received") var ( numDeposits = uint32(len(req.DepositOutpoints)) @@ -981,7 +981,7 @@ func unmarshallHopHint(rpcHint *swapserverrpc.HopHint) (zpay32.HopHint, error) { func (s *swapClientServer) Probe(ctx context.Context, req *looprpc.ProbeRequest) (*looprpc.ProbeResponse, error) { - log.Infof("Probe request received") + infof("Probe request received") var lastHop *route.Vertex if req.LastHop != nil { @@ -1013,7 +1013,7 @@ func (s *swapClientServer) Probe(ctx context.Context, func (s *swapClientServer) LoopIn(ctx context.Context, in *looprpc.LoopInRequest) (*looprpc.SwapResponse, error) { - log.Infof("Loop in request received") + infof("Loop in request received") htlcConfTarget, err := validateLoopInRequest( in.HtlcConfTarget, in.ExternalHtlc, 0, in.Amt, @@ -1052,7 +1052,7 @@ func (s *swapClientServer) LoopIn(ctx context.Context, } swapInfo, err := s.impl.LoopIn(ctx, req) if err != nil { - log.Errorf("Loop in: %v", err) + errorf("Loop in: %v", err) return nil, err } @@ -1079,7 +1079,7 @@ func (s *swapClientServer) LoopIn(ctx context.Context, func (s *swapClientServer) GetL402Tokens(ctx context.Context, _ *looprpc.TokensRequest) (*looprpc.TokensResponse, error) { - log.Infof("Get L402 tokens request received") + infof("Get L402 tokens request received") tokens, err := s.impl.L402Store.AllTokens() if err != nil { @@ -1128,7 +1128,7 @@ func (s *swapClientServer) GetL402Tokens(ctx context.Context, func (s *swapClientServer) GetLsatTokens(ctx context.Context, req *looprpc.TokensRequest) (*looprpc.TokensResponse, error) { - log.Warnf("Received deprecated call GetLsatTokens. Please update the " + + warnf("Received deprecated call GetLsatTokens. Please update the " + "client software. Calling GetL402Tokens now.") return s.GetL402Tokens(ctx, req) @@ -1754,7 +1754,7 @@ func (s *swapClientServer) StaticAddressLoopIn(ctx context.Context, in *looprpc.StaticAddressLoopInRequest) ( *looprpc.StaticAddressLoopInResponse, error) { - log.Infof("Static loop-in request received") + infof("Static loop-in request received") routeHints, err := unmarshallRouteHints(in.RouteHints) if err != nil { @@ -2187,52 +2187,52 @@ func validateLoopOutRequest(ctx context.Context, lnd lndclient.LightningClient, func hasBandwidth(channels []lndclient.ChannelInfo, amt btcutil.Amount, maxParts int) (bool, int) { - log.Tracef("Checking if %v sats can be routed with %v parts over "+ + tracef("Checking if %v sats can be routed with %v parts over "+ "channel set of length %v", amt, maxParts, len(channels)) localBalances := make([]btcutil.Amount, len(channels)) var totalBandwidth btcutil.Amount for i, channel := range channels { - log.Tracef("Channel %v: local=%v remote=%v", channel.ChannelID, + tracef("Channel %v: local=%v remote=%v", channel.ChannelID, channel.LocalBalance, channel.RemoteBalance) localBalances[i] = channel.LocalBalance totalBandwidth += channel.LocalBalance } - log.Tracef("Total bandwidth: %v", totalBandwidth) + tracef("Total bandwidth: %v", totalBandwidth) if totalBandwidth < amt { return false, 0 } logLocalBalances := func(shard int) { - log.Tracef("Local balances for %v shards:", shard) + tracef("Local balances for %v shards:", shard) for i, balance := range localBalances { - log.Tracef("Channel %v: localBalances[%v]=%v", + tracef("Channel %v: localBalances[%v]=%v", channels[i].ChannelID, i, balance) } } split := amt for shard := 0; shard <= maxParts; { - log.Tracef("Trying to split %v sats into %v parts", amt, shard) + tracef("Trying to split %v sats into %v parts", amt, shard) paid := false for i := 0; i < len(localBalances); i++ { // TODO(hieblmi): Consider channel reserves because the // channel can't send its full local balance. if localBalances[i] >= split { - log.Tracef("len(shards)=%v: Local channel "+ + tracef("len(shards)=%v: Local channel "+ "balance %v can pay %v sats", shard, localBalances[i], split) localBalances[i] -= split - log.Tracef("len(shards)=%v: Subtracted "+ + tracef("len(shards)=%v: Subtracted "+ "%v sats from localBalance[%v]=%v", shard, split, i, localBalances[i]) amt -= split - log.Tracef("len(shards)=%v: Remaining total "+ + tracef("len(shards)=%v: Remaining total "+ "amount amt=%v", shard, amt) paid = true @@ -2245,26 +2245,26 @@ func hasBandwidth(channels []lndclient.ChannelInfo, amt btcutil.Amount, logLocalBalances(shard) if amt == 0 { - log.Tracef("Payment is routable with %v part(s)", shard) + tracef("Payment is routable with %v part(s)", shard) return true, shard } if !paid { - log.Tracef("len(shards)=%v: No channel could pay %v "+ + tracef("len(shards)=%v: No channel could pay %v "+ "sats, halving payment to %v and trying again", split/2) split /= 2 } else { - log.Tracef("len(shards)=%v: Payment was made, trying "+ + tracef("len(shards)=%v: Payment was made, trying "+ "to pay remaining sats %v", shard, amt) split = amt } } - log.Tracef("Payment is not routable, remaining amount that can't be "+ + tracef("Payment is not routable, remaining amount that can't be "+ "sent: %v sats", amt) logLocalBalances(maxParts) diff --git a/loopd/swapclient_server_test.go b/loopd/swapclient_server_test.go index b279ca909..2de779343 100644 --- a/loopd/swapclient_server_test.go +++ b/loopd/swapclient_server_test.go @@ -491,7 +491,8 @@ func TestValidateLoopOutRequest(t *testing.T) { logger := btclog.NewSLogger( btclog.NewDefaultHandler(os.Stdout), ) - log = logger.SubSystem(Subsystem) + setLogger(logger.SubSystem(Subsystem)) + conf, err := validateLoopOutRequest( ctx, lnd.Client, &test.chain, req, test.destAddr, test.maxParts, diff --git a/loopd/utils.go b/loopd/utils.go index 6be649db4..6261b6675 100644 --- a/loopd/utils.go +++ b/loopd/utils.go @@ -57,12 +57,12 @@ func getClient(cfg *Config, swapDb loopdb.SwapStore, } if cfg.MaxL402Cost == defaultCost && cfg.MaxLSATCost != 0 { - log.Warnf("Option maxlsatcost is deprecated and will be " + + warnf("Option maxlsatcost is deprecated and will be " + "removed. Switch to maxl402cost.") clientConfig.MaxL402Cost = btcutil.Amount(cfg.MaxLSATCost) } if cfg.MaxL402Fee == defaultFee && cfg.MaxLSATFee != 0 { - log.Warnf("Option maxlsatfee is deprecated and will be " + + warnf("Option maxlsatfee is deprecated and will be " + "removed. Switch to maxl402fee.") clientConfig.MaxL402Fee = btcutil.Amount(cfg.MaxLSATFee) } @@ -87,7 +87,7 @@ func openDatabase(cfg *Config, chainParams *chaincfg.Params) (loopdb.SwapStore, ) switch cfg.DatabaseBackend { case DatabaseBackendSqlite: - log.Infof("Opening sqlite3 database at: %v", + infof("Opening sqlite3 database at: %v", cfg.Sqlite.DatabaseFileName) db, err = loopdb.NewSqliteStore(cfg.Sqlite, chainParams) @@ -97,7 +97,7 @@ func openDatabase(cfg *Config, chainParams *chaincfg.Params) (loopdb.SwapStore, baseDb = *db.(*loopdb.SqliteSwapStore).BaseDB case DatabaseBackendPostgres: - log.Infof("Opening postgres database at: %v", + infof("Opening postgres database at: %v", cfg.Postgres.DSN(true)) db, err = loopdb.NewPostgresStore(cfg.Postgres, chainParams) From 4c292f179faf4ccbb4902e57c73405889426024e Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Mon, 10 Mar 2025 19:36:54 -0300 Subject: [PATCH 3/8] staticaddr: fix race conditions in current height Make the current height an atomic variable in staticaddr/address/manager.go and in staticaddr/withdraw/manager.go. Removed the initiation height from staticaddr/deposit/manager.go (not needed). --- loopd/daemon.go | 10 +--------- staticaddr/address/manager.go | 9 +++++---- staticaddr/deposit/manager.go | 7 +------ staticaddr/deposit/manager_test.go | 5 +---- staticaddr/withdraw/manager.go | 7 ++++--- 5 files changed, 12 insertions(+), 26 deletions(-) diff --git a/loopd/daemon.go b/loopd/daemon.go index 9bb5e6225..d35d5fc50 100644 --- a/loopd/daemon.go +++ b/loopd/daemon.go @@ -861,16 +861,8 @@ func (d *Daemon) initialize(withMacaroonService bool) error { go func() { defer d.wg.Done() - // Lnd's GetInfo call supplies us with the current block - // height. - info, err := d.lnd.Client.GetInfo(d.mainCtx) - if err != nil { - d.internalErrChan <- err - return - } - infof("Starting static address deposit manager...") - err = depositManager.Run(d.mainCtx, info.BlockHeight) + err = depositManager.Run(d.mainCtx) if err != nil && !errors.Is(context.Canceled, err) { d.internalErrChan <- err } diff --git a/staticaddr/address/manager.go b/staticaddr/address/manager.go index 4cb5391c6..96e9cdbf5 100644 --- a/staticaddr/address/manager.go +++ b/staticaddr/address/manager.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "github.com/btcsuite/btcd/btcec/v2" "github.com/btcsuite/btcd/btcec/v2/schnorr" @@ -52,7 +53,7 @@ type Manager struct { sync.Mutex - currentHeight int32 + currentHeight atomic.Int32 } // NewManager creates a new address manager. @@ -74,7 +75,7 @@ func (m *Manager) Run(ctx context.Context) error { for { select { case currentHeight := <-newBlockChan: - m.currentHeight = currentHeight + m.currentHeight.Store(currentHeight) case err = <-newBlockErrChan: return err @@ -111,7 +112,7 @@ func (m *Manager) NewAddress(ctx context.Context) (*btcutil.AddressTaproot, m.Unlock() // Ensure that we have that we have a sane current block height. - if m.currentHeight == 0 { + if m.currentHeight.Load() == 0 { return nil, fmt.Errorf("current block height is unknown") } @@ -176,7 +177,7 @@ func (m *Manager) NewAddress(ctx context.Context) (*btcutil.AddressTaproot, ProtocolVersion: version.AddressProtocolVersion( protocolVersion, ), - InitiationHeight: m.currentHeight, + InitiationHeight: m.currentHeight.Load(), } err = m.cfg.Store.CreateStaticAddress(ctx, addrParams) if err != nil { diff --git a/staticaddr/deposit/manager.go b/staticaddr/deposit/manager.go index efaef97aa..c6c342347 100644 --- a/staticaddr/deposit/manager.go +++ b/staticaddr/deposit/manager.go @@ -84,9 +84,6 @@ type Manager struct { // activeDeposits contains all the active static address outputs. activeDeposits map[wire.OutPoint]*FSM - // initiationHeight stores the currently best known block height. - initiationHeight uint32 - // deposits contains all the deposits that have ever been made to the // static address. This field is used to store and recover deposits. It // also serves as basis for reconciliation of newly detected deposits by @@ -111,9 +108,7 @@ func NewManager(cfg *ManagerConfig) *Manager { } // Run runs the address manager. -func (m *Manager) Run(ctx context.Context, currentHeight uint32) error { - m.initiationHeight = currentHeight - +func (m *Manager) Run(ctx context.Context) error { newBlockChan, newBlockErrChan, err := m.cfg.ChainNotifier.RegisterBlockEpochNtfn(ctx) //nolint:lll if err != nil { return err diff --git a/staticaddr/deposit/manager_test.go b/staticaddr/deposit/manager_test.go index d943a32f6..dc67393b0 100644 --- a/staticaddr/deposit/manager_test.go +++ b/staticaddr/deposit/manager_test.go @@ -219,10 +219,7 @@ func TestManager(t *testing.T) { // Start the deposit manager. go func() { - err := testContext.manager.Run( - ctx, uint32(testContext.mockLnd.Height), - ) - require.NoError(t, err) + require.NoError(t, testContext.manager.Run(ctx)) }() // Ensure that the manager has been initialized. diff --git a/staticaddr/withdraw/manager.go b/staticaddr/withdraw/manager.go index 1a60b37b4..2de212141 100644 --- a/staticaddr/withdraw/manager.go +++ b/staticaddr/withdraw/manager.go @@ -6,6 +6,7 @@ import ( "fmt" "reflect" "strings" + "sync/atomic" "github.com/btcsuite/btcd/btcec/v2/schnorr" "github.com/btcsuite/btcd/btcec/v2/schnorr/musig2" @@ -106,7 +107,7 @@ type Manager struct { errChan chan error // initiationHeight stores the currently best known block height. - initiationHeight uint32 + initiationHeight atomic.Uint32 // finalizedWithdrawalTx are the finalized withdrawal transactions that // are published to the network and re-published on block arrivals. @@ -127,7 +128,7 @@ func NewManager(cfg *ManagerConfig) *Manager { // Run runs the deposit withdrawal manager. func (m *Manager) Run(ctx context.Context, currentHeight uint32) error { - m.initiationHeight = currentHeight + m.initiationHeight.Store(currentHeight) newBlockChan, newBlockErrChan, err := m.cfg.ChainNotifier.RegisterBlockEpochNtfn(ctx) @@ -479,7 +480,7 @@ func (m *Manager) handleWithdrawal(ctx context.Context, confChan, errChan, err := m.cfg.ChainNotifier.RegisterConfirmationsNtfn( ctx, &txHash, withdrawalPkScript, MinConfs, - int32(m.initiationHeight), + int32(m.initiationHeight.Load()), ) if err != nil { return err From ce77fcc12cc3de8c31ad25e7a3d8fa0c2c828649 Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Mon, 10 Mar 2025 20:00:25 -0300 Subject: [PATCH 4/8] instantout/reservation: fix data race in test --- instantout/reservation/manager_test.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/instantout/reservation/manager_test.go b/instantout/reservation/manager_test.go index 226ffb172..b5cd3df08 100644 --- a/instantout/reservation/manager_test.go +++ b/instantout/reservation/manager_test.go @@ -3,6 +3,7 @@ package reservation import ( "context" "encoding/hex" + "sync/atomic" "testing" "time" @@ -72,13 +73,14 @@ func TestManager(t *testing.T) { require.NoError(t, err) // We'll now expect a spend registration. - spendReg := <-testContext.mockLnd.RegisterSpendChannel - require.Equal(t, spendReg.PkScript, pkScript) + var spendReg atomic.Pointer[test.SpendRegistration] + spendReg.Store(<-testContext.mockLnd.RegisterSpendChannel) + require.Equal(t, spendReg.Load().PkScript, pkScript) go func() { // We'll expect a second spend registration. - spendReg = <-testContext.mockLnd.RegisterSpendChannel - require.Equal(t, spendReg.PkScript, pkScript) + spendReg.Store(<-testContext.mockLnd.RegisterSpendChannel) + require.Equal(t, spendReg.Load().PkScript, pkScript) }() // We'll now try to lock the reservation. @@ -90,7 +92,7 @@ func TestManager(t *testing.T) { require.Error(t, err) testContext.mockLnd.SpendChannel <- &chainntnfs.SpendDetail{ - SpentOutPoint: spendReg.Outpoint, + SpentOutPoint: spendReg.Load().Outpoint, } // We'll now expect the reservation to be expired. From 3cbfadd27af6ce4ff5f59c1fa56f700f00b39915 Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Wed, 12 Mar 2025 21:36:33 -0300 Subject: [PATCH 5/8] loop: fix race in loopOutSwap.payInvoiceAsync The function used to call the method swapKit.swapInfo() which accessed many fields of the swapKit which may change in parallel by handlePaymentResult called by executeSwap (payInvoiceAsync is called in a goroutine). The fields are: cost, state, and update time. --- loopout.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/loopout.go b/loopout.go index 26dfd1420..99a7b7371 100644 --- a/loopout.go +++ b/loopout.go @@ -873,7 +873,7 @@ func (s *loopOutSwap) payInvoiceAsync(ctx context.Context, } if err := s.swapKit.server.ReportRoutingResult( - ctx, s.swapInfo().SwapHash, s.swapInvoicePaymentAddr, + ctx, s.hash, s.swapInvoicePaymentAddr, reportType, paymentSuccess, int32(attempts), dt.Milliseconds(), ); err != nil { From dc4a5641acd85f5f0a39b79c1b4901d0867240e9 Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Fri, 14 Mar 2025 14:16:07 -0300 Subject: [PATCH 6/8] multi: initialize block height in New, not in Run If Run sets the current height field, it is technically a race between Run and other methods reading the field. Setting in New is a safer option. Removed a check that height is not 0 from static address manager. --- instantout/manager.go | 8 ++-- loopd/daemon.go | 65 +++++++++--------------------- staticaddr/address/manager.go | 12 +++--- staticaddr/address/manager_test.go | 8 +++- staticaddr/loopin/manager.go | 11 ++--- staticaddr/withdraw/manager.go | 11 ++--- 6 files changed, 46 insertions(+), 69 deletions(-) diff --git a/instantout/manager.go b/instantout/manager.go index 38207c446..905874fc4 100644 --- a/instantout/manager.go +++ b/instantout/manager.go @@ -41,18 +41,17 @@ type Manager struct { } // NewInstantOutManager creates a new instantout manager. -func NewInstantOutManager(cfg *Config) *Manager { +func NewInstantOutManager(cfg *Config, height int32) *Manager { return &Manager{ cfg: cfg, activeInstantOuts: make(map[lntypes.Hash]*FSM), blockEpochChan: make(chan int32), + currentHeight: height, } } // Run runs the instantout manager. -func (m *Manager) Run(ctx context.Context, initChan chan struct{}, - height int32) error { - +func (m *Manager) Run(ctx context.Context, initChan chan struct{}) error { log.Debugf("Starting instantout manager") defer func() { log.Debugf("Stopping instantout manager") @@ -62,7 +61,6 @@ func (m *Manager) Run(ctx context.Context, initChan chan struct{}, defer cancel() m.runCtx = runCtx - m.currentHeight = height err := m.recoverInstantOuts(runCtx) if err != nil { diff --git a/loopd/daemon.go b/loopd/daemon.go index d35d5fc50..3c52c8168 100644 --- a/loopd/daemon.go +++ b/loopd/daemon.go @@ -446,6 +446,14 @@ func (d *Daemon) initialize(withMacaroonService bool) error { chainParams, ) + // We need to know the current block height to properly initialize + // managers. + getInfo, err := d.lnd.Client.GetInfo(d.mainCtx) + if err != nil { + return fmt.Errorf("failed to get current block height: %w", err) + } + blockHeight := getInfo.BlockHeight + // If we're running an asset client, we'll log something here. if d.assetClient != nil { getInfo, err := d.assetClient.GetInfo( @@ -580,7 +588,7 @@ func (d *Daemon) initialize(withMacaroonService bool) error { ChainParams: d.lnd.ChainParams, ChainNotifier: d.lnd.ChainNotifier, } - staticAddressManager = address.NewManager(addrCfg) + staticAddressManager = address.NewManager(addrCfg, int32(blockHeight)) // Static address deposit manager setup. depositStore := deposit.NewSqlStore(baseDb) @@ -606,7 +614,7 @@ func (d *Daemon) initialize(withMacaroonService bool) error { ChainNotifier: d.lnd.ChainNotifier, Signer: d.lnd.Signer, } - withdrawalManager = withdraw.NewManager(withdrawalCfg) + withdrawalManager = withdraw.NewManager(withdrawalCfg, blockHeight) // Static address loop-in manager setup. staticAddressLoopInStore := loopin.NewSqlStore( @@ -631,7 +639,7 @@ func (d *Daemon) initialize(withMacaroonService bool) error { ValidateLoopInContract: loop.ValidateLoopInContract, MaxStaticAddrHtlcFeePercentage: d.cfg.MaxStaticAddrHtlcFeePercentage, MaxStaticAddrHtlcBackupFeePercentage: d.cfg.MaxStaticAddrHtlcBackupFeePercentage, - }) + }, blockHeight) var ( reservationManager *reservation.Manager @@ -674,7 +682,7 @@ func (d *Daemon) initialize(withMacaroonService bool) error { } instantOutManager = instantout.NewInstantOutManager( - instantOutConfig, + instantOutConfig, int32(blockHeight), ) } @@ -769,19 +777,11 @@ func (d *Daemon) initialize(withMacaroonService bool) error { go func() { defer d.wg.Done() - // We need to know the current block height to properly - // initialize the reservation manager. - getInfo, err := d.lnd.Client.GetInfo(d.mainCtx) - if err != nil { - d.internalErrChan <- err - return - } - infof("Starting reservation manager") defer infof("Reservation manager stopped") - err = d.reservationManager.Run( - d.mainCtx, int32(getInfo.BlockHeight), initChan, + err := d.reservationManager.Run( + d.mainCtx, int32(blockHeight), initChan, ) if err != nil && !errors.Is(err, context.Canceled) { d.internalErrChan <- err @@ -809,18 +809,10 @@ func (d *Daemon) initialize(withMacaroonService bool) error { go func() { defer d.wg.Done() - getInfo, err := d.lnd.Client.GetInfo(d.mainCtx) - if err != nil { - d.internalErrChan <- err - return - } - infof("Starting instantout manager") defer infof("Instantout manager stopped") - err = d.instantOutManager.Run( - d.mainCtx, initChan, int32(getInfo.BlockHeight), - ) + err := d.instantOutManager.Run(d.mainCtx, initChan) if err != nil && !errors.Is(err, context.Canceled) { d.internalErrChan <- err } @@ -847,7 +839,7 @@ func (d *Daemon) initialize(withMacaroonService bool) error { defer d.wg.Done() infof("Starting static address manager...") - err = staticAddressManager.Run(d.mainCtx) + err := staticAddressManager.Run(d.mainCtx) if err != nil && !errors.Is(context.Canceled, err) { d.internalErrChan <- err } @@ -862,7 +854,7 @@ func (d *Daemon) initialize(withMacaroonService bool) error { defer d.wg.Done() infof("Starting static address deposit manager...") - err = depositManager.Run(d.mainCtx) + err := depositManager.Run(d.mainCtx) if err != nil && !errors.Is(context.Canceled, err) { d.internalErrChan <- err } @@ -877,17 +869,9 @@ func (d *Daemon) initialize(withMacaroonService bool) error { go func() { defer d.wg.Done() - // Lnd's GetInfo call supplies us with the current block - // height. - info, err := d.lnd.Client.GetInfo(d.mainCtx) - if err != nil { - d.internalErrChan <- err - return - } - infof("Starting static address deposit withdrawal " + "manager...") - err = withdrawalManager.Run(d.mainCtx, info.BlockHeight) + err := withdrawalManager.Run(d.mainCtx) if err != nil && !errors.Is(context.Canceled, err) { d.internalErrChan <- err } @@ -903,19 +887,8 @@ func (d *Daemon) initialize(withMacaroonService bool) error { go func() { defer d.wg.Done() - // Lnd's GetInfo call supplies us with the current block - // height. - info, err := d.lnd.Client.GetInfo(d.mainCtx) - if err != nil { - d.internalErrChan <- err - - return - } - infof("Starting static address loop-in manager...") - err = staticLoopInManager.Run( - d.mainCtx, info.BlockHeight, - ) + err := staticLoopInManager.Run(d.mainCtx) if err != nil && !errors.Is(context.Canceled, err) { d.internalErrChan <- err } diff --git a/staticaddr/address/manager.go b/staticaddr/address/manager.go index 96e9cdbf5..cc6012738 100644 --- a/staticaddr/address/manager.go +++ b/staticaddr/address/manager.go @@ -57,10 +57,13 @@ type Manager struct { } // NewManager creates a new address manager. -func NewManager(cfg *ManagerConfig) *Manager { - return &Manager{ +func NewManager(cfg *ManagerConfig, currentHeight int32) *Manager { + m := &Manager{ cfg: cfg, } + m.currentHeight.Store(currentHeight) + + return m } // Run runs the address manager. @@ -111,11 +114,6 @@ func (m *Manager) NewAddress(ctx context.Context) (*btcutil.AddressTaproot, } m.Unlock() - // Ensure that we have that we have a sane current block height. - if m.currentHeight.Load() == 0 { - return nil, fmt.Errorf("current block height is unknown") - } - // We are fetching a new L402 token from the server. There is one static // address per L402 token allowed. err = m.cfg.FetchL402(ctx) diff --git a/staticaddr/address/manager_test.go b/staticaddr/address/manager_test.go index 35a6d0069..37ba83c9b 100644 --- a/staticaddr/address/manager_test.go +++ b/staticaddr/address/manager_test.go @@ -154,6 +154,9 @@ type ManagerTestContext struct { // NewAddressManagerTestContext creates a new test context for the static // address manager. func NewAddressManagerTestContext(t *testing.T) *ManagerTestContext { + ctxb, cancel := context.WithCancel(context.Background()) + defer cancel() + mockLnd := test.NewMockLnd() lndContext := test.NewContext(t, mockLnd) @@ -183,7 +186,10 @@ func NewAddressManagerTestContext(t *testing.T) *ManagerTestContext { FetchL402: func(context.Context) error { return nil }, } - manager := NewManager(cfg) + getInfo, err := mockLnd.Client.GetInfo(ctxb) + require.NoError(t, err) + + manager := NewManager(cfg, int32(getInfo.BlockHeight)) return &ManagerTestContext{ manager: manager, diff --git a/staticaddr/loopin/manager.go b/staticaddr/loopin/manager.go index cbe6d9cec..9f31b29ee 100644 --- a/staticaddr/loopin/manager.go +++ b/staticaddr/loopin/manager.go @@ -139,8 +139,8 @@ type Manager struct { } // NewManager creates a new deposit withdrawal manager. -func NewManager(cfg *Config) *Manager { - return &Manager{ +func NewManager(cfg *Config, currentHeight uint32) *Manager { + m := &Manager{ cfg: cfg, initChan: make(chan struct{}), newLoopInChan: make(chan *newSwapRequest), @@ -148,12 +148,13 @@ func NewManager(cfg *Config) *Manager { errChan: make(chan error), activeLoopIns: make(map[lntypes.Hash]*FSM), } + m.currentHeight.Store(currentHeight) + + return m } // Run runs the static address loop-in manager. -func (m *Manager) Run(ctx context.Context, currentHeight uint32) error { - m.currentHeight.Store(currentHeight) - +func (m *Manager) Run(ctx context.Context) error { registerBlockNtfn := m.cfg.ChainNotifier.RegisterBlockEpochNtfn newBlockChan, newBlockErrChan, err := registerBlockNtfn(ctx) if err != nil { diff --git a/staticaddr/withdraw/manager.go b/staticaddr/withdraw/manager.go index 2de212141..3a446ca1f 100644 --- a/staticaddr/withdraw/manager.go +++ b/staticaddr/withdraw/manager.go @@ -115,8 +115,8 @@ type Manager struct { } // NewManager creates a new deposit withdrawal manager. -func NewManager(cfg *ManagerConfig) *Manager { - return &Manager{ +func NewManager(cfg *ManagerConfig, currentHeight uint32) *Manager { + m := &Manager{ cfg: cfg, initChan: make(chan struct{}), finalizedWithdrawalTxns: make(map[chainhash.Hash]*wire.MsgTx), @@ -124,12 +124,13 @@ func NewManager(cfg *ManagerConfig) *Manager { newWithdrawalRequestChan: make(chan newWithdrawalRequest), errChan: make(chan error), } + m.initiationHeight.Store(currentHeight) + + return m } // Run runs the deposit withdrawal manager. -func (m *Manager) Run(ctx context.Context, currentHeight uint32) error { - m.initiationHeight.Store(currentHeight) - +func (m *Manager) Run(ctx context.Context) error { newBlockChan, newBlockErrChan, err := m.cfg.ChainNotifier.RegisterBlockEpochNtfn(ctx) From 3abd2d2235dcfdff50369f0da5e63b86afa34ed3 Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Wed, 26 Feb 2025 11:30:29 -0300 Subject: [PATCH 7/8] make: add targets unit-race and unit-postgres-race --- Makefile | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 552ce0c57..25febbb5c 100644 --- a/Makefile +++ b/Makefile @@ -95,14 +95,22 @@ clean: # TESTING # ======= -unit: +unit: @$(call print, "Running unit tests.") $(UNIT) +unit-race: + @$(call print, "Running unit race tests.") + $(UNIT) -race + unit-postgres: @$(call print, "Running unit tests with postgres.") $(UNIT) -tags=test_db_postgres +unit-postgres-race: + @$(call print, "Running unit race tests with postgres.") + $(UNIT) -race -tags=test_db_postgres + # ========= # UTILITIES # ========= From 2f2c5a34e720f9ab4f99ff3b6361c3da4026e71b Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Wed, 26 Feb 2025 11:31:43 -0300 Subject: [PATCH 8/8] ci: run unit tests with -race --- .github/workflows/main.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 6ffe7c574..d84591cef 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -101,7 +101,7 @@ jobs: go-version: '~${{ env.GO_VERSION }}' - name: run unit tests - run: make unit + run: make unit-race - name: run unit test with postgres - run: make unit-postgres + run: make unit-postgres-race