From 8099f47e4ef47ffbe7c9af1188dbf91721fdb903 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 19 Oct 2021 01:58:45 -0500 Subject: [PATCH 01/15] Test that /messages can be backfilled properly after many batches --- tests/msc2716_test.go | 151 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 151 insertions(+) diff --git a/tests/msc2716_test.go b/tests/msc2716_test.go index 0ad694ca..71a88bad 100644 --- a/tests/msc2716_test.go +++ b/tests/msc2716_test.go @@ -15,6 +15,7 @@ import ( "testing" "time" + "github.com/sirupsen/logrus" "github.com/tidwall/gjson" "github.com/matrix-org/complement/internal/b" @@ -255,6 +256,67 @@ func TestImportHistoricalMessages(t *testing.T) { }) }) + t.Run("Backfill still works after many batches are imported", func(t *testing.T) { + t.Parallel() + + roomID := as.CreateRoom(t, createPublicRoomOpts) + alice.JoinRoom(t, roomID, nil) + + // Create some normal messages in the timeline + eventIDsBefore := createMessagesInRoom(t, alice, roomID, 2) + eventIdBefore := eventIDsBefore[len(eventIDsBefore)-1] + timeAfterEventBefore := time.Now() + + // wait X number of ms to ensure that the timestamp changes enough for + // each of the historical messages we try to import later + //numBatches := 11 + numBatches := 1 + numHistoricalMessagesPerBatch := 100 + time.Sleep(time.Duration(numBatches*numHistoricalMessagesPerBatch) * timeBetweenMessages) + + // eventIDsAfter + createMessagesInRoom(t, alice, roomID, 2) + + // Import a long chain of batches connected to each other. + // We want to make sure Synapse doesn't blow up after we import + // many messages. + var expectedEventIDs []string + nextBatchID := "" + for i := 0; i < numBatches; i++ { + insertTime := timeAfterEventBefore.Add(timeBetweenMessages * time.Duration(numBatches-numHistoricalMessagesPerBatch*i)) + batchSendRes := batchSendHistoricalMessages( + t, + as, + roomID, + eventIdBefore, + nextBatchID, + createJoinStateEventsForBatchSendRequest([]string{virtualUserID}, insertTime), + createMessageEventsForBatchSendRequest([]string{virtualUserID}, insertTime, numHistoricalMessagesPerBatch), + // Status + 200, + ) + batchSendResBody := client.ParseJSON(t, batchSendRes) + // Make sure we see all of the historical messages + expectedEventIDs = append(expectedEventIDs, client.GetJSONFieldStringArray(t, batchSendResBody, "event_ids")...) + nextBatchID = client.GetJSONFieldStr(t, batchSendResBody, "next_batch_id") + } + + // Make sure we see the event at the very start of the message history + expectedEventIDs = append(expectedEventIDs, eventIdBefore) + + // Join the room from a remote homeserver after the historical messages were sent + remoteCharlie.JoinRoom(t, roomID, []string{"hs1"}) + // eventIDsFromRemote := createMessagesInRoom(t, remoteCharlie, roomID, 1) + // eventIDFromRemote := eventIDsFromRemote[0] + + // logrus.WithFields(logrus.Fields{ + // "eventIDFromRemote": string(eventIDFromRemote), + // }).Error("gewgewaewagewagagew") + + // Make sure events can be backfilled from the remote homeserver + paginateUntilMessageCheckOff(t, remoteCharlie, roomID, expectedEventIDs) + }) + t.Run("Historical events from /batch_send do not come down in an incremental sync", func(t *testing.T) { t.Parallel() @@ -913,6 +975,95 @@ func fetchUntilMessagesResponseHas(t *testing.T, c *client.CSAPI, roomID string, } } +func paginateUntilMessageCheckOff(t *testing.T, c *client.CSAPI, roomID string, expectedEventIDs []string) { + t.Helper() + start := time.Now() + + workingExpectedEventIDMap := make(map[string]string) + for _, expectedEventID := range expectedEventIDs { + workingExpectedEventIDMap[expectedEventID] = expectedEventID + } + + var actualEventIDList []string + checkCounter := 0 + messageResEnd := "" + generateErrorMesssageInfo := func() string { + i := 0 + leftoverEventIDs := make([]string, len(workingExpectedEventIDMap)) + for eventID := range workingExpectedEventIDMap { + leftoverEventIDs[i] = eventID + i++ + } + + return fmt.Sprintf("Called /messages %d times but only found %d/%d expected messages. Leftover messages we expected (%d): %s. We saw %d events over all of the API calls: %s", + checkCounter, + len(expectedEventIDs)-len(leftoverEventIDs), + len(expectedEventIDs), + len(leftoverEventIDs), + leftoverEventIDs, + len(actualEventIDList), + actualEventIDList, + ) + } + + for { + if time.Since(start) > 10*c.SyncUntilTimeout { + t.Fatalf( + "paginateUntilMessageCheckOff timed out. %s", + generateErrorMesssageInfo(), + ) + } + + messagesRes := c.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ + "dir": []string{"b"}, + "limit": []string{"100"}, + "from": []string{messageResEnd}, + })) + messsageResBody := client.ParseJSON(t, messagesRes) + + messageResEnd = client.GetJSONFieldStr(t, messsageResBody, "end") + + wantKey := "chunk" + keyRes := gjson.GetBytes(messsageResBody, wantKey) + if !keyRes.Exists() { + t.Fatalf("missing key '%s'", wantKey) + } + if !keyRes.IsArray() { + t.Fatalf("key '%s' is not an array (was %s)", wantKey, keyRes.Type) + } + + events := keyRes.Array() + + if len(events) == 0 { + t.Fatalf( + "paginateUntilMessageCheckOff reached the end of the messages without finding all expected events. %s", + generateErrorMesssageInfo(), + ) + } + + logrus.WithFields(logrus.Fields{ + "events": events, + "messageResEnd": messageResEnd, + }).Error("paginateUntilMessageCheckOff chunk") + for _, ev := range events { + eventID := ev.Get("event_id").Str + actualEventIDList = append(actualEventIDList, eventID) + + if _, keyExists := workingExpectedEventIDMap[eventID]; keyExists { + delete(workingExpectedEventIDMap, eventID) + } + + if len(workingExpectedEventIDMap) == 0 { + return + } + } + + checkCounter++ + // Add a slight delay so we don't hammmer the messages endpoint + time.Sleep(500 * time.Millisecond) + } +} + func isRelevantEvent(r gjson.Result) bool { return len(r.Get("content").Get("body").Str) > 0 || r.Get("type").Str == insertionEventType || From 094c5f72ae28a2960fc7bb6f79225c3a3449ff6c Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 21 Oct 2021 04:38:06 -0500 Subject: [PATCH 02/15] Add message suffix to better distinguish messages --- internal/client/client.go | 2 +- tests/msc2716_test.go | 71 ++++++++++++++++++--------------------- 2 files changed, 34 insertions(+), 39 deletions(-) diff --git a/internal/client/client.go b/internal/client/client.go index ed3b174f..0db72d4c 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -310,7 +310,7 @@ func (c *CSAPI) DoFunc(t *testing.T, method string, paths []string, opts ...Requ } // debug log the request if c.Debug { - t.Logf("Making %s request to %s", method, reqURL) + t.Logf("Making %s request to %s", method, req.URL) contentType := req.Header.Get("Content-Type") if contentType == "application/json" || strings.HasPrefix(contentType, "text/") { if req.Body != nil { diff --git a/tests/msc2716_test.go b/tests/msc2716_test.go index 71a88bad..fd1bc659 100644 --- a/tests/msc2716_test.go +++ b/tests/msc2716_test.go @@ -15,7 +15,6 @@ import ( "testing" "time" - "github.com/sirupsen/logrus" "github.com/tidwall/gjson" "github.com/matrix-org/complement/internal/b" @@ -62,6 +61,7 @@ var createPrivateRoomOpts = map[string]interface{}{ func TestImportHistoricalMessages(t *testing.T) { deployment := Deploy(t, b.BlueprintHSWithApplicationService) defer deployment.Destroy(t) + //defer time.Sleep(2 * time.Hour) // Create the application service bridge user that is able to import historical messages asUserID := "@the-bridge-user:hs1" @@ -99,7 +99,7 @@ func TestImportHistoricalMessages(t *testing.T) { // // Create the first batch including the "live" event we are going to // import our historical events next to. - eventIDsBefore := createMessagesInRoom(t, alice, roomID, 2) + eventIDsBefore := createMessagesInRoom(t, alice, roomID, 2, "eventIDsBefore") eventIdBefore := eventIDsBefore[len(eventIDsBefore)-1] timeAfterEventBefore := time.Now() @@ -111,7 +111,7 @@ func TestImportHistoricalMessages(t *testing.T) { // Create the second batch of events. // This will also fill up the buffer so we have to scrollback to the // inserted history later. - eventIDsAfter := createMessagesInRoom(t, alice, roomID, 2) + eventIDsAfter := createMessagesInRoom(t, alice, roomID, 2, "eventIDsAfter") // Insert the most recent batch of historical messages insertTime0 := timeAfterEventBefore.Add(timeBetweenMessages * 3) @@ -215,7 +215,7 @@ func TestImportHistoricalMessages(t *testing.T) { alice.JoinRoom(t, roomID, nil) // Create the "live" event we are going to insert our historical events next to - eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) + eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore") eventIdBefore := eventIDsBefore[0] timeAfterEventBefore := time.Now() @@ -263,19 +263,19 @@ func TestImportHistoricalMessages(t *testing.T) { alice.JoinRoom(t, roomID, nil) // Create some normal messages in the timeline - eventIDsBefore := createMessagesInRoom(t, alice, roomID, 2) + eventIDsBefore := createMessagesInRoom(t, alice, roomID, 2, "eventIDsBefore") eventIdBefore := eventIDsBefore[len(eventIDsBefore)-1] timeAfterEventBefore := time.Now() // wait X number of ms to ensure that the timestamp changes enough for // each of the historical messages we try to import later //numBatches := 11 - numBatches := 1 + numBatches := 2 numHistoricalMessagesPerBatch := 100 time.Sleep(time.Duration(numBatches*numHistoricalMessagesPerBatch) * timeBetweenMessages) // eventIDsAfter - createMessagesInRoom(t, alice, roomID, 2) + createMessagesInRoom(t, alice, roomID, 2, "eventIDsAfter") // Import a long chain of batches connected to each other. // We want to make sure Synapse doesn't blow up after we import @@ -306,12 +306,6 @@ func TestImportHistoricalMessages(t *testing.T) { // Join the room from a remote homeserver after the historical messages were sent remoteCharlie.JoinRoom(t, roomID, []string{"hs1"}) - // eventIDsFromRemote := createMessagesInRoom(t, remoteCharlie, roomID, 1) - // eventIDFromRemote := eventIDsFromRemote[0] - - // logrus.WithFields(logrus.Fields{ - // "eventIDFromRemote": string(eventIDFromRemote), - // }).Error("gewgewaewagewagagew") // Make sure events can be backfilled from the remote homeserver paginateUntilMessageCheckOff(t, remoteCharlie, roomID, expectedEventIDs) @@ -324,12 +318,12 @@ func TestImportHistoricalMessages(t *testing.T) { alice.JoinRoom(t, roomID, nil) // Create the "live" event we are going to insert our historical events next to - eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) + eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore") eventIdBefore := eventIDsBefore[0] timeAfterEventBefore := time.Now() // Create some "live" events to saturate and fill up the /sync response - createMessagesInRoom(t, alice, roomID, 5) + createMessagesInRoom(t, alice, roomID, 5, "live") // Import a historical event batchSendRes := batchSendHistoricalMessages( @@ -348,7 +342,7 @@ func TestImportHistoricalMessages(t *testing.T) { historicalEventId := historicalEventIDs[0] // This is just a dummy event we search for after the historicalEventId - eventIDsAfterHistoricalImport := createMessagesInRoom(t, alice, roomID, 1) + eventIDsAfterHistoricalImport := createMessagesInRoom(t, alice, roomID, 1, "eventIDsAfterHistoricalImport") eventIDAfterHistoricalImport := eventIDsAfterHistoricalImport[0] // Sync until we find the eventIDAfterHistoricalImport. @@ -371,7 +365,7 @@ func TestImportHistoricalMessages(t *testing.T) { alice.JoinRoom(t, roomID, nil) // Create the "live" event we are going to import our historical events next to - eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) + eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore") eventIdBefore := eventIDsBefore[0] timeAfterEventBefore := time.Now() @@ -425,7 +419,7 @@ func TestImportHistoricalMessages(t *testing.T) { roomID := as.CreateRoom(t, createPublicRoomOpts) alice.JoinRoom(t, roomID, nil) - eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) + eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore") eventIdBefore := eventIDsBefore[0] timeAfterEventBefore := time.Now() @@ -448,7 +442,7 @@ func TestImportHistoricalMessages(t *testing.T) { roomID := as.CreateRoom(t, createPublicRoomOpts) alice.JoinRoom(t, roomID, nil) - eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) + eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore") eventIdBefore := eventIDsBefore[0] timeAfterEventBefore := time.Now() @@ -479,7 +473,7 @@ func TestImportHistoricalMessages(t *testing.T) { alice.JoinRoom(t, roomID, nil) // Create the "live" event we are going to import our historical events next to - eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) + eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore") eventIdBefore := eventIDsBefore[0] timeAfterEventBefore := time.Now() @@ -531,12 +525,12 @@ func TestImportHistoricalMessages(t *testing.T) { roomID := as.CreateRoom(t, createPublicRoomOpts) alice.JoinRoom(t, roomID, nil) - eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) + eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore") eventIdBefore := eventIDsBefore[0] timeAfterEventBefore := time.Now() // eventIDsAfter - createMessagesInRoom(t, alice, roomID, 3) + createMessagesInRoom(t, alice, roomID, 3, "eventIDsAfter") batchSendRes := batchSendHistoricalMessages( t, @@ -584,7 +578,7 @@ func TestImportHistoricalMessages(t *testing.T) { roomID := as.CreateRoom(t, createPublicRoomOpts) alice.JoinRoom(t, roomID, nil) - eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) + eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore") eventIdBefore := eventIDsBefore[0] timeAfterEventBefore := time.Now() @@ -608,7 +602,7 @@ func TestImportHistoricalMessages(t *testing.T) { }) // eventIDsAfter - createMessagesInRoom(t, alice, roomID, 3) + createMessagesInRoom(t, alice, roomID, 3, "eventIDsAfter") batchSendRes := batchSendHistoricalMessages( t, @@ -659,12 +653,12 @@ func TestImportHistoricalMessages(t *testing.T) { // Join the room from a remote homeserver before any historical messages are sent remoteCharlie.JoinRoom(t, roomID, []string{"hs1"}) - eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) + eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore") eventIdBefore := eventIDsBefore[0] timeAfterEventBefore := time.Now() // eventIDsAfter - createMessagesInRoom(t, alice, roomID, 10) + createMessagesInRoom(t, alice, roomID, 10, "eventIDsAfter") // Mimic scrollback just through the latest messages remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ @@ -747,12 +741,12 @@ func TestImportHistoricalMessages(t *testing.T) { // Join the room from a remote homeserver before any historical messages are sent remoteCharlie.JoinRoom(t, roomID, []string{"hs1"}) - eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) + eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore") eventIdBefore := eventIDsBefore[0] timeAfterEventBefore := time.Now() // eventIDsAfter - createMessagesInRoom(t, alice, roomID, 3) + createMessagesInRoom(t, alice, roomID, 3, "eventIDsAfter") // Mimic scrollback to all of the messages // scrollbackMessagesRes @@ -840,7 +834,7 @@ func TestImportHistoricalMessages(t *testing.T) { alice.JoinRoom(t, roomID, nil) // Create the "live" event we are going to import our historical events next to - eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) + eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore") eventIdBefore := eventIDsBefore[0] timeAfterEventBefore := time.Now() @@ -880,7 +874,7 @@ func TestImportHistoricalMessages(t *testing.T) { alice.JoinRoom(t, roomID, nil) // Create the "live" event we are going to import our historical events next to - eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) + eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore") eventIdBefore := eventIDsBefore[0] timeAfterEventBefore := time.Now() @@ -1007,7 +1001,7 @@ func paginateUntilMessageCheckOff(t *testing.T, c *client.CSAPI, roomID string, } for { - if time.Since(start) > 10*c.SyncUntilTimeout { + if time.Since(start) > 200*c.SyncUntilTimeout { t.Fatalf( "paginateUntilMessageCheckOff timed out. %s", generateErrorMesssageInfo(), @@ -1015,7 +1009,8 @@ func paginateUntilMessageCheckOff(t *testing.T, c *client.CSAPI, roomID string, } messagesRes := c.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ - "dir": []string{"b"}, + "dir": []string{"b"}, + // TODO: Can we do it with 100? "limit": []string{"100"}, "from": []string{messageResEnd}, })) @@ -1041,10 +1036,10 @@ func paginateUntilMessageCheckOff(t *testing.T, c *client.CSAPI, roomID string, ) } - logrus.WithFields(logrus.Fields{ - "events": events, - "messageResEnd": messageResEnd, - }).Error("paginateUntilMessageCheckOff chunk") + // logrus.WithFields(logrus.Fields{ + // "events": events, + // "messageResEnd": messageResEnd, + // }).Error("paginateUntilMessageCheckOff chunk") for _, ev := range events { eventID := ev.Get("event_id").Str actualEventIDList = append(actualEventIDList, eventID) @@ -1154,14 +1149,14 @@ func sendMarkerAndEnsureBackfilled(t *testing.T, as *client.CSAPI, c *client.CSA return markerEventID } -func createMessagesInRoom(t *testing.T, c *client.CSAPI, roomID string, count int) (eventIDs []string) { +func createMessagesInRoom(t *testing.T, c *client.CSAPI, roomID string, count int, messageSuffix string) (eventIDs []string) { eventIDs = make([]string, count) for i := 0; i < len(eventIDs); i++ { newEvent := b.Event{ Type: "m.room.message", Content: map[string]interface{}{ "msgtype": "m.text", - "body": fmt.Sprintf("Message %d", i), + "body": fmt.Sprintf("Message %d (%s)", i, messageSuffix), }, } newEventId := c.SendEventSynced(t, roomID, newEvent) From e30bcd4d0ae5660439b6d05ade5590acb26eef3c Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 29 Oct 2021 23:20:39 -0500 Subject: [PATCH 03/15] v4 room version --- tests/msc2716_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/msc2716_test.go b/tests/msc2716_test.go index fd1bc659..49303e4b 100644 --- a/tests/msc2716_test.go +++ b/tests/msc2716_test.go @@ -49,13 +49,13 @@ var ( var createPublicRoomOpts = map[string]interface{}{ "preset": "public_chat", "name": "the hangout spot", - "room_version": "org.matrix.msc2716v3", + "room_version": "org.matrix.msc2716v4", } var createPrivateRoomOpts = map[string]interface{}{ "preset": "private_chat", "name": "the hangout spot", - "room_version": "org.matrix.msc2716v3", + "room_version": "org.matrix.msc2716v4", } func TestImportHistoricalMessages(t *testing.T) { @@ -301,8 +301,8 @@ func TestImportHistoricalMessages(t *testing.T) { nextBatchID = client.GetJSONFieldStr(t, batchSendResBody, "next_batch_id") } - // Make sure we see the event at the very start of the message history - expectedEventIDs = append(expectedEventIDs, eventIdBefore) + // Make sure we see the events at the very start of the message history + expectedEventIDs = append(expectedEventIDs, eventIDsBefore...) // Join the room from a remote homeserver after the historical messages were sent remoteCharlie.JoinRoom(t, roomID, []string{"hs1"}) From 1e333d6db7d5e6ccb505dbbd32e2b1dca3965a42 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 3 Nov 2021 02:43:11 -0500 Subject: [PATCH 04/15] Make sure marker event is sent --- tests/msc2716_test.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/tests/msc2716_test.go b/tests/msc2716_test.go index 49303e4b..a072e193 100644 --- a/tests/msc2716_test.go +++ b/tests/msc2716_test.go @@ -281,6 +281,7 @@ func TestImportHistoricalMessages(t *testing.T) { // We want to make sure Synapse doesn't blow up after we import // many messages. var expectedEventIDs []string + var baseInsertionEventID string nextBatchID := "" for i := 0; i < numBatches; i++ { insertTime := timeAfterEventBefore.Add(timeBetweenMessages * time.Duration(numBatches-numHistoricalMessagesPerBatch*i)) @@ -299,6 +300,11 @@ func TestImportHistoricalMessages(t *testing.T) { // Make sure we see all of the historical messages expectedEventIDs = append(expectedEventIDs, client.GetJSONFieldStringArray(t, batchSendResBody, "event_ids")...) nextBatchID = client.GetJSONFieldStr(t, batchSendResBody, "next_batch_id") + + // Grab the base insertione event ID to reference later in the marker event + if i == 0 { + baseInsertionEventID = client.GetJSONFieldStr(t, batchSendResBody, "base_insertion_event_id") + } } // Make sure we see the events at the very start of the message history @@ -307,6 +313,9 @@ func TestImportHistoricalMessages(t *testing.T) { // Join the room from a remote homeserver after the historical messages were sent remoteCharlie.JoinRoom(t, roomID, []string{"hs1"}) + // Send the marker event + sendMarkerAndEnsureBackfilled(t, as, remoteCharlie, roomID, baseInsertionEventID) + // Make sure events can be backfilled from the remote homeserver paginateUntilMessageCheckOff(t, remoteCharlie, roomID, expectedEventIDs) }) @@ -545,6 +554,10 @@ func TestImportHistoricalMessages(t *testing.T) { ) batchSendResBody := client.ParseJSON(t, batchSendRes) historicalEventIDs := client.GetJSONFieldStringArray(t, batchSendResBody, "event_ids") + baseInsertionEventID := client.GetJSONFieldStr(t, batchSendResBody, "base_insertion_event_id") + + // Send the marker event + sendMarkerAndEnsureBackfilled(t, as, alice, roomID, baseInsertionEventID) // Join the room from a remote homeserver after the historical messages were sent remoteCharlie.JoinRoom(t, roomID, []string{"hs1"}) @@ -618,6 +631,9 @@ func TestImportHistoricalMessages(t *testing.T) { batchSendResBody := client.ParseJSON(t, batchSendRes) historicalEventIDs := client.GetJSONFieldStringArray(t, batchSendResBody, "event_ids") + // Send the marker event + sendMarkerAndEnsureBackfilled(t, as, alice, roomID, insertionEventID) + // Join the room from a remote homeserver after the historical messages were sent remoteCharlie.JoinRoom(t, roomID, []string{"hs1"}) From 2022b31183b99cbdbe2174e16237118c14a650a6 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 3 Nov 2021 02:45:09 -0500 Subject: [PATCH 05/15] Cleanup tests --- tests/msc2716_test.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/tests/msc2716_test.go b/tests/msc2716_test.go index a072e193..e1ed068d 100644 --- a/tests/msc2716_test.go +++ b/tests/msc2716_test.go @@ -61,7 +61,6 @@ var createPrivateRoomOpts = map[string]interface{}{ func TestImportHistoricalMessages(t *testing.T) { deployment := Deploy(t, b.BlueprintHSWithApplicationService) defer deployment.Destroy(t) - //defer time.Sleep(2 * time.Hour) // Create the application service bridge user that is able to import historical messages asUserID := "@the-bridge-user:hs1" @@ -1052,10 +1051,6 @@ func paginateUntilMessageCheckOff(t *testing.T, c *client.CSAPI, roomID string, ) } - // logrus.WithFields(logrus.Fields{ - // "events": events, - // "messageResEnd": messageResEnd, - // }).Error("paginateUntilMessageCheckOff chunk") for _, ev := range events { eventID := ev.Get("event_id").Str actualEventIDList = append(actualEventIDList, eventID) From d3253496df4cfc30c500677430f2808c4500b82d Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 3 Nov 2021 02:53:11 -0500 Subject: [PATCH 06/15] Some test cleanup and comments --- tests/msc2716_test.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/tests/msc2716_test.go b/tests/msc2716_test.go index e1ed068d..c1fe5466 100644 --- a/tests/msc2716_test.go +++ b/tests/msc2716_test.go @@ -266,11 +266,15 @@ func TestImportHistoricalMessages(t *testing.T) { eventIdBefore := eventIDsBefore[len(eventIDsBefore)-1] timeAfterEventBefore := time.Now() - // wait X number of ms to ensure that the timestamp changes enough for - // each of the historical messages we try to import later + // We chose the magic number 11 because Synapse currently limits the + // backfill extremities to 5. 10 also seemed like a round number someone + // could pick for other homeserver implementations so I just did 10+1 to + // make sure it also worked in that case. //numBatches := 11 numBatches := 2 numHistoricalMessagesPerBatch := 100 + // wait X number of ms to ensure that the timestamp changes enough for + // each of the historical messages we try to import later time.Sleep(time.Duration(numBatches*numHistoricalMessagesPerBatch) * timeBetweenMessages) // eventIDsAfter @@ -1024,8 +1028,7 @@ func paginateUntilMessageCheckOff(t *testing.T, c *client.CSAPI, roomID string, } messagesRes := c.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ - "dir": []string{"b"}, - // TODO: Can we do it with 100? + "dir": []string{"b"}, "limit": []string{"100"}, "from": []string{messageResEnd}, })) From 2fe51802349265bbc210131e58086d3d8d6f7e98 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 10 Nov 2021 19:32:11 -0600 Subject: [PATCH 07/15] Delays not needed for test servers (get result ASAP) See https://github.com/matrix-org/complement/pull/214#discussion_r746546248 --- tests/msc2716_test.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tests/msc2716_test.go b/tests/msc2716_test.go index c1fe5466..08550279 100644 --- a/tests/msc2716_test.go +++ b/tests/msc2716_test.go @@ -983,8 +983,6 @@ func fetchUntilMessagesResponseHas(t *testing.T, c *client.CSAPI, roomID string, } checkCounter++ - // Add a slight delay so we don't hammmer the messages endpoint - time.Sleep(500 * time.Millisecond) } } @@ -1068,8 +1066,6 @@ func paginateUntilMessageCheckOff(t *testing.T, c *client.CSAPI, roomID string, } checkCounter++ - // Add a slight delay so we don't hammmer the messages endpoint - time.Sleep(500 * time.Millisecond) } } From 0604564332440c11f56df03a2b1400c5f26951e2 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 10 Nov 2021 19:44:52 -0600 Subject: [PATCH 08/15] Fix typo --- tests/msc2716_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/msc2716_test.go b/tests/msc2716_test.go index 08550279..8e912593 100644 --- a/tests/msc2716_test.go +++ b/tests/msc2716_test.go @@ -304,7 +304,7 @@ func TestImportHistoricalMessages(t *testing.T) { expectedEventIDs = append(expectedEventIDs, client.GetJSONFieldStringArray(t, batchSendResBody, "event_ids")...) nextBatchID = client.GetJSONFieldStr(t, batchSendResBody, "next_batch_id") - // Grab the base insertione event ID to reference later in the marker event + // Grab the base insertion event ID to reference later in the marker event if i == 0 { baseInsertionEventID = client.GetJSONFieldStr(t, batchSendResBody, "base_insertion_event_id") } From 4aba836d8577bed9a91610053a9bf0780ca82d1e Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 11 Nov 2021 00:40:18 -0600 Subject: [PATCH 09/15] Fix missing params after merge --- tests/msc2716_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/msc2716_test.go b/tests/msc2716_test.go index d2314e78..a73df22b 100644 --- a/tests/msc2716_test.go +++ b/tests/msc2716_test.go @@ -520,12 +520,12 @@ func TestImportHistoricalMessages(t *testing.T) { alice.JoinRoom(t, roomID, nil) // Create the "live" event we are going to insert our backfilled events next to - eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) + eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore") eventIdBefore := eventIDsBefore[0] timeAfterEventBefore := time.Now() // eventIDsAfter - createMessagesInRoom(t, alice, roomID, 2) + createMessagesInRoom(t, alice, roomID, 2, "eventIDsAfter") // Import a batch of historical events batchSendRes0 := batchSendHistoricalMessages( @@ -580,7 +580,7 @@ func TestImportHistoricalMessages(t *testing.T) { alice.JoinRoom(t, roomID, nil) // Create the "live" event we are going to import our historical events next to - eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) + eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore") eventIdBefore := eventIDsBefore[0] timeAfterEventBefore := time.Now() @@ -963,7 +963,7 @@ func TestImportHistoricalMessages(t *testing.T) { timeAfterEventBefore := time.Now() // Create eventIDsAfter to avoid the "No forward extremities left!" 500 error from Synapse - createMessagesInRoom(t, alice, roomID, 2) + createMessagesInRoom(t, alice, roomID, 2, "eventIDsAfter") // Import a historical event batchSendRes := batchSendHistoricalMessages( From ffbca43c5dfead7cfa1bc27edf9e0ec91534b277 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 1 Dec 2021 22:22:21 -0600 Subject: [PATCH 10/15] Make sure historical state doesn't appear between batches --- tests/msc2716_test.go | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/tests/msc2716_test.go b/tests/msc2716_test.go index a73df22b..27364863 100644 --- a/tests/msc2716_test.go +++ b/tests/msc2716_test.go @@ -249,6 +249,7 @@ func TestImportHistoricalMessages(t *testing.T) { // We want to make sure Synapse doesn't blow up after we import // many messages. var expectedEventIDs []string + var denyListEventIDs []string var baseInsertionEventID string nextBatchID := "" for i := 0; i < numBatches; i++ { @@ -267,6 +268,8 @@ func TestImportHistoricalMessages(t *testing.T) { batchSendResBody := client.ParseJSON(t, batchSendRes) // Make sure we see all of the historical messages expectedEventIDs = append(expectedEventIDs, client.GetJSONFieldStringArray(t, batchSendResBody, "event_ids")...) + // We should not find any historical state between the batches of messages + denyListEventIDs = append(denyListEventIDs, client.GetJSONFieldStringArray(t, batchSendResBody, "state_event_ids")...) nextBatchID = client.GetJSONFieldStr(t, batchSendResBody, "next_batch_id") // Grab the base insertion event ID to reference later in the marker event @@ -285,7 +288,7 @@ func TestImportHistoricalMessages(t *testing.T) { sendMarkerAndEnsureBackfilled(t, as, remoteCharlie, roomID, baseInsertionEventID) // Make sure events can be backfilled from the remote homeserver - paginateUntilMessageCheckOff(t, remoteCharlie, roomID, expectedEventIDs) + paginateUntilMessageCheckOff(t, remoteCharlie, roomID, expectedEventIDs, denyListEventIDs) }) t.Run("Historical events from /batch_send do not come down in an incremental sync", func(t *testing.T) { @@ -1101,7 +1104,10 @@ func fetchUntilMessagesResponseHas(t *testing.T, c *client.CSAPI, roomID string, } } -func paginateUntilMessageCheckOff(t *testing.T, c *client.CSAPI, roomID string, expectedEventIDs []string) { +// Paginate the /messages endpoint until we find all of the expectedEventIds +// (order does not matter). If any event in denyListEventIDs is found, an error +// will be thrown. +func paginateUntilMessageCheckOff(t *testing.T, c *client.CSAPI, roomID string, expectedEventIDs []string, denyListEventIDs []string) { t.Helper() start := time.Now() @@ -1110,6 +1116,11 @@ func paginateUntilMessageCheckOff(t *testing.T, c *client.CSAPI, roomID string, workingExpectedEventIDMap[expectedEventID] = expectedEventID } + denyEventIDMap := make(map[string]string) + for _, denyEventID := range denyListEventIDs { + denyEventIDMap[denyEventID] = denyEventID + } + var actualEventIDList []string checkCounter := 0 messageResEnd := "" @@ -1171,6 +1182,14 @@ func paginateUntilMessageCheckOff(t *testing.T, c *client.CSAPI, roomID string, eventID := ev.Get("event_id").Str actualEventIDList = append(actualEventIDList, eventID) + if _, keyExists := denyEventIDMap[eventID]; keyExists { + t.Fatalf( + "paginateUntilMessageCheckOff found unexpected message=%s in deny list while paginating. %s", + eventID, + generateErrorMesssageInfo(), + ) + } + if _, keyExists := workingExpectedEventIDMap[eventID]; keyExists { delete(workingExpectedEventIDMap, eventID) } From 37109fac9090f5f1844a79c1ede0f7a04f8d9426 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 1 Dec 2021 23:05:28 -0600 Subject: [PATCH 11/15] Re-use JSONArrayEach See https://github.com/matrix-org/complement/pull/214#discussion_r746548755 Since we don't return early when we find everything in the expected event list, this also had the weird side-effect of finding that when the historical event chain is connected to the insertion event, it doesn't show up beteween batches, but it does appear before the creation event. (see https://github.com/matrix-org/synapse/pull/11243/files#discussion_r760753854) --- tests/msc2716_test.go | 74 +++++++++++++++++++++---------------------- 1 file changed, 36 insertions(+), 38 deletions(-) diff --git a/tests/msc2716_test.go b/tests/msc2716_test.go index 27364863..d7c2e707 100644 --- a/tests/msc2716_test.go +++ b/tests/msc2716_test.go @@ -1073,10 +1073,10 @@ func reversed(in []string) []string { func fetchUntilMessagesResponseHas(t *testing.T, c *client.CSAPI, roomID string, check func(gjson.Result) bool) { t.Helper() start := time.Now() - checkCounter := 0 + callCounter := 0 for { if time.Since(start) > c.SyncUntilTimeout { - t.Fatalf("fetchUntilMessagesResponseHas timed out. Called check function %d times", checkCounter) + t.Fatalf("fetchUntilMessagesResponseHas timed out. Called check function %d times", callCounter) } messagesRes := c.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ @@ -1100,7 +1100,7 @@ func fetchUntilMessagesResponseHas(t *testing.T, c *client.CSAPI, roomID string, } } - checkCounter++ + callCounter++ } } @@ -1122,7 +1122,7 @@ func paginateUntilMessageCheckOff(t *testing.T, c *client.CSAPI, roomID string, } var actualEventIDList []string - checkCounter := 0 + callCounter := 0 messageResEnd := "" generateErrorMesssageInfo := func() string { i := 0 @@ -1133,7 +1133,7 @@ func paginateUntilMessageCheckOff(t *testing.T, c *client.CSAPI, roomID string, } return fmt.Sprintf("Called /messages %d times but only found %d/%d expected messages. Leftover messages we expected (%d): %s. We saw %d events over all of the API calls: %s", - checkCounter, + callCounter, len(expectedEventIDs)-len(leftoverEventIDs), len(expectedEventIDs), len(leftoverEventIDs), @@ -1156,50 +1156,48 @@ func paginateUntilMessageCheckOff(t *testing.T, c *client.CSAPI, roomID string, "limit": []string{"100"}, "from": []string{messageResEnd}, })) + callCounter++ messsageResBody := client.ParseJSON(t, messagesRes) - messageResEnd = client.GetJSONFieldStr(t, messsageResBody, "end") + // Since the original body can only be read once, create a new one from the body bytes we just read + messagesRes.Body = ioutil.NopCloser(bytes.NewBuffer(messsageResBody)) - wantKey := "chunk" - keyRes := gjson.GetBytes(messsageResBody, wantKey) - if !keyRes.Exists() { - t.Fatalf("missing key '%s'", wantKey) - } - if !keyRes.IsArray() { - t.Fatalf("key '%s' is not an array (was %s)", wantKey, keyRes.Type) - } - - events := keyRes.Array() + foundEventInMessageResponse := false + must.MatchResponse(t, messagesRes, match.HTTPResponse{ + JSON: []match.JSON{ + match.JSONArrayEach("chunk", func(ev gjson.Result) error { + foundEventInMessageResponse = true + eventID := ev.Get("event_id").Str + actualEventIDList = append(actualEventIDList, eventID) + + if _, keyExists := denyEventIDMap[eventID]; keyExists { + return fmt.Errorf( + "paginateUntilMessageCheckOff found unexpected message=%s in deny list while paginating. %s", + eventID, + generateErrorMesssageInfo(), + ) + } + + if _, keyExists := workingExpectedEventIDMap[eventID]; keyExists { + delete(workingExpectedEventIDMap, eventID) + } + + return nil + }), + }, + }) - if len(events) == 0 { + if !foundEventInMessageResponse { t.Fatalf( "paginateUntilMessageCheckOff reached the end of the messages without finding all expected events. %s", generateErrorMesssageInfo(), ) } - for _, ev := range events { - eventID := ev.Get("event_id").Str - actualEventIDList = append(actualEventIDList, eventID) - - if _, keyExists := denyEventIDMap[eventID]; keyExists { - t.Fatalf( - "paginateUntilMessageCheckOff found unexpected message=%s in deny list while paginating. %s", - eventID, - generateErrorMesssageInfo(), - ) - } - - if _, keyExists := workingExpectedEventIDMap[eventID]; keyExists { - delete(workingExpectedEventIDMap, eventID) - } - - if len(workingExpectedEventIDMap) == 0 { - return - } + // We were able to find all of the expected events! + if len(workingExpectedEventIDMap) == 0 { + return } - - checkCounter++ } } From 4c8284a198068745fdbc9b685189478f79a4018a Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 17 Dec 2021 12:15:41 -0600 Subject: [PATCH 12/15] v4 was never merged --- tests/msc2716_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/msc2716_test.go b/tests/msc2716_test.go index aa49d517..96cc5f1a 100644 --- a/tests/msc2716_test.go +++ b/tests/msc2716_test.go @@ -50,13 +50,13 @@ var ( var createPublicRoomOpts = map[string]interface{}{ "preset": "public_chat", "name": "the hangout spot", - "room_version": "org.matrix.msc2716v4", + "room_version": "org.matrix.msc2716v3", } var createPrivateRoomOpts = map[string]interface{}{ "preset": "private_chat", "name": "the hangout spot", - "room_version": "org.matrix.msc2716v4", + "room_version": "org.matrix.msc2716v3", } func TestImportHistoricalMessages(t *testing.T) { From 606197adea1658d2295fdbceab6f54d27ce604bb Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 20 Sep 2022 19:07:52 -0500 Subject: [PATCH 13/15] Update test name --- tests/msc2716_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/msc2716_test.go b/tests/msc2716_test.go index cddf8efc..4f63afe6 100644 --- a/tests/msc2716_test.go +++ b/tests/msc2716_test.go @@ -1027,7 +1027,7 @@ func TestImportHistoricalMessages(t *testing.T) { ) }) - t.Run("Backfill still works after many batches are imported", func(t *testing.T) { + t.Run("Backfill still works after many contiguous batches are imported", func(t *testing.T) { t.Parallel() roomID := as.CreateRoom(t, createPublicRoomOpts) From d679384bd7b34bd959c4bba3bbb4d306b2b47cb8 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 29 Sep 2022 03:46:38 -0500 Subject: [PATCH 14/15] Changes and debugging --- internal/client/client.go | 2 +- tests/msc2716_test.go | 29 +++++++++++++++++++++++++---- 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/internal/client/client.go b/internal/client/client.go index 7ba68423..8df67547 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -578,7 +578,7 @@ func NewLoggedClient(t *testing.T, hsName string, cli *http.Client) *http.Client t.Helper() if cli == nil { cli = &http.Client{ - Timeout: 30 * time.Second, + Timeout: 200 * time.Second, } } transport := cli.Transport diff --git a/tests/msc2716_test.go b/tests/msc2716_test.go index 4f63afe6..27226881 100644 --- a/tests/msc2716_test.go +++ b/tests/msc2716_test.go @@ -17,6 +17,7 @@ import ( "testing" "time" + "github.com/sirupsen/logrus" "github.com/tidwall/gjson" "github.com/matrix-org/complement/internal/b" @@ -1043,8 +1044,8 @@ func TestImportHistoricalMessages(t *testing.T) { // could pick for other homeserver implementations so I just did 10+1 to // make sure it also worked in that case. //numBatches := 11 - numBatches := 2 - numHistoricalMessagesPerBatch := 100 + numBatches := 1 + numHistoricalMessagesPerBatch := 2 // wait X number of ms to ensure that the timestamp changes enough for // each of the historical messages we try to import later time.Sleep(time.Duration(numBatches*numHistoricalMessagesPerBatch) * timeBetweenMessages) @@ -1311,7 +1312,7 @@ func paginateUntilMessageCheckOff(t *testing.T, c *client.CSAPI, roomID string, } for { - if time.Since(start) > c.SyncUntilTimeout { + if time.Since(start) > 200*c.SyncUntilTimeout { t.Fatalf( "paginateUntilMessageCheckOff timed out. %s", generateErrorMesssageInfo(), @@ -1325,7 +1326,21 @@ func paginateUntilMessageCheckOff(t *testing.T, c *client.CSAPI, roomID string, })) callCounter++ messsageResBody := client.ParseJSON(t, messagesRes) - messageResEnd = client.GetJSONFieldStr(t, messsageResBody, "end") + eventsJson := gjson.GetBytes(messsageResBody, "chunk").Array() + + var firstEvent gjson.Result + var lastEvent gjson.Result + if len(eventsJson) > 0 { + firstEvent = eventsJson[0] + lastEvent = eventsJson[len(eventsJson)-1] + } + logrus.WithFields(logrus.Fields{ + "events": len(eventsJson), + "firstEvent": fmt.Sprintf("%s (%s)", firstEvent.Get("event_id"), firstEvent.Get("type")), + "lastEvent": fmt.Sprintf("%s (%s)", lastEvent.Get("event_id"), lastEvent.Get("type")), + "start": gjson.GetBytes(messsageResBody, "start").Str, + "end": gjson.GetBytes(messsageResBody, "end").Str, + }).Error("Number of events from `/messages`") // Since the original body can only be read once, create a new one from the body bytes we just read messagesRes.Body = ioutil.NopCloser(bytes.NewBuffer(messsageResBody)) @@ -1365,6 +1380,12 @@ func paginateUntilMessageCheckOff(t *testing.T, c *client.CSAPI, roomID string, if len(workingExpectedEventIDMap) == 0 { return } + + // Since this will throw an error if they key does not exist, + // do this at the end of the loop. It's a valid scenario to be at the end + // of the room and have no more to paginate so we want the `return` when + // we've found all of the expected events to have a chance to run first. + messageResEnd = client.GetJSONFieldStr(t, messsageResBody, "end") } } From 230c46eab27e80734fbefbd5b4d2a6f8163c8a9d Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 29 Sep 2022 04:11:15 -0500 Subject: [PATCH 15/15] Mulitply timeout by the number of requests we expect --- tests/msc2716_test.go | 28 +++++++--------------------- 1 file changed, 7 insertions(+), 21 deletions(-) diff --git a/tests/msc2716_test.go b/tests/msc2716_test.go index 27226881..1e87c212 100644 --- a/tests/msc2716_test.go +++ b/tests/msc2716_test.go @@ -17,7 +17,6 @@ import ( "testing" "time" - "github.com/sirupsen/logrus" "github.com/tidwall/gjson" "github.com/matrix-org/complement/internal/b" @@ -1043,9 +1042,8 @@ func TestImportHistoricalMessages(t *testing.T) { // backfill extremities to 5. 10 also seemed like a round number someone // could pick for other homeserver implementations so I just did 10+1 to // make sure it also worked in that case. - //numBatches := 11 - numBatches := 1 - numHistoricalMessagesPerBatch := 2 + numBatches := 11 + numHistoricalMessagesPerBatch := 100 // wait X number of ms to ensure that the timestamp changes enough for // each of the historical messages we try to import later time.Sleep(time.Duration(numBatches*numHistoricalMessagesPerBatch) * timeBetweenMessages) @@ -1077,7 +1075,6 @@ func TestImportHistoricalMessages(t *testing.T) { // Make sure we see all of the historical messages expectedEventIDs = append(expectedEventIDs, client.GetJSONFieldStringArray(t, batchSendResBody, "event_ids")...) // We should not find any historical state between the batches of messages - denyListEventIDs = append(denyListEventIDs, client.GetJSONFieldStringArray(t, batchSendResBody, "state_event_ids")...) nextBatchID = client.GetJSONFieldStr(t, batchSendResBody, "next_batch_id") // Grab the base insertion event ID to reference later in the marker event @@ -1311,8 +1308,12 @@ func paginateUntilMessageCheckOff(t *testing.T, c *client.CSAPI, roomID string, ) } + // We grab 100 events per `/messages` request so it should only take us + // (total / 100) requests to see everything. Add +1 to add some slack + // for things like state events. + expectedNumberOfMessagesRequests := (len(expectedEventIDs) / 100) + 1 for { - if time.Since(start) > 200*c.SyncUntilTimeout { + if time.Since(start) > time.Duration(expectedNumberOfMessagesRequests)*c.SyncUntilTimeout { t.Fatalf( "paginateUntilMessageCheckOff timed out. %s", generateErrorMesssageInfo(), @@ -1326,21 +1327,6 @@ func paginateUntilMessageCheckOff(t *testing.T, c *client.CSAPI, roomID string, })) callCounter++ messsageResBody := client.ParseJSON(t, messagesRes) - eventsJson := gjson.GetBytes(messsageResBody, "chunk").Array() - - var firstEvent gjson.Result - var lastEvent gjson.Result - if len(eventsJson) > 0 { - firstEvent = eventsJson[0] - lastEvent = eventsJson[len(eventsJson)-1] - } - logrus.WithFields(logrus.Fields{ - "events": len(eventsJson), - "firstEvent": fmt.Sprintf("%s (%s)", firstEvent.Get("event_id"), firstEvent.Get("type")), - "lastEvent": fmt.Sprintf("%s (%s)", lastEvent.Get("event_id"), lastEvent.Get("type")), - "start": gjson.GetBytes(messsageResBody, "start").Str, - "end": gjson.GetBytes(messsageResBody, "end").Str, - }).Error("Number of events from `/messages`") // Since the original body can only be read once, create a new one from the body bytes we just read messagesRes.Body = ioutil.NopCloser(bytes.NewBuffer(messsageResBody))