Skip to content

Commit 39ba051

Browse files
committed
add ctx to blocksItr
Add context.Context to the block iterator so that waitForBlock respects context cancellation. This fixes goroutine leaks in callers like deliver.go where cursor.Next() blocks indefinitely even after the gRPC stream's context is cancelled. Signed-off-by: Senthilnathan <cendhu@gmail.com>
1 parent 26c5a49 commit 39ba051

File tree

14 files changed

+157
-63
lines changed

14 files changed

+157
-63
lines changed

common/deliver/deliver.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ func (h *Handler) deliverBlocks(ctx context.Context, srv *Server, envelope *cb.E
298298

299299
iterCh := make(chan struct{})
300300
go func() {
301-
block, status = cursor.Next()
301+
block, status = cursor.Next(ctx)
302302
close(iterCh)
303303
}()
304304

common/deliver/deliver_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,7 @@ var _ = ginkgo.Describe("Deliver", func() {
347347

348348
ginkgo.Context("when multiple blocks are requested", func() {
349349
ginkgo.BeforeEach(func() {
350-
fakeBlockIterator.NextStub = func() (*cb.Block, cb.Status) {
350+
fakeBlockIterator.NextStub = func(context.Context) (*cb.Block, cb.Status) {
351351
blk := &cb.Block{
352352
Header: &cb.BlockHeader{Number: 994 + uint64(fakeBlockIterator.NextCallCount())},
353353
}
@@ -440,7 +440,7 @@ var _ = ginkgo.Describe("Deliver", func() {
440440
seekInfo = &ab.SeekInfo{Start: &ab.SeekPosition{}, Stop: seekNewest}
441441

442442
fakeBlockReader.HeightReturns(3)
443-
fakeBlockIterator.NextStub = func() (*cb.Block, cb.Status) {
443+
fakeBlockIterator.NextStub = func(context.Context) (*cb.Block, cb.Status) {
444444
blk := &cb.Block{
445445
Header: &cb.BlockHeader{Number: uint64(fakeBlockIterator.NextCallCount())},
446446
}
@@ -475,7 +475,7 @@ var _ = ginkgo.Describe("Deliver", func() {
475475
fakeBlockReader.IteratorReturns(fakeBlockIterator, 0)
476476
fakeBlockReader.HeightReturns(2)
477477
fakeChain.ReaderReturns(fakeBlockReader)
478-
fakeBlockIterator.NextStub = func() (*cb.Block, cb.Status) {
478+
fakeBlockIterator.NextStub = func(context.Context) (*cb.Block, cb.Status) {
479479
blk := &cb.Block{
480480
Header: &cb.BlockHeader{Number: uint64(fakeBlockIterator.NextCallCount() - 1)},
481481
}
@@ -509,7 +509,7 @@ var _ = ginkgo.Describe("Deliver", func() {
509509
}
510510

511511
fakeBlockReader.HeightReturns(3)
512-
fakeBlockIterator.NextStub = func() (*cb.Block, cb.Status) {
512+
fakeBlockIterator.NextStub = func(context.Context) (*cb.Block, cb.Status) {
513513
blk := &cb.Block{
514514
Header: &cb.BlockHeader{Number: uint64(fakeBlockIterator.NextCallCount())},
515515
Data: &cb.BlockData{Data: [][]byte{{1}, {2}}},
@@ -555,7 +555,7 @@ var _ = ginkgo.Describe("Deliver", func() {
555555
ContentType: ab.SeekInfo_HEADER_WITH_SIG,
556556
}
557557
fakeBlockReader.HeightReturns(4)
558-
fakeBlockIterator.NextStub = func() (*cb.Block, cb.Status) {
558+
fakeBlockIterator.NextStub = func(context.Context) (*cb.Block, cb.Status) {
559559
nxtCallCount := fakeBlockIterator.NextCallCount()
560560
block := &cb.Block{
561561
Header: &cb.BlockHeader{Number: uint64(nxtCallCount)},
@@ -940,7 +940,7 @@ var _ = ginkgo.Describe("Deliver", func() {
940940
done = make(chan struct{})
941941
ctx, cancel = context.WithCancel(context.Background())
942942
cancel()
943-
fakeBlockIterator.NextStub = func() (*cb.Block, cb.Status) {
943+
fakeBlockIterator.NextStub = func(context.Context) (*cb.Block, cb.Status) {
944944
<-done
945945
return nil, cb.Status_BAD_REQUEST
946946
}
@@ -993,7 +993,7 @@ var _ = ginkgo.Describe("Deliver", func() {
993993

994994
ginkgo.BeforeEach(func() {
995995
doneCh = make(chan struct{})
996-
fakeBlockIterator.NextStub = func() (*cb.Block, cb.Status) {
996+
fakeBlockIterator.NextStub = func(context.Context) (*cb.Block, cb.Status) {
997997
<-doneCh
998998
return &cb.Block{}, cb.Status_INTERNAL_SERVER_ERROR
999999
}

common/deliver/mock/block_iterator.go

Lines changed: 16 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/ledger/blkstorage/blockfile_mgr_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,12 +141,12 @@ func TestBlockfileMgrBlockIterator(t *testing.T) {
141141
func testBlockfileMgrBlockIterator(t *testing.T, blockfileMgr *blockfileMgr,
142142
firstBlockNum, lastBlockNum int, expectedBlocks []*common.Block,
143143
) {
144-
itr, err := blockfileMgr.retrieveBlocks(uint64(firstBlockNum))
144+
itr, err := blockfileMgr.retrieveBlocks(uint64(firstBlockNum)) //nolint:gosec // int > uint64
145145
require.NoError(t, err, "Error while getting blocks iterator")
146146
defer itr.Close()
147147
numBlocksItrated := 0
148148
for {
149-
block, err := itr.Next()
149+
block, err := itr.Next(t.Context())
150150
require.NoError(t, err, "Error while getting block number [%d] from iterator", numBlocksItrated)
151151
require.Equal(t, expectedBlocks[numBlocksItrated], block)
152152
numBlocksItrated++

common/ledger/blkstorage/blocks_itr.go

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0
77
package blkstorage
88

99
import (
10+
"context"
1011
"sync"
1112

1213
"github.com/hyperledger/fabric-x-common/common/ledger"
@@ -25,13 +26,24 @@ type blocksItr struct {
2526
func newBlockItr(mgr *blockfileMgr, startBlockNum uint64) *blocksItr {
2627
mgr.blkfilesInfoCond.L.Lock()
2728
defer mgr.blkfilesInfoCond.L.Unlock()
28-
return &blocksItr{mgr, mgr.blockfilesInfo.lastPersistedBlock, startBlockNum, nil, false, &sync.Mutex{}}
29+
return &blocksItr{
30+
mgr: mgr,
31+
maxBlockNumAvailable: mgr.blockfilesInfo.lastPersistedBlock,
32+
blockNumToRetrieve: startBlockNum,
33+
closeMarkerLock: &sync.Mutex{},
34+
}
2935
}
3036

31-
func (itr *blocksItr) waitForBlock(blockNum uint64) uint64 {
37+
func (itr *blocksItr) waitForBlock(ctx context.Context, blockNum uint64) uint64 {
3238
itr.mgr.blkfilesInfoCond.L.Lock()
3339
defer itr.mgr.blkfilesInfoCond.L.Unlock()
34-
for itr.mgr.blockfilesInfo.lastPersistedBlock < blockNum && !itr.shouldClose() {
40+
41+
stop := context.AfterFunc(ctx, func() {
42+
itr.mgr.blkfilesInfoCond.Broadcast()
43+
})
44+
defer stop()
45+
46+
for itr.mgr.blockfilesInfo.lastPersistedBlock < blockNum && !itr.shouldClose() && ctx.Err() == nil {
3547
logger.Debugf("Going to wait for newer blocks. maxAvailaBlockNumber=[%d], waitForBlockNum=[%d]",
3648
itr.mgr.blockfilesInfo.lastPersistedBlock, blockNum)
3749
itr.mgr.blkfilesInfoCond.Wait()
@@ -59,15 +71,18 @@ func (itr *blocksItr) shouldClose() bool {
5971
}
6072

6173
// Next moves the cursor to next block and returns true iff the iterator is not exhausted
62-
func (itr *blocksItr) Next() (ledger.QueryResult, error) {
74+
func (itr *blocksItr) Next(ctx context.Context) (ledger.QueryResult, error) { //nolint:ireturn
6375
if itr.maxBlockNumAvailable < itr.blockNumToRetrieve {
64-
itr.maxBlockNumAvailable = itr.waitForBlock(itr.blockNumToRetrieve)
76+
itr.maxBlockNumAvailable = itr.waitForBlock(ctx, itr.blockNumToRetrieve) //nolint:ireturn
6577
}
6678
itr.closeMarkerLock.Lock()
6779
defer itr.closeMarkerLock.Unlock()
6880
if itr.closeMarker {
6981
return nil, nil
7082
}
83+
if err := ctx.Err(); err != nil {
84+
return nil, err
85+
}
7186

7287
cachedBlock, existsInCache := itr.mgr.cache.get(itr.blockNumToRetrieve)
7388
if existsInCache {

common/ledger/blkstorage/blocks_itr_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,11 @@ func TestBlockItrClose(t *testing.T) {
5454
itr, err := blkfileMgr.retrieveBlocks(1)
5555
require.NoError(t, err)
5656

57-
bh, _ := itr.Next()
57+
bh, _ := itr.Next(t.Context())
5858
require.NotNil(t, bh)
5959
itr.Close()
6060

61-
bh, err = itr.Next()
61+
bh, err = itr.Next(t.Context())
6262
require.NoError(t, err)
6363
require.Nil(t, bh)
6464
}
@@ -79,7 +79,7 @@ func TestRaceToDeadlock(t *testing.T) {
7979
panic(err)
8080
}
8181
go func() {
82-
itr.Next()
82+
_, _ = itr.Next(t.Context())
8383
}()
8484
itr.Close()
8585
}
@@ -92,7 +92,7 @@ func TestRaceToDeadlock(t *testing.T) {
9292
go func() {
9393
itr.Close()
9494
}()
95-
itr.Next()
95+
_, _ = itr.Next(t.Context())
9696
}
9797
}
9898

@@ -144,7 +144,7 @@ func iterateInBackground(t *testing.T, itr *blocksItr, quitAfterBlkNum uint64, w
144144
defer func() { require.Equal(t, expectedBlockNums, retrievedBlkNums) }()
145145

146146
for {
147-
blk, err := itr.Next()
147+
blk, err := itr.Next(t.Context())
148148
require.NoError(t, err)
149149
if blk == nil {
150150
return
@@ -162,7 +162,7 @@ func testIterateAndVerify(t *testing.T, itr *blocksItr, blocks []*common.Block,
162162
blocksIterated := 0
163163
for {
164164
t.Logf("blocksIterated: %v", blocksIterated)
165-
block, err := itr.Next()
165+
block, err := itr.Next(t.Context())
166166
require.NoError(t, err)
167167
require.Equal(t, blocks[blocksIterated], block)
168168
blocksIterated++

common/ledger/blkstorage/blockstore_provider_test.go

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,16 @@ SPDX-License-Identifier: Apache-2.0
77
package blkstorage
88

99
import (
10+
"context"
1011
"fmt"
1112
"testing"
13+
"time"
1214

1315
"github.com/hyperledger/fabric-protos-go-apiv2/common"
1416
"github.com/hyperledger/fabric-protos-go-apiv2/peer"
1517
"github.com/stretchr/testify/require"
1618

19+
"github.com/hyperledger/fabric-x-common/common/ledger"
1720
"github.com/hyperledger/fabric-x-common/common/ledger/testutil"
1821
"github.com/hyperledger/fabric-x-common/protoutil"
1922
"github.com/hyperledger/fabric-x-common/tools/pkg/txflags"
@@ -101,7 +104,7 @@ func checkBlocks(t *testing.T, expectedBlocks []*common.Block, store *BlockStore
101104

102105
itr, _ := store.RetrieveBlocks(0)
103106
for i := 0; i < len(expectedBlocks); i++ {
104-
block, _ := itr.Next()
107+
block, _ := itr.Next(t.Context())
105108
require.Equal(t, expectedBlocks[i], block)
106109
}
107110

@@ -246,6 +249,47 @@ func TestDrop(t *testing.T) {
246249
require.EqualError(t, provider.Drop("ledger2"), "internal leveldb error while obtaining db iterator: leveldb: closed")
247250
}
248251

252+
func TestBlocksItrContextCancel(t *testing.T) {
253+
t.Parallel()
254+
env := newTestEnv(t, NewConf(t.TempDir(), 0))
255+
t.Cleanup(env.Cleanup)
256+
defer env.Cleanup()
257+
store, err := env.provider.Open("testledger-ctx")
258+
require.NoError(t, err)
259+
t.Cleanup(store.Shutdown)
260+
261+
// Add one block so the store is initialized
262+
blocks := testutil.ConstructTestBlocks(t, 1)
263+
require.NoError(t, store.AddBlock(blocks[0]))
264+
265+
// Create a cancellable context and an iterator starting at block 1 (not yet available)
266+
ctx, cancel := context.WithCancel(t.Context())
267+
itr, err := store.RetrieveBlocks(1)
268+
require.NoError(t, err)
269+
t.Cleanup(itr.Close)
270+
271+
// Next() should block waiting for block 1. Cancel the context to unblock it.
272+
done := make(chan any)
273+
var result ledger.QueryResult
274+
var nextErr error
275+
go func() {
276+
result, nextErr = itr.Next(ctx)
277+
close(done)
278+
}()
279+
280+
// Cancel the context — this should wake up waitForBlock
281+
cancel()
282+
283+
select {
284+
case <-done:
285+
case <-time.After(5 * time.Second):
286+
t.Fatal("Next() did not return after context cancellation")
287+
}
288+
289+
require.Nil(t, result)
290+
require.ErrorIs(t, nextErr, context.Canceled)
291+
}
292+
249293
func constructLedgerid(id int) string {
250294
return fmt.Sprintf("ledger_%d", id)
251295
}

common/ledger/blkstorage/snapshot_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -519,7 +519,7 @@ func verifyQueriesOnBlocksAddedAfterBootstrapping(t *testing.T,
519519

520520
itr, err := bootstrappedBlockStore.RetrieveBlocks(b.Header.Number)
521521
require.NoError(t, err)
522-
blk, err := itr.Next()
522+
blk, err := itr.Next(t.Context())
523523
require.NoError(t, err)
524524
require.Equal(t, b, blk)
525525
itr.Close()

common/ledger/blockledger/fileledger/impl.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ SPDX-License-Identifier: Apache-2.0
77
package fileledger
88

99
import (
10+
"context"
11+
1012
cb "github.com/hyperledger/fabric-protos-go-apiv2/common"
1113
ab "github.com/hyperledger/fabric-protos-go-apiv2/orderer"
1214

@@ -48,8 +50,8 @@ type fileLedgerIterator struct {
4850

4951
// Next blocks until there is a new block available, or until Close is called.
5052
// It returns an error if the next block is no longer retrievable.
51-
func (i *fileLedgerIterator) Next() (*cb.Block, cb.Status) {
52-
result, err := i.commonIterator.Next()
53+
func (i *fileLedgerIterator) Next(ctx context.Context) (*cb.Block, cb.Status) {
54+
result, err := i.commonIterator.Next(ctx)
5355
if err != nil {
5456
logger.Error(err)
5557
return nil, cb.Status_SERVICE_UNAVAILABLE

0 commit comments

Comments
 (0)