@@ -7,6 +7,10 @@ import (
77 "testing"
88 "time"
99
10+ "github.com/stretchr/testify/require"
11+
12+ streammock "github.com/onflow/flow-go/engine/access/state_stream/mock"
13+
1014 "github.com/google/uuid"
1115 "github.com/gorilla/websocket"
1216 "github.com/rs/zerolog"
@@ -18,6 +22,8 @@ import (
1822 dpmock "github.com/onflow/flow-go/engine/access/rest/websockets/data_providers/mock"
1923 connectionmock "github.com/onflow/flow-go/engine/access/rest/websockets/mock"
2024 "github.com/onflow/flow-go/engine/access/rest/websockets/models"
25+ "github.com/onflow/flow-go/engine/access/state_stream/backend"
26+ "github.com/onflow/flow-go/model/flow"
2127 "github.com/onflow/flow-go/utils/unittest"
2228)
2329
@@ -30,6 +36,9 @@ type ControllerSuite struct {
3036
3137 connection * connectionmock.WebsocketConnection
3238 dataProviderFactory * dpmock.DataProviderFactory
39+
40+ streamApi * streammock.API
41+ streamConfig backend.Config
3342}
3443
3544func TestControllerSuite (t * testing.T ) {
@@ -39,10 +48,232 @@ func TestControllerSuite(t *testing.T) {
3948// SetupTest initializes the test suite with required dependencies.
4049func (s * ControllerSuite ) SetupTest () {
4150 s .logger = unittest .Logger ()
42- s .config = Config {}
51+ s .config = NewDefaultWebsocketConfig ()
4352
4453 s .connection = connectionmock .NewWebsocketConnection (s .T ())
4554 s .dataProviderFactory = dpmock .NewDataProviderFactory (s .T ())
55+
56+ s .streamApi = streammock .NewAPI (s .T ())
57+ s .streamConfig = backend.Config {}
58+ }
59+
60+ // TestSubscribeRequest tests the subscribe to topic flow.
61+ // We emulate a request message from a client, and a response message from a controller.
62+ func (s * ControllerSuite ) TestSubscribeRequest () {
63+ s .T ().Run ("Happy path" , func (t * testing.T ) {
64+ conn , dataProviderFactory , dataProvider := newControllerMocks (t )
65+ controller := NewWebSocketController (s .logger , s .config , conn , dataProviderFactory )
66+
67+ dataProvider .
68+ On ("Run" ).
69+ Run (func (args mock.Arguments ) {}).
70+ Return (nil ).
71+ Once ()
72+
73+ subscribeRequest := models.SubscribeMessageRequest {
74+ BaseMessageRequest : models.BaseMessageRequest {Action : "subscribe" },
75+ Topic : dp .BlocksTopic ,
76+ Arguments : nil ,
77+ }
78+
79+ // Simulate receiving the subscription request from the client
80+ conn .
81+ On ("ReadJSON" , mock .Anything ).
82+ Run (func (args mock.Arguments ) {
83+ requestMsg , ok := args .Get (0 ).(* json.RawMessage )
84+ require .True (t , ok )
85+ subscribeRequestMessage , err := json .Marshal (subscribeRequest )
86+ require .NoError (t , err )
87+ * requestMsg = subscribeRequestMessage
88+ }).
89+ Return (nil ).
90+ Once ()
91+
92+ // Channel to signal the test flow completion
93+ done := make (chan struct {}, 1 )
94+
95+ // Simulate writing a successful subscription response back to the client
96+ conn .
97+ On ("WriteJSON" , mock .Anything ).
98+ Return (func (msg interface {}) error {
99+ response , ok := msg .(models.SubscribeMessageResponse )
100+ require .True (t , ok )
101+ require .True (t , response .Success )
102+ close (done ) // Signal that response has been sent
103+ return websocket .ErrCloseSent
104+ }).Once ()
105+
106+ // Simulate client closing connection after receiving the response
107+ conn .
108+ On ("ReadJSON" , mock .Anything ).
109+ Return (func (interface {}) error {
110+ <- done
111+ return websocket .ErrCloseSent
112+ }).Once ()
113+
114+ ctx , cancel := context .WithCancel (context .Background ())
115+ defer cancel ()
116+
117+ controller .HandleConnection (ctx )
118+ })
119+ }
120+
121+ // TestSubscribeBlocks tests the functionality for streaming blocks to a subscriber.
122+ func (s * ControllerSuite ) TestSubscribeBlocks () {
123+ s .T ().Run ("Stream one block" , func (t * testing.T ) {
124+ conn , dataProviderFactory , dataProvider := newControllerMocks (t )
125+ controller := NewWebSocketController (s .logger , s .config , conn , dataProviderFactory )
126+
127+ // Simulate data provider write a block to the controller
128+ expectedBlock := unittest .BlockFixture ()
129+ dataProvider .
130+ On ("Run" , mock .Anything ).
131+ Run (func (args mock.Arguments ) {
132+ controller .communicationChannel <- expectedBlock
133+ }).
134+ Return (nil ).
135+ Once ()
136+
137+ done := make (chan struct {}, 1 )
138+ s .expectSubscriptionRequest (conn , done )
139+ s .expectSubscriptionResponse (conn , true )
140+
141+ // Expect a valid block to be passed to WriteJSON.
142+ // If we got to this point, the controller executed all its logic properly
143+ var actualBlock flow.Block
144+ conn .
145+ On ("WriteJSON" , mock .Anything ).
146+ Return (func (msg interface {}) error {
147+ block , ok := msg .(flow.Block )
148+ require .True (t , ok )
149+ actualBlock = block
150+
151+ close (done )
152+ return websocket .ErrCloseSent
153+ }).Once ()
154+
155+ ctx , cancel := context .WithCancel (context .Background ())
156+ defer cancel ()
157+
158+ controller .HandleConnection (ctx )
159+ require .Equal (t , expectedBlock , actualBlock )
160+ })
161+
162+ s .T ().Run ("Stream many blocks" , func (t * testing.T ) {
163+ conn , dataProviderFactory , dataProvider := newControllerMocks (t )
164+ controller := NewWebSocketController (s .logger , s .config , conn , dataProviderFactory )
165+
166+ // Simulate data provider writes some blocks to the controller
167+ expectedBlocks := unittest .BlockFixtures (100 )
168+ dataProvider .
169+ On ("Run" , mock .Anything ).
170+ Run (func (args mock.Arguments ) {
171+ for _ , block := range expectedBlocks {
172+ controller .communicationChannel <- * block
173+ }
174+ }).
175+ Return (nil ).
176+ Once ()
177+
178+ done := make (chan struct {}, 1 )
179+ s .expectSubscriptionRequest (conn , done )
180+ s .expectSubscriptionResponse (conn , true )
181+
182+ i := 0
183+ actualBlocks := make ([]* flow.Block , len (expectedBlocks ))
184+
185+ // Expect valid blocks to be passed to WriteJSON.
186+ // If we got to this point, the controller executed all its logic properly
187+ conn .
188+ On ("WriteJSON" , mock .Anything ).
189+ Return (func (msg interface {}) error {
190+ block , ok := msg .(flow.Block )
191+ require .True (t , ok )
192+
193+ actualBlocks [i ] = & block
194+ i += 1
195+
196+ if i == len (expectedBlocks ) {
197+ close (done )
198+ return websocket .ErrCloseSent
199+ }
200+
201+ return nil
202+ }).
203+ Times (len (expectedBlocks ))
204+
205+ ctx , cancel := context .WithCancel (context .Background ())
206+ defer cancel ()
207+
208+ controller .HandleConnection (ctx )
209+ require .Equal (t , expectedBlocks , actualBlocks )
210+ })
211+ }
212+
213+ // newControllerMocks initializes mock WebSocket connection, data provider, and data provider factory.
214+ // The mocked functions are expected to be called in a case when a test is expected to reach WriteJSON function.
215+ func newControllerMocks (t * testing.T ) (* connectionmock.WebsocketConnection , * dpmock.DataProviderFactory , * dpmock.DataProvider ) {
216+ conn := connectionmock .NewWebsocketConnection (t )
217+ conn .On ("Close" ).Return (nil ).Once ()
218+ conn .On ("SetReadDeadline" , mock .Anything ).Return (nil ).Once ()
219+ conn .On ("SetWriteDeadline" , mock .Anything ).Return (nil )
220+ conn .On ("SetPongHandler" , mock .AnythingOfType ("func(string) error" )).Return (nil ).Once ()
221+
222+ id := uuid .New ()
223+ topic := dp .BlocksTopic
224+ dataProvider := dpmock .NewDataProvider (t )
225+ dataProvider .On ("ID" ).Return (id )
226+ dataProvider .On ("Close" ).Return (nil )
227+ dataProvider .On ("Topic" ).Return (topic )
228+
229+ factory := dpmock .NewDataProviderFactory (t )
230+ factory .
231+ On ("NewDataProvider" , mock .Anything , mock .Anything , mock .Anything , mock .Anything ).
232+ Return (dataProvider , nil ).
233+ Once ()
234+
235+ return conn , factory , dataProvider
236+ }
237+
238+ // expectSubscriptionRequest mocks the client's subscription request.
239+ func (s * ControllerSuite ) expectSubscriptionRequest (conn * connectionmock.WebsocketConnection , done <- chan struct {}) {
240+ requestMessage := models.SubscribeMessageRequest {
241+ BaseMessageRequest : models.BaseMessageRequest {Action : "subscribe" },
242+ Topic : dp .BlocksTopic ,
243+ }
244+
245+ // The very first message from a client is a request to subscribe to some topic
246+ conn .On ("ReadJSON" , mock .Anything ).
247+ Run (func (args mock.Arguments ) {
248+ reqMsg , ok := args .Get (0 ).(* json.RawMessage )
249+ require .True (s .T (), ok )
250+ msg , err := json .Marshal (requestMessage )
251+ require .NoError (s .T (), err )
252+ * reqMsg = msg
253+ }).
254+ Return (nil ).
255+ Once ()
256+
257+ // In the default case, no further communication is expected from the client.
258+ // We wait for the writer routine to signal completion, allowing us to close the connection gracefully
259+ conn .
260+ On ("ReadJSON" , mock .Anything ).
261+ Return (func (msg interface {}) error {
262+ <- done
263+ return websocket .ErrCloseSent
264+ })
265+ }
266+
267+ // expectSubscriptionResponse mocks the subscription response sent to the client.
268+ func (s * ControllerSuite ) expectSubscriptionResponse (conn * connectionmock.WebsocketConnection , success bool ) {
269+ conn .On ("WriteJSON" , mock .Anything ).
270+ Run (func (args mock.Arguments ) {
271+ response , ok := args .Get (0 ).(models.SubscribeMessageResponse )
272+ require .True (s .T (), ok )
273+ require .Equal (s .T (), success , response .Success )
274+ }).
275+ Return (nil ).
276+ Once ()
46277}
47278
48279// TestConfigureKeepaliveConnection ensures that the WebSocket connection is configured correctly.
@@ -259,8 +490,9 @@ func (s *ControllerSuite) initializeController() *Controller {
259490// mockDataProviderSetup is a helper which mocks a blocks data provider setup.
260491func (s * ControllerSuite ) mockBlockDataProviderSetup (id uuid.UUID ) * dpmock.DataProvider {
261492 dataProvider := dpmock .NewDataProvider (s .T ())
262- dataProvider .On ("ID" ).Return (id ).Once ()
493+ dataProvider .On ("ID" ).Return (id ).Twice ()
263494 dataProvider .On ("Close" ).Return (nil ).Once ()
495+ dataProvider .On ("Topic" ).Return (dp .BlocksTopic ).Once ()
264496 s .dataProviderFactory .On ("NewDataProvider" , mock .Anything , dp .BlocksTopic , mock .Anything , mock .Anything ).
265497 Return (dataProvider , nil ).Once ()
266498 dataProvider .On ("Run" ).Return (nil ).Once ()
0 commit comments