Skip to content

Commit 501589c

Browse files
kegsayDavid RobertsonMadLittleMods
authored
Re-design client.SyncUntil (#272)
* Initial API design * Remove SyncUntilInvited and SyncUntilJoined; add more docs * Better docs and error messages * Remove SyncUntilTimelineHas * Log the failing array * Convert ignored users tests * Remove SyncUntil and fixup MSCs * Update internal/client/client.go Co-authored-by: David Robertson <[email protected]> * Update internal/client/client.go Co-authored-by: Eric Eastwood <[email protected]> * Update internal/client/client.go Co-authored-by: Eric Eastwood <[email protected]> Co-authored-by: David Robertson <[email protected]> Co-authored-by: Eric Eastwood <[email protected]>
1 parent e7f6921 commit 501589c

11 files changed

+299
-236
lines changed

internal/client/client.go

Lines changed: 221 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package client
33
import (
44
"bytes"
55
"encoding/json"
6+
"fmt"
67
"io/ioutil"
78
"net/http"
89
"net/http/httputil"
@@ -22,12 +23,50 @@ import (
2223
// See functions starting with `With...` in this package for more info.
2324
type RequestOpt func(req *http.Request)
2425

26+
// SyncCheckOpt is a functional option for use with MustSyncUntil which should return <nil> if
27+
// the response satisfies the check, else return a human friendly error.
28+
// The result object is the entire /sync response from this request.
29+
type SyncCheckOpt func(clientUserID string, topLevelSyncJSON gjson.Result) error
30+
31+
// SyncReq contains all the /sync request configuration options. The empty struct `SyncReq{}` is valid
32+
// which will do a full /sync due to lack of a since token.
33+
type SyncReq struct {
34+
// A point in time to continue a sync from. This should be the next_batch token returned by an
35+
// earlier call to this endpoint.
36+
Since string
37+
// The ID of a filter created using the filter API or a filter JSON object encoded as a string.
38+
// The server will detect whether it is an ID or a JSON object by whether the first character is
39+
// a "{" open brace. Passing the JSON inline is best suited to one off requests. Creating a
40+
// filter using the filter API is recommended for clients that reuse the same filter multiple
41+
// times, for example in long poll requests.
42+
Filter string
43+
// Controls whether to include the full state for all rooms the user is a member of.
44+
// If this is set to true, then all state events will be returned, even if since is non-empty.
45+
// The timeline will still be limited by the since parameter. In this case, the timeout parameter
46+
// will be ignored and the query will return immediately, possibly with an empty timeline.
47+
// If false, and since is non-empty, only state which has changed since the point indicated by
48+
// since will be returned.
49+
// By default, this is false.
50+
FullState bool
51+
// Controls whether the client is automatically marked as online by polling this API. If this
52+
// parameter is omitted then the client is automatically marked as online when it uses this API.
53+
// Otherwise if the parameter is set to “offline” then the client is not marked as being online
54+
// when it uses this API. When set to “unavailable”, the client is marked as being idle.
55+
// One of: [offline online unavailable].
56+
SetPresence string
57+
// The maximum time to wait, in milliseconds, before returning this request. If no events
58+
// (or other data) become available before this time elapses, the server will return a response
59+
// with empty fields.
60+
// By default, this is 1000 for Complement testing.
61+
TimeoutMillis string // string for easier conversion to query params
62+
}
63+
2564
type CSAPI struct {
2665
UserID string
2766
AccessToken string
2867
BaseURL string
2968
Client *http.Client
30-
// how long are we willing to wait for SyncUntil.... calls
69+
// how long are we willing to wait for MustSyncUntil.... calls
3170
SyncUntilTimeout time.Duration
3271
// True to enable verbose logging
3372
Debug bool
@@ -122,124 +161,124 @@ func (c *CSAPI) SendEventSynced(t *testing.T, roomID string, e b.Event) string {
122161
body := ParseJSON(t, res)
123162
eventID := GetJSONFieldStr(t, body, "event_id")
124163
t.Logf("SendEventSynced waiting for event ID %s", eventID)
125-
c.SyncUntilTimelineHas(t, roomID, func(r gjson.Result) bool {
164+
c.MustSyncUntil(t, SyncReq{}, SyncTimelineHas(roomID, func(r gjson.Result) bool {
126165
return r.Get("event_id").Str == eventID
127-
})
166+
}))
128167
return eventID
129168
}
130169

131-
// SyncUntilTimelineHas is a wrapper around `SyncUntil`.
132-
// It blocks and continually calls `/sync` until
133-
// - we have joined the given room
134-
// - we see an event in the room for which the `check` function returns True
135-
// If the `check` function fails the test, the failing event will be automatically logged.
136-
// Will time out after CSAPI.SyncUntilTimeout.
170+
// Perform a single /sync request with the given request options. To sync until something happens,
171+
// see `MustSyncUntil`.
137172
//
138-
// Returns the `next_batch` token from the last /sync response. This can be passed as
139-
// `since` to sync from this point forward only.
140-
func (c *CSAPI) SyncUntilTimelineHas(t *testing.T, roomID string, check func(gjson.Result) bool) string {
173+
// Fails the test if the /sync request does not return 200 OK.
174+
// Returns the top-level parsed /sync response JSON as well as the next_batch token from the response.
175+
func (c *CSAPI) MustSync(t *testing.T, syncReq SyncReq) (gjson.Result, string) {
141176
t.Helper()
142-
return c.SyncUntil(t, "", "", "rooms.join."+GjsonEscape(roomID)+".timeline.events", check)
177+
query := url.Values{
178+
"timeout": []string{"1000"},
179+
}
180+
// configure the HTTP request based on SyncReq
181+
if syncReq.TimeoutMillis != "" {
182+
query["timeout"] = []string{syncReq.TimeoutMillis}
183+
}
184+
if syncReq.Since != "" {
185+
query["since"] = []string{syncReq.Since}
186+
}
187+
if syncReq.Filter != "" {
188+
query["filter"] = []string{syncReq.Filter}
189+
}
190+
if syncReq.FullState {
191+
query["full_state"] = []string{"true"}
192+
}
193+
if syncReq.SetPresence != "" {
194+
query["set_presence"] = []string{syncReq.SetPresence}
195+
}
196+
res := c.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "sync"}, WithQueries(query))
197+
body := ParseJSON(t, res)
198+
result := gjson.ParseBytes(body)
199+
nextBatch := GetJSONFieldStr(t, body, "next_batch")
200+
return result, nextBatch
143201
}
144202

145-
// SyncUntilGlobalAccountDataHas is a wrapper around `SyncUntil`.
146-
// It blocks and continually calls `/sync` until
147-
// - we an event in the global account data for which the `check` function returns True
148-
// If the `check` function fails the test, the failing event will be automatically logged.
149-
// Will time out after CSAPI.SyncUntilTimeout.
203+
// MustSyncUntil blocks and continually calls /sync (advancing the since token) until all the
204+
// check functions return no error. Returns the final/latest since token.
150205
//
151-
// Returns the `next_batch` token from the last /sync response. This can be passed as
152-
// `since` to sync from this point forward only.
153-
func (c *CSAPI) SyncUntilGlobalAccountDataHas(t *testing.T, check func(gjson.Result) bool) string {
154-
t.Helper()
155-
return c.SyncUntil(t, "", "", "account_data.events", check)
156-
}
157-
158-
// SyncUntilInvitedTo is a wrapper around SyncUntil.
159-
// It blocks and continually calls `/sync` until we've been invited to the given room.
160-
// Will time out after CSAPI.SyncUntilTimeout.
206+
// Initial /sync example: (no since token)
207+
// bob.InviteRoom(t, roomID, alice.UserID)
208+
// alice.JoinRoom(t, roomID, nil)
209+
// alice.MustSyncUntil(t, client.SyncReq{}, client.SyncJoinedTo(alice.UserID, roomID))
161210
//
162-
// Returns the `next_batch` token from the last /sync response. This can be passed as
163-
// `since` to sync from this point forward only.
164-
func (c *CSAPI) SyncUntilInvitedTo(t *testing.T, roomID string) string {
165-
t.Helper()
166-
check := func(event gjson.Result) bool {
167-
return event.Get("type").Str == "m.room.member" &&
168-
event.Get("content.membership").Str == "invite" &&
169-
event.Get("state_key").Str == c.UserID
170-
}
171-
return c.SyncUntil(t, "", "", "rooms.invite."+GjsonEscape(roomID)+".invite_state.events", check)
172-
}
173-
174-
// SyncUntilJoined is a wrapper around SyncUntil.
175-
// It blocks and continually calls `/sync` until we've joined the given room.
176-
// Will time out after CSAPI.SyncUntilTimeout.
177-
func (c *CSAPI) SyncUntilJoined(t *testing.T, roomID string) {
178-
t.Helper()
179-
c.SyncUntilTimelineHas(t, roomID, func(event gjson.Result) bool {
180-
return event.Get("type").Str == "m.room.member" &&
181-
event.Get("content.membership").Str == "join" &&
182-
event.Get("state_key").Str == c.UserID
183-
})
184-
}
185-
186-
// SyncUntil blocks and continually calls /sync until
187-
// - the response contains a particular `key`, and
188-
// - its corresponding value is an array
189-
// - some element in that array makes the `check` function return true.
190-
// If the `check` function fails the test, the failing event will be automatically logged.
191-
// Will time out after CSAPI.SyncUntilTimeout.
211+
// Incremental /sync example: (test controls since token)
212+
// since := alice.MustSyncUntil(t, client.SyncReq{TimeoutMillis: "0"}) // get a since token
213+
// bob.InviteRoom(t, roomID, alice.UserID)
214+
// since = alice.MustSyncUntil(t, client.SyncReq{Since: since}, client.SyncInvitedTo(alice.UserID, roomID))
215+
// alice.JoinRoom(t, roomID, nil)
216+
// alice.MustSyncUntil(t, client.SyncReq{Since: since}, client.SyncJoinedTo(alice.UserID, roomID))
192217
//
193-
// Returns the `next_batch` token from the last /sync response. This can be passed as
194-
// `since` to sync from this point forward only.
195-
func (c *CSAPI) SyncUntil(t *testing.T, since, filter, key string, check func(gjson.Result) bool) string {
218+
// Checking multiple parts of /sync:
219+
// alice.MustSyncUntil(
220+
// t, client.SyncReq{},
221+
// client.SyncJoinedTo(alice.UserID, roomID),
222+
// client.SyncJoinedTo(alice.UserID, roomID2),
223+
// client.SyncJoinedTo(alice.UserID, roomID3),
224+
// )
225+
//
226+
// Check functions are unordered and independent. Once a check function returns true it is removed
227+
// from the list of checks and won't be called again.
228+
//
229+
// In the unlikely event that you want all the checkers to pass *explicitly* in a single /sync
230+
// response (e.g to assert some form of atomic update which updates multiple parts of the /sync
231+
// response at once) then make your own checker function which does this.
232+
//
233+
// In the unlikely event that you need ordering on your checks, call MustSyncUntil multiple times
234+
// with a single checker, and reuse the returned since token, as in the "Incremental sync" example.
235+
//
236+
// Will time out after CSAPI.SyncUntilTimeout. Returns the latest since token used.
237+
func (c *CSAPI) MustSyncUntil(t *testing.T, syncReq SyncReq, checks ...SyncCheckOpt) string {
196238
t.Helper()
197239
start := time.Now()
198-
checkCounter := 0
199-
// Print failing events in a defer() so we handle t.Fatalf in the same way as t.Errorf
200-
var wasFailed = t.Failed()
201-
var lastEvent *gjson.Result
202-
timedOut := false
203-
defer func() {
204-
if !wasFailed && t.Failed() {
205-
raw := ""
206-
if lastEvent != nil {
207-
raw = lastEvent.Raw
208-
}
209-
if !timedOut {
210-
t.Logf("SyncUntil: failing event %s", raw)
211-
}
240+
numResponsesReturned := 0
241+
checkers := make([]struct {
242+
check SyncCheckOpt
243+
errs []string
244+
}, len(checks))
245+
for i := range checks {
246+
c := checkers[i]
247+
c.check = checks[i]
248+
checkers[i] = c
249+
}
250+
printErrors := func() string {
251+
err := "Checkers:\n"
252+
for _, c := range checkers {
253+
err += strings.Join(c.errs, "\n")
254+
err += ", \n"
212255
}
213-
}()
256+
return err
257+
}
214258
for {
215259
if time.Since(start) > c.SyncUntilTimeout {
216-
timedOut = true
217-
t.Fatalf("SyncUntil: timed out. Called check function %d times", checkCounter)
260+
t.Fatalf("%s MustSyncUntil: timed out after %v. Seen %d /sync responses. %s", c.UserID, time.Since(start), numResponsesReturned, printErrors())
218261
}
219-
query := url.Values{
220-
"timeout": []string{"1000"},
221-
}
222-
if since != "" {
223-
query["since"] = []string{since}
224-
}
225-
if filter != "" {
226-
query["filter"] = []string{filter}
227-
}
228-
res := c.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "sync"}, WithQueries(query))
229-
body := ParseJSON(t, res)
230-
since = GetJSONFieldStr(t, body, "next_batch")
231-
keyRes := gjson.GetBytes(body, key)
232-
if keyRes.IsArray() {
233-
events := keyRes.Array()
234-
for i, ev := range events {
235-
lastEvent = &events[i]
236-
if check(ev) {
237-
return since
238-
}
239-
wasFailed = t.Failed()
240-
checkCounter++
262+
response, nextBatch := c.MustSync(t, syncReq)
263+
syncReq.Since = nextBatch
264+
numResponsesReturned += 1
265+
266+
for i := 0; i < len(checkers); i++ {
267+
err := checkers[i].check(c.UserID, response)
268+
if err == nil {
269+
// check passed, removed from checkers
270+
checkers = append(checkers[:i], checkers[i+1:]...)
271+
i--
272+
} else {
273+
c := checkers[i]
274+
c.errs = append(c.errs, fmt.Sprintf("[t=%v] Response #%d: %s", time.Since(start), numResponsesReturned, err))
275+
checkers[i] = c
241276
}
242277
}
278+
if len(checkers) == 0 {
279+
// every checker has passed!
280+
return syncReq.Since
281+
}
243282
}
244283
}
245284

@@ -509,3 +548,85 @@ func GjsonEscape(in string) string {
509548
in = strings.ReplaceAll(in, "*", `\*`)
510549
return in
511550
}
551+
552+
// Check that the timeline for `roomID` has an event which passes the check function.
553+
func SyncTimelineHas(roomID string, check func(gjson.Result) bool) SyncCheckOpt {
554+
return func(clientUserID string, topLevelSyncJSON gjson.Result) error {
555+
err := loopArray(
556+
topLevelSyncJSON, "rooms.join."+GjsonEscape(roomID)+".timeline.events", check,
557+
)
558+
if err == nil {
559+
return nil
560+
}
561+
return fmt.Errorf("SyncTimelineHas(%s): %s", roomID, err)
562+
}
563+
}
564+
565+
// Checks that `userID` gets invited to `roomID`.
566+
//
567+
// This checks different parts of the /sync response depending on the client making the request.
568+
// If the client is also the person being invited to the room then the 'invite' block will be inspected.
569+
// If the client is different to the person being invited then the 'join' block will be inspected.
570+
func SyncInvitedTo(userID, roomID string) SyncCheckOpt {
571+
return func(clientUserID string, topLevelSyncJSON gjson.Result) error {
572+
// two forms which depend on what the client user is:
573+
// - passively viewing an invite for a room you're joined to (timeline events)
574+
// - actively being invited to a room.
575+
if clientUserID == userID {
576+
// active
577+
err := loopArray(
578+
topLevelSyncJSON, "rooms.invite."+GjsonEscape(roomID)+".invite_state.events",
579+
func(ev gjson.Result) bool {
580+
return ev.Get("type").Str == "m.room.member" && ev.Get("state_key").Str == userID && ev.Get("content.membership").Str == "invite"
581+
},
582+
)
583+
if err != nil {
584+
return fmt.Errorf("SyncInvitedTo(%s): %s", roomID, err)
585+
}
586+
return nil
587+
}
588+
// passive
589+
return SyncTimelineHas(roomID, func(ev gjson.Result) bool {
590+
return ev.Get("type").Str == "m.room.member" && ev.Get("state_key").Str == userID && ev.Get("content.membership").Str == "invite"
591+
})(clientUserID, topLevelSyncJSON)
592+
}
593+
}
594+
595+
// Check that `userID` gets joined to `roomID` by inspecting the join timeline for a membership event.
596+
func SyncJoinedTo(userID, roomID string) SyncCheckOpt {
597+
return func(clientUserID string, topLevelSyncJSON gjson.Result) error {
598+
// awkward wrapping to get the error message correct at the start :/
599+
err := SyncTimelineHas(roomID, func(ev gjson.Result) bool {
600+
return ev.Get("type").Str == "m.room.member" && ev.Get("state_key").Str == userID && ev.Get("content.membership").Str == "join"
601+
})(clientUserID, topLevelSyncJSON)
602+
if err == nil {
603+
return nil
604+
}
605+
return fmt.Errorf("SyncJoinedTo(%s,%s): %s", userID, roomID, err)
606+
}
607+
}
608+
609+
// Calls the `check` function for each global account data event, and returns with success if the
610+
// `check` function returns true for at least one event.
611+
func SyncGlobalAccountDataHas(check func(gjson.Result) bool) SyncCheckOpt {
612+
return func(clientUserID string, topLevelSyncJSON gjson.Result) error {
613+
return loopArray(topLevelSyncJSON, "account_data.events", check)
614+
}
615+
}
616+
617+
func loopArray(object gjson.Result, key string, check func(gjson.Result) bool) error {
618+
array := object.Get(key)
619+
if !array.Exists() {
620+
return fmt.Errorf("Key %s does not exist", key)
621+
}
622+
if !array.IsArray() {
623+
return fmt.Errorf("Key %s exists but it isn't an array", key)
624+
}
625+
goArray := array.Array()
626+
for _, ev := range goArray {
627+
if check(ev) {
628+
return nil
629+
}
630+
}
631+
return fmt.Errorf("check function did not pass while iterating over %d elements: %v", len(goArray), array.Raw)
632+
}

tests/csapi/apidoc_room_create_test.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -159,10 +159,5 @@ func TestRoomCreateWithInvites(t *testing.T) {
159159
"invite": []string{bob.UserID},
160160
})
161161

162-
bob.SyncUntil(t, "", "", "rooms.invite."+client.GjsonEscape(roomID)+".invite_state.events", func(event gjson.Result) bool {
163-
return event.Get("type").Str == "m.room.member" &&
164-
event.Get("content.membership").Str == "invite" &&
165-
event.Get("state_key").Str == bob.UserID &&
166-
event.Get("sender").Str == alice.UserID
167-
})
162+
bob.MustSyncUntil(t, client.SyncReq{}, client.SyncInvitedTo(bob.UserID, roomID))
168163
}

0 commit comments

Comments
 (0)