Skip to content

Commit 101ff68

Browse files
Merge branch 'master' into AndriiDiachuk/6588-events-data-provider
2 parents 907c708 + 45950a7 commit 101ff68

File tree

16 files changed

+1136
-124
lines changed

16 files changed

+1136
-124
lines changed

Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,7 @@ generate-mocks: install-mock-generators
216216
mockery --name 'Storage' --dir=module/executiondatasync/tracker --case=underscore --output="module/executiondatasync/tracker/mock" --outpkg="mocktracker"
217217
mockery --name 'ScriptExecutor' --dir=module/execution --case=underscore --output="module/execution/mock" --outpkg="mock"
218218
mockery --name 'StorageSnapshot' --dir=fvm/storage/snapshot --case=underscore --output="fvm/storage/snapshot/mock" --outpkg="mock"
219+
mockery --name 'WebsocketConnection' --dir=engine/access/rest/websockets --case=underscore --output="engine/access/rest/websockets/mock" --outpkg="mock"
219220

220221
#temporarily make insecure/ a non-module to allow mockery to create mocks
221222
mv insecure/go.mod insecure/go2.mod

engine/access/rest/websockets/controller.go

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -66,15 +66,15 @@ func (c *Controller) HandleConnection(ctx context.Context) {
6666
g, gCtx := errgroup.WithContext(ctx)
6767

6868
g.Go(func() error {
69-
return c.readMessagesFromClient(gCtx)
69+
return c.readMessages(gCtx)
7070
})
7171

7272
g.Go(func() error {
7373
return c.keepalive(gCtx)
7474
})
7575

7676
g.Go(func() error {
77-
return c.writeMessagesToClient(gCtx)
77+
return c.writeMessages(gCtx)
7878
})
7979

8080
if err = g.Wait(); err != nil {
@@ -111,12 +111,12 @@ func (c *Controller) configureKeepalive() error {
111111
return nil
112112
}
113113

114-
// writeMessagesToClient reads a messages from communication channel and passes them on to a client WebSocket connection.
114+
// writeMessages reads a messages from communication channel and passes them on to a client WebSocket connection.
115115
// The communication channel is filled by data providers. Besides, the response limit tracker is involved in
116116
// write message regulation
117117
//
118118
// No errors are expected during normal operation. All errors are considered benign.
119-
func (c *Controller) writeMessagesToClient(ctx context.Context) error {
119+
func (c *Controller) writeMessages(ctx context.Context) error {
120120
for {
121121
select {
122122
case <-ctx.Done():
@@ -143,11 +143,11 @@ func (c *Controller) writeMessagesToClient(ctx context.Context) error {
143143
}
144144
}
145145

146-
// readMessagesFromClient continuously reads messages from a client WebSocket connection,
146+
// readMessages continuously reads messages from a client WebSocket connection,
147147
// processes each message, and handles actions based on the message type.
148148
//
149149
// No errors are expected during normal operation. All errors are considered benign.
150-
func (c *Controller) readMessagesFromClient(ctx context.Context) error {
150+
func (c *Controller) readMessages(ctx context.Context) error {
151151
for {
152152
select {
153153
case <-ctx.Done():
@@ -163,10 +163,12 @@ func (c *Controller) readMessagesFromClient(ctx context.Context) error {
163163

164164
_, validatedMsg, err := c.parseAndValidateMessage(msg)
165165
if err != nil {
166+
//TODO: write error to error channel
166167
return fmt.Errorf("failed to parse and validate client message: %w", err)
167168
}
168169

169170
if err := c.handleAction(ctx, validatedMsg); err != nil {
171+
//TODO: write error to error channel
170172
return fmt.Errorf("failed to handle message action: %w", err)
171173
}
172174
}
@@ -241,8 +243,16 @@ func (c *Controller) handleSubscribe(ctx context.Context, msg models.SubscribeMe
241243

242244
c.dataProviders.Add(dp.ID(), dp)
243245

244-
//TODO: return OK response to client
245-
c.communicationChannel <- msg
246+
//TODO: return correct OK response to client
247+
response := models.SubscribeMessageResponse{
248+
BaseMessageResponse: models.BaseMessageResponse{
249+
Success: true,
250+
},
251+
Topic: dp.Topic(),
252+
ID: dp.ID().String(),
253+
}
254+
255+
c.communicationChannel <- response
246256

247257
go func() {
248258
err := dp.Run()

engine/access/rest/websockets/controller_test.go

Lines changed: 234 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ import (
77
"testing"
88
"time"
99

10+
"github.com/stretchr/testify/require"
11+
12+
streammock "github.com/onflow/flow-go/engine/access/state_stream/mock"
13+
1014
"github.com/google/uuid"
1115
"github.com/gorilla/websocket"
1216
"github.com/rs/zerolog"
@@ -18,6 +22,8 @@ import (
1822
dpmock "github.com/onflow/flow-go/engine/access/rest/websockets/data_providers/mock"
1923
connectionmock "github.com/onflow/flow-go/engine/access/rest/websockets/mock"
2024
"github.com/onflow/flow-go/engine/access/rest/websockets/models"
25+
"github.com/onflow/flow-go/engine/access/state_stream/backend"
26+
"github.com/onflow/flow-go/model/flow"
2127
"github.com/onflow/flow-go/utils/unittest"
2228
)
2329

@@ -30,6 +36,9 @@ type ControllerSuite struct {
3036

3137
connection *connectionmock.WebsocketConnection
3238
dataProviderFactory *dpmock.DataProviderFactory
39+
40+
streamApi *streammock.API
41+
streamConfig backend.Config
3342
}
3443

3544
func TestControllerSuite(t *testing.T) {
@@ -39,10 +48,232 @@ func TestControllerSuite(t *testing.T) {
3948
// SetupTest initializes the test suite with required dependencies.
4049
func (s *ControllerSuite) SetupTest() {
4150
s.logger = unittest.Logger()
42-
s.config = Config{}
51+
s.config = NewDefaultWebsocketConfig()
4352

4453
s.connection = connectionmock.NewWebsocketConnection(s.T())
4554
s.dataProviderFactory = dpmock.NewDataProviderFactory(s.T())
55+
56+
s.streamApi = streammock.NewAPI(s.T())
57+
s.streamConfig = backend.Config{}
58+
}
59+
60+
// TestSubscribeRequest tests the subscribe to topic flow.
61+
// We emulate a request message from a client, and a response message from a controller.
62+
func (s *ControllerSuite) TestSubscribeRequest() {
63+
s.T().Run("Happy path", func(t *testing.T) {
64+
conn, dataProviderFactory, dataProvider := newControllerMocks(t)
65+
controller := NewWebSocketController(s.logger, s.config, conn, dataProviderFactory)
66+
67+
dataProvider.
68+
On("Run").
69+
Run(func(args mock.Arguments) {}).
70+
Return(nil).
71+
Once()
72+
73+
subscribeRequest := models.SubscribeMessageRequest{
74+
BaseMessageRequest: models.BaseMessageRequest{Action: "subscribe"},
75+
Topic: dp.BlocksTopic,
76+
Arguments: nil,
77+
}
78+
79+
// Simulate receiving the subscription request from the client
80+
conn.
81+
On("ReadJSON", mock.Anything).
82+
Run(func(args mock.Arguments) {
83+
requestMsg, ok := args.Get(0).(*json.RawMessage)
84+
require.True(t, ok)
85+
subscribeRequestMessage, err := json.Marshal(subscribeRequest)
86+
require.NoError(t, err)
87+
*requestMsg = subscribeRequestMessage
88+
}).
89+
Return(nil).
90+
Once()
91+
92+
// Channel to signal the test flow completion
93+
done := make(chan struct{}, 1)
94+
95+
// Simulate writing a successful subscription response back to the client
96+
conn.
97+
On("WriteJSON", mock.Anything).
98+
Return(func(msg interface{}) error {
99+
response, ok := msg.(models.SubscribeMessageResponse)
100+
require.True(t, ok)
101+
require.True(t, response.Success)
102+
close(done) // Signal that response has been sent
103+
return websocket.ErrCloseSent
104+
}).Once()
105+
106+
// Simulate client closing connection after receiving the response
107+
conn.
108+
On("ReadJSON", mock.Anything).
109+
Return(func(interface{}) error {
110+
<-done
111+
return websocket.ErrCloseSent
112+
}).Once()
113+
114+
ctx, cancel := context.WithCancel(context.Background())
115+
defer cancel()
116+
117+
controller.HandleConnection(ctx)
118+
})
119+
}
120+
121+
// TestSubscribeBlocks tests the functionality for streaming blocks to a subscriber.
122+
func (s *ControllerSuite) TestSubscribeBlocks() {
123+
s.T().Run("Stream one block", func(t *testing.T) {
124+
conn, dataProviderFactory, dataProvider := newControllerMocks(t)
125+
controller := NewWebSocketController(s.logger, s.config, conn, dataProviderFactory)
126+
127+
// Simulate data provider write a block to the controller
128+
expectedBlock := unittest.BlockFixture()
129+
dataProvider.
130+
On("Run", mock.Anything).
131+
Run(func(args mock.Arguments) {
132+
controller.communicationChannel <- expectedBlock
133+
}).
134+
Return(nil).
135+
Once()
136+
137+
done := make(chan struct{}, 1)
138+
s.expectSubscriptionRequest(conn, done)
139+
s.expectSubscriptionResponse(conn, true)
140+
141+
// Expect a valid block to be passed to WriteJSON.
142+
// If we got to this point, the controller executed all its logic properly
143+
var actualBlock flow.Block
144+
conn.
145+
On("WriteJSON", mock.Anything).
146+
Return(func(msg interface{}) error {
147+
block, ok := msg.(flow.Block)
148+
require.True(t, ok)
149+
actualBlock = block
150+
151+
close(done)
152+
return websocket.ErrCloseSent
153+
}).Once()
154+
155+
ctx, cancel := context.WithCancel(context.Background())
156+
defer cancel()
157+
158+
controller.HandleConnection(ctx)
159+
require.Equal(t, expectedBlock, actualBlock)
160+
})
161+
162+
s.T().Run("Stream many blocks", func(t *testing.T) {
163+
conn, dataProviderFactory, dataProvider := newControllerMocks(t)
164+
controller := NewWebSocketController(s.logger, s.config, conn, dataProviderFactory)
165+
166+
// Simulate data provider writes some blocks to the controller
167+
expectedBlocks := unittest.BlockFixtures(100)
168+
dataProvider.
169+
On("Run", mock.Anything).
170+
Run(func(args mock.Arguments) {
171+
for _, block := range expectedBlocks {
172+
controller.communicationChannel <- *block
173+
}
174+
}).
175+
Return(nil).
176+
Once()
177+
178+
done := make(chan struct{}, 1)
179+
s.expectSubscriptionRequest(conn, done)
180+
s.expectSubscriptionResponse(conn, true)
181+
182+
i := 0
183+
actualBlocks := make([]*flow.Block, len(expectedBlocks))
184+
185+
// Expect valid blocks to be passed to WriteJSON.
186+
// If we got to this point, the controller executed all its logic properly
187+
conn.
188+
On("WriteJSON", mock.Anything).
189+
Return(func(msg interface{}) error {
190+
block, ok := msg.(flow.Block)
191+
require.True(t, ok)
192+
193+
actualBlocks[i] = &block
194+
i += 1
195+
196+
if i == len(expectedBlocks) {
197+
close(done)
198+
return websocket.ErrCloseSent
199+
}
200+
201+
return nil
202+
}).
203+
Times(len(expectedBlocks))
204+
205+
ctx, cancel := context.WithCancel(context.Background())
206+
defer cancel()
207+
208+
controller.HandleConnection(ctx)
209+
require.Equal(t, expectedBlocks, actualBlocks)
210+
})
211+
}
212+
213+
// newControllerMocks initializes mock WebSocket connection, data provider, and data provider factory.
214+
// The mocked functions are expected to be called in a case when a test is expected to reach WriteJSON function.
215+
func newControllerMocks(t *testing.T) (*connectionmock.WebsocketConnection, *dpmock.DataProviderFactory, *dpmock.DataProvider) {
216+
conn := connectionmock.NewWebsocketConnection(t)
217+
conn.On("Close").Return(nil).Once()
218+
conn.On("SetReadDeadline", mock.Anything).Return(nil).Once()
219+
conn.On("SetWriteDeadline", mock.Anything).Return(nil)
220+
conn.On("SetPongHandler", mock.AnythingOfType("func(string) error")).Return(nil).Once()
221+
222+
id := uuid.New()
223+
topic := dp.BlocksTopic
224+
dataProvider := dpmock.NewDataProvider(t)
225+
dataProvider.On("ID").Return(id)
226+
dataProvider.On("Close").Return(nil)
227+
dataProvider.On("Topic").Return(topic)
228+
229+
factory := dpmock.NewDataProviderFactory(t)
230+
factory.
231+
On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
232+
Return(dataProvider, nil).
233+
Once()
234+
235+
return conn, factory, dataProvider
236+
}
237+
238+
// expectSubscriptionRequest mocks the client's subscription request.
239+
func (s *ControllerSuite) expectSubscriptionRequest(conn *connectionmock.WebsocketConnection, done <-chan struct{}) {
240+
requestMessage := models.SubscribeMessageRequest{
241+
BaseMessageRequest: models.BaseMessageRequest{Action: "subscribe"},
242+
Topic: dp.BlocksTopic,
243+
}
244+
245+
// The very first message from a client is a request to subscribe to some topic
246+
conn.On("ReadJSON", mock.Anything).
247+
Run(func(args mock.Arguments) {
248+
reqMsg, ok := args.Get(0).(*json.RawMessage)
249+
require.True(s.T(), ok)
250+
msg, err := json.Marshal(requestMessage)
251+
require.NoError(s.T(), err)
252+
*reqMsg = msg
253+
}).
254+
Return(nil).
255+
Once()
256+
257+
// In the default case, no further communication is expected from the client.
258+
// We wait for the writer routine to signal completion, allowing us to close the connection gracefully
259+
conn.
260+
On("ReadJSON", mock.Anything).
261+
Return(func(msg interface{}) error {
262+
<-done
263+
return websocket.ErrCloseSent
264+
})
265+
}
266+
267+
// expectSubscriptionResponse mocks the subscription response sent to the client.
268+
func (s *ControllerSuite) expectSubscriptionResponse(conn *connectionmock.WebsocketConnection, success bool) {
269+
conn.On("WriteJSON", mock.Anything).
270+
Run(func(args mock.Arguments) {
271+
response, ok := args.Get(0).(models.SubscribeMessageResponse)
272+
require.True(s.T(), ok)
273+
require.Equal(s.T(), success, response.Success)
274+
}).
275+
Return(nil).
276+
Once()
46277
}
47278

48279
// TestConfigureKeepaliveConnection ensures that the WebSocket connection is configured correctly.
@@ -259,8 +490,9 @@ func (s *ControllerSuite) initializeController() *Controller {
259490
// mockDataProviderSetup is a helper which mocks a blocks data provider setup.
260491
func (s *ControllerSuite) mockBlockDataProviderSetup(id uuid.UUID) *dpmock.DataProvider {
261492
dataProvider := dpmock.NewDataProvider(s.T())
262-
dataProvider.On("ID").Return(id).Once()
493+
dataProvider.On("ID").Return(id).Twice()
263494
dataProvider.On("Close").Return(nil).Once()
495+
dataProvider.On("Topic").Return(dp.BlocksTopic).Once()
264496
s.dataProviderFactory.On("NewDataProvider", mock.Anything, dp.BlocksTopic, mock.Anything, mock.Anything).
265497
Return(dataProvider, nil).Once()
266498
dataProvider.On("Run").Return(nil).Once()

0 commit comments

Comments
 (0)