Skip to content

Commit 369d736

Browse files
committed
[Access] Allow streaming from the spork root block
1 parent 7874657 commit 369d736

File tree

6 files changed

+418
-115
lines changed

6 files changed

+418
-115
lines changed

engine/access/state_stream/backend/backend.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ type StateStreamBackend struct {
7878
execDataCache *cache.ExecutionDataCache
7979
registers *execution.RegistersAsyncStore
8080
registerRequestLimit int
81+
sporkRootBlockHeight uint64
8182
}
8283

8384
func New(
@@ -108,6 +109,7 @@ func New(
108109
execDataCache: execDataCache,
109110
registers: registers,
110111
registerRequestLimit: registerIDsRequestLimit,
112+
sporkRootBlockHeight: state.Params().SporkRootBlockHeight(),
111113
}
112114

113115
b.ExecutionDataBackend = ExecutionDataBackend{
@@ -155,6 +157,26 @@ func (b *StateStreamBackend) getExecutionData(ctx context.Context, height uint64
155157
return nil, fmt.Errorf("execution data for block %d is not available yet: %w", height, subscription.ErrBlockNotReady)
156158
}
157159

160+
// the spork root block will never have execution data available. If requested, return an empty result.
161+
if height == b.sporkRootBlockHeight {
162+
sporkRootBlockID, err := b.headers.BlockIDByHeight(b.sporkRootBlockHeight)
163+
if err != nil {
164+
if !errors.Is(err, storage.ErrNotFound) {
165+
return nil, fmt.Errorf("could not get block ID for root block height %d: %w", b.sporkRootBlockHeight, err)
166+
}
167+
// the spork root block will not exist locally if the node was bootstrapped with a newer
168+
// root block. In this case, just use a placeholder. This blockID is only used when serving
169+
// data for the root block, which will never happen if it does not exist locally.
170+
sporkRootBlockID = flow.ZeroID
171+
}
172+
173+
return &execution_data.BlockExecutionDataEntity{
174+
BlockExecutionData: &execution_data.BlockExecutionData{
175+
BlockID: sporkRootBlockID,
176+
},
177+
}, nil
178+
}
179+
158180
execData, err := b.execDataCache.ByHeight(ctx, height)
159181
if err != nil {
160182
if errors.Is(err, storage.ErrNotFound) ||

engine/access/state_stream/backend/backend_account_statuses_test.go

Lines changed: 124 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ func (s *BackendAccountStatusesSuite) subscribeFromStartBlockIdTestCases() []tes
128128
{
129129
name: "happy path - all new blocks",
130130
highestBackfill: -1, // no backfill
131-
startValue: s.rootBlock.ID(),
131+
startValue: s.blocks[0].ID(),
132132
},
133133
{
134134
name: "happy path - partial backfill",
@@ -140,11 +140,6 @@ func (s *BackendAccountStatusesSuite) subscribeFromStartBlockIdTestCases() []tes
140140
highestBackfill: len(s.blocks) - 1, // backfill all blocks
141141
startValue: s.blocks[0].ID(),
142142
},
143-
{
144-
name: "happy path - start from root block by id",
145-
highestBackfill: len(s.blocks) - 1, // backfill all blocks
146-
startValue: s.rootBlock.ID(), // start from root block
147-
},
148143
}
149144

150145
return s.generateFiltersForTestCases(baseTests)
@@ -156,7 +151,7 @@ func (s *BackendAccountStatusesSuite) subscribeFromStartHeightTestCases() []test
156151
{
157152
name: "happy path - all new blocks",
158153
highestBackfill: -1, // no backfill
159-
startValue: s.rootBlock.Height,
154+
startValue: s.blocks[0].Height,
160155
},
161156
{
162157
name: "happy path - partial backfill",
@@ -168,11 +163,6 @@ func (s *BackendAccountStatusesSuite) subscribeFromStartHeightTestCases() []test
168163
highestBackfill: len(s.blocks) - 1, // backfill all blocks
169164
startValue: s.blocks[0].Height,
170165
},
171-
{
172-
name: "happy path - start from root block by id",
173-
highestBackfill: len(s.blocks) - 1, // backfill all blocks
174-
startValue: s.rootBlock.Height, // start from root block
175-
},
176166
}
177167

178168
return s.generateFiltersForTestCases(baseTests)
@@ -311,33 +301,20 @@ func (s *BackendAccountStatusesSuite) subscribeToAccountStatuses(
311301
s.broadcaster.Publish()
312302
}
313303

314-
expectedEvents := map[string]flow.EventsList{}
315-
for _, event := range s.blockEvents[b.ID()] {
316-
if test.filters.Match(event) {
317-
var address string
318-
switch event.Type {
319-
case state_stream.CoreEventAccountCreated:
320-
address = s.accountCreatedAddress.HexWithPrefix()
321-
case state_stream.CoreEventAccountContractAdded:
322-
address = s.accountContractAdded.HexWithPrefix()
323-
case state_stream.CoreEventAccountContractUpdated:
324-
address = s.accountContractUpdated.HexWithPrefix()
325-
}
326-
expectedEvents[address] = append(expectedEvents[address], event)
327-
}
328-
}
304+
expectedEvents := s.expectedAccountStatuses(b.ID(), test.filters)
329305

330306
// Consume execution data from subscription
331307
unittest.RequireReturnsBefore(s.T(), func() {
332308
v, ok := <-sub.Channel()
333309
require.True(s.T(), ok, "channel closed while waiting for exec data for block %d %v: err: %v", b.Height, b.ID(), sub.Err())
334310

335-
resp, ok := v.(*AccountStatusesResponse)
336-
require.True(s.T(), ok, "unexpected response type: %T", v)
311+
expected := &AccountStatusesResponse{
312+
BlockID: b.ID(),
313+
Height: b.Height,
314+
AccountEvents: expectedEvents,
315+
}
316+
s.requireEventsResponse(v, expected)
337317

338-
assert.Equal(s.T(), b.ID(), resp.BlockID)
339-
assert.Equal(s.T(), b.Height, resp.Height)
340-
assert.Equal(s.T(), expectedEvents, resp.AccountEvents)
341318
}, 60*time.Second, fmt.Sprintf("timed out waiting for exec data for block %d %v", b.Height, b.ID()))
342319
}
343320

@@ -408,6 +385,102 @@ func (s *BackendAccountStatusesSuite) TestSubscribeAccountStatusesFromLatestBloc
408385
s.subscribeToAccountStatuses(call, s.subscribeFromLatestTestCases())
409386
}
410387

388+
// requireEventsResponse ensures that the received event information matches the expected data.
389+
func (s *BackendAccountStatusesSuite) requireEventsResponse(v interface{}, expected *AccountStatusesResponse) {
390+
actual, ok := v.(*AccountStatusesResponse)
391+
require.True(s.T(), ok, "unexpected response type: %T", v)
392+
393+
assert.Equal(s.T(), expected.BlockID, actual.BlockID)
394+
assert.Equal(s.T(), expected.Height, actual.Height)
395+
assert.Equal(s.T(), expected.AccountEvents, actual.AccountEvents)
396+
}
397+
398+
// TestSubscribeAccountStatusesFromSporkRootBlock tests that events subscriptions starting from the spork
399+
// root block return an empty result for the root block.
400+
func (s *BackendAccountStatusesSuite) TestSubscribeAccountStatusesFromSporkRootBlock() {
401+
ctx, cancel := context.WithCancel(context.Background())
402+
defer cancel()
403+
404+
// setup the backend to have 1 available block
405+
s.highestBlockHeader = s.blocks[0].ToHeader()
406+
407+
rootEventResponse := &AccountStatusesResponse{
408+
BlockID: s.rootBlock.ID(),
409+
Height: s.rootBlock.Height,
410+
AccountEvents: map[string]flow.EventsList{},
411+
}
412+
413+
filter, err := state_stream.NewAccountStatusFilter(state_stream.DefaultEventFilterConfig, chainID.Chain(), []string{}, []string{})
414+
require.NoError(s.T(), err)
415+
416+
expectedEvents := s.expectedAccountStatuses(s.blocks[0].ID(), filter)
417+
firstEventResponse := &AccountStatusesResponse{
418+
BlockID: s.blocks[0].ID(),
419+
Height: s.blocks[0].Height,
420+
AccountEvents: expectedEvents,
421+
}
422+
423+
assertSubscriptionResponses := func(sub subscription.Subscription, cancel context.CancelFunc) {
424+
// the first response should have details from the root block and no events
425+
resp := <-sub.Channel()
426+
s.requireEventsResponse(resp, rootEventResponse)
427+
428+
// the second response should have details from the first block and its events
429+
resp = <-sub.Channel()
430+
s.requireEventsResponse(resp, firstEventResponse)
431+
432+
cancel()
433+
resp, ok := <-sub.Channel()
434+
assert.False(s.T(), ok)
435+
assert.Nil(s.T(), resp)
436+
assert.ErrorIs(s.T(), sub.Err(), context.Canceled)
437+
}
438+
439+
s.Run("by height", func() {
440+
subCtx, subCancel := context.WithCancel(ctx)
441+
defer subCancel()
442+
443+
s.executionDataTracker.On("GetStartHeightFromHeight", s.rootBlock.Height).
444+
Return(func(startHeight uint64) (uint64, error) {
445+
return s.executionDataTrackerReal.GetStartHeightFromHeight(startHeight)
446+
})
447+
448+
sub := s.backend.SubscribeAccountStatusesFromStartHeight(subCtx, s.rootBlock.Height, filter)
449+
assertSubscriptionResponses(sub, subCancel)
450+
})
451+
452+
s.Run("by ID", func() {
453+
subCtx, subCancel := context.WithCancel(ctx)
454+
defer subCancel()
455+
456+
s.executionDataTracker.On("GetStartHeightFromBlockID", s.rootBlock.ID()).
457+
Return(func(startBlockID flow.Identifier) (uint64, error) {
458+
return s.executionDataTrackerReal.GetStartHeightFromBlockID(startBlockID)
459+
})
460+
461+
sub := s.backend.SubscribeAccountStatusesFromStartBlockID(subCtx, s.rootBlock.ID(), filter)
462+
assertSubscriptionResponses(sub, subCancel)
463+
})
464+
465+
s.Run("by latest", func() {
466+
subCtx, subCancel := context.WithCancel(ctx)
467+
defer subCancel()
468+
469+
// simulate the case where the latest block is also the root block
470+
s.snapshot.On("Head").Unset()
471+
s.snapshot.On("Head").Return(s.rootBlock.ToHeader(), nil).Once()
472+
473+
s.executionDataTracker.On("GetStartHeightFromLatest", mock.Anything).
474+
Return(func(ctx context.Context) (uint64, error) {
475+
return s.executionDataTrackerReal.GetStartHeightFromLatest(ctx)
476+
})
477+
478+
sub := s.backend.SubscribeAccountStatusesFromLatestBlock(subCtx, filter)
479+
assertSubscriptionResponses(sub, subCancel)
480+
})
481+
482+
}
483+
411484
// TestSubscribeAccountStatusesHandlesErrors tests handling of expected errors in the SubscribeAccountStatuses.
412485
func (s *BackendExecutionDataSuite) TestSubscribeAccountStatusesHandlesErrors() {
413486
ctx, cancel := context.WithCancel(context.Background())
@@ -455,3 +528,22 @@ func (s *BackendExecutionDataSuite) TestSubscribeAccountStatusesHandlesErrors()
455528
assert.Equal(s.T(), codes.NotFound, status.Code(sub.Err()), "expected NotFound, got %v: %v", status.Code(sub.Err()).String(), sub.Err())
456529
})
457530
}
531+
532+
func (s *BackendAccountStatusesSuite) expectedAccountStatuses(blockID flow.Identifier, filter state_stream.AccountStatusFilter) map[string]flow.EventsList {
533+
expectedEvents := map[string]flow.EventsList{}
534+
for _, event := range s.blockEvents[blockID] {
535+
if filter.Match(event) {
536+
var address string
537+
switch event.Type {
538+
case state_stream.CoreEventAccountCreated:
539+
address = s.accountCreatedAddress.HexWithPrefix()
540+
case state_stream.CoreEventAccountContractAdded:
541+
address = s.accountContractAdded.HexWithPrefix()
542+
case state_stream.CoreEventAccountContractUpdated:
543+
address = s.accountContractUpdated.HexWithPrefix()
544+
}
545+
expectedEvents[address] = append(expectedEvents[address], event)
546+
}
547+
}
548+
return expectedEvents
549+
}

0 commit comments

Comments
 (0)