Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/deliver/deliver.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ func (h *Handler) deliverBlocks(ctx context.Context, srv *Server, envelope *cb.E

iterCh := make(chan struct{})
go func() {
block, status = cursor.Next()
block, status = cursor.Next(ctx)
close(iterCh)
}()

Expand Down
14 changes: 7 additions & 7 deletions common/deliver/deliver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ var _ = ginkgo.Describe("Deliver", func() {

ginkgo.Context("when multiple blocks are requested", func() {
ginkgo.BeforeEach(func() {
fakeBlockIterator.NextStub = func() (*cb.Block, cb.Status) {
fakeBlockIterator.NextStub = func(context.Context) (*cb.Block, cb.Status) {
blk := &cb.Block{
Header: &cb.BlockHeader{Number: 994 + uint64(fakeBlockIterator.NextCallCount())},
}
Expand Down Expand Up @@ -440,7 +440,7 @@ var _ = ginkgo.Describe("Deliver", func() {
seekInfo = &ab.SeekInfo{Start: &ab.SeekPosition{}, Stop: seekNewest}

fakeBlockReader.HeightReturns(3)
fakeBlockIterator.NextStub = func() (*cb.Block, cb.Status) {
fakeBlockIterator.NextStub = func(context.Context) (*cb.Block, cb.Status) {
blk := &cb.Block{
Header: &cb.BlockHeader{Number: uint64(fakeBlockIterator.NextCallCount())},
}
Expand Down Expand Up @@ -475,7 +475,7 @@ var _ = ginkgo.Describe("Deliver", func() {
fakeBlockReader.IteratorReturns(fakeBlockIterator, 0)
fakeBlockReader.HeightReturns(2)
fakeChain.ReaderReturns(fakeBlockReader)
fakeBlockIterator.NextStub = func() (*cb.Block, cb.Status) {
fakeBlockIterator.NextStub = func(context.Context) (*cb.Block, cb.Status) {
blk := &cb.Block{
Header: &cb.BlockHeader{Number: uint64(fakeBlockIterator.NextCallCount() - 1)},
}
Expand Down Expand Up @@ -509,7 +509,7 @@ var _ = ginkgo.Describe("Deliver", func() {
}

fakeBlockReader.HeightReturns(3)
fakeBlockIterator.NextStub = func() (*cb.Block, cb.Status) {
fakeBlockIterator.NextStub = func(context.Context) (*cb.Block, cb.Status) {
blk := &cb.Block{
Header: &cb.BlockHeader{Number: uint64(fakeBlockIterator.NextCallCount())},
Data: &cb.BlockData{Data: [][]byte{{1}, {2}}},
Expand Down Expand Up @@ -555,7 +555,7 @@ var _ = ginkgo.Describe("Deliver", func() {
ContentType: ab.SeekInfo_HEADER_WITH_SIG,
}
fakeBlockReader.HeightReturns(4)
fakeBlockIterator.NextStub = func() (*cb.Block, cb.Status) {
fakeBlockIterator.NextStub = func(context.Context) (*cb.Block, cb.Status) {
nxtCallCount := fakeBlockIterator.NextCallCount()
block := &cb.Block{
Header: &cb.BlockHeader{Number: uint64(nxtCallCount)},
Expand Down Expand Up @@ -940,7 +940,7 @@ var _ = ginkgo.Describe("Deliver", func() {
done = make(chan struct{})
ctx, cancel = context.WithCancel(context.Background())
cancel()
fakeBlockIterator.NextStub = func() (*cb.Block, cb.Status) {
fakeBlockIterator.NextStub = func(context.Context) (*cb.Block, cb.Status) {
<-done
return nil, cb.Status_BAD_REQUEST
}
Expand Down Expand Up @@ -993,7 +993,7 @@ var _ = ginkgo.Describe("Deliver", func() {

ginkgo.BeforeEach(func() {
doneCh = make(chan struct{})
fakeBlockIterator.NextStub = func() (*cb.Block, cb.Status) {
fakeBlockIterator.NextStub = func(context.Context) (*cb.Block, cb.Status) {
<-doneCh
return &cb.Block{}, cb.Status_INTERNAL_SERVER_ERROR
}
Expand Down
22 changes: 16 additions & 6 deletions common/deliver/mock/block_iterator.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions common/ledger/blkstorage/blockfile_mgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,12 @@ func TestBlockfileMgrBlockIterator(t *testing.T) {
func testBlockfileMgrBlockIterator(t *testing.T, blockfileMgr *blockfileMgr,
firstBlockNum, lastBlockNum int, expectedBlocks []*common.Block,
) {
itr, err := blockfileMgr.retrieveBlocks(uint64(firstBlockNum))
itr, err := blockfileMgr.retrieveBlocks(uint64(firstBlockNum)) //nolint:gosec // int > uint64
require.NoError(t, err, "Error while getting blocks iterator")
defer itr.Close()
numBlocksItrated := 0
for {
block, err := itr.Next()
block, err := itr.Next(t.Context())
require.NoError(t, err, "Error while getting block number [%d] from iterator", numBlocksItrated)
require.Equal(t, expectedBlocks[numBlocksItrated], block)
numBlocksItrated++
Expand Down
25 changes: 20 additions & 5 deletions common/ledger/blkstorage/blocks_itr.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0
package blkstorage

import (
"context"
"sync"

"github.com/hyperledger/fabric-x-common/common/ledger"
Expand All @@ -25,13 +26,24 @@ type blocksItr struct {
func newBlockItr(mgr *blockfileMgr, startBlockNum uint64) *blocksItr {
mgr.blkfilesInfoCond.L.Lock()
defer mgr.blkfilesInfoCond.L.Unlock()
return &blocksItr{mgr, mgr.blockfilesInfo.lastPersistedBlock, startBlockNum, nil, false, &sync.Mutex{}}
return &blocksItr{
mgr: mgr,
maxBlockNumAvailable: mgr.blockfilesInfo.lastPersistedBlock,
blockNumToRetrieve: startBlockNum,
closeMarkerLock: &sync.Mutex{},
}
}

func (itr *blocksItr) waitForBlock(blockNum uint64) uint64 {
func (itr *blocksItr) waitForBlock(ctx context.Context, blockNum uint64) uint64 {
itr.mgr.blkfilesInfoCond.L.Lock()
defer itr.mgr.blkfilesInfoCond.L.Unlock()
for itr.mgr.blockfilesInfo.lastPersistedBlock < blockNum && !itr.shouldClose() {

stop := context.AfterFunc(ctx, func() {
itr.mgr.blkfilesInfoCond.Broadcast()
})
defer stop()

for itr.mgr.blockfilesInfo.lastPersistedBlock < blockNum && !itr.shouldClose() && ctx.Err() == nil {
logger.Debugf("Going to wait for newer blocks. maxAvailaBlockNumber=[%d], waitForBlockNum=[%d]",
itr.mgr.blockfilesInfo.lastPersistedBlock, blockNum)
itr.mgr.blkfilesInfoCond.Wait()
Expand Down Expand Up @@ -59,15 +71,18 @@ func (itr *blocksItr) shouldClose() bool {
}

// Next moves the cursor to next block and returns true iff the iterator is not exhausted
func (itr *blocksItr) Next() (ledger.QueryResult, error) {
func (itr *blocksItr) Next(ctx context.Context) (ledger.QueryResult, error) { //nolint:ireturn
if itr.maxBlockNumAvailable < itr.blockNumToRetrieve {
itr.maxBlockNumAvailable = itr.waitForBlock(itr.blockNumToRetrieve)
itr.maxBlockNumAvailable = itr.waitForBlock(ctx, itr.blockNumToRetrieve)
}
itr.closeMarkerLock.Lock()
defer itr.closeMarkerLock.Unlock()
if itr.closeMarker {
return nil, nil
}
if err := ctx.Err(); err != nil {
return nil, err
}

cachedBlock, existsInCache := itr.mgr.cache.get(itr.blockNumToRetrieve)
if existsInCache {
Expand Down
12 changes: 6 additions & 6 deletions common/ledger/blkstorage/blocks_itr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ func TestBlockItrClose(t *testing.T) {
itr, err := blkfileMgr.retrieveBlocks(1)
require.NoError(t, err)

bh, _ := itr.Next()
bh, _ := itr.Next(t.Context())
require.NotNil(t, bh)
itr.Close()

bh, err = itr.Next()
bh, err = itr.Next(t.Context())
require.NoError(t, err)
require.Nil(t, bh)
}
Expand All @@ -79,7 +79,7 @@ func TestRaceToDeadlock(t *testing.T) {
panic(err)
}
go func() {
itr.Next()
_, _ = itr.Next(t.Context())
}()
itr.Close()
}
Expand All @@ -92,7 +92,7 @@ func TestRaceToDeadlock(t *testing.T) {
go func() {
itr.Close()
}()
itr.Next()
_, _ = itr.Next(t.Context())
}
}

Expand Down Expand Up @@ -144,7 +144,7 @@ func iterateInBackground(t *testing.T, itr *blocksItr, quitAfterBlkNum uint64, w
defer func() { require.Equal(t, expectedBlockNums, retrievedBlkNums) }()

for {
blk, err := itr.Next()
blk, err := itr.Next(t.Context())
require.NoError(t, err)
if blk == nil {
return
Expand All @@ -162,7 +162,7 @@ func testIterateAndVerify(t *testing.T, itr *blocksItr, blocks []*common.Block,
blocksIterated := 0
for {
t.Logf("blocksIterated: %v", blocksIterated)
block, err := itr.Next()
block, err := itr.Next(t.Context())
require.NoError(t, err)
require.Equal(t, blocks[blocksIterated], block)
blocksIterated++
Expand Down
45 changes: 44 additions & 1 deletion common/ledger/blkstorage/blockstore_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ SPDX-License-Identifier: Apache-2.0
package blkstorage

import (
"context"
"fmt"
"testing"
"time"

"github.com/hyperledger/fabric-protos-go-apiv2/common"
"github.com/hyperledger/fabric-protos-go-apiv2/peer"
"github.com/stretchr/testify/require"

"github.com/hyperledger/fabric-x-common/common/ledger"
"github.com/hyperledger/fabric-x-common/common/ledger/testutil"
"github.com/hyperledger/fabric-x-common/protoutil"
"github.com/hyperledger/fabric-x-common/tools/pkg/txflags"
Expand Down Expand Up @@ -101,7 +104,7 @@ func checkBlocks(t *testing.T, expectedBlocks []*common.Block, store *BlockStore

itr, _ := store.RetrieveBlocks(0)
for i := 0; i < len(expectedBlocks); i++ {
block, _ := itr.Next()
block, _ := itr.Next(t.Context())
require.Equal(t, expectedBlocks[i], block)
}

Expand Down Expand Up @@ -246,6 +249,46 @@ func TestDrop(t *testing.T) {
require.EqualError(t, provider.Drop("ledger2"), "internal leveldb error while obtaining db iterator: leveldb: closed")
}

func TestBlocksItrContextCancel(t *testing.T) {
t.Parallel()
env := newTestEnv(t, NewConf(t.TempDir(), 0))
t.Cleanup(env.Cleanup)
store, err := env.provider.Open("testledger-ctx")
require.NoError(t, err)
t.Cleanup(store.Shutdown)

// Add one block so the store is initialized
blocks := testutil.ConstructTestBlocks(t, 1)
require.NoError(t, store.AddBlock(blocks[0]))

// Create a cancellable context and an iterator starting at block 1 (not yet available)
ctx, cancel := context.WithCancel(t.Context())
itr, err := store.RetrieveBlocks(1)
require.NoError(t, err)
t.Cleanup(itr.Close)

// Next() should block waiting for block 1. Cancel the context to unblock it.
done := make(chan any)
var result ledger.QueryResult
var nextErr error
go func() {
result, nextErr = itr.Next(ctx)
close(done)
}()

// Cancel the context — this should wake up waitForBlock
cancel()

select {
case <-done:
case <-time.After(5 * time.Second):
t.Fatal("Next() did not return after context cancellation")
}

require.Nil(t, result)
require.ErrorIs(t, nextErr, context.Canceled)
}

func constructLedgerid(id int) string {
return fmt.Sprintf("ledger_%d", id)
}
2 changes: 1 addition & 1 deletion common/ledger/blkstorage/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ func verifyQueriesOnBlocksAddedAfterBootstrapping(t *testing.T,

itr, err := bootstrappedBlockStore.RetrieveBlocks(b.Header.Number)
require.NoError(t, err)
blk, err := itr.Next()
blk, err := itr.Next(t.Context())
require.NoError(t, err)
require.Equal(t, b, blk)
itr.Close()
Expand Down
6 changes: 4 additions & 2 deletions common/ledger/blockledger/fileledger/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ SPDX-License-Identifier: Apache-2.0
package fileledger

import (
"context"

cb "github.com/hyperledger/fabric-protos-go-apiv2/common"
ab "github.com/hyperledger/fabric-protos-go-apiv2/orderer"

Expand Down Expand Up @@ -48,8 +50,8 @@ type fileLedgerIterator struct {

// Next blocks until there is a new block available, or until Close is called.
// It returns an error if the next block is no longer retrievable.
func (i *fileLedgerIterator) Next() (*cb.Block, cb.Status) {
result, err := i.commonIterator.Next()
func (i *fileLedgerIterator) Next(ctx context.Context) (*cb.Block, cb.Status) {
result, err := i.commonIterator.Next(ctx)
if err != nil {
logger.Error(err)
return nil, cb.Status_SERVICE_UNAVAILABLE
Expand Down
Loading