Skip to content

Commit ae1be14

Browse files
committed
add ctx to blocksItr
Add context.Context to the block iterator so that waitForBlock respects context cancellation. This fixes goroutine leaks in callers like deliver.go where cursor.Next() blocks indefinitely even after the gRPC stream's context is cancelled. Signed-off-by: Senthilnathan <cendhu@gmail.com>
1 parent 26c5a49 commit ae1be14

File tree

15 files changed

+137
-62
lines changed

15 files changed

+137
-62
lines changed

common/deliver/deliver.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ func (h *Handler) deliverBlocks(ctx context.Context, srv *Server, envelope *cb.E
261261

262262
logger.Debugf("[channel: %s] Received seekInfo (%p) %v from %s", chdr.ChannelId, seekInfo, seekInfo, addr)
263263

264-
cursor, number := chain.Reader().Iterator(seekInfo.Start)
264+
cursor, number := chain.Reader().Iterator(ctx, seekInfo.Start)
265265
defer cursor.Close()
266266
var stopNum uint64
267267
switch stop := seekInfo.Stop.Type.(type) {

common/deliver/deliver_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ var _ = ginkgo.Describe("Deliver", func() {
341341
gomega.Expect(err).NotTo(gomega.HaveOccurred())
342342

343343
gomega.Expect(fakeBlockReader.IteratorCallCount()).To(gomega.Equal(1))
344-
startPosition := fakeBlockReader.IteratorArgsForCall(0)
344+
_, startPosition := fakeBlockReader.IteratorArgsForCall(0)
345345
gomega.Expect(startPosition).To(test.ProtoEqual(seekInfo.Start))
346346
})
347347

@@ -423,7 +423,7 @@ var _ = ginkgo.Describe("Deliver", func() {
423423
gomega.Expect(err).NotTo(gomega.HaveOccurred())
424424

425425
gomega.Expect(fakeBlockReader.IteratorCallCount()).To(gomega.Equal(1))
426-
start := fakeBlockReader.IteratorArgsForCall(0)
426+
_, start := fakeBlockReader.IteratorArgsForCall(0)
427427
gomega.Expect(start).To(test.ProtoEqual(&ab.SeekPosition{}))
428428
gomega.Expect(fakeBlockIterator.NextCallCount()).To(gomega.Equal(1))
429429

@@ -453,7 +453,7 @@ var _ = ginkgo.Describe("Deliver", func() {
453453
gomega.Expect(err).NotTo(gomega.HaveOccurred())
454454

455455
gomega.Expect(fakeBlockReader.IteratorCallCount()).To(gomega.Equal(1))
456-
start := fakeBlockReader.IteratorArgsForCall(0)
456+
_, start := fakeBlockReader.IteratorArgsForCall(0)
457457
gomega.Expect(start).To(test.ProtoEqual(&ab.SeekPosition{}))
458458

459459
gomega.Expect(fakeBlockIterator.NextCallCount()).To(gomega.Equal(2))
@@ -525,7 +525,7 @@ var _ = ginkgo.Describe("Deliver", func() {
525525
gomega.Expect(err).NotTo(gomega.HaveOccurred())
526526

527527
gomega.Expect(fakeBlockReader.IteratorCallCount()).To(gomega.Equal(1))
528-
start := fakeBlockReader.IteratorArgsForCall(0)
528+
_, start := fakeBlockReader.IteratorArgsForCall(0)
529529
gomega.Expect(start).To(test.ProtoEqual(&ab.SeekPosition{}))
530530

531531
gomega.Expect(fakeBlockIterator.NextCallCount()).To(gomega.Equal(2))
@@ -592,7 +592,7 @@ var _ = ginkgo.Describe("Deliver", func() {
592592
err := handler.Handle(context.Background(), server)
593593
gomega.Expect(err).NotTo(gomega.HaveOccurred())
594594
gomega.Expect(fakeBlockReader.IteratorCallCount()).To(gomega.Equal(1))
595-
start := fakeBlockReader.IteratorArgsForCall(0)
595+
_, start := fakeBlockReader.IteratorArgsForCall(0)
596596
gomega.Expect(start).To(test.ProtoEqual(&ab.SeekPosition{}))
597597
gomega.Expect(fakeBlockIterator.NextCallCount()).To(gomega.Equal(3))
598598
gomega.Expect(fakeResponseSender.SendBlockResponseCallCount()).To(gomega.Equal(3))

common/deliver/mock/block_reader.go

Lines changed: 13 additions & 10 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/ledger/blkstorage/blockfile_mgr.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package blkstorage
88

99
import (
1010
"bytes"
11+
"context"
1112
"fmt"
1213
"math"
1314
"sync"
@@ -609,14 +610,14 @@ func (mgr *blockfileMgr) retrieveBlockHeaderByNumber(blockNum uint64) (*common.B
609610
return info.blockHeader, nil
610611
}
611612

612-
func (mgr *blockfileMgr) retrieveBlocks(startNum uint64) (*blocksItr, error) {
613+
func (mgr *blockfileMgr) retrieveBlocks(ctx context.Context, startNum uint64) (*blocksItr, error) {
613614
if startNum < mgr.firstPossibleBlockNumberInBlockFiles() {
614615
return nil, errors.Errorf(
615616
"cannot serve block [%d]. The ledger is bootstrapped from a snapshot. First available block = [%d]",
616617
startNum, mgr.firstPossibleBlockNumberInBlockFiles(),
617618
)
618619
}
619-
return newBlockItr(mgr, startNum), nil
620+
return newBlockItr(ctx, mgr, startNum), nil
620621
}
621622

622623
func (mgr *blockfileMgr) txIDExists(txID string) (bool, error) {

common/ledger/blkstorage/blockfile_mgr_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ func TestBlockfileMgrBlockIterator(t *testing.T) {
141141
func testBlockfileMgrBlockIterator(t *testing.T, blockfileMgr *blockfileMgr,
142142
firstBlockNum, lastBlockNum int, expectedBlocks []*common.Block,
143143
) {
144-
itr, err := blockfileMgr.retrieveBlocks(uint64(firstBlockNum))
144+
itr, err := blockfileMgr.retrieveBlocks(t.Context(), uint64(firstBlockNum))
145145
require.NoError(t, err, "Error while getting blocks iterator")
146146
defer itr.Close()
147147
numBlocksItrated := 0

common/ledger/blkstorage/blocks_itr.go

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0
77
package blkstorage
88

99
import (
10+
"context"
1011
"sync"
1112

1213
"github.com/hyperledger/fabric-x-common/common/ledger"
@@ -15,23 +16,36 @@ import (
1516
// blocksItr - an iterator for iterating over a sequence of blocks
1617
type blocksItr struct {
1718
mgr *blockfileMgr
19+
ctx context.Context
1820
maxBlockNumAvailable uint64
1921
blockNumToRetrieve uint64
2022
stream *blockStream
2123
closeMarker bool
2224
closeMarkerLock *sync.Mutex
2325
}
2426

25-
func newBlockItr(mgr *blockfileMgr, startBlockNum uint64) *blocksItr {
27+
func newBlockItr(ctx context.Context, mgr *blockfileMgr, startBlockNum uint64) *blocksItr {
2628
mgr.blkfilesInfoCond.L.Lock()
2729
defer mgr.blkfilesInfoCond.L.Unlock()
28-
return &blocksItr{mgr, mgr.blockfilesInfo.lastPersistedBlock, startBlockNum, nil, false, &sync.Mutex{}}
30+
return &blocksItr{
31+
mgr: mgr,
32+
ctx: ctx,
33+
maxBlockNumAvailable: mgr.blockfilesInfo.lastPersistedBlock,
34+
blockNumToRetrieve: startBlockNum,
35+
closeMarkerLock: &sync.Mutex{},
36+
}
2937
}
3038

3139
func (itr *blocksItr) waitForBlock(blockNum uint64) uint64 {
3240
itr.mgr.blkfilesInfoCond.L.Lock()
3341
defer itr.mgr.blkfilesInfoCond.L.Unlock()
34-
for itr.mgr.blockfilesInfo.lastPersistedBlock < blockNum && !itr.shouldClose() {
42+
43+
stop := context.AfterFunc(itr.ctx, func() {
44+
itr.mgr.blkfilesInfoCond.Broadcast()
45+
})
46+
defer stop()
47+
48+
for itr.mgr.blockfilesInfo.lastPersistedBlock < blockNum && !itr.shouldClose() && itr.ctx.Err() == nil {
3549
logger.Debugf("Going to wait for newer blocks. maxAvailaBlockNumber=[%d], waitForBlockNum=[%d]",
3650
itr.mgr.blockfilesInfo.lastPersistedBlock, blockNum)
3751
itr.mgr.blkfilesInfoCond.Wait()
@@ -68,6 +82,9 @@ func (itr *blocksItr) Next() (ledger.QueryResult, error) {
6882
if itr.closeMarker {
6983
return nil, nil
7084
}
85+
if err := itr.ctx.Err(); err != nil {
86+
return nil, err
87+
}
7188

7289
cachedBlock, existsInCache := itr.mgr.cache.get(itr.blockNumToRetrieve)
7390
if existsInCache {

common/ledger/blkstorage/blocks_itr_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func TestBlocksItrBlockingNext(t *testing.T) {
2727
blocks := testutil.ConstructTestBlocks(t, 10)
2828
blkfileMgrWrapper.addBlocks(blocks[:5])
2929

30-
itr, err := blkfileMgr.retrieveBlocks(1)
30+
itr, err := blkfileMgr.retrieveBlocks(t.Context(), 1)
3131
require.NoError(t, err)
3232
defer itr.Close()
3333
readyChan := make(chan struct{})
@@ -51,7 +51,7 @@ func TestBlockItrClose(t *testing.T) {
5151
blocks := testutil.ConstructTestBlocks(t, 5)
5252
blkfileMgrWrapper.addBlocks(blocks)
5353

54-
itr, err := blkfileMgr.retrieveBlocks(1)
54+
itr, err := blkfileMgr.retrieveBlocks(t.Context(), 1)
5555
require.NoError(t, err)
5656

5757
bh, _ := itr.Next()
@@ -74,7 +74,7 @@ func TestRaceToDeadlock(t *testing.T) {
7474
blkfileMgrWrapper.addBlocks(blocks)
7575

7676
for i := 0; i < 1000; i++ {
77-
itr, err := blkfileMgr.retrieveBlocks(5)
77+
itr, err := blkfileMgr.retrieveBlocks(t.Context(), 5)
7878
if err != nil {
7979
panic(err)
8080
}
@@ -85,7 +85,7 @@ func TestRaceToDeadlock(t *testing.T) {
8585
}
8686

8787
for i := 0; i < 1000; i++ {
88-
itr, err := blkfileMgr.retrieveBlocks(5)
88+
itr, err := blkfileMgr.retrieveBlocks(t.Context(), 5)
8989
if err != nil {
9090
panic(err)
9191
}
@@ -105,7 +105,7 @@ func TestBlockItrCloseWithoutRetrieve(t *testing.T) {
105105
blocks := testutil.ConstructTestBlocks(t, 5)
106106
blkfileMgrWrapper.addBlocks(blocks)
107107

108-
itr, err := blkfileMgr.retrieveBlocks(2)
108+
itr, err := blkfileMgr.retrieveBlocks(t.Context(), 2)
109109
require.NoError(t, err)
110110
itr.Close()
111111
}
@@ -121,12 +121,12 @@ func TestCloseMultipleItrsWaitForFutureBlock(t *testing.T) {
121121

122122
wg := &sync.WaitGroup{}
123123
wg.Add(2)
124-
itr1, err := blkfileMgr.retrieveBlocks(7)
124+
itr1, err := blkfileMgr.retrieveBlocks(t.Context(), 7)
125125
require.NoError(t, err)
126126
// itr1 does not retrieve any block because it closes before new blocks are added
127127
go iterateInBackground(t, itr1, 9, wg, []uint64{})
128128

129-
itr2, err := blkfileMgr.retrieveBlocks(8)
129+
itr2, err := blkfileMgr.retrieveBlocks(t.Context(), 8)
130130
require.NoError(t, err)
131131
// itr2 retrieves two blocks 8 and 9. Because it started waiting for 8 and quits at 9
132132
go iterateInBackground(t, itr2, 9, wg, []uint64{8, 9})

common/ledger/blkstorage/blockstore.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0
77
package blkstorage
88

99
import (
10+
"context"
1011
"time"
1112

1213
"github.com/hyperledger/fabric-protos-go-apiv2/common"
@@ -77,8 +78,8 @@ func (store *BlockStore) GetBlockchainInfo() (*common.BlockchainInfo, error) {
7778
}
7879

7980
// RetrieveBlocks returns an iterator that can be used for iterating over a range of blocks
80-
func (store *BlockStore) RetrieveBlocks(startNum uint64) (ledger.ResultsIterator, error) {
81-
return store.fileMgr.retrieveBlocks(startNum)
81+
func (store *BlockStore) RetrieveBlocks(ctx context.Context, startNum uint64) (ledger.ResultsIterator, error) {
82+
return store.fileMgr.retrieveBlocks(ctx, startNum)
8283
}
8384

8485
// RetrieveBlockByHash returns the block for given block-hash

common/ledger/blkstorage/blockstore_provider_test.go

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,16 @@ SPDX-License-Identifier: Apache-2.0
77
package blkstorage
88

99
import (
10+
"context"
1011
"fmt"
1112
"testing"
13+
"time"
1214

1315
"github.com/hyperledger/fabric-protos-go-apiv2/common"
1416
"github.com/hyperledger/fabric-protos-go-apiv2/peer"
1517
"github.com/stretchr/testify/require"
1618

19+
"github.com/hyperledger/fabric-x-common/common/ledger"
1720
"github.com/hyperledger/fabric-x-common/common/ledger/testutil"
1821
"github.com/hyperledger/fabric-x-common/protoutil"
1922
"github.com/hyperledger/fabric-x-common/tools/pkg/txflags"
@@ -99,7 +102,7 @@ func checkBlocks(t *testing.T, expectedBlocks []*common.Block, store *BlockStore
99102
require.Equal(t, uint64(len(expectedBlocks)), bcInfo.Height)
100103
require.Equal(t, protoutil.BlockHeaderHash(expectedBlocks[len(expectedBlocks)-1].GetHeader()), bcInfo.CurrentBlockHash)
101104

102-
itr, _ := store.RetrieveBlocks(0)
105+
itr, _ := store.RetrieveBlocks(t.Context(), 0)
103106
for i := 0; i < len(expectedBlocks); i++ {
104107
block, _ := itr.Next()
105108
require.Equal(t, expectedBlocks[i], block)
@@ -246,6 +249,46 @@ func TestDrop(t *testing.T) {
246249
require.EqualError(t, provider.Drop("ledger2"), "internal leveldb error while obtaining db iterator: leveldb: closed")
247250
}
248251

252+
func TestBlocksItrContextCancel(t *testing.T) {
253+
env := newTestEnv(t, NewConf(t.TempDir(), 0))
254+
t.Cleanup(env.Cleanup)
255+
defer env.Cleanup()
256+
store, err := env.provider.Open("testledger-ctx")
257+
require.NoError(t, err)
258+
t.Cleanup(store.Shutdown)
259+
260+
// Add one block so the store is initialized
261+
blocks := testutil.ConstructTestBlocks(t, 1)
262+
require.NoError(t, store.AddBlock(blocks[0]))
263+
264+
// Create a cancellable context and an iterator starting at block 1 (not yet available)
265+
ctx, cancel := context.WithCancel(t.Context())
266+
itr, err := store.RetrieveBlocks(ctx, 1)
267+
require.NoError(t, err)
268+
t.Cleanup(itr.Close)
269+
270+
// Next() should block waiting for block 1. Cancel the context to unblock it.
271+
done := make(chan any)
272+
var result ledger.QueryResult
273+
var nextErr error
274+
go func() {
275+
result, nextErr = itr.Next()
276+
close(done)
277+
}()
278+
279+
// Cancel the context — this should wake up waitForBlock
280+
cancel()
281+
282+
select {
283+
case <-done:
284+
case <-time.After(5 * time.Second):
285+
t.Fatal("Next() did not return after context cancellation")
286+
}
287+
288+
require.Nil(t, result)
289+
require.ErrorIs(t, nextErr, context.Canceled)
290+
}
291+
249292
func constructLedgerid(id int) string {
250293
return fmt.Sprintf("ledger_%d", id)
251294
}

common/ledger/blkstorage/snapshot_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -465,7 +465,7 @@ func verifyQueriesOnBlocksPriorToSnapshot(
465465
_, err := bootstrappedBlockStore.RetrieveBlockByNumber(blockNum)
466466
require.EqualError(t, err, expectedErrStr)
467467

468-
_, err = bootstrappedBlockStore.RetrieveBlocks(blockNum)
468+
_, err = bootstrappedBlockStore.RetrieveBlocks(t.Context(), blockNum)
469469
require.EqualError(t, err, expectedErrStr)
470470

471471
_, err = bootstrappedBlockStore.RetrieveTxByBlockNumTranNum(blockNum, 0)
@@ -517,7 +517,7 @@ func verifyQueriesOnBlocksAddedAfterBootstrapping(t *testing.T,
517517
require.NoError(t, err)
518518
require.Equal(t, b, retrievedBlock)
519519

520-
itr, err := bootstrappedBlockStore.RetrieveBlocks(b.Header.Number)
520+
itr, err := bootstrappedBlockStore.RetrieveBlocks(t.Context(), b.Header.Number)
521521
require.NoError(t, err)
522522
blk, err := itr.Next()
523523
require.NoError(t, err)

0 commit comments

Comments
 (0)