-
Notifications
You must be signed in to change notification settings - Fork 61
Draft: Test that /messages works on remote homeserver and can be backfilled properly after many batches (MSC2716)
#214
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
8099f47
094c5f7
e30bcd4
1e333d6
2022b31
d325349
2fe5180
0604564
83adbe2
4aba836
ffbca43
37109fa
cc7236b
4c8284a
9b90429
677836b
85eb7bd
3532821
1667e15
0589546
606197a
d679384
230c46e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
|
|
@@ -49,18 +49,19 @@ var ( | |||
| var createPublicRoomOpts = map[string]interface{}{ | ||||
| "preset": "public_chat", | ||||
| "name": "the hangout spot", | ||||
| "room_version": "org.matrix.msc2716v3", | ||||
| "room_version": "org.matrix.msc2716v4", | ||||
| } | ||||
|
|
||||
| var createPrivateRoomOpts = map[string]interface{}{ | ||||
| "preset": "private_chat", | ||||
| "name": "the hangout spot", | ||||
| "room_version": "org.matrix.msc2716v3", | ||||
| "room_version": "org.matrix.msc2716v4", | ||||
| } | ||||
|
|
||||
| func TestImportHistoricalMessages(t *testing.T) { | ||||
| deployment := Deploy(t, b.BlueprintHSWithApplicationService) | ||||
| defer deployment.Destroy(t) | ||||
| //defer time.Sleep(2 * time.Hour) | ||||
MadLittleMods marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||
|
|
||||
| // Create the application service bridge user that is able to import historical messages | ||||
| asUserID := "@the-bridge-user:hs1" | ||||
|
|
@@ -98,7 +99,7 @@ func TestImportHistoricalMessages(t *testing.T) { | |||
| // | ||||
MadLittleMods marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
| // Create the first batch including the "live" event we are going to | ||||
| // import our historical events next to. | ||||
| eventIDsBefore := createMessagesInRoom(t, alice, roomID, 2) | ||||
| eventIDsBefore := createMessagesInRoom(t, alice, roomID, 2, "eventIDsBefore") | ||||
| eventIdBefore := eventIDsBefore[len(eventIDsBefore)-1] | ||||
| timeAfterEventBefore := time.Now() | ||||
|
|
||||
|
|
@@ -110,7 +111,7 @@ func TestImportHistoricalMessages(t *testing.T) { | |||
| // Create the second batch of events. | ||||
| // This will also fill up the buffer so we have to scrollback to the | ||||
| // inserted history later. | ||||
| eventIDsAfter := createMessagesInRoom(t, alice, roomID, 2) | ||||
| eventIDsAfter := createMessagesInRoom(t, alice, roomID, 2, "eventIDsAfter") | ||||
|
|
||||
| // Insert the most recent batch of historical messages | ||||
| insertTime0 := timeAfterEventBefore.Add(timeBetweenMessages * 3) | ||||
|
|
@@ -214,7 +215,7 @@ func TestImportHistoricalMessages(t *testing.T) { | |||
| alice.JoinRoom(t, roomID, nil) | ||||
|
|
||||
| // Create the "live" event we are going to insert our historical events next to | ||||
| eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) | ||||
| eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore") | ||||
| eventIdBefore := eventIDsBefore[0] | ||||
| timeAfterEventBefore := time.Now() | ||||
|
|
||||
|
|
@@ -255,19 +256,74 @@ func TestImportHistoricalMessages(t *testing.T) { | |||
| }) | ||||
| }) | ||||
|
|
||||
| t.Run("Backfill still works after many batches are imported", func(t *testing.T) { | ||||
MadLittleMods marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||
| t.Parallel() | ||||
MadLittleMods marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||
|
|
||||
| roomID := as.CreateRoom(t, createPublicRoomOpts) | ||||
| alice.JoinRoom(t, roomID, nil) | ||||
|
|
||||
| // Create some normal messages in the timeline | ||||
| eventIDsBefore := createMessagesInRoom(t, alice, roomID, 2, "eventIDsBefore") | ||||
| eventIdBefore := eventIDsBefore[len(eventIDsBefore)-1] | ||||
| timeAfterEventBefore := time.Now() | ||||
|
|
||||
| // wait X number of ms to ensure that the timestamp changes enough for | ||||
| // each of the historical messages we try to import later | ||||
| //numBatches := 11 | ||||
| numBatches := 2 | ||||
|
||||
| numHistoricalMessagesPerBatch := 100 | ||||
| time.Sleep(time.Duration(numBatches*numHistoricalMessagesPerBatch) * timeBetweenMessages) | ||||
|
|
||||
| // eventIDsAfter | ||||
| createMessagesInRoom(t, alice, roomID, 2, "eventIDsAfter") | ||||
|
|
||||
| // Import a long chain of batches connected to each other. | ||||
| // We want to make sure Synapse doesn't blow up after we import | ||||
| // many messages. | ||||
| var expectedEventIDs []string | ||||
| nextBatchID := "" | ||||
| for i := 0; i < numBatches; i++ { | ||||
| insertTime := timeAfterEventBefore.Add(timeBetweenMessages * time.Duration(numBatches-numHistoricalMessagesPerBatch*i)) | ||||
| batchSendRes := batchSendHistoricalMessages( | ||||
| t, | ||||
| as, | ||||
| roomID, | ||||
| eventIdBefore, | ||||
| nextBatchID, | ||||
| createJoinStateEventsForBatchSendRequest([]string{virtualUserID}, insertTime), | ||||
| createMessageEventsForBatchSendRequest([]string{virtualUserID}, insertTime, numHistoricalMessagesPerBatch), | ||||
| // Status | ||||
| 200, | ||||
| ) | ||||
| batchSendResBody := client.ParseJSON(t, batchSendRes) | ||||
| // Make sure we see all of the historical messages | ||||
| expectedEventIDs = append(expectedEventIDs, client.GetJSONFieldStringArray(t, batchSendResBody, "event_ids")...) | ||||
| nextBatchID = client.GetJSONFieldStr(t, batchSendResBody, "next_batch_id") | ||||
| } | ||||
MadLittleMods marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||
|
|
||||
| // Make sure we see the events at the very start of the message history | ||||
| expectedEventIDs = append(expectedEventIDs, eventIDsBefore...) | ||||
|
||||
|
|
||||
| // Join the room from a remote homeserver after the historical messages were sent | ||||
| remoteCharlie.JoinRoom(t, roomID, []string{"hs1"}) | ||||
|
|
||||
| // Make sure events can be backfilled from the remote homeserver | ||||
| paginateUntilMessageCheckOff(t, remoteCharlie, roomID, expectedEventIDs) | ||||
| }) | ||||
|
|
||||
| t.Run("Historical events from /batch_send do not come down in an incremental sync", func(t *testing.T) { | ||||
| t.Parallel() | ||||
|
|
||||
| roomID := as.CreateRoom(t, createPublicRoomOpts) | ||||
| alice.JoinRoom(t, roomID, nil) | ||||
|
|
||||
| // Create the "live" event we are going to insert our historical events next to | ||||
| eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) | ||||
| eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore") | ||||
| eventIdBefore := eventIDsBefore[0] | ||||
| timeAfterEventBefore := time.Now() | ||||
|
|
||||
| // Create some "live" events to saturate and fill up the /sync response | ||||
| createMessagesInRoom(t, alice, roomID, 5) | ||||
| createMessagesInRoom(t, alice, roomID, 5, "live") | ||||
|
|
||||
| // Import a historical event | ||||
| batchSendRes := batchSendHistoricalMessages( | ||||
|
|
@@ -286,7 +342,7 @@ func TestImportHistoricalMessages(t *testing.T) { | |||
| historicalEventId := historicalEventIDs[0] | ||||
|
|
||||
| // This is just a dummy event we search for after the historicalEventId | ||||
| eventIDsAfterHistoricalImport := createMessagesInRoom(t, alice, roomID, 1) | ||||
| eventIDsAfterHistoricalImport := createMessagesInRoom(t, alice, roomID, 1, "eventIDsAfterHistoricalImport") | ||||
| eventIDAfterHistoricalImport := eventIDsAfterHistoricalImport[0] | ||||
|
|
||||
| // Sync until we find the eventIDAfterHistoricalImport. | ||||
|
|
@@ -309,7 +365,7 @@ func TestImportHistoricalMessages(t *testing.T) { | |||
| alice.JoinRoom(t, roomID, nil) | ||||
|
|
||||
| // Create the "live" event we are going to import our historical events next to | ||||
| eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) | ||||
| eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore") | ||||
| eventIdBefore := eventIDsBefore[0] | ||||
| timeAfterEventBefore := time.Now() | ||||
|
|
||||
|
|
@@ -363,7 +419,7 @@ func TestImportHistoricalMessages(t *testing.T) { | |||
| roomID := as.CreateRoom(t, createPublicRoomOpts) | ||||
| alice.JoinRoom(t, roomID, nil) | ||||
|
|
||||
| eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) | ||||
| eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore") | ||||
| eventIdBefore := eventIDsBefore[0] | ||||
| timeAfterEventBefore := time.Now() | ||||
|
|
||||
|
|
@@ -386,7 +442,7 @@ func TestImportHistoricalMessages(t *testing.T) { | |||
| roomID := as.CreateRoom(t, createPublicRoomOpts) | ||||
| alice.JoinRoom(t, roomID, nil) | ||||
|
|
||||
| eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) | ||||
| eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore") | ||||
| eventIdBefore := eventIDsBefore[0] | ||||
| timeAfterEventBefore := time.Now() | ||||
|
|
||||
|
|
@@ -417,7 +473,7 @@ func TestImportHistoricalMessages(t *testing.T) { | |||
| alice.JoinRoom(t, roomID, nil) | ||||
|
|
||||
| // Create the "live" event we are going to import our historical events next to | ||||
| eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) | ||||
| eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore") | ||||
| eventIdBefore := eventIDsBefore[0] | ||||
| timeAfterEventBefore := time.Now() | ||||
|
|
||||
|
|
@@ -469,12 +525,12 @@ func TestImportHistoricalMessages(t *testing.T) { | |||
| roomID := as.CreateRoom(t, createPublicRoomOpts) | ||||
| alice.JoinRoom(t, roomID, nil) | ||||
|
|
||||
| eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) | ||||
| eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore") | ||||
| eventIdBefore := eventIDsBefore[0] | ||||
| timeAfterEventBefore := time.Now() | ||||
|
|
||||
| // eventIDsAfter | ||||
| createMessagesInRoom(t, alice, roomID, 3) | ||||
| createMessagesInRoom(t, alice, roomID, 3, "eventIDsAfter") | ||||
|
|
||||
| batchSendRes := batchSendHistoricalMessages( | ||||
| t, | ||||
|
|
@@ -522,7 +578,7 @@ func TestImportHistoricalMessages(t *testing.T) { | |||
| roomID := as.CreateRoom(t, createPublicRoomOpts) | ||||
| alice.JoinRoom(t, roomID, nil) | ||||
|
|
||||
| eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) | ||||
| eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore") | ||||
| eventIdBefore := eventIDsBefore[0] | ||||
| timeAfterEventBefore := time.Now() | ||||
|
|
||||
|
|
@@ -546,7 +602,7 @@ func TestImportHistoricalMessages(t *testing.T) { | |||
| }) | ||||
|
|
||||
| // eventIDsAfter | ||||
| createMessagesInRoom(t, alice, roomID, 3) | ||||
| createMessagesInRoom(t, alice, roomID, 3, "eventIDsAfter") | ||||
|
|
||||
| batchSendRes := batchSendHistoricalMessages( | ||||
| t, | ||||
|
|
@@ -597,12 +653,12 @@ func TestImportHistoricalMessages(t *testing.T) { | |||
| // Join the room from a remote homeserver before any historical messages are sent | ||||
| remoteCharlie.JoinRoom(t, roomID, []string{"hs1"}) | ||||
|
|
||||
| eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) | ||||
| eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore") | ||||
| eventIdBefore := eventIDsBefore[0] | ||||
| timeAfterEventBefore := time.Now() | ||||
|
|
||||
| // eventIDsAfter | ||||
| createMessagesInRoom(t, alice, roomID, 10) | ||||
| createMessagesInRoom(t, alice, roomID, 10, "eventIDsAfter") | ||||
|
|
||||
| // Mimic scrollback just through the latest messages | ||||
| remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ | ||||
|
|
@@ -685,12 +741,12 @@ func TestImportHistoricalMessages(t *testing.T) { | |||
| // Join the room from a remote homeserver before any historical messages are sent | ||||
| remoteCharlie.JoinRoom(t, roomID, []string{"hs1"}) | ||||
|
|
||||
| eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) | ||||
| eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore") | ||||
| eventIdBefore := eventIDsBefore[0] | ||||
| timeAfterEventBefore := time.Now() | ||||
|
|
||||
| // eventIDsAfter | ||||
| createMessagesInRoom(t, alice, roomID, 3) | ||||
| createMessagesInRoom(t, alice, roomID, 3, "eventIDsAfter") | ||||
|
|
||||
| // Mimic scrollback to all of the messages | ||||
| // scrollbackMessagesRes | ||||
|
|
@@ -778,7 +834,7 @@ func TestImportHistoricalMessages(t *testing.T) { | |||
| alice.JoinRoom(t, roomID, nil) | ||||
|
|
||||
| // Create the "live" event we are going to import our historical events next to | ||||
| eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) | ||||
| eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore") | ||||
| eventIdBefore := eventIDsBefore[0] | ||||
| timeAfterEventBefore := time.Now() | ||||
|
|
||||
|
|
@@ -818,7 +874,7 @@ func TestImportHistoricalMessages(t *testing.T) { | |||
| alice.JoinRoom(t, roomID, nil) | ||||
|
|
||||
| // Create the "live" event we are going to import our historical events next to | ||||
| eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) | ||||
| eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore") | ||||
| eventIdBefore := eventIDsBefore[0] | ||||
| timeAfterEventBefore := time.Now() | ||||
|
|
||||
|
|
@@ -913,6 +969,96 @@ func fetchUntilMessagesResponseHas(t *testing.T, c *client.CSAPI, roomID string, | |||
| } | ||||
| } | ||||
|
|
||||
| func paginateUntilMessageCheckOff(t *testing.T, c *client.CSAPI, roomID string, expectedEventIDs []string) { | ||||
| t.Helper() | ||||
| start := time.Now() | ||||
|
|
||||
| workingExpectedEventIDMap := make(map[string]string) | ||||
| for _, expectedEventID := range expectedEventIDs { | ||||
| workingExpectedEventIDMap[expectedEventID] = expectedEventID | ||||
| } | ||||
|
|
||||
| var actualEventIDList []string | ||||
| checkCounter := 0 | ||||
| messageResEnd := "" | ||||
| generateErrorMesssageInfo := func() string { | ||||
| i := 0 | ||||
| leftoverEventIDs := make([]string, len(workingExpectedEventIDMap)) | ||||
| for eventID := range workingExpectedEventIDMap { | ||||
| leftoverEventIDs[i] = eventID | ||||
| i++ | ||||
| } | ||||
|
|
||||
| return fmt.Sprintf("Called /messages %d times but only found %d/%d expected messages. Leftover messages we expected (%d): %s. We saw %d events over all of the API calls: %s", | ||||
| checkCounter, | ||||
| len(expectedEventIDs)-len(leftoverEventIDs), | ||||
| len(expectedEventIDs), | ||||
| len(leftoverEventIDs), | ||||
| leftoverEventIDs, | ||||
| len(actualEventIDList), | ||||
| actualEventIDList, | ||||
| ) | ||||
| } | ||||
|
|
||||
| for { | ||||
| if time.Since(start) > 200*c.SyncUntilTimeout { | ||||
|
||||
| SyncUntilTimeout: 5 * time.Second, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kegsay Sorry this wasn't clear as undrafting indicates "marked this pull request as ready for review" but I didn't assign you this one yet specifically because of this problem.
The test itself is good to go (timeout can be switched to normal and numBatches could be switched back to 11) and shipped but want to make it actually acceptable time-wise to run against Synapse before merging.
I used numBatches=2 during testing because it's much faster to see results while developing.
Thanks for the review pass though and I'll fix up these other spots ⏩
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@MadLittleMods what is the status of this PR? It's still marked as a Draft.
Status is still the same as the last update in this thread. It's too slow on Synapse for me to be comfortable merging it yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In terms of optimizing Synapse to make this test viable to run time-wise, I'm a bit blocked on a race condition in some recent code, matrix-org/synapse#12394 (comment) -> matrix-org/synapse#12646
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In terms of optimizing Synapse to make this test viable to run time-wise, I'm a bit blocked on a race condition in some recent code, matrix-org/synapse#12394 (comment) -> matrix-org/synapse#12646
Now matrix-org/synapse#12988 I think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made progress on optimizing Synapse:
- Saves 10s per
/messagescall: Fixhave_seen_eventcache not being invalidated synapse#13863 - Still need to figure out Optimize backfill receiving to have less missing
prev_eventthrashing (scratch) synapse#13864
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use JSONArrayEach?
complement/internal/match/json.go
Line 164 in 136fd60
| func JSONArrayEach(wantKey string, fn func(gjson.Result) error) JSON { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC, I think it was because we can't break early with JSONArrayEach (can only break early if you throw an error which fails the test). JSONArrayEach could be refactored to have the item function return keepGoing, err but rather do that in another PR since it touches so many other tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the early return really a deal breaker? I don't think it'll affect runtime performance that much?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to re-use JSONArrayEach. It's not so bad ⏩
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this function doesn't actually check the events are in the correct order, just that they exist?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, just a checkoff function
Uh oh!
There was an error while loading. Please reload this page.