Skip to content

Commit 941178d

Browse files
Merge pull request #6766 from The-K-R-O-K/AndriiDiachuk/6588-events-data-provider
[Access] Add implementation for events data providers and account statuses data providers
2 parents 66e6db0 + f158e6f commit 941178d

20 files changed

+1196
-125
lines changed

engine/access/rest/http/request/event_type.go renamed to engine/access/rest/common/parser/event_type.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package request
1+
package parser
22

33
import (
44
"fmt"

engine/access/rest/http/request/get_events.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func (g *GetEvents) Parse(rawType string, rawStart string, rawEnd string, rawBlo
7171
if rawType == "" {
7272
return fmt.Errorf("event type must be provided")
7373
}
74-
var eventType EventType
74+
var eventType parser.EventType
7575
err = eventType.Parse(rawType)
7676
if err != nil {
7777
return err

engine/access/rest/server.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,13 @@ func NewServer(serverAPI access.API,
5151
builder.AddLegacyWebsocketsRoutes(stateStreamApi, chain, stateStreamConfig, config.MaxRequestSize)
5252
}
5353

54-
dataProviderFactory := dp.NewDataProviderFactory(logger, stateStreamApi, serverAPI)
54+
dataProviderFactory := dp.NewDataProviderFactory(
55+
logger,
56+
stateStreamApi,
57+
serverAPI,
58+
chain,
59+
stateStreamConfig.EventFilterConfig,
60+
stateStreamConfig.HeartbeatInterval)
5561
builder.AddWebsocketsRoute(chain, wsConfig, config.MaxRequestSize, dataProviderFactory)
5662

5763
c := cors.New(cors.Options{
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
package data_providers
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strconv"
7+
8+
"github.com/rs/zerolog"
9+
"google.golang.org/grpc/codes"
10+
"google.golang.org/grpc/status"
11+
12+
"github.com/onflow/flow-go/engine/access/rest/common/parser"
13+
"github.com/onflow/flow-go/engine/access/rest/http/request"
14+
"github.com/onflow/flow-go/engine/access/rest/websockets/models"
15+
"github.com/onflow/flow-go/engine/access/state_stream"
16+
"github.com/onflow/flow-go/engine/access/state_stream/backend"
17+
"github.com/onflow/flow-go/engine/access/subscription"
18+
"github.com/onflow/flow-go/model/flow"
19+
"github.com/onflow/flow-go/module/counters"
20+
)
21+
22+
// accountStatusesArguments contains the arguments required for subscribing to account statuses
23+
type accountStatusesArguments struct {
24+
StartBlockID flow.Identifier // ID of the block to start subscription from
25+
StartBlockHeight uint64 // Height of the block to start subscription from
26+
Filter state_stream.AccountStatusFilter // Filter applied to events for a given subscription
27+
}
28+
29+
type AccountStatusesDataProvider struct {
30+
*baseDataProvider
31+
32+
logger zerolog.Logger
33+
stateStreamApi state_stream.API
34+
35+
heartbeatInterval uint64
36+
}
37+
38+
var _ DataProvider = (*AccountStatusesDataProvider)(nil)
39+
40+
// NewAccountStatusesDataProvider creates a new instance of AccountStatusesDataProvider.
41+
func NewAccountStatusesDataProvider(
42+
ctx context.Context,
43+
logger zerolog.Logger,
44+
stateStreamApi state_stream.API,
45+
topic string,
46+
arguments models.Arguments,
47+
send chan<- interface{},
48+
chain flow.Chain,
49+
eventFilterConfig state_stream.EventFilterConfig,
50+
heartbeatInterval uint64,
51+
) (*AccountStatusesDataProvider, error) {
52+
p := &AccountStatusesDataProvider{
53+
logger: logger.With().Str("component", "account-statuses-data-provider").Logger(),
54+
stateStreamApi: stateStreamApi,
55+
heartbeatInterval: heartbeatInterval,
56+
}
57+
58+
// Initialize arguments passed to the provider.
59+
accountStatusesArgs, err := parseAccountStatusesArguments(arguments, chain, eventFilterConfig)
60+
if err != nil {
61+
return nil, fmt.Errorf("invalid arguments for account statuses data provider: %w", err)
62+
}
63+
64+
subCtx, cancel := context.WithCancel(ctx)
65+
66+
p.baseDataProvider = newBaseDataProvider(
67+
topic,
68+
cancel,
69+
send,
70+
p.createSubscription(subCtx, accountStatusesArgs), // Set up a subscription to account statuses based on arguments.
71+
)
72+
73+
return p, nil
74+
}
75+
76+
// Run starts processing the subscription for events and handles responses.
77+
//
78+
// No errors are expected during normal operations.
79+
func (p *AccountStatusesDataProvider) Run() error {
80+
return subscription.HandleSubscription(p.subscription, p.handleResponse())
81+
}
82+
83+
// createSubscription creates a new subscription using the specified input arguments.
84+
func (p *AccountStatusesDataProvider) createSubscription(ctx context.Context, args accountStatusesArguments) subscription.Subscription {
85+
if args.StartBlockID != flow.ZeroID {
86+
return p.stateStreamApi.SubscribeAccountStatusesFromStartBlockID(ctx, args.StartBlockID, args.Filter)
87+
}
88+
89+
if args.StartBlockHeight != request.EmptyHeight {
90+
return p.stateStreamApi.SubscribeAccountStatusesFromStartHeight(ctx, args.StartBlockHeight, args.Filter)
91+
}
92+
93+
return p.stateStreamApi.SubscribeAccountStatusesFromLatestBlock(ctx, args.Filter)
94+
}
95+
96+
// handleResponse processes an account statuses and sends the formatted response.
97+
//
98+
// No errors are expected during normal operations.
99+
func (p *AccountStatusesDataProvider) handleResponse() func(accountStatusesResponse *backend.AccountStatusesResponse) error {
100+
blocksSinceLastMessage := uint64(0)
101+
messageIndex := counters.NewMonotonousCounter(0)
102+
103+
return func(accountStatusesResponse *backend.AccountStatusesResponse) error {
104+
// check if there are any events in the response. if not, do not send a message unless the last
105+
// response was more than HeartbeatInterval blocks ago
106+
if len(accountStatusesResponse.AccountEvents) == 0 {
107+
blocksSinceLastMessage++
108+
if blocksSinceLastMessage < p.heartbeatInterval {
109+
return nil
110+
}
111+
blocksSinceLastMessage = 0
112+
}
113+
114+
index := messageIndex.Value()
115+
if ok := messageIndex.Set(messageIndex.Value() + 1); !ok {
116+
return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value())
117+
}
118+
119+
p.send <- &models.AccountStatusesResponse{
120+
BlockID: accountStatusesResponse.BlockID.String(),
121+
Height: strconv.FormatUint(accountStatusesResponse.Height, 10),
122+
AccountEvents: accountStatusesResponse.AccountEvents,
123+
MessageIndex: index,
124+
}
125+
126+
return nil
127+
}
128+
}
129+
130+
// parseAccountStatusesArguments validates and initializes the account statuses arguments.
131+
func parseAccountStatusesArguments(
132+
arguments models.Arguments,
133+
chain flow.Chain,
134+
eventFilterConfig state_stream.EventFilterConfig,
135+
) (accountStatusesArguments, error) {
136+
var args accountStatusesArguments
137+
138+
// Parse block arguments
139+
startBlockID, startBlockHeight, err := ParseStartBlock(arguments)
140+
if err != nil {
141+
return args, err
142+
}
143+
args.StartBlockID = startBlockID
144+
args.StartBlockHeight = startBlockHeight
145+
146+
// Parse 'event_types' as a JSON array
147+
var eventTypes parser.EventTypes
148+
if eventTypesIn, ok := arguments["event_types"]; ok && eventTypesIn != "" {
149+
result, ok := eventTypesIn.([]string)
150+
if !ok {
151+
return args, fmt.Errorf("'event_types' must be an array of string")
152+
}
153+
154+
err := eventTypes.Parse(result)
155+
if err != nil {
156+
return args, fmt.Errorf("invalid 'event_types': %w", err)
157+
}
158+
}
159+
160+
// Parse 'accountAddresses' as []string{}
161+
var accountAddresses []string
162+
if accountAddressesIn, ok := arguments["account_addresses"]; ok && accountAddressesIn != "" {
163+
accountAddresses, ok = accountAddressesIn.([]string)
164+
if !ok {
165+
return args, fmt.Errorf("'account_addresses' must be an array of string")
166+
}
167+
}
168+
169+
// Initialize the event filter with the parsed arguments
170+
args.Filter, err = state_stream.NewAccountStatusFilter(eventFilterConfig, chain, eventTypes.Flow(), accountAddresses)
171+
if err != nil {
172+
return args, fmt.Errorf("failed to create event filter: %w", err)
173+
}
174+
175+
return args, nil
176+
}

0 commit comments

Comments
 (0)