Skip to content

Commit 53d2093

Browse files
authored
Merge pull request #7913 from onflow/peter/fix-streaming-start-height
[Access] Allow streaming from the spork root block
2 parents 112fea4 + f28ef82 commit 53d2093

File tree

16 files changed

+626
-360
lines changed

16 files changed

+626
-360
lines changed

cmd/access/node_builder/access_node_builder.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2060,7 +2060,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
20602060
// handles block-related operations.
20612061
blockTracker, err := subscriptiontracker.NewBlockTracker(
20622062
node.State,
2063-
builder.FinalizedRootBlock.Height,
2063+
builder.SealedRootBlock.Height,
20642064
node.Storage.Headers,
20652065
broadcaster,
20662066
)

cmd/observer/node_builder/observer_builder.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1921,7 +1921,7 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() {
19211921
// handles block-related operations.
19221922
blockTracker, err := subscriptiontracker.NewBlockTracker(
19231923
node.State,
1924-
builder.FinalizedRootBlock.Height,
1924+
builder.SealedRootBlock.Height,
19251925
node.Storage.Headers,
19261926
broadcaster,
19271927
)

engine/access/rpc/backend/backend_stream_block_digests_test.go

Lines changed: 6 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,12 @@ import (
55
"testing"
66

77
"github.com/stretchr/testify/assert"
8-
"github.com/stretchr/testify/mock"
98
"github.com/stretchr/testify/require"
109
"github.com/stretchr/testify/suite"
1110
"google.golang.org/grpc/codes"
1211
"google.golang.org/grpc/status"
1312

13+
"github.com/onflow/flow-go/engine"
1414
"github.com/onflow/flow-go/engine/access/subscription"
1515
"github.com/onflow/flow-go/model/flow"
1616
"github.com/onflow/flow-go/utils/unittest"
@@ -31,13 +31,6 @@ func (s *BackendBlockDigestSuite) SetupTest() {
3131

3232
// TestSubscribeBlockDigestsFromStartBlockID tests the SubscribeBlockDigestsFromStartBlockID method.
3333
func (s *BackendBlockDigestSuite) TestSubscribeBlockDigestsFromStartBlockID() {
34-
s.blockTracker.On(
35-
"GetStartHeightFromBlockID",
36-
mock.AnythingOfType("flow.Identifier"),
37-
).Return(func(startBlockID flow.Identifier) (uint64, error) {
38-
return s.blockTrackerReal.GetStartHeightFromBlockID(startBlockID)
39-
}, nil)
40-
4134
call := func(ctx context.Context, startValue interface{}, blockStatus flow.BlockStatus) subscription.Subscription {
4235
return s.backend.SubscribeBlockDigestsFromStartBlockID(ctx, startValue.(flow.Identifier), blockStatus)
4336
}
@@ -47,13 +40,6 @@ func (s *BackendBlockDigestSuite) TestSubscribeBlockDigestsFromStartBlockID() {
4740

4841
// TestSubscribeBlockDigestsFromStartHeight tests the SubscribeBlockDigestsFromStartHeight method.
4942
func (s *BackendBlockDigestSuite) TestSubscribeBlockDigestsFromStartHeight() {
50-
s.blockTracker.On(
51-
"GetStartHeightFromHeight",
52-
mock.AnythingOfType("uint64"),
53-
).Return(func(startHeight uint64) (uint64, error) {
54-
return s.blockTrackerReal.GetStartHeightFromHeight(startHeight)
55-
}, nil)
56-
5743
call := func(ctx context.Context, startValue interface{}, blockStatus flow.BlockStatus) subscription.Subscription {
5844
return s.backend.SubscribeBlockDigestsFromStartHeight(ctx, startValue.(uint64), blockStatus)
5945
}
@@ -63,13 +49,6 @@ func (s *BackendBlockDigestSuite) TestSubscribeBlockDigestsFromStartHeight() {
6349

6450
// TestSubscribeBlockDigestsFromLatest tests the SubscribeBlockDigestsFromLatest method.
6551
func (s *BackendBlockDigestSuite) TestSubscribeBlockDigestsFromLatest() {
66-
s.blockTracker.On(
67-
"GetStartHeightFromLatest",
68-
mock.Anything,
69-
).Return(func(ctx context.Context) (uint64, error) {
70-
return s.blockTrackerReal.GetStartHeightFromLatest(ctx)
71-
}, nil)
72-
7352
call := func(ctx context.Context, startValue interface{}, blockStatus flow.BlockStatus) subscription.Subscription {
7453
return s.backend.SubscribeBlockDigestsFromLatest(ctx, blockStatus)
7554
}
@@ -106,43 +85,30 @@ func (s *BackendBlockDigestSuite) TestSubscribeBlockDigestsHandlesErrors() {
10685
ctx, cancel := context.WithCancel(context.Background())
10786
defer cancel()
10887

109-
// mock block tracker for GetStartHeightFromBlockID
110-
s.blockTracker.On(
111-
"GetStartHeightFromBlockID",
112-
mock.AnythingOfType("flow.Identifier"),
113-
).Return(func(startBlockID flow.Identifier) (uint64, error) {
114-
return s.blockTrackerReal.GetStartHeightFromBlockID(startBlockID)
115-
}, nil)
88+
backend, err := New(s.backendParams(engine.NewBroadcaster()))
89+
s.Require().NoError(err)
11690

11791
s.Run("returns error if unknown start block id is provided", func() {
11892
subCtx, subCancel := context.WithCancel(ctx)
11993
defer subCancel()
12094

121-
sub := s.backend.SubscribeBlockDigestsFromStartBlockID(subCtx, unittest.IdentifierFixture(), flow.BlockStatusFinalized)
95+
sub := backend.SubscribeBlockDigestsFromStartBlockID(subCtx, unittest.IdentifierFixture(), flow.BlockStatusFinalized)
12296
assert.Equal(s.T(), codes.NotFound, status.Code(sub.Err()), "expected %s, got %v: %v", codes.NotFound, status.Code(sub.Err()).String(), sub.Err())
12397
})
12498

125-
// mock block tracker for GetStartHeightFromHeight
126-
s.blockTracker.On(
127-
"GetStartHeightFromHeight",
128-
mock.AnythingOfType("uint64"),
129-
).Return(func(startHeight uint64) (uint64, error) {
130-
return s.blockTrackerReal.GetStartHeightFromHeight(startHeight)
131-
}, nil)
132-
13399
s.Run("returns error for start height before root height", func() {
134100
subCtx, subCancel := context.WithCancel(ctx)
135101
defer subCancel()
136102

137-
sub := s.backend.SubscribeBlockDigestsFromStartHeight(subCtx, s.rootBlock.Height-1, flow.BlockStatusFinalized)
103+
sub := backend.SubscribeBlockDigestsFromStartHeight(subCtx, s.rootBlock.Height-1, flow.BlockStatusFinalized)
138104
assert.Equal(s.T(), codes.InvalidArgument, status.Code(sub.Err()), "expected %s, got %v: %v", codes.InvalidArgument, status.Code(sub.Err()).String(), sub.Err())
139105
})
140106

141107
s.Run("returns error if unknown start height is provided", func() {
142108
subCtx, subCancel := context.WithCancel(ctx)
143109
defer subCancel()
144110

145-
sub := s.backend.SubscribeBlockDigestsFromStartHeight(subCtx, s.blocksArray[len(s.blocksArray)-1].Height+10, flow.BlockStatusFinalized)
111+
sub := backend.SubscribeBlockDigestsFromStartHeight(subCtx, s.blocksArray[len(s.blocksArray)-1].Height+10, flow.BlockStatusFinalized)
146112
assert.Equal(s.T(), codes.NotFound, status.Code(sub.Err()), "expected %s, got %v: %v", codes.NotFound, status.Code(sub.Err()).String(), sub.Err())
147113
})
148114
}

engine/access/rpc/backend/backend_stream_block_headers_test.go

Lines changed: 6 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,12 @@ import (
55
"testing"
66

77
"github.com/stretchr/testify/assert"
8-
"github.com/stretchr/testify/mock"
98
"github.com/stretchr/testify/require"
109
"github.com/stretchr/testify/suite"
1110
"google.golang.org/grpc/codes"
1211
"google.golang.org/grpc/status"
1312

13+
"github.com/onflow/flow-go/engine"
1414
"github.com/onflow/flow-go/engine/access/subscription"
1515
"github.com/onflow/flow-go/model/flow"
1616
"github.com/onflow/flow-go/utils/unittest"
@@ -31,13 +31,6 @@ func (s *BackendBlockHeadersSuite) SetupTest() {
3131

3232
// TestSubscribeBlockHeadersFromStartBlockID tests the SubscribeBlockHeadersFromStartBlockID method.
3333
func (s *BackendBlockHeadersSuite) TestSubscribeBlockHeadersFromStartBlockID() {
34-
s.blockTracker.On(
35-
"GetStartHeightFromBlockID",
36-
mock.AnythingOfType("flow.Identifier"),
37-
).Return(func(startBlockID flow.Identifier) (uint64, error) {
38-
return s.blockTrackerReal.GetStartHeightFromBlockID(startBlockID)
39-
}, nil)
40-
4134
call := func(ctx context.Context, startValue interface{}, blockStatus flow.BlockStatus) subscription.Subscription {
4235
return s.backend.SubscribeBlockHeadersFromStartBlockID(ctx, startValue.(flow.Identifier), blockStatus)
4336
}
@@ -47,13 +40,6 @@ func (s *BackendBlockHeadersSuite) TestSubscribeBlockHeadersFromStartBlockID() {
4740

4841
// TestSubscribeBlockHeadersFromStartHeight tests the SubscribeBlockHeadersFromStartHeight method.
4942
func (s *BackendBlockHeadersSuite) TestSubscribeBlockHeadersFromStartHeight() {
50-
s.blockTracker.On(
51-
"GetStartHeightFromHeight",
52-
mock.AnythingOfType("uint64"),
53-
).Return(func(startHeight uint64) (uint64, error) {
54-
return s.blockTrackerReal.GetStartHeightFromHeight(startHeight)
55-
}, nil)
56-
5743
call := func(ctx context.Context, startValue interface{}, blockStatus flow.BlockStatus) subscription.Subscription {
5844
return s.backend.SubscribeBlockHeadersFromStartHeight(ctx, startValue.(uint64), blockStatus)
5945
}
@@ -63,13 +49,6 @@ func (s *BackendBlockHeadersSuite) TestSubscribeBlockHeadersFromStartHeight() {
6349

6450
// TestSubscribeBlockHeadersFromLatest tests the SubscribeBlockHeadersFromLatest method.
6551
func (s *BackendBlockHeadersSuite) TestSubscribeBlockHeadersFromLatest() {
66-
s.blockTracker.On(
67-
"GetStartHeightFromLatest",
68-
mock.Anything,
69-
).Return(func(ctx context.Context) (uint64, error) {
70-
return s.blockTrackerReal.GetStartHeightFromLatest(ctx)
71-
}, nil)
72-
7352
call := func(ctx context.Context, startValue interface{}, blockStatus flow.BlockStatus) subscription.Subscription {
7453
return s.backend.SubscribeBlockHeadersFromLatest(ctx, blockStatus)
7554
}
@@ -106,43 +85,30 @@ func (s *BackendBlockHeadersSuite) TestSubscribeBlockHeadersHandlesErrors() {
10685
ctx, cancel := context.WithCancel(context.Background())
10786
defer cancel()
10887

109-
// mock block tracker for GetStartHeightFromBlockID
110-
s.blockTracker.On(
111-
"GetStartHeightFromBlockID",
112-
mock.AnythingOfType("flow.Identifier"),
113-
).Return(func(startBlockID flow.Identifier) (uint64, error) {
114-
return s.blockTrackerReal.GetStartHeightFromBlockID(startBlockID)
115-
}, nil)
88+
backend, err := New(s.backendParams(engine.NewBroadcaster()))
89+
s.Require().NoError(err)
11690

11791
s.Run("returns error for unknown start block id is provided", func() {
11892
subCtx, subCancel := context.WithCancel(ctx)
11993
defer subCancel()
12094

121-
sub := s.backend.SubscribeBlockHeadersFromStartBlockID(subCtx, unittest.IdentifierFixture(), flow.BlockStatusFinalized)
95+
sub := backend.SubscribeBlockHeadersFromStartBlockID(subCtx, unittest.IdentifierFixture(), flow.BlockStatusFinalized)
12296
assert.Equal(s.T(), codes.NotFound, status.Code(sub.Err()), "expected %s, got %v: %v", codes.NotFound, status.Code(sub.Err()).String(), sub.Err())
12397
})
12498

125-
// mock block tracker for GetStartHeightFromHeight
126-
s.blockTracker.On(
127-
"GetStartHeightFromHeight",
128-
mock.AnythingOfType("uint64"),
129-
).Return(func(startHeight uint64) (uint64, error) {
130-
return s.blockTrackerReal.GetStartHeightFromHeight(startHeight)
131-
}, nil)
132-
13399
s.Run("returns error if start height before root height", func() {
134100
subCtx, subCancel := context.WithCancel(ctx)
135101
defer subCancel()
136102

137-
sub := s.backend.SubscribeBlockHeadersFromStartHeight(subCtx, s.rootBlock.Height-1, flow.BlockStatusFinalized)
103+
sub := backend.SubscribeBlockHeadersFromStartHeight(subCtx, s.rootBlock.Height-1, flow.BlockStatusFinalized)
138104
assert.Equal(s.T(), codes.InvalidArgument, status.Code(sub.Err()), "expected %s, got %v: %v", codes.InvalidArgument, status.Code(sub.Err()).String(), sub.Err())
139105
})
140106

141107
s.Run("returns error for unknown start height is provided", func() {
142108
subCtx, subCancel := context.WithCancel(ctx)
143109
defer subCancel()
144110

145-
sub := s.backend.SubscribeBlockHeadersFromStartHeight(subCtx, s.blocksArray[len(s.blocksArray)-1].Height+10, flow.BlockStatusFinalized)
111+
sub := backend.SubscribeBlockHeadersFromStartHeight(subCtx, s.blocksArray[len(s.blocksArray)-1].Height+10, flow.BlockStatusFinalized)
146112
assert.Equal(s.T(), codes.NotFound, status.Code(sub.Err()), "expected %s, got %v: %v", codes.NotFound, status.Code(sub.Err()).String(), sub.Err())
147113
})
148114
}

0 commit comments

Comments
 (0)