diff --git a/loopd/daemon.go b/loopd/daemon.go index 3b50a9130..9feca79e3 100644 --- a/loopd/daemon.go +++ b/loopd/daemon.go @@ -774,6 +774,8 @@ func (d *Daemon) initialize(withMacaroonService bool) error { infof("Liquidity manager stopped") }() + initManagerTimeout := 10 * time.Second + // Start the reservation manager. if d.reservationManager != nil { d.wg.Add(1) @@ -792,9 +794,11 @@ func (d *Daemon) initialize(withMacaroonService bool) error { } }() - // Wait for the reservation server to be ready before starting the - // grpc server. - timeOutCtx, cancel := context.WithTimeout(d.mainCtx, 10*time.Second) + // Wait for the reservation server to be ready before starting + // the grpc server. + timeOutCtx, cancel := context.WithTimeout( + d.mainCtx, initManagerTimeout, + ) select { case <-timeOutCtx.Done(): cancel() @@ -822,9 +826,11 @@ func (d *Daemon) initialize(withMacaroonService bool) error { } }() - // Wait for the instantout server to be ready before starting the - // grpc server. - timeOutCtx, cancel := context.WithTimeout(d.mainCtx, 10*time.Second) + // Wait for the instantout server to be ready before starting + // the grpc server. + timeOutCtx, cancel := context.WithTimeout( + d.mainCtx, initManagerTimeout, + ) select { case <-timeOutCtx.Done(): cancel() @@ -839,67 +845,128 @@ func (d *Daemon) initialize(withMacaroonService bool) error { // Start the static address manager. if staticAddressManager != nil { d.wg.Add(1) + initChan := make(chan struct{}) go func() { defer d.wg.Done() infof("Starting static address manager...") - err := staticAddressManager.Run(d.mainCtx) + defer infof("Static address manager stopped") + + err := staticAddressManager.Run(d.mainCtx, initChan) if err != nil && !errors.Is(context.Canceled, err) { d.internalErrChan <- err } - infof("Static address manager stopped") }() + + // Wait for the static address manager to be ready before + // starting the grpc server. + timeOutCtx, cancel := context.WithTimeout( + d.mainCtx, initManagerTimeout, + ) + select { + case <-timeOutCtx.Done(): + cancel() + return fmt.Errorf("static address manager not "+ + "ready: %v", timeOutCtx.Err()) + + case <-initChan: + cancel() + } } // Start the static address deposit manager. if depositManager != nil { d.wg.Add(1) + initChan := make(chan struct{}) go func() { defer d.wg.Done() infof("Starting static address deposit manager...") - err := depositManager.Run(d.mainCtx) + defer infof("Static address deposit manager stopped") + + err := depositManager.Run(d.mainCtx, initChan) if err != nil && !errors.Is(context.Canceled, err) { d.internalErrChan <- err } - infof("Static address deposit manager stopped") }() - depositManager.WaitInitComplete() + + // Wait for the static address manager to be ready before + // starting the grpc server. + timeOutCtx, cancel := context.WithTimeout( + d.mainCtx, initManagerTimeout, + ) + select { + case <-timeOutCtx.Done(): + cancel() + return fmt.Errorf("static address deposit manager "+ + "not ready: %v", timeOutCtx.Err()) + + case <-initChan: + cancel() + } } // Start the static address deposit withdrawal manager. if withdrawalManager != nil { d.wg.Add(1) + initChan := make(chan struct{}) go func() { defer d.wg.Done() - infof("Starting static address deposit withdrawal " + - "manager...") - err := withdrawalManager.Run(d.mainCtx) + infof("Starting static address withdrawal manager...") + defer infof("Static address withdrawal manager stopped") + + err := withdrawalManager.Run(d.mainCtx, initChan) if err != nil && !errors.Is(context.Canceled, err) { d.internalErrChan <- err } - infof("Static address deposit withdrawal manager " + - "stopped") }() - withdrawalManager.WaitInitComplete() + + // Wait for the static address withdrawal manager to be ready + // before starting the grpc server. + timeOutCtx, cancel := context.WithTimeout( + d.mainCtx, initManagerTimeout, + ) + select { + case <-timeOutCtx.Done(): + cancel() + return fmt.Errorf("static address withdrawal manager "+ + "server not ready: %v", timeOutCtx.Err()) + + case <-initChan: + cancel() + } } // Start the static address loop-in manager. if staticLoopInManager != nil { d.wg.Add(1) + initChan := make(chan struct{}) go func() { defer d.wg.Done() infof("Starting static address loop-in manager...") - err := staticLoopInManager.Run(d.mainCtx) + defer infof("Static address loop-in manager stopped") + err := staticLoopInManager.Run(d.mainCtx, initChan) if err != nil && !errors.Is(context.Canceled, err) { d.internalErrChan <- err } - infof("Starting static address loop-in manager " + - "stopped") }() - staticLoopInManager.WaitInitComplete() + + // Wait for the static address loop-in manager to be ready before + // starting the grpc server. + timeOutCtx, cancel := context.WithTimeout( + d.mainCtx, initManagerTimeout, + ) + select { + case <-timeOutCtx.Done(): + cancel() + return fmt.Errorf("static address loop-in manager "+ + "not ready: %v", timeOutCtx.Err()) + + case <-initChan: + cancel() + } } // Last, start our internal error handler. This will return exactly one diff --git a/staticaddr/address/manager.go b/staticaddr/address/manager.go index 578904501..122c87420 100644 --- a/staticaddr/address/manager.go +++ b/staticaddr/address/manager.go @@ -67,7 +67,7 @@ func NewManager(cfg *ManagerConfig, currentHeight int32) *Manager { } // Run runs the address manager. -func (m *Manager) Run(ctx context.Context) error { +func (m *Manager) Run(ctx context.Context, initChan chan struct{}) error { newBlockChan, newBlockErrChan, err := m.cfg.ChainNotifier.RegisterBlockEpochNtfn(ctx) @@ -75,6 +75,10 @@ func (m *Manager) Run(ctx context.Context) error { return err } + // Communicate to the caller that the address manager has completed its + // initialization. + close(initChan) + for { select { case currentHeight := <-newBlockChan: diff --git a/staticaddr/address/manager_test.go b/staticaddr/address/manager_test.go index f1eb0adaf..017ad4c85 100644 --- a/staticaddr/address/manager_test.go +++ b/staticaddr/address/manager_test.go @@ -95,11 +95,14 @@ func TestManager(t *testing.T) { testContext := NewAddressManagerTestContext(t) // Start the manager. + initChan := make(chan struct{}) go func() { - err := testContext.manager.Run(ctxb) + err := testContext.manager.Run(ctxb, initChan) require.ErrorIs(t, err, context.Canceled) }() + <-initChan + // Create the expected static address. expectedAddress, err := GenerateExpectedTaprootAddress(testContext) require.NoError(t, err) diff --git a/staticaddr/deposit/manager.go b/staticaddr/deposit/manager.go index 717e447a1..4b39690d9 100644 --- a/staticaddr/deposit/manager.go +++ b/staticaddr/deposit/manager.go @@ -77,10 +77,6 @@ type Manager struct { // mu guards access to activeDeposits map. mu sync.Mutex - // initChan signals the daemon that the address manager has completed - // its initialization. - initChan chan struct{} - // activeDeposits contains all the active static address outputs. activeDeposits map[wire.OutPoint]*FSM @@ -100,7 +96,6 @@ type Manager struct { func NewManager(cfg *ManagerConfig) *Manager { return &Manager{ cfg: cfg, - initChan: make(chan struct{}), activeDeposits: make(map[wire.OutPoint]*FSM), deposits: make(map[wire.OutPoint]*Deposit), finalizedDepositChan: make(chan wire.OutPoint), @@ -108,7 +103,7 @@ func NewManager(cfg *ManagerConfig) *Manager { } // Run runs the address manager. -func (m *Manager) Run(ctx context.Context) error { +func (m *Manager) Run(ctx context.Context, initChan chan struct{}) error { newBlockChan, newBlockErrChan, err := m.cfg.ChainNotifier.RegisterBlockEpochNtfn(ctx) //nolint:lll if err != nil { return err @@ -125,7 +120,7 @@ func (m *Manager) Run(ctx context.Context) error { // Communicate to the caller that the address manager has completed its // initialization. - close(m.initChan) + close(initChan) for { select { @@ -209,12 +204,6 @@ func (m *Manager) recoverDeposits(ctx context.Context) error { return nil } -// WaitInitComplete waits until the address manager has completed its setup. -func (m *Manager) WaitInitComplete() { - defer log.Debugf("Static address deposit manager initiation complete.") - <-m.initChan -} - // pollDeposits polls new deposits to our static address and notifies the // manager's event loop about them. func (m *Manager) pollDeposits(ctx context.Context) { diff --git a/staticaddr/deposit/manager_test.go b/staticaddr/deposit/manager_test.go index dc67393b0..007414c2c 100644 --- a/staticaddr/deposit/manager_test.go +++ b/staticaddr/deposit/manager_test.go @@ -42,8 +42,6 @@ var ( blockErrChan = make(chan error) - initChan = make(chan struct{}) - finalizedDepositChan = make(chan wire.OutPoint) ) @@ -218,8 +216,9 @@ func TestManager(t *testing.T) { testContext := newManagerTestContext(t) // Start the deposit manager. + initChan := make(chan struct{}) go func() { - require.NoError(t, testContext.manager.Run(ctx)) + require.NoError(t, testContext.manager.Run(ctx, initChan)) }() // Ensure that the manager has been initialized. @@ -337,7 +336,6 @@ func newManagerTestContext(t *testing.T) *ManagerTestContext { } manager := NewManager(cfg) - manager.initChan = initChan manager.finalizedDepositChan = finalizedDepositChan testContext := &ManagerTestContext{ diff --git a/staticaddr/loopin/manager.go b/staticaddr/loopin/manager.go index 1eb8b156f..106f2b3cf 100644 --- a/staticaddr/loopin/manager.go +++ b/staticaddr/loopin/manager.go @@ -116,10 +116,6 @@ type newSwapResponse struct { type Manager struct { cfg *Config - // initChan signals the daemon that the address manager has completed - // its initialization. - initChan chan struct{} - // newLoopInChan receives swap requests from the server and initiates // loop-in swaps. newLoopInChan chan *newSwapRequest @@ -141,7 +137,6 @@ type Manager struct { func NewManager(cfg *Config, currentHeight uint32) *Manager { m := &Manager{ cfg: cfg, - initChan: make(chan struct{}), newLoopInChan: make(chan *newSwapRequest), exitChan: make(chan struct{}), errChan: make(chan error), @@ -153,7 +148,7 @@ func NewManager(cfg *Config, currentHeight uint32) *Manager { } // Run runs the static address loop-in manager. -func (m *Manager) Run(ctx context.Context) error { +func (m *Manager) Run(ctx context.Context, initChan chan struct{}) error { registerBlockNtfn := m.cfg.ChainNotifier.RegisterBlockEpochNtfn newBlockChan, newBlockErrChan, err := registerBlockNtfn(ctx) if err != nil { @@ -175,7 +170,7 @@ func (m *Manager) Run(ctx context.Context) error { // Communicate to the caller that the address manager has completed its // initialization. - close(m.initChan) + close(initChan) var loopIn *StaticAddressLoopIn for { @@ -493,13 +488,6 @@ func (m *Manager) recoverLoopIns(ctx context.Context) error { return nil } -// WaitInitComplete waits until the static address loop-in manager has completed -// its setup. -func (m *Manager) WaitInitComplete() { - defer log.Debugf("Static address loop-in manager initiation complete.") - <-m.initChan -} - // DeliverLoopInRequest forwards a loop-in request from the server to the // manager run loop to initiate a new loop-in swap. func (m *Manager) DeliverLoopInRequest(ctx context.Context, diff --git a/staticaddr/withdraw/manager.go b/staticaddr/withdraw/manager.go index f8ff8a855..0cd4adade 100644 --- a/staticaddr/withdraw/manager.go +++ b/staticaddr/withdraw/manager.go @@ -113,10 +113,6 @@ type Manager struct { // mu protects access to finalizedWithdrawalTxns. mu sync.Mutex - // initChan signals the daemon that the withdrawal manager has completed - // its initialization. - initChan chan struct{} - // newWithdrawalRequestChan receives a list of outpoints that should be // withdrawn. The request is forwarded to the managers main loop. newWithdrawalRequestChan chan newWithdrawalRequest @@ -139,7 +135,6 @@ type Manager struct { func NewManager(cfg *ManagerConfig, currentHeight uint32) *Manager { m := &Manager{ cfg: cfg, - initChan: make(chan struct{}), finalizedWithdrawalTxns: make(map[chainhash.Hash]*wire.MsgTx), exitChan: make(chan struct{}), newWithdrawalRequestChan: make(chan newWithdrawalRequest), @@ -151,7 +146,7 @@ func NewManager(cfg *ManagerConfig, currentHeight uint32) *Manager { } // Run runs the deposit withdrawal manager. -func (m *Manager) Run(ctx context.Context) error { +func (m *Manager) Run(ctx context.Context, initChan chan struct{}) error { newBlockChan, newBlockErrChan, err := m.cfg.ChainNotifier.RegisterBlockEpochNtfn(ctx) @@ -166,7 +161,7 @@ func (m *Manager) Run(ctx context.Context) error { // Communicate to the caller that the address manager has completed its // initialization. - close(m.initChan) + close(initChan) for { select { @@ -274,14 +269,6 @@ func (m *Manager) recoverWithdrawals(ctx context.Context) error { return nil } -// WaitInitComplete waits until the address manager has completed its setup. -func (m *Manager) WaitInitComplete() { - defer log.Debugf("Static address withdrawal manager initiation " + - "complete.") - - <-m.initChan -} - // WithdrawDeposits starts a deposits withdrawal flow. If the amount is set to 0 // the full amount of the selected deposits will be withdrawn. func (m *Manager) WithdrawDeposits(ctx context.Context,