Skip to content

Commit 1f2f0ae

Browse files
Added generic HandleResponse, updated providers
1 parent 437af4f commit 1f2f0ae

File tree

5 files changed

+47
-43
lines changed

5 files changed

+47
-43
lines changed

engine/access/rest/websockets/data_providers/block_digests_provider.go

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,14 @@ func NewBlockDigestsDataProvider(
5858
//
5959
// No errors are expected during normal operations.
6060
func (p *BlockDigestsDataProvider) Run() error {
61-
return subscription.HandleSubscription(p.subscription, p.handleResponse(p.send))
61+
return subscription.HandleSubscription(
62+
p.subscription,
63+
subscription.HandleResponse(p.send, func(block *flow.BlockDigest) (interface{}, error) {
64+
return &models.BlockDigestMessageResponse{
65+
Block: block,
66+
}, nil
67+
}),
68+
)
6269
}
6370

6471
// createSubscription creates a new subscription using the specified input arguments.
@@ -73,16 +80,3 @@ func (p *BlockDigestsDataProvider) createSubscription(ctx context.Context, args
7380

7481
return p.api.SubscribeBlockDigestsFromLatest(ctx, args.BlockStatus)
7582
}
76-
77-
// handleResponse processes a block digest and sends the formatted response.
78-
//
79-
// No errors are expected during normal operations.
80-
func (p *BlockDigestsDataProvider) handleResponse(send chan<- interface{}) func(block *flow.BlockDigest) error {
81-
return func(block *flow.BlockDigest) error {
82-
send <- &models.BlockDigestMessageResponse{
83-
Block: block,
84-
}
85-
86-
return nil
87-
}
88-
}

engine/access/rest/websockets/data_providers/block_headers_provider.go

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,14 @@ func NewBlockHeadersDataProvider(
5858
//
5959
// No errors are expected during normal operations.
6060
func (p *BlockHeadersDataProvider) Run() error {
61-
return subscription.HandleSubscription(p.subscription, p.handleResponse(p.send))
61+
return subscription.HandleSubscription(
62+
p.subscription,
63+
subscription.HandleResponse(p.send, func(header *flow.Header) (interface{}, error) {
64+
return &models.BlockHeaderMessageResponse{
65+
Header: header,
66+
}, nil
67+
}),
68+
)
6269
}
6370

6471
// createSubscription creates a new subscription using the specified input arguments.
@@ -73,16 +80,3 @@ func (p *BlockHeadersDataProvider) createSubscription(ctx context.Context, args
7380

7481
return p.api.SubscribeBlockHeadersFromLatest(ctx, args.BlockStatus)
7582
}
76-
77-
// handleResponse processes a block header and sends the formatted response.
78-
//
79-
// No errors are expected during normal operations.
80-
func (p *BlockHeadersDataProvider) handleResponse(send chan<- interface{}) func(header *flow.Header) error {
81-
return func(header *flow.Header) error {
82-
send <- &models.BlockHeaderMessageResponse{
83-
Header: header,
84-
}
85-
86-
return nil
87-
}
88-
}

engine/access/rest/websockets/data_providers/blocks_provider.go

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,14 @@ func NewBlocksDataProvider(
6767
//
6868
// No errors are expected during normal operations.
6969
func (p *BlocksDataProvider) Run() error {
70-
return subscription.HandleSubscription(p.subscription, p.handleResponse(p.send))
70+
return subscription.HandleSubscription(
71+
p.subscription,
72+
subscription.HandleResponse(p.send, func(block *flow.Block) (interface{}, error) {
73+
return &models.BlockMessageResponse{
74+
Block: block,
75+
}, nil
76+
}),
77+
)
7178
}
7279

7380
// createSubscription creates a new subscription using the specified input arguments.
@@ -83,19 +90,6 @@ func (p *BlocksDataProvider) createSubscription(ctx context.Context, args Blocks
8390
return p.api.SubscribeBlocksFromLatest(ctx, args.BlockStatus)
8491
}
8592

86-
// handleResponse processes a block and sends the formatted response.
87-
//
88-
// No errors are expected during normal operations.
89-
func (p *BlocksDataProvider) handleResponse(send chan<- interface{}) func(*flow.Block) error {
90-
return func(block *flow.Block) error {
91-
send <- &models.BlockMessageResponse{
92-
Block: block,
93-
}
94-
95-
return nil
96-
}
97-
}
98-
9993
// ParseBlocksArguments validates and initializes the blocks arguments.
10094
func ParseBlocksArguments(arguments models.Arguments) (BlocksArguments, error) {
10195
var args BlocksArguments

engine/access/rest/websockets/data_providers/blocks_provider_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ func (s *BlocksProviderSuite) requireBlock(v interface{}, expectedBlock *flow.Bl
217217
// Arguments:
218218
// - topic: The topic associated with the data provider.
219219
// - tests: A slice of test cases to run, each specifying setup and validation logic.
220-
// - sendData: A function to simulate emitting data into the data channel.
220+
// - sendData: A function to simulate emitting data into the subscription's data channel.
221221
// - requireFn: A function to validate the output received in the send channel.
222222
func (s *BlocksProviderSuite) testHappyPath(
223223
topic string,

engine/access/subscription/util.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,3 +54,25 @@ func HandleRPCSubscription[T any](sub Subscription, handleResponse func(resp T)
5454

5555
return nil
5656
}
57+
58+
// HandleResponse processes a generic response of type and sends it to the provided channel.
59+
//
60+
// Parameters:
61+
// - send: The channel to which the processed response is sent.
62+
// - transform: A function to transform the response into the expected interface{} type.
63+
//
64+
// No errors are expected during normal operations.
65+
func HandleResponse[T any](send chan<- interface{}, transform func(resp T) (interface{}, error)) func(resp T) error {
66+
return func(response T) error {
67+
// Transform the response
68+
resp, err := transform(response)
69+
if err != nil {
70+
return fmt.Errorf("failed to transform response: %w", err)
71+
}
72+
73+
// send to the channel
74+
send <- resp
75+
76+
return nil
77+
}
78+
}

0 commit comments

Comments
 (0)