Skip to content

Commit 370a014

Browse files
authored
Regression test for element-hq/synapse#16928 (#710)
1 parent 891d188 commit 370a014

File tree

4 files changed

+169
-5
lines changed

4 files changed

+169
-5
lines changed

federation/server_room.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/matrix-org/complement/b"
1111
"github.com/matrix-org/complement/ct"
12+
"github.com/matrix-org/complement/helpers"
1213
)
1314

1415
type Event struct {
@@ -40,6 +41,8 @@ type ServerRoom struct {
4041
TimelineMutex sync.RWMutex
4142
ForwardExtremities []string
4243
Depth int64
44+
waiters map[string][]*helpers.Waiter // room ID -> []Waiter
45+
waitersMu *sync.Mutex
4346
}
4447

4548
// newRoom creates an empty room structure with no events
@@ -49,6 +52,8 @@ func newRoom(roomVer gomatrixserverlib.RoomVersion, roomId string) *ServerRoom {
4952
Version: roomVer,
5053
State: make(map[string]gomatrixserverlib.PDU),
5154
ForwardExtremities: make([]string, 0),
55+
waiters: make(map[string][]*helpers.Waiter),
56+
waitersMu: &sync.Mutex{},
5257
}
5358
}
5459

@@ -66,6 +71,41 @@ func (r *ServerRoom) AddEvent(ev gomatrixserverlib.PDU) {
6671
r.Depth = ev.Depth()
6772
}
6873
r.ForwardExtremities = []string{ev.EventID()}
74+
75+
// inform waiters
76+
r.waitersMu.Lock()
77+
defer r.waitersMu.Unlock()
78+
for _, w := range r.waiters[ev.EventID()] {
79+
w.Finish()
80+
}
81+
delete(r.waiters, ev.EventID()) // clear the waiters
82+
}
83+
84+
// WaiterForEvent creates a Waiter which waits until the given event ID is added to the room.
85+
// This can be used as a synchronisation point to wait until the server under test has sent
86+
// a given PDU in a /send transaction to the Complement server. This is the equivalent to listening
87+
// for the PDU in the /send transaction and then unblocking the Waiter. Note that calling
88+
// this function doesn't actually block. Call .Wait(time.Duration) on the waiter to block.
89+
//
90+
// Note: you must still add HandleTransactionRequests(nil,nil) to your server for the server to
91+
// automatically add events to the room.
92+
func (r *ServerRoom) WaiterForEvent(eventID string) *helpers.Waiter {
93+
// we need to lock the timeline so we can check the timeline without racing. We need to check
94+
// the timeline so we can immediately finish the waiter if the event ID is already in the timeline.
95+
r.TimelineMutex.Lock()
96+
defer r.TimelineMutex.Unlock()
97+
w := helpers.NewWaiter()
98+
r.waitersMu.Lock()
99+
r.waiters[eventID] = append(r.waiters[eventID], w)
100+
r.waitersMu.Unlock()
101+
// check if the event is already there and if so immediately end the wait
102+
for _, ev := range r.Timeline {
103+
if ev.EventID() == eventID {
104+
w.Finish()
105+
break
106+
}
107+
}
108+
return w
69109
}
70110

71111
// AuthEvents returns the state event IDs of the auth events which authenticate this event

match/json.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -274,10 +274,10 @@ func JSONArrayEach(wantKey string, fn func(gjson.Result) error) JSON {
274274
}
275275

276276
if !body.Exists() {
277-
return fmt.Errorf("missing key '%s'", wantKey)
277+
return fmt.Errorf("JSONArrayEach: missing key '%s'", wantKey)
278278
}
279279
if !body.IsArray() {
280-
return fmt.Errorf("key '%s' is not an array", wantKey)
280+
return fmt.Errorf("JSONArrayEach: key '%s' is not an array", wantKey)
281281
}
282282
var err error
283283
body.ForEach(func(_, val gjson.Result) bool {
@@ -294,10 +294,10 @@ func JSONMapEach(wantKey string, fn func(k, v gjson.Result) error) JSON {
294294
return func(body gjson.Result) error {
295295
res := body.Get(wantKey)
296296
if !res.Exists() {
297-
return fmt.Errorf("missing key '%s'", wantKey)
297+
return fmt.Errorf("JSONMapEach: missing key '%s'", wantKey)
298298
}
299299
if !res.IsObject() {
300-
return fmt.Errorf("key '%s' is not an object", wantKey)
300+
return fmt.Errorf("JSONMapEach: key '%s' is not an object", wantKey)
301301
}
302302
var err error
303303
res.ForEach(func(key, val gjson.Result) bool {

should/should.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ func MatchJSONBytes(rawJson []byte, matchers ...match.JSON) error {
147147
body := gjson.ParseBytes(rawJson)
148148
for _, jm := range matchers {
149149
if err := jm(body); err != nil {
150-
return fmt.Errorf("MatchJSONBytes %s with input = %v", err, string(rawJson))
150+
return fmt.Errorf("MatchJSONBytes %s with input = %v", err, body.Get("@pretty").String())
151151
}
152152
}
153153
return nil

tests/federation_sync_test.go

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package tests
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"testing"
7+
"time"
8+
9+
"github.com/matrix-org/complement"
10+
"github.com/matrix-org/complement/b"
11+
"github.com/matrix-org/complement/client"
12+
"github.com/matrix-org/complement/federation"
13+
"github.com/matrix-org/complement/helpers"
14+
"github.com/matrix-org/complement/match"
15+
"github.com/matrix-org/complement/must"
16+
"github.com/matrix-org/complement/runtime"
17+
"github.com/tidwall/gjson"
18+
)
19+
20+
// Tests https://github.com/element-hq/synapse/issues/16928
21+
// To set up:
22+
// - Alice and Bob join the same room, sends E1.
23+
// - Alice sends event E3.
24+
// - Bob forks the graph at E1 and sends S2.
25+
// - Alice sends event E4 on the same fork as E3.
26+
// - Alice sends event E5 merging the forks.
27+
// - Alice sync with timeline_limit=1 and a filter that skips E5
28+
func TestSyncOmitsStateChangeOnFilteredEvents(t *testing.T) {
29+
runtime.SkipIf(t, runtime.Dendrite) // S2 is put in the timeline, not state.
30+
deployment := complement.Deploy(t, 1)
31+
defer deployment.Destroy(t)
32+
srv := federation.NewServer(t, deployment,
33+
federation.HandleKeyRequests(),
34+
federation.HandleMakeSendJoinRequests(),
35+
federation.HandleTransactionRequests(nil, nil),
36+
)
37+
srv.UnexpectedRequestsAreErrors = false
38+
cancel := srv.Listen()
39+
defer cancel()
40+
41+
alice := deployment.Register(t, "hs1", helpers.RegistrationOpts{})
42+
bob := srv.UserID("bob")
43+
ver := alice.GetDefaultRoomVersion(t)
44+
serverRoom := srv.MustMakeRoom(t, ver, federation.InitialRoomEvents(ver, bob))
45+
46+
// Join Alice to the new room on the federation server and send E1.
47+
alice.MustJoinRoom(t, serverRoom.RoomID, []string{srv.ServerName()})
48+
e1 := alice.SendEventSynced(t, serverRoom.RoomID, b.Event{
49+
Type: "m.room.message",
50+
Content: map[string]interface{}{
51+
"msgtype": "m.text",
52+
"body": "E1",
53+
},
54+
})
55+
56+
// wait until bob's server sees e1
57+
serverRoom.WaiterForEvent(e1).Waitf(t, 5*time.Second, "failed to see e1 (%s) on complement server", e1)
58+
59+
// create S2 but don't send it yet, prev_events will be set to [e1]
60+
roomName := "I am the room name, S2"
61+
s2 := srv.MustCreateEvent(t, serverRoom, federation.Event{
62+
Type: "m.room.name",
63+
StateKey: b.Ptr(""),
64+
Sender: bob,
65+
Content: map[string]interface{}{
66+
"name": roomName,
67+
},
68+
})
69+
70+
// Alice sends E3 & E4
71+
alice.SendEventSynced(t, serverRoom.RoomID, b.Event{
72+
Type: "m.room.message",
73+
Content: map[string]interface{}{
74+
"msgtype": "m.text",
75+
"body": "E3",
76+
},
77+
})
78+
alice.SendEventSynced(t, serverRoom.RoomID, b.Event{
79+
Type: "m.room.message",
80+
Content: map[string]interface{}{
81+
"msgtype": "m.text",
82+
"body": "E4",
83+
},
84+
})
85+
86+
// fork the dag earlier at e1 and send s2
87+
srv.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{s2.JSON()}, nil)
88+
89+
// wait until we see S2 to ensure the server has processed this.
90+
alice.MustSyncUntil(t, client.SyncReq{}, client.SyncTimelineHasEventID(serverRoom.RoomID, s2.EventID()))
91+
92+
// now send E5, merging the forks
93+
alice.SendEventSynced(t, serverRoom.RoomID, b.Event{
94+
Type: "please_filter_me",
95+
Content: map[string]interface{}{
96+
"msgtype": "m.text",
97+
"body": "E5",
98+
},
99+
})
100+
101+
// now do a sync request which filters events of type 'please_filter_me', and ensure we still see the
102+
// correct room name. Note we don't need to SyncUntil here as we have all the data in the right places
103+
// already.
104+
res, _ := alice.MustSync(t, client.SyncReq{
105+
Filter: `{
106+
"room": {
107+
"timeline": {
108+
"not_types": ["please_filter_me"],
109+
"limit": 1
110+
}
111+
}
112+
}`,
113+
})
114+
must.MatchGJSON(t, res, match.JSONCheckOff(
115+
// look in this array
116+
fmt.Sprintf("rooms.join.%s.state.events", client.GjsonEscape(serverRoom.RoomID)),
117+
// for these items
118+
[]interface{}{s2.EventID()},
119+
// and map them first into this format
120+
match.CheckOffMapper(func(r gjson.Result) interface{} {
121+
return r.Get("event_id").Str
122+
}), match.CheckOffAllowUnwanted(),
123+
))
124+
}

0 commit comments

Comments
 (0)