Skip to content

Commit 2335031

Browse files
authored
Merge pull request #693 from lightninglabs/caretaker_stop_fixes
tapgarden: fix races and deadlocks in caretaker
2 parents 6b39ff8 + 05b82da commit 2335031

File tree

7 files changed

+476
-114
lines changed

7 files changed

+476
-114
lines changed

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ COMMIT := $(shell git describe --tags --dirty)
1515

1616
GOBUILD := GOEXPERIMENT=loopvar GO111MODULE=on go build -v
1717
GOINSTALL := GOEXPERIMENT=loopvar GO111MODULE=on go install -v
18-
GOTEST := GOEXPERIMENT=loopvar GO111MODULE=on go test
18+
GOTEST := GOEXPERIMENT=loopvar GO111MODULE=on go test
1919
GOMOD := GO111MODULE=on go mod
2020

2121
GOLIST := go list -deps $(PKG)/... | grep '$(PKG)'
@@ -168,7 +168,7 @@ unit-cover: $(GOACC_BIN)
168168

169169
unit-race:
170170
@$(call print, "Running unit race tests.")
171-
env CGO_ENABLED=1 GORACE="history_size=7 halt_on_errors=1" $(GOLIST) | $(XARGS) env $(GOTEST) -race -test.timeout=20m
171+
env CGO_ENABLED=1 GORACE="history_size=7 halt_on_errors=1" $(UNIT_RACE)
172172

173173
itest: build-itest itest-only
174174

fn/func.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,3 +172,20 @@ func First[T any](xs []*T, pred func(*T) bool) (*T, error) {
172172

173173
return nil, fmt.Errorf("no item found")
174174
}
175+
176+
// Last returns the last item in the slice that matches the predicate, or an
177+
// error if none matches.
178+
func Last[T any](xs []*T, pred func(*T) bool) (*T, error) {
179+
var matches []*T
180+
for i := range xs {
181+
if pred(xs[i]) {
182+
matches = append(matches, xs[i])
183+
}
184+
}
185+
186+
if len(matches) == 0 {
187+
return nil, fmt.Errorf("no item found")
188+
}
189+
190+
return matches[len(matches)-1], nil
191+
}

make/testing_flags.mk

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ LOG_TAGS =
55
TEST_FLAGS =
66
ITEST_FLAGS = -logoutput
77
COVER_PKG = $$(go list -deps -tags="$(DEV_TAGS)" ./... | grep '$(PKG)' | grep -v lnrpc)
8+
RACE_PKG = go list -deps -tags="$(DEV_TAGS)" ./... | grep '$(PKG)'
89
COVER_HTML = go tool cover -html=coverage.txt -o coverage.html
910
POSTGRES_START_DELAY = 5
1011

@@ -19,6 +20,7 @@ ifneq ($(pkg),)
1920
UNITPKG := $(PKG)/$(pkg)
2021
UNIT_TARGETED = yes
2122
COVER_PKG = $(PKG)/$(pkg)
23+
RACE_PKG = $(PKG)/$(pkg)
2224
endif
2325

2426
# If a specific unit test case is being target, construct test.run filter.
@@ -78,7 +80,7 @@ TEST_FLAGS += -test.timeout=$(timeout)
7880
else ifneq ($(optional),)
7981
TEST_FLAGS += -test.timeout=240m
8082
else
81-
TEST_FLAGS += -test.timeout=60m
83+
TEST_FLAGS += -test.timeout=20m
8284
endif
8385

8486
GOLIST := go list -tags="$(DEV_TAGS)" -deps $(PKG)/... | grep '$(PKG)'| grep -v '/vendor/'

tapgarden/caretaker.go

Lines changed: 122 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -166,62 +166,82 @@ func (b *BatchCaretaker) Start() error {
166166
func (b *BatchCaretaker) Stop() error {
167167
var stopErr error
168168
b.stopOnce.Do(func() {
169+
log.Infof("BatchCaretaker(%x): Stopping", b.batchKey[:])
170+
169171
close(b.Quit)
170172
b.Wg.Wait()
171173
})
172174

173175
return stopErr
174176
}
175177

176-
// Cancel signals for a batch caretaker to stop advancing a batch if possible.
177-
// A batch can only be cancelled if it has not reached BatchStateBroadcast yet.
178-
func (b *BatchCaretaker) Cancel() CancelResp {
178+
// Cancel signals for a batch caretaker to stop advancing a batch. A batch can
179+
// only be cancelled if it has not reached BatchStateBroadcast yet. If
180+
// cancellation succeeds, we forward the batch state after cancellation. If the
181+
// batch could not be cancelled, the planter will handle caretaker shutdown and
182+
// batch state.
183+
func (b *BatchCaretaker) Cancel() error {
179184
ctx, cancel := b.WithCtxQuit()
180185
defer cancel()
181186

182-
batchKey := b.cfg.Batch.BatchKey.PubKey.SerializeCompressed()
187+
batchKey := b.batchKey[:]
183188
batchState := b.cfg.Batch.State()
189+
var cancelResp CancelResp
190+
184191
// This function can only be called before the caretaker state stepping
185192
// function, so the batch state read is the next state that has not yet
186193
// been executed. Seedlings are converted to asset sprouts in the Frozen
187194
// state, and broadcast in the Broadast state.
195+
log.Debugf("BatchCaretaker(%x): Trying to cancel", batchKey)
188196
switch batchState {
189197
// In the pending state, the batch seedlings have not sprouted yet.
190198
case BatchStatePending, BatchStateFrozen:
191-
finalBatchState := BatchStateSeedlingCancelled
192199
err := b.cfg.Log.UpdateBatchState(
193200
ctx, b.cfg.Batch.BatchKey.PubKey,
194-
finalBatchState,
201+
BatchStateSeedlingCancelled,
195202
)
196203
if err != nil {
197204
err = fmt.Errorf("BatchCaretaker(%x), batch state(%v), "+
198205
"cancel failed: %w", batchKey, batchState, err)
199206
}
200207

201-
b.cfg.BroadcastErrChan <- fmt.Errorf("caretaker canceled")
202-
203-
return CancelResp{&finalBatchState, err}
208+
cancelResp = CancelResp{true, err}
204209

205210
case BatchStateCommitted:
206-
finalBatchState := BatchStateSproutCancelled
207211
err := b.cfg.Log.UpdateBatchState(
208212
ctx, b.cfg.Batch.BatchKey.PubKey,
209-
finalBatchState,
213+
BatchStateSproutCancelled,
210214
)
211215
if err != nil {
212216
err = fmt.Errorf("BatchCaretaker(%x), batch state(%v), "+
213217
"cancel failed: %w", batchKey, batchState, err)
214218
}
215219

216-
b.cfg.BroadcastErrChan <- fmt.Errorf("caretaker canceled")
217-
218-
return CancelResp{&finalBatchState, err}
220+
cancelResp = CancelResp{true, err}
219221

220222
default:
221223
err := fmt.Errorf("BatchCaretaker(%x), batch not cancellable",
222224
b.cfg.Batch.BatchKey.PubKey.SerializeCompressed())
223-
return CancelResp{nil, err}
225+
cancelResp = CancelResp{false, err}
226+
}
227+
228+
b.cfg.CancelRespChan <- cancelResp
229+
230+
// If the batch was cancellable, the final write of the cancelled batch
231+
// may still have failed. That error will be handled by the planter. At
232+
// this point, the caretaker should shut down gracefully if cancellation
233+
// was attempted.
234+
if cancelResp.cancelAttempted {
235+
log.Infof("BatchCaretaker(%x), attempted batch cancellation, "+
236+
"shutting down", b.batchKey[:])
237+
238+
return nil
224239
}
240+
241+
// If the cancellation failed, that error will be handled by the
242+
// planter.
243+
return fmt.Errorf("BatchCaretaker(%x) cancellation failed",
244+
b.batchKey[:])
225245
}
226246

227247
// advanceStateUntil attempts to advance the internal state machine until the
@@ -241,22 +261,20 @@ func (b *BatchCaretaker) advanceStateUntil(currentState,
241261
return 0, fmt.Errorf("BatchCaretaker(%x), shutting "+
242262
"down", b.batchKey[:])
243263

264+
// If the batch was cancellable, the finalState of the cancel
265+
// response will be non-nil. If the cancellation failed, that
266+
// error will be handled by the planter. At this point, the
267+
// caretaker should always shut down gracefully.
244268
case <-b.cfg.CancelReqChan:
245-
cancelResp := b.Cancel()
246-
b.cfg.CancelRespChan <- cancelResp
247-
248-
// TODO(jhb): Use concrete error types for caretaker
249-
// shutdown cases
250-
// If the batch was cancellable, the finalState of the
251-
// cancel response will be non-nil. If the cancellation
252-
// failed, that error will be handled by the planter.
253-
// At this point, the caretaker should always shut down
254-
// gracefully.
255-
if cancelResp.finalState != nil {
269+
cancelErr := b.Cancel()
270+
if cancelErr == nil {
256271
return 0, fmt.Errorf("BatchCaretaker(%x), "+
257272
"attempted batch cancellation, "+
258273
"shutting down", b.batchKey[:])
259274
}
275+
276+
log.Info(cancelErr)
277+
260278
default:
261279
}
262280

@@ -313,7 +331,7 @@ func (b *BatchCaretaker) assetCultivator() {
313331
currentBatchState, BatchStateBroadcast,
314332
)
315333
if err != nil {
316-
log.Errorf("unable to advance state machine: %v", err)
334+
log.Errorf("Unable to advance state machine: %v", err)
317335
b.cfg.BroadcastErrChan <- err
318336
return
319337
}
@@ -360,7 +378,12 @@ func (b *BatchCaretaker) assetCultivator() {
360378
return
361379

362380
case <-b.cfg.CancelReqChan:
363-
b.cfg.CancelRespChan <- b.Cancel()
381+
cancelErr := b.Cancel()
382+
if cancelErr == nil {
383+
return
384+
}
385+
386+
log.Error(cancelErr)
364387

365388
case <-b.Quit:
366389
return
@@ -740,7 +763,7 @@ func (b *BatchCaretaker) stateStep(currentState BatchState) (BatchState, error)
740763
b.cfg.Batch.GenesisPacket.ChainFees = chainFees
741764

742765
log.Infof("BatchCaretaker(%x): GenesisPacket absolute fee: "+
743-
"%d sats", chainFees)
766+
"%d sats", b.batchKey[:], chainFees)
744767
log.Infof("BatchCaretaker(%x): GenesisPacket finalized",
745768
b.batchKey[:])
746769
log.Tracef("GenesisPacket: %v", spew.Sdump(signedPkt))
@@ -848,48 +871,85 @@ func (b *BatchCaretaker) stateStep(currentState BatchState) (BatchState, error)
848871
defer confCancel()
849872
defer b.Wg.Done()
850873

851-
var confEvent *chainntnfs.TxConfirmation
852-
select {
853-
case confEvent = <-confNtfn.Confirmed:
854-
log.Debugf("Got chain confirmation: %v",
855-
confEvent.Tx.TxHash())
856-
857-
case err := <-errChan:
858-
b.cfg.ErrChan <- fmt.Errorf("error getting "+
859-
"confirmation: %w", err)
860-
return
861-
862-
case <-confCtx.Done():
863-
log.Debugf("Skipping TX confirmation, context " +
864-
"done")
865-
866-
case <-b.cfg.CancelReqChan:
867-
b.cfg.CancelRespChan <- b.Cancel()
874+
var (
875+
confEvent *chainntnfs.TxConfirmation
876+
confRecv bool
877+
)
868878

869-
case <-b.Quit:
870-
log.Debugf("Skipping TX confirmation, exiting")
871-
return
879+
for !confRecv {
880+
select {
881+
case confEvent = <-confNtfn.Confirmed:
882+
confRecv = true
883+
884+
case err := <-errChan:
885+
confErr := fmt.Errorf("error getting "+
886+
"confirmation: %w", err)
887+
log.Info(confErr)
888+
b.cfg.ErrChan <- confErr
889+
890+
return
891+
892+
case <-confCtx.Done():
893+
log.Debugf("Skipping TX confirmation, " +
894+
"context done")
895+
confRecv = true
896+
897+
case <-b.cfg.CancelReqChan:
898+
cancelErr := b.Cancel()
899+
if cancelErr == nil {
900+
return
901+
}
902+
903+
// Cancellation failed, continue to wait
904+
// for transaction confirmation.
905+
log.Info(cancelErr)
906+
907+
case <-b.Quit:
908+
log.Debugf("Skipping TX confirmation, " +
909+
"exiting")
910+
return
911+
}
872912
}
873913

874914
if confEvent == nil {
875-
b.cfg.ErrChan <- fmt.Errorf("got empty " +
915+
confErr := fmt.Errorf("got empty " +
876916
"confirmation event in batch")
917+
log.Info(confErr)
918+
b.cfg.ErrChan <- confErr
919+
877920
return
878921
}
879922

880-
select {
881-
case b.confEvent <- confEvent:
882-
883-
case <-confCtx.Done():
884-
log.Debugf("Skipping TX confirmation, context " +
885-
"done")
886-
887-
case <-b.cfg.CancelReqChan:
888-
b.cfg.CancelRespChan <- b.Cancel()
923+
if confEvent.Tx != nil {
924+
log.Debugf("Got chain confirmation: %v",
925+
confEvent.Tx.TxHash())
926+
}
889927

890-
case <-b.Quit:
891-
log.Debugf("Skipping TX confirmation, exiting")
892-
return
928+
for {
929+
select {
930+
case b.confEvent <- confEvent:
931+
return
932+
933+
case <-confCtx.Done():
934+
log.Debugf("Skipping TX confirmation, " +
935+
"context done")
936+
return
937+
938+
case <-b.cfg.CancelReqChan:
939+
cancelErr := b.Cancel()
940+
if cancelErr == nil {
941+
return
942+
}
943+
944+
// Cancellation failed, continue to try
945+
// and send the confirmation event.
946+
log.Info(cancelErr)
947+
948+
case <-b.Quit:
949+
log.Debugf("Skipping TX confirmation, " +
950+
"exiting")
951+
return
952+
}
893953
}
894954
}()
895955

0 commit comments

Comments
 (0)