Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 88 additions & 21 deletions loopd/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,8 @@ func (d *Daemon) initialize(withMacaroonService bool) error {
infof("Liquidity manager stopped")
}()

initManagerTimeout := 10 * time.Second

// Start the reservation manager.
if d.reservationManager != nil {
d.wg.Add(1)
Expand All @@ -792,9 +794,11 @@ func (d *Daemon) initialize(withMacaroonService bool) error {
}
}()

// Wait for the reservation server to be ready before starting the
// grpc server.
timeOutCtx, cancel := context.WithTimeout(d.mainCtx, 10*time.Second)
// Wait for the reservation server to be ready before starting
// the grpc server.
timeOutCtx, cancel := context.WithTimeout(
d.mainCtx, initManagerTimeout,
)
select {
case <-timeOutCtx.Done():
cancel()
Expand Down Expand Up @@ -822,9 +826,11 @@ func (d *Daemon) initialize(withMacaroonService bool) error {
}
}()

// Wait for the instantout server to be ready before starting the
// grpc server.
timeOutCtx, cancel := context.WithTimeout(d.mainCtx, 10*time.Second)
// Wait for the instantout server to be ready before starting
// the grpc server.
timeOutCtx, cancel := context.WithTimeout(
d.mainCtx, initManagerTimeout,
)
select {
case <-timeOutCtx.Done():
cancel()
Expand All @@ -839,67 +845,128 @@ func (d *Daemon) initialize(withMacaroonService bool) error {
// Start the static address manager.
if staticAddressManager != nil {
d.wg.Add(1)
initChan := make(chan struct{})
go func() {
defer d.wg.Done()

infof("Starting static address manager...")
err := staticAddressManager.Run(d.mainCtx)
defer infof("Static address manager stopped")

err := staticAddressManager.Run(d.mainCtx, initChan)
if err != nil && !errors.Is(context.Canceled, err) {
d.internalErrChan <- err
}
infof("Static address manager stopped")
}()

// Wait for the static address manager to be ready before
// starting the grpc server.
timeOutCtx, cancel := context.WithTimeout(
d.mainCtx, initManagerTimeout,
)
select {
case <-timeOutCtx.Done():
cancel()
return fmt.Errorf("static address manager not "+
"ready: %v", timeOutCtx.Err())

case <-initChan:
cancel()
}
}

// Start the static address deposit manager.
if depositManager != nil {
d.wg.Add(1)
initChan := make(chan struct{})
go func() {
defer d.wg.Done()

infof("Starting static address deposit manager...")
err := depositManager.Run(d.mainCtx)
defer infof("Static address deposit manager stopped")

err := depositManager.Run(d.mainCtx, initChan)
if err != nil && !errors.Is(context.Canceled, err) {
d.internalErrChan <- err
}
infof("Static address deposit manager stopped")
}()
depositManager.WaitInitComplete()

// Wait for the static address manager to be ready before
// starting the grpc server.
timeOutCtx, cancel := context.WithTimeout(
d.mainCtx, initManagerTimeout,
)
select {
case <-timeOutCtx.Done():
cancel()
return fmt.Errorf("static address deposit manager "+
"not ready: %v", timeOutCtx.Err())

case <-initChan:
cancel()
}
}

// Start the static address deposit withdrawal manager.
if withdrawalManager != nil {
d.wg.Add(1)
initChan := make(chan struct{})
go func() {
defer d.wg.Done()

infof("Starting static address deposit withdrawal " +
"manager...")
err := withdrawalManager.Run(d.mainCtx)
infof("Starting static address withdrawal manager...")
defer infof("Static address withdrawal manager stopped")

err := withdrawalManager.Run(d.mainCtx, initChan)
if err != nil && !errors.Is(context.Canceled, err) {
d.internalErrChan <- err
}
infof("Static address deposit withdrawal manager " +
"stopped")
}()
withdrawalManager.WaitInitComplete()

// Wait for the static address withdrawal manager to be ready
// before starting the grpc server.
timeOutCtx, cancel := context.WithTimeout(
d.mainCtx, initManagerTimeout,
)
select {
case <-timeOutCtx.Done():
cancel()
return fmt.Errorf("static address withdrawal manager "+
"server not ready: %v", timeOutCtx.Err())

case <-initChan:
cancel()
}
}

// Start the static address loop-in manager.
if staticLoopInManager != nil {
d.wg.Add(1)
initChan := make(chan struct{})
go func() {
defer d.wg.Done()

infof("Starting static address loop-in manager...")
err := staticLoopInManager.Run(d.mainCtx)
defer infof("Static address loop-in manager stopped")
err := staticLoopInManager.Run(d.mainCtx, initChan)
if err != nil && !errors.Is(context.Canceled, err) {
d.internalErrChan <- err
}
infof("Starting static address loop-in manager " +
"stopped")
}()
staticLoopInManager.WaitInitComplete()

// Wait for the static address loop-in manager to be ready before
// starting the grpc server.
timeOutCtx, cancel := context.WithTimeout(
d.mainCtx, initManagerTimeout,
)
select {
case <-timeOutCtx.Done():
cancel()
return fmt.Errorf("static address loop-in manager "+
"not ready: %v", timeOutCtx.Err())

case <-initChan:
cancel()
}
}

// Last, start our internal error handler. This will return exactly one
Expand Down
6 changes: 5 additions & 1 deletion staticaddr/address/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,18 @@ func NewManager(cfg *ManagerConfig, currentHeight int32) *Manager {
}

// Run runs the address manager.
func (m *Manager) Run(ctx context.Context) error {
func (m *Manager) Run(ctx context.Context, initChan chan struct{}) error {
newBlockChan, newBlockErrChan, err :=
m.cfg.ChainNotifier.RegisterBlockEpochNtfn(ctx)

if err != nil {
return err
}

// Communicate to the caller that the address manager has completed its
// initialization.
close(initChan)

for {
select {
case currentHeight := <-newBlockChan:
Expand Down
5 changes: 4 additions & 1 deletion staticaddr/address/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,14 @@ func TestManager(t *testing.T) {
testContext := NewAddressManagerTestContext(t)

// Start the manager.
initChan := make(chan struct{})
go func() {
err := testContext.manager.Run(ctxb)
err := testContext.manager.Run(ctxb, initChan)
require.ErrorIs(t, err, context.Canceled)
}()

<-initChan

// Create the expected static address.
expectedAddress, err := GenerateExpectedTaprootAddress(testContext)
require.NoError(t, err)
Expand Down
15 changes: 2 additions & 13 deletions staticaddr/deposit/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,6 @@ type Manager struct {
// mu guards access to activeDeposits map.
mu sync.Mutex

// initChan signals the daemon that the address manager has completed
// its initialization.
initChan chan struct{}

// activeDeposits contains all the active static address outputs.
activeDeposits map[wire.OutPoint]*FSM

Expand All @@ -100,15 +96,14 @@ type Manager struct {
func NewManager(cfg *ManagerConfig) *Manager {
return &Manager{
cfg: cfg,
initChan: make(chan struct{}),
activeDeposits: make(map[wire.OutPoint]*FSM),
deposits: make(map[wire.OutPoint]*Deposit),
finalizedDepositChan: make(chan wire.OutPoint),
}
}

// Run runs the address manager.
func (m *Manager) Run(ctx context.Context) error {
func (m *Manager) Run(ctx context.Context, initChan chan struct{}) error {
newBlockChan, newBlockErrChan, err := m.cfg.ChainNotifier.RegisterBlockEpochNtfn(ctx) //nolint:lll
if err != nil {
return err
Expand All @@ -125,7 +120,7 @@ func (m *Manager) Run(ctx context.Context) error {

// Communicate to the caller that the address manager has completed its
// initialization.
close(m.initChan)
close(initChan)

for {
select {
Expand Down Expand Up @@ -209,12 +204,6 @@ func (m *Manager) recoverDeposits(ctx context.Context) error {
return nil
}

// WaitInitComplete waits until the address manager has completed its setup.
func (m *Manager) WaitInitComplete() {
defer log.Debugf("Static address deposit manager initiation complete.")
<-m.initChan
}

// pollDeposits polls new deposits to our static address and notifies the
// manager's event loop about them.
func (m *Manager) pollDeposits(ctx context.Context) {
Expand Down
6 changes: 2 additions & 4 deletions staticaddr/deposit/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ var (

blockErrChan = make(chan error)

initChan = make(chan struct{})

finalizedDepositChan = make(chan wire.OutPoint)
)

Expand Down Expand Up @@ -218,8 +216,9 @@ func TestManager(t *testing.T) {
testContext := newManagerTestContext(t)

// Start the deposit manager.
initChan := make(chan struct{})
go func() {
require.NoError(t, testContext.manager.Run(ctx))
require.NoError(t, testContext.manager.Run(ctx, initChan))
}()

// Ensure that the manager has been initialized.
Expand Down Expand Up @@ -337,7 +336,6 @@ func newManagerTestContext(t *testing.T) *ManagerTestContext {
}

manager := NewManager(cfg)
manager.initChan = initChan
manager.finalizedDepositChan = finalizedDepositChan

testContext := &ManagerTestContext{
Expand Down
16 changes: 2 additions & 14 deletions staticaddr/loopin/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,6 @@ type newSwapResponse struct {
type Manager struct {
cfg *Config

// initChan signals the daemon that the address manager has completed
// its initialization.
initChan chan struct{}

// newLoopInChan receives swap requests from the server and initiates
// loop-in swaps.
newLoopInChan chan *newSwapRequest
Expand All @@ -141,7 +137,6 @@ type Manager struct {
func NewManager(cfg *Config, currentHeight uint32) *Manager {
m := &Manager{
cfg: cfg,
initChan: make(chan struct{}),
newLoopInChan: make(chan *newSwapRequest),
exitChan: make(chan struct{}),
errChan: make(chan error),
Expand All @@ -153,7 +148,7 @@ func NewManager(cfg *Config, currentHeight uint32) *Manager {
}

// Run runs the static address loop-in manager.
func (m *Manager) Run(ctx context.Context) error {
func (m *Manager) Run(ctx context.Context, initChan chan struct{}) error {
registerBlockNtfn := m.cfg.ChainNotifier.RegisterBlockEpochNtfn
newBlockChan, newBlockErrChan, err := registerBlockNtfn(ctx)
if err != nil {
Expand All @@ -175,7 +170,7 @@ func (m *Manager) Run(ctx context.Context) error {

// Communicate to the caller that the address manager has completed its
// initialization.
close(m.initChan)
close(initChan)

var loopIn *StaticAddressLoopIn
for {
Expand Down Expand Up @@ -493,13 +488,6 @@ func (m *Manager) recoverLoopIns(ctx context.Context) error {
return nil
}

// WaitInitComplete waits until the static address loop-in manager has completed
// its setup.
func (m *Manager) WaitInitComplete() {
defer log.Debugf("Static address loop-in manager initiation complete.")
<-m.initChan
}

// DeliverLoopInRequest forwards a loop-in request from the server to the
// manager run loop to initiate a new loop-in swap.
func (m *Manager) DeliverLoopInRequest(ctx context.Context,
Expand Down
17 changes: 2 additions & 15 deletions staticaddr/withdraw/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,6 @@ type Manager struct {
// mu protects access to finalizedWithdrawalTxns.
mu sync.Mutex

// initChan signals the daemon that the withdrawal manager has completed
// its initialization.
initChan chan struct{}

// newWithdrawalRequestChan receives a list of outpoints that should be
// withdrawn. The request is forwarded to the managers main loop.
newWithdrawalRequestChan chan newWithdrawalRequest
Expand All @@ -139,7 +135,6 @@ type Manager struct {
func NewManager(cfg *ManagerConfig, currentHeight uint32) *Manager {
m := &Manager{
cfg: cfg,
initChan: make(chan struct{}),
finalizedWithdrawalTxns: make(map[chainhash.Hash]*wire.MsgTx),
exitChan: make(chan struct{}),
newWithdrawalRequestChan: make(chan newWithdrawalRequest),
Expand All @@ -151,7 +146,7 @@ func NewManager(cfg *ManagerConfig, currentHeight uint32) *Manager {
}

// Run runs the deposit withdrawal manager.
func (m *Manager) Run(ctx context.Context) error {
func (m *Manager) Run(ctx context.Context, initChan chan struct{}) error {
newBlockChan, newBlockErrChan, err :=
m.cfg.ChainNotifier.RegisterBlockEpochNtfn(ctx)

Expand All @@ -166,7 +161,7 @@ func (m *Manager) Run(ctx context.Context) error {

// Communicate to the caller that the address manager has completed its
// initialization.
close(m.initChan)
close(initChan)

for {
select {
Expand Down Expand Up @@ -274,14 +269,6 @@ func (m *Manager) recoverWithdrawals(ctx context.Context) error {
return nil
}

// WaitInitComplete waits until the address manager has completed its setup.
func (m *Manager) WaitInitComplete() {
defer log.Debugf("Static address withdrawal manager initiation " +
"complete.")

<-m.initChan
}

// WithdrawDeposits starts a deposits withdrawal flow. If the amount is set to 0
// the full amount of the selected deposits will be withdrawn.
func (m *Manager) WithdrawDeposits(ctx context.Context,
Expand Down