Skip to content

Commit fd2a5b0

Browse files
Merge pull request #4536 from OffchainLabs/pmikolajczyk/nit-4684-stop-waiter-api
Automatic child lifecycle management in StopWaiter
2 parents 72adaee + 729bb1a commit fd2a5b0

File tree

28 files changed

+485
-186
lines changed

28 files changed

+485
-186
lines changed

arbnode/batch_poster.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2073,8 +2073,8 @@ func (b *BatchPoster) GetBacklogEstimate() uint64 {
20732073

20742074
func (b *BatchPoster) Start(ctxIn context.Context) {
20752075
b.StopWaiter.Start(ctxIn, b)
2076-
b.dataPoster.Start(b.GetContext())
2077-
b.redisLock.Start(b.GetContext())
2076+
b.StartAndTrackChild(b.dataPoster)
2077+
b.StartAndTrackChild(b.redisLock)
20782078
b.LaunchThread(b.pollForReverts)
20792079
b.LaunchThread(b.pollForL1PriceData)
20802080
commonEphemeralErrorHandler := util.NewEphemeralErrorHandler(time.Minute, "", 0)
@@ -2157,12 +2157,6 @@ func (b *BatchPoster) Start(ctxIn context.Context) {
21572157
})
21582158
}
21592159

2160-
func (b *BatchPoster) StopAndWait() {
2161-
b.redisLock.StopAndWait()
2162-
b.dataPoster.StopAndWait()
2163-
b.StopWaiter.StopAndWait()
2164-
}
2165-
21662160
type BoolRing struct {
21672161
buffer []bool
21682162
bufferPosition int

arbnode/inbox_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,7 @@ func (w *execClientWrapper) ArbOSVersionForMessageIndex(msgIdx arbutil.MessageIn
115115
return w.ExecutionEngine.ArbOSVersionForMessageIndex(msgIdx)
116116
}
117117

118-
func (w *execClientWrapper) StopAndWait() {
119-
}
118+
func (w *execClientWrapper) StopAndWait() {}
120119

121120
func NewTransactionStreamerForTest(t *testing.T, ctx context.Context, ownerAddress common.Address) (*gethexec.ExecutionEngine, *TransactionStreamer, ethdb.Database, *core.BlockChain) {
122121
chainConfig := chaininfo.ArbitrumDevTestChainConfig()

blocks_reexecutor/blocks_reexecutor.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -349,10 +349,6 @@ func (s *BlocksReExecutor) wrapFatalErr(err error) error {
349349
return fmt.Errorf("shutting BlocksReExecutor down due to fatal error: %w", err)
350350
}
351351

352-
func (s *BlocksReExecutor) StopAndWait() {
353-
s.StopWaiter.StopAndWait()
354-
}
355-
356352
func (s *BlocksReExecutor) dereferenceRoot(root common.Hash) {
357353
s.mutex.Lock()
358354
defer s.mutex.Unlock()

bold/challenge/manager.go

Lines changed: 4 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,7 @@ type Opt = func(val *Manager)
4242
// AssertionManager works with the challenge manager suppplying information
4343
// about assertions.
4444
type AssertionManager interface {
45-
Start(context.Context)
46-
StopAndWait()
45+
stopwaiter.StoppableChild
4746
LatestAgreedAssertion() protocol.AssertionHash
4847
SetRivalHandler(types.RivalHandler)
4948
}
@@ -280,8 +279,7 @@ func (m *Manager) Start(ctx context.Context) {
280279
m.StopWaiter.Start(ctx, m)
281280
log.Info("Started challenge manager", "stakerAddress", m.chain.StakerAddress().Hex())
282281

283-
// Start the assertion manager on its own StopWaiter.
284-
m.assertionManager.Start(m.GetContext())
282+
m.StartAndTrackChild(m.assertionManager)
285283

286284
// Watcher tower and resolve modes don't monitor challenges.
287285
if m.mode == types.WatchTowerMode || m.mode == types.ResolveMode {
@@ -291,26 +289,13 @@ func (m *Manager) Start(ctx context.Context) {
291289
// Start watching for parent chain block events in the background.
292290
m.LaunchThread(m.listenForBlockEvents)
293291

294-
// Start watching for ongoing chain events on its own StopWaiter.
295-
m.watcher.Start(m.GetContext())
292+
m.StartAndTrackChild(m.watcher)
296293

297294
if m.api != nil {
298-
// Start the API server on its own StopWaiter.
299-
m.api.Start(m.GetContext())
295+
m.StartAndTrackChild(m.api)
300296
}
301297
}
302298

303-
func (m *Manager) StopAndWait() {
304-
// Stop children first so they can shut down gracefully before
305-
// the parent context is cancelled.
306-
if m.api != nil {
307-
m.api.StopAndWait()
308-
}
309-
m.watcher.StopAndWait()
310-
m.assertionManager.StopAndWait()
311-
m.StopWaiter.StopAndWait()
312-
}
313-
314299
func (m *Manager) listenForBlockEvents(ctx context.Context) {
315300
// If the chain watcher has not yet scraped and caught up all BoLD
316301
// events up to the latest head, then we fire "block notification" events

bold/testing/e2e/headers.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,6 @@ func (s *simpleHeaderProvider) listenToHeaders(ctx context.Context) {
4242
}
4343
}
4444

45-
func (s *simpleHeaderProvider) StopAndWait() {
46-
s.StopWaiter.StopAndWait()
47-
}
48-
4945
func (s *simpleHeaderProvider) Subscribe(requireBlockNrUpdates bool) (<-chan *types.Header, func()) {
5046
ch := make(chan *types.Header, 100)
5147
s.chs = append(s.chs, ch)

broadcastclients/broadcastclients.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -138,10 +138,13 @@ func clearAndResetTicker(timer *time.Ticker, interval time.Duration) {
138138
func (bcs *BroadcastClients) Start(ctx context.Context) {
139139
bcs.StopWaiter.Start(ctx, bcs)
140140
bcs.primaryRouter.StopWaiter.Start(bcs.GetContext(), bcs.primaryRouter)
141+
bcs.TrackChild(bcs.primaryRouter)
141142
bcs.secondaryRouter.StopWaiter.Start(bcs.GetContext(), bcs.secondaryRouter)
143+
bcs.TrackChild(bcs.secondaryRouter)
142144

143145
for _, client := range bcs.primaryClients {
144146
client.Start(bcs.GetContext())
147+
bcs.TrackChild(client)
145148
}
146149

147150
var lastConfirmed arbutil.MessageIndex
@@ -272,6 +275,8 @@ func (bcs *BroadcastClients) startSecondaryFeed(ctx context.Context) {
272275
}
273276
bcs.secondaryClients = append(bcs.secondaryClients, client)
274277
client.Start(ctx)
278+
// Secondary clients are not tracked — they are managed
279+
// manually by startSecondaryFeed/stopSecondaryFeed.
275280
log.Info("secondary feed started", "url", url, "startingFromSeq", latestSeqNum)
276281
} else if len(bcs.secondaryURL) > 0 {
277282
log.Warn("failed to start a new secondary feed all available secondary feeds were started")
@@ -299,13 +304,10 @@ func (bcs *BroadcastClients) stopSecondaryFeed() {
299304
}
300305

301306
func (bcs *BroadcastClients) StopAndWait() {
307+
// Secondary clients are not tracked (they're managed dynamically),
308+
// so stop them explicitly for clean websocket shutdown.
302309
for _, client := range bcs.secondaryClients {
303310
client.StopAndWait()
304311
}
305-
for _, client := range bcs.primaryClients {
306-
client.StopAndWait()
307-
}
308-
bcs.secondaryRouter.StopWaiter.StopAndWait()
309-
bcs.primaryRouter.StopWaiter.StopAndWait()
310312
bcs.StopWaiter.StopAndWait()
311313
}

changelog/pmikolajczyk-nit-4684.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
### Internal
2+
- Add TrackChild/StartAndTrackChild to StopWaiter for automatic LIFO child lifecycle management

cmd/el-proxy/main.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -136,12 +136,7 @@ func NewExpressLaneProxy(
136136

137137
func (p *ExpressLaneProxy) Start(ctx context.Context) {
138138
p.StopWaiter.Start(ctx, p)
139-
p.expressLaneTracker.Start(p.GetContext())
140-
}
141-
142-
func (p *ExpressLaneProxy) StopAndWait() {
143-
p.expressLaneTracker.StopAndWait()
144-
p.StopWaiter.StopAndWait()
139+
p.StartAndTrackChild(p.expressLaneTracker)
145140
}
146141

147142
var ErrorInternalConnectionError = errors.New("internal connection error")

execution/gethexec/addressfilter/service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ func (s *FilterService) Start(ctx context.Context) {
8484
return s.config.PollInterval
8585
})
8686

87-
s.addressChecker.Start(s.GetContext())
87+
s.StartAndTrackChild(s.addressChecker)
8888

8989
log.Info("address-filter service started",
9090
"poll_interval", s.config.PollInterval,

execution/gethexec/blockmetadata.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,3 @@ func (b *BulkBlockMetadataFetcher) Start(ctx context.Context) {
111111
_ = stopwaiter.CallWhenTriggeredWith[struct{}](&b.StopWaiterSafe, b.ClearCache, b.reorgDetector)
112112
}
113113
}
114-
115-
func (b *BulkBlockMetadataFetcher) StopAndWait() {
116-
b.StopWaiter.StopAndWait()
117-
}

0 commit comments

Comments
 (0)