99	"encoding/json" 
1010	"fmt" 
1111	"net/http" 
12+ 	"net/url" 
1213	"strings" 
1314	"testing" 
1415	"time" 
@@ -19,117 +20,272 @@ import (
1920
2021	"github.com/matrix-org/complement/internal/b" 
2122	"github.com/matrix-org/complement/internal/client" 
23+ 	"github.com/matrix-org/complement/internal/docker" 
2224	"github.com/matrix-org/complement/internal/federation" 
2325	"github.com/matrix-org/complement/internal/match" 
26+ 	"github.com/matrix-org/complement/internal/must" 
2427)
2528
26- // TestSyncBlocksDuringPartialStateJoin tests that a regular /sync request 
27- // made during a partial-state /send_join request blocks until the state is 
28- // correctly synced. 
29- func  TestSyncBlocksDuringPartialStateJoin (t  * testing.T ) {
30- 	// We make a room on the Complement server, then have @alice:hs1 join it, 
31- 	// and make a sync request while the resync is in flight 
29+ func  TestPartialStateJoin (t  * testing.T ) {
30+ 	// test that a regular /sync request made during a partial-state /send_join 
31+ 	// request blocks until the state is correctly synced. 
32+ 	t .Run ("SyncBlocksDuringPartialStateJoin" , func (t  * testing.T ) {
33+ 		deployment  :=  Deploy (t , b .BlueprintAlice )
34+ 		defer  deployment .Destroy (t )
35+ 		alice  :=  deployment .Client (t , "hs1" , "@alice:hs1" )
3236
33- 	deployment  :=  Deploy (t , b . BlueprintAlice )
34- 	defer  deployment .Destroy (t )
37+ 		 psjResult  :=  beginPartialStateJoin (t , deployment ,  alice )
38+ 		 defer  psjResult .Destroy ()
3539
36- 	alice   :=   deployment . Client ( t ,  "hs1" ,  "@alice:hs1" ) 
40+ 		 // Alice has now joined the room, and the server is syncing the state in the background. 
3741
38- 	srv  :=  federation .NewServer (t , deployment ,
42+ 		// attempts to sync should now block. Fire off a goroutine to try it. 
43+ 		syncResponseChan  :=  make (chan  gjson.Result )
44+ 		defer  close (syncResponseChan )
45+ 		go  func () {
46+ 			response , _  :=  alice .MustSync (t , client.SyncReq {})
47+ 			syncResponseChan  <-  response 
48+ 		}()
49+ 
50+ 		// wait for the state_ids request to arrive 
51+ 		psjResult .AwaitStateIdsRequest (t )
52+ 
53+ 		// the client-side requests should still be waiting 
54+ 		select  {
55+ 		case  <- syncResponseChan :
56+ 			t .Fatalf ("Sync completed before state resync complete" )
57+ 		default :
58+ 		}
59+ 
60+ 		// release the federation /state response 
61+ 		psjResult .FinishStateRequest ()
62+ 
63+ 		// the /sync request should now complete, with the new room 
64+ 		var  syncRes  gjson.Result 
65+ 		select  {
66+ 		case  <- time .After (1  *  time .Second ):
67+ 			t .Fatalf ("/sync request request did not complete" )
68+ 		case  syncRes  =  <- syncResponseChan :
69+ 		}
70+ 
71+ 		roomRes  :=  syncRes .Get ("rooms.join."  +  client .GjsonEscape (psjResult .ServerRoom .RoomID ))
72+ 		if  ! roomRes .Exists () {
73+ 			t .Fatalf ("/sync completed without join to new room\n " )
74+ 		}
75+ 
76+ 		// check that the state includes both charlie and derek. 
77+ 		matcher  :=  match .JSONCheckOffAllowUnwanted ("state.events" ,
78+ 			[]interface {}{
79+ 				"m.room.member|"  +  psjResult .Server .UserID ("charlie" ),
80+ 				"m.room.member|"  +  psjResult .Server .UserID ("derek" ),
81+ 			}, func (result  gjson.Result ) interface {} {
82+ 				return  strings .Join ([]string {result .Map ()["type" ].Str , result .Map ()["state_key" ].Str }, "|" )
83+ 			}, nil ,
84+ 		)
85+ 		if  err  :=  matcher ([]byte (roomRes .Raw )); err  !=  nil  {
86+ 			t .Errorf ("Did not find expected state events in /sync response: %s" , err )
87+ 
88+ 		}
89+ 	})
90+ 
91+ 	// when Alice does a lazy-loading sync, she should see the room immediately 
92+ 	t .Run ("CanLazyLoadingSyncDuringPartialStateJoin" , func (t  * testing.T ) {
93+ 		deployment  :=  Deploy (t , b .BlueprintAlice )
94+ 		defer  deployment .Destroy (t )
95+ 		alice  :=  deployment .Client (t , "hs1" , "@alice:hs1" )
96+ 
97+ 		psjResult  :=  beginPartialStateJoin (t , deployment , alice )
98+ 		defer  psjResult .Destroy ()
99+ 
100+ 		alice .MustSyncUntil (t ,
101+ 			client.SyncReq {
102+ 				Filter : buildLazyLoadingSyncFilter (),
103+ 			},
104+ 			client .SyncJoinedTo (alice .UserID , psjResult .ServerRoom .RoomID ),
105+ 		)
106+ 		t .Logf ("Alice successfully synced" )
107+ 	})
108+ 
109+ 	// we should be able to send events in the room, during the resync 
110+ 	t .Run ("CanSendEventsDuringPartialStateJoin" , func (t  * testing.T ) {
111+ 		t .Skip ("Cannot yet send events during resync" )
112+ 		deployment  :=  Deploy (t , b .BlueprintAlice )
113+ 		defer  deployment .Destroy (t )
114+ 		alice  :=  deployment .Client (t , "hs1" , "@alice:hs1" )
115+ 
116+ 		psjResult  :=  beginPartialStateJoin (t , deployment , alice )
117+ 		defer  psjResult .Destroy ()
118+ 
119+ 		alice .Client .Timeout  =  2  *  time .Second 
120+ 		paths  :=  []string {"_matrix" , "client" , "r0" , "rooms" , psjResult .ServerRoom .RoomID , "send" , "m.room.message" , "0" }
121+ 		res  :=  alice .MustDoFunc (t , "PUT" , paths , client .WithJSONBody (t , map [string ]interface {}{
122+ 			"msgtype" : "m.text" ,
123+ 			"body" :    "Hello world!" ,
124+ 		}))
125+ 		body  :=  gjson .ParseBytes (client .ParseJSON (t , res ))
126+ 		eventID  :=  body .Get ("event_id" ).Str 
127+ 		t .Logf ("Alice sent event event ID %s" , eventID )
128+ 	})
129+ 
130+ 	// a request to (client-side) /members?at= should block until the (federation) /state request completes 
131+ 	// TODO(faster_joins): also need to test /state, and /members without an `at`, which follow a different path 
132+ 	t .Run ("MembersRequestBlocksDuringPartialStateJoin" , func (t  * testing.T ) {
133+ 		deployment  :=  Deploy (t , b .BlueprintAlice )
134+ 		defer  deployment .Destroy (t )
135+ 		alice  :=  deployment .Client (t , "hs1" , "@alice:hs1" )
136+ 
137+ 		psjResult  :=  beginPartialStateJoin (t , deployment , alice )
138+ 		defer  psjResult .Destroy ()
139+ 
140+ 		// we need a sync token to pass to the `at` param. 
141+ 		syncToken  :=  alice .MustSyncUntil (t ,
142+ 			client.SyncReq {
143+ 				Filter : buildLazyLoadingSyncFilter (),
144+ 			},
145+ 			client .SyncJoinedTo (alice .UserID , psjResult .ServerRoom .RoomID ),
146+ 		)
147+ 		t .Logf ("Alice successfully synced" )
148+ 
149+ 		// Fire off a goroutine to send the request, and write the response back to a channel. 
150+ 		clientMembersRequestResponseChan  :=  make (chan  * http.Response )
151+ 		defer  close (clientMembersRequestResponseChan )
152+ 		go  func () {
153+ 			queryParams  :=  url.Values {}
154+ 			queryParams .Set ("at" , syncToken )
155+ 			clientMembersRequestResponseChan  <-  alice .MustDoFunc (
156+ 				t ,
157+ 				"GET" ,
158+ 				[]string {"_matrix" , "client" , "r0" , "rooms" , psjResult .ServerRoom .RoomID , "members" },
159+ 				client .WithQueries (queryParams ),
160+ 			)
161+ 		}()
162+ 
163+ 		// release the federation /state response 
164+ 		psjResult .FinishStateRequest ()
165+ 
166+ 		// the client-side /members request should now complete, with a response that includes charlie and derek. 
167+ 		select  {
168+ 		case  <- time .After (1  *  time .Second ):
169+ 			t .Fatalf ("client-side /members request did not complete" )
170+ 		case  res  :=  <- clientMembersRequestResponseChan :
171+ 			must .MatchResponse (t , res , match.HTTPResponse {
172+ 				JSON : []match.JSON {
173+ 					match .JSONCheckOff ("chunk" ,
174+ 						[]interface {}{
175+ 							"m.room.member|"  +  alice .UserID ,
176+ 							"m.room.member|"  +  psjResult .Server .UserID ("charlie" ),
177+ 							"m.room.member|"  +  psjResult .Server .UserID ("derek" ),
178+ 						}, func (result  gjson.Result ) interface {} {
179+ 							return  strings .Join ([]string {result .Map ()["type" ].Str , result .Map ()["state_key" ].Str }, "|" )
180+ 						}, nil ),
181+ 				},
182+ 			})
183+ 		}
184+ 	})
185+ }
186+ 
187+ // buildLazyLoadingSyncFilter constructs a json-marshalled filter suitable the 'Filter' field of a client.SyncReq 
188+ func  buildLazyLoadingSyncFilter () string  {
189+ 	j , _  :=  json .Marshal (map [string ]interface {}{
190+ 		"room" : map [string ]interface {}{
191+ 			"timeline" : map [string ]interface {}{
192+ 				"lazy_load_members" : true ,
193+ 			},
194+ 			"state" : map [string ]interface {}{
195+ 				"lazy_load_members" : true ,
196+ 			},
197+ 		},
198+ 	})
199+ 	return  string (j )
200+ }
201+ 
202+ // partialStateJoinResult is the result of beginPartialStateJoin 
203+ type  partialStateJoinResult  struct  {
204+ 	cancelListener                    func ()
205+ 	Server                            * federation.Server 
206+ 	ServerRoom                        * federation.ServerRoom 
207+ 	fedStateIdsRequestReceivedWaiter  * Waiter 
208+ 	fedStateIdsSendResponseWaiter     * Waiter 
209+ }
210+ 
211+ // beginPartialStateJoin spins up a room on a complement server, 
212+ // then has a test user join it. It returns a partialStateJoinResult, 
213+ // which must be Destroy'd on completion. 
214+ // 
215+ // When this method completes, the /join request will have completed, but the 
216+ // state has not yet been re-synced. To allow the re-sync to proceed, call 
217+ // partialStateJoinResult.FinishStateRequest. 
218+ func  beginPartialStateJoin (t  * testing.T , deployment  * docker.Deployment , joiningUser  * client.CSAPI ) partialStateJoinResult  {
219+ 	result  :=  partialStateJoinResult {}
220+ 	success  :=  false 
221+ 	defer  func () {
222+ 		if  ! success  {
223+ 			result .Destroy ()
224+ 		}
225+ 	}()
226+ 
227+ 	result .Server  =  federation .NewServer (t , deployment ,
39228		federation .HandleKeyRequests (),
40229		federation .HandlePartialStateMakeSendJoinRequests (),
41230		federation .HandleEventRequests (),
42231	)
43- 	cancel  :=  srv .Listen ()
44- 	defer  cancel ()
232+ 	result .cancelListener  =  result .Server .Listen ()
45233
46234	// some things for orchestration 
47- 	fedStateIdsRequestReceivedWaiter  :=  NewWaiter ()
48- 	defer  fedStateIdsRequestReceivedWaiter .Finish ()
49- 	fedStateIdsSendResponseWaiter  :=  NewWaiter ()
50- 	defer  fedStateIdsSendResponseWaiter .Finish ()
235+ 	result .fedStateIdsRequestReceivedWaiter  =  NewWaiter ()
236+ 	result .fedStateIdsSendResponseWaiter  =  NewWaiter ()
51237
52238	// create the room on the complement server, with charlie and derek as members 
53- 	charlie  :=  srv .UserID ("charlie" )
54- 	derek  :=  srv .UserID ("derek" )
55- 	serverRoom  :=  makeTestRoom (t , srv , alice .GetDefaultRoomVersion (t ), charlie , derek )
239+ 	roomVer  :=  joiningUser .GetDefaultRoomVersion (t )
240+ 	result .ServerRoom  =  result .Server .MustMakeRoom (t , roomVer , federation .InitialRoomEvents (roomVer , result .Server .UserID ("charlie" )))
241+ 	result .ServerRoom .AddEvent (result .Server .MustCreateEvent (t , result .ServerRoom , b.Event {
242+ 		Type :     "m.room.member" ,
243+ 		StateKey : b .Ptr (result .Server .UserID ("derek" )),
244+ 		Sender :   result .Server .UserID ("derek" ),
245+ 		Content : map [string ]interface {}{
246+ 			"membership" : "join" ,
247+ 		},
248+ 	}))
56249
57250	// register a handler for /state_ids requests, which finishes fedStateIdsRequestReceivedWaiter, then 
58251	// waits for fedStateIdsSendResponseWaiter and sends a reply 
59- 	handleStateIdsRequests (t , srv ,  serverRoom ,  fedStateIdsRequestReceivedWaiter , fedStateIdsSendResponseWaiter )
252+ 	handleStateIdsRequests (t , result . Server ,  result . ServerRoom ,  result . fedStateIdsRequestReceivedWaiter , result . fedStateIdsSendResponseWaiter )
60253
61254	// a handler for /state requests, which sends a sensible response 
62- 	handleStateRequests (t , srv , serverRoom , nil , nil )
63- 
64- 	// have alice join the room by room ID. 
65- 	alice .JoinRoom (t , serverRoom .RoomID , []string {srv .ServerName ()})
66- 	t .Logf ("Join completed" )
255+ 	handleStateRequests (t , result .Server , result .ServerRoom , nil , nil )
67256
68- 	// Alice has now joined the room, and the server is syncing the state in the background. 
257+ 	// have joiningUser join the room by room ID. 
258+ 	joiningUser .JoinRoom (t , result .ServerRoom .RoomID , []string {result .Server .ServerName ()})
259+ 	t .Logf ("/join request completed" )
69260
70- 	// attempts to sync should now block. Fire off a goroutine to try it. 
71- 	syncResponseChan  :=  make (chan  gjson.Result )
72- 	defer  close (syncResponseChan )
73- 	go  func () {
74- 		response , _  :=  alice .MustSync (t , client.SyncReq {})
75- 		syncResponseChan  <-  response 
76- 	}()
77- 
78- 	// wait for the state_ids request to arrive 
79- 	fedStateIdsRequestReceivedWaiter .Waitf (t , 5 * time .Second , "Waiting for /state_ids request" )
261+ 	success  =  true 
262+ 	return  result 
263+ }
80264
81- 	 // the client-side requests should still be waiting  
82- 	 select  { 
83- 	 case   <- syncResponseChan : 
84- 		 t . Fatalf ( "Sync completed before state resync complete" ) 
85- 	default : 
265+ // Destroy cleans up  the resources associated with the join attempt. It must  
266+ // be called once the test is finished 
267+ func  ( psj   * partialStateJoinResult )  Destroy () { 
268+ 	if   psj . fedStateIdsSendResponseWaiter   !=   nil  { 
269+ 		 psj . fedStateIdsSendResponseWaiter . Finish () 
86270	}
87271
88- 	// release the federation /state response 
89- 	fedStateIdsSendResponseWaiter .Finish ()
90- 
91- 	// the /sync request should now complete, with the new room 
92- 	var  syncRes  gjson.Result 
93- 	select  {
94- 	case  <- time .After (1  *  time .Second ):
95- 		t .Fatalf ("/sync request request did not complete" )
96- 	case  syncRes  =  <- syncResponseChan :
272+ 	if  psj .fedStateIdsRequestReceivedWaiter  !=  nil  {
273+ 		psj .fedStateIdsRequestReceivedWaiter .Finish ()
97274	}
98275
99- 	roomRes  :=  syncRes .Get ("rooms.join."  +  client .GjsonEscape (serverRoom .RoomID ))
100- 	if  ! roomRes .Exists () {
101- 		t .Fatalf ("/sync completed without join to new room\n " )
276+ 	if  psj .cancelListener  !=  nil  {
277+ 		psj .cancelListener ()
102278	}
279+ }
103280
104- 	// check that the state includes both charlie and derek. 
105- 	matcher  :=  match .JSONCheckOffAllowUnwanted ("state.events" ,
106- 		[]interface {}{
107- 			"m.room.member|"  +  charlie ,
108- 			"m.room.member|"  +  derek ,
109- 		}, func (result  gjson.Result ) interface {} {
110- 			return  strings .Join ([]string {result .Map ()["type" ].Str , result .Map ()["state_key" ].Str }, "|" )
111- 		}, nil ,
112- 	)
113- 	if  err  :=  matcher ([]byte (roomRes .Raw )); err  !=  nil  {
114- 		t .Errorf ("Did not find expected state events in /sync response: %s" , err )
115- 	}
281+ // wait for a /state_ids request for the test room to arrive 
282+ func  (psj  * partialStateJoinResult ) AwaitStateIdsRequest (t  * testing.T ) {
283+ 	psj .fedStateIdsRequestReceivedWaiter .Waitf (t , 5 * time .Second , "Waiting for /state_ids request" )
116284}
117285
118- // makeTestRoom constructs a test room on the Complement server, and adds the given extra members 
119- func  makeTestRoom (t  * testing.T , srv  * federation.Server , roomVer  gomatrixserverlib.RoomVersion , creator  string , members  ... string ) * federation.ServerRoom  {
120- 	serverRoom  :=  srv .MustMakeRoom (t , roomVer , federation .InitialRoomEvents (roomVer , creator ))
121- 	for  _ , m  :=  range  members  {
122- 		serverRoom .AddEvent (srv .MustCreateEvent (t , serverRoom , b.Event {
123- 			Type :     "m.room.member" ,
124- 			StateKey : b .Ptr (m ),
125- 			Sender :   m ,
126- 			Content : map [string ]interface {}{
127- 				"membership" : "join" ,
128- 			},
129- 		}),
130- 		)
131- 	}
132- 	return  serverRoom 
286+ // allow the /state_ids request to complete, thus allowing the state re-sync to complete 
287+ func  (psj  * partialStateJoinResult ) FinishStateRequest () {
288+ 	psj .fedStateIdsSendResponseWaiter .Finish ()
133289}
134290
135291// handleStateIdsRequests registers a handler for /state_ids requests for serverRoom. 
0 commit comments