From 7679e69405e40af79bc6efaf1ac7344c18381fb5 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 1 Dec 2021 18:13:20 -0600 Subject: [PATCH 1/2] Send marker events in all federation tests The marker event is important to let the remote homeservers know there are historical messages at the given insertion event it points to. It's just a lucky happen-stance that the tests passed before without the need for the marker event. Marker events are required/expected in order for the history to be properly visible on remote homservers. --- tests/msc2716_test.go | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/tests/msc2716_test.go b/tests/msc2716_test.go index db5317f6..75beab9f 100644 --- a/tests/msc2716_test.go +++ b/tests/msc2716_test.go @@ -575,6 +575,11 @@ func TestImportHistoricalMessages(t *testing.T) { ) batchSendResBody := client.ParseJSON(t, batchSendRes) historicalEventIDs := client.GetJSONFieldStringArray(t, batchSendResBody, "event_ids") + baseInsertionEventID := client.GetJSONFieldStr(t, batchSendResBody, "base_insertion_event_id") + + // Send the marker event which lets remote homeservers know there are + // some historical messages back at the given insertion event. + sendMarkerAndEnsureBackfilled(t, as, alice, roomID, baseInsertionEventID) // Join the room from a remote homeserver after the historical messages were sent remoteCharlie.JoinRoom(t, roomID, []string{"hs1"}) @@ -654,6 +659,10 @@ func TestImportHistoricalMessages(t *testing.T) { batchSendResBody := client.ParseJSON(t, batchSendRes) historicalEventIDs := client.GetJSONFieldStringArray(t, batchSendResBody, "event_ids") + // Send the marker event which lets remote homeservers know there are + // some historical messages back at the given insertion event. + sendMarkerAndEnsureBackfilled(t, as, alice, roomID, insertionEventID) + // Join the room from a remote homeserver after the historical messages were sent remoteCharlie.JoinRoom(t, roomID, []string{"hs1"}) @@ -756,7 +765,8 @@ func TestImportHistoricalMessages(t *testing.T) { }, }) - // Send the marker event + // Send the marker event which lets remote homeservers know there are + // some historical messages back at the given insertion event. sendMarkerAndEnsureBackfilled(t, as, remoteCharlie, roomID, baseInsertionEventID) // FIXME: In the future, we should probably replace the following logic @@ -849,7 +859,8 @@ func TestImportHistoricalMessages(t *testing.T) { }, }) - // Send the marker event + // Send the marker event which lets remote homeservers know there are + // some historical messages back at the given insertion event. sendMarkerAndEnsureBackfilled(t, as, remoteCharlie, roomID, baseInsertionEventID) // FIXME: In the future, we should probably replace the following logic @@ -959,7 +970,8 @@ func TestImportHistoricalMessages(t *testing.T) { batchEventID := client.GetJSONFieldStr(t, batchSendResBody, "batch_event_id") baseInsertionEventID := client.GetJSONFieldStr(t, batchSendResBody, "base_insertion_event_id") - // Send the marker event + // Send the marker event which lets remote homeservers know there are + // some historical messages back at the given insertion event. markerEventID := sendMarkerAndEnsureBackfilled(t, as, alice, roomID, baseInsertionEventID) redactEventID(t, alice, roomID, insertionEventID, 403) @@ -1110,11 +1122,11 @@ func ensureVirtualUserRegistered(t *testing.T, c *client.CSAPI, virtualUserLocal } } +// Send the marker event which lets remote homeservers know there are +// some historical messages back at the given insertion event. func sendMarkerAndEnsureBackfilled(t *testing.T, as *client.CSAPI, c *client.CSAPI, roomID, insertionEventID string) (markerEventID string) { t.Helper() - // Send a marker event to let all of the homeservers know about the - // insertion point where all of the historical messages are at markerEvent := b.Event{ Type: markerEventType, Content: map[string]interface{}{ From e71e02cf2b0d49b0d3c7ea1f953b95d69e282f5f Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 1 Dec 2021 18:16:07 -0600 Subject: [PATCH 2/2] Put the federation tests under their own section be able to filter --- tests/msc2716_test.go | 580 +++++++++++++++++++++--------------------- 1 file changed, 291 insertions(+), 289 deletions(-) diff --git a/tests/msc2716_test.go b/tests/msc2716_test.go index 75beab9f..e636645b 100644 --- a/tests/msc2716_test.go +++ b/tests/msc2716_test.go @@ -549,338 +549,340 @@ func TestImportHistoricalMessages(t *testing.T) { t.Skip("Skipping until implemented") }) - t.Run("Historical messages are visible when joining on federated server - auto-generated base insertion event", func(t *testing.T) { - t.Parallel() - - roomID := as.CreateRoom(t, createPublicRoomOpts) - alice.JoinRoom(t, roomID, nil) - - eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) - eventIdBefore := eventIDsBefore[0] - timeAfterEventBefore := time.Now() + t.Run("Federation", func(t *testing.T) { + t.Run("Historical messages are visible when joining on federated server - auto-generated base insertion event", func(t *testing.T) { + t.Parallel() - // eventIDsAfter - createMessagesInRoom(t, alice, roomID, 3) + roomID := as.CreateRoom(t, createPublicRoomOpts) + alice.JoinRoom(t, roomID, nil) - batchSendRes := batchSendHistoricalMessages( - t, - as, - roomID, - eventIdBefore, - "", - createJoinStateEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore), - createMessageEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore, 2), - // Status - 200, - ) - batchSendResBody := client.ParseJSON(t, batchSendRes) - historicalEventIDs := client.GetJSONFieldStringArray(t, batchSendResBody, "event_ids") - baseInsertionEventID := client.GetJSONFieldStr(t, batchSendResBody, "base_insertion_event_id") + eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) + eventIdBefore := eventIDsBefore[0] + timeAfterEventBefore := time.Now() - // Send the marker event which lets remote homeservers know there are - // some historical messages back at the given insertion event. - sendMarkerAndEnsureBackfilled(t, as, alice, roomID, baseInsertionEventID) + // eventIDsAfter + createMessagesInRoom(t, alice, roomID, 3) - // Join the room from a remote homeserver after the historical messages were sent - remoteCharlie.JoinRoom(t, roomID, []string{"hs1"}) + batchSendRes := batchSendHistoricalMessages( + t, + as, + roomID, + eventIdBefore, + "", + createJoinStateEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore), + createMessageEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore, 2), + // Status + 200, + ) + batchSendResBody := client.ParseJSON(t, batchSendRes) + historicalEventIDs := client.GetJSONFieldStringArray(t, batchSendResBody, "event_ids") + baseInsertionEventID := client.GetJSONFieldStr(t, batchSendResBody, "base_insertion_event_id") - // Make sure all of the events have been backfilled - fetchUntilMessagesResponseHas(t, remoteCharlie, roomID, func(ev gjson.Result) bool { - if ev.Get("event_id").Str == eventIdBefore { - return true - } + // Send the marker event which lets remote homeservers know there are + // some historical messages back at the given insertion event. + sendMarkerAndEnsureBackfilled(t, as, alice, roomID, baseInsertionEventID) - return false - }) + // Join the room from a remote homeserver after the historical messages were sent + remoteCharlie.JoinRoom(t, roomID, []string{"hs1"}) - // FIXME: In the future, we should probably replace the following logic - // with `validateBatchSendRes` to re-use and have some more robust - // assertion logic here. We're currently not using it because the message - // order isn't quite perfect when a remote federated homeserver gets - // backfilled. - // validateBatchSendRes(t, remoteCharlie, roomID, batchSendRes, false) - messagesRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ - "dir": []string{"b"}, - "limit": []string{"100"}, - })) + // Make sure all of the events have been backfilled + fetchUntilMessagesResponseHas(t, remoteCharlie, roomID, func(ev gjson.Result) bool { + if ev.Get("event_id").Str == eventIdBefore { + return true + } - must.MatchResponse(t, messagesRes, match.HTTPResponse{ - JSON: []match.JSON{ - match.JSONCheckOffAllowUnwanted("chunk", makeInterfaceSlice(historicalEventIDs), func(r gjson.Result) interface{} { - return r.Get("event_id").Str - }, nil), - }, + return false + }) + + // FIXME: In the future, we should probably replace the following logic + // with `validateBatchSendRes` to re-use and have some more robust + // assertion logic here. We're currently not using it because the message + // order isn't quite perfect when a remote federated homeserver gets + // backfilled. + // validateBatchSendRes(t, remoteCharlie, roomID, batchSendRes, false) + messagesRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ + "dir": []string{"b"}, + "limit": []string{"100"}, + })) + + must.MatchResponse(t, messagesRes, match.HTTPResponse{ + JSON: []match.JSON{ + match.JSONCheckOffAllowUnwanted("chunk", makeInterfaceSlice(historicalEventIDs), func(r gjson.Result) interface{} { + return r.Get("event_id").Str + }, nil), + }, + }) }) - }) - - t.Run("Historical messages are visible when joining on federated server - pre-made insertion event", func(t *testing.T) { - t.Parallel() - - roomID := as.CreateRoom(t, createPublicRoomOpts) - alice.JoinRoom(t, roomID, nil) - eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) - eventIdBefore := eventIDsBefore[0] - timeAfterEventBefore := time.Now() - - // Create insertion event in the normal DAG - batchID := "mynextBatchID123" - insertionEvent := b.Event{ - Type: insertionEventType, - Content: map[string]interface{}{ - nextBatchIDContentField: batchID, - historicalContentField: true, - }, - } - // We can't use as.SendEventSynced(...) because application services can't use the /sync API - txnId := getTxnID("sendInsertionAndEnsureBackfilled-txn") - insertionSendRes := as.MustDoFunc(t, "PUT", []string{"_matrix", "client", "r0", "rooms", roomID, "send", insertionEvent.Type, txnId}, client.WithJSONBody(t, insertionEvent.Content)) - insertionSendBody := client.ParseJSON(t, insertionSendRes) - insertionEventID := client.GetJSONFieldStr(t, insertionSendBody, "event_id") - // Make sure the insertion event has reached the homeserver - alice.SyncUntilTimelineHas(t, roomID, func(ev gjson.Result) bool { - return ev.Get("event_id").Str == insertionEventID - }) + t.Run("Historical messages are visible when joining on federated server - pre-made insertion event", func(t *testing.T) { + t.Parallel() - // eventIDsAfter - createMessagesInRoom(t, alice, roomID, 3) + roomID := as.CreateRoom(t, createPublicRoomOpts) + alice.JoinRoom(t, roomID, nil) - batchSendRes := batchSendHistoricalMessages( - t, - as, - roomID, - eventIdBefore, - batchID, - createJoinStateEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore), - createMessageEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore, 2), - // Status - 200, - ) - batchSendResBody := client.ParseJSON(t, batchSendRes) - historicalEventIDs := client.GetJSONFieldStringArray(t, batchSendResBody, "event_ids") + eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) + eventIdBefore := eventIDsBefore[0] + timeAfterEventBefore := time.Now() - // Send the marker event which lets remote homeservers know there are - // some historical messages back at the given insertion event. - sendMarkerAndEnsureBackfilled(t, as, alice, roomID, insertionEventID) + // Create insertion event in the normal DAG + batchID := "mynextBatchID123" + insertionEvent := b.Event{ + Type: insertionEventType, + Content: map[string]interface{}{ + nextBatchIDContentField: batchID, + historicalContentField: true, + }, + } + // We can't use as.SendEventSynced(...) because application services can't use the /sync API + txnId := getTxnID("sendInsertionAndEnsureBackfilled-txn") + insertionSendRes := as.MustDoFunc(t, "PUT", []string{"_matrix", "client", "r0", "rooms", roomID, "send", insertionEvent.Type, txnId}, client.WithJSONBody(t, insertionEvent.Content)) + insertionSendBody := client.ParseJSON(t, insertionSendRes) + insertionEventID := client.GetJSONFieldStr(t, insertionSendBody, "event_id") + // Make sure the insertion event has reached the homeserver + alice.SyncUntilTimelineHas(t, roomID, func(ev gjson.Result) bool { + return ev.Get("event_id").Str == insertionEventID + }) + + // eventIDsAfter + createMessagesInRoom(t, alice, roomID, 3) - // Join the room from a remote homeserver after the historical messages were sent - remoteCharlie.JoinRoom(t, roomID, []string{"hs1"}) + batchSendRes := batchSendHistoricalMessages( + t, + as, + roomID, + eventIdBefore, + batchID, + createJoinStateEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore), + createMessageEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore, 2), + // Status + 200, + ) + batchSendResBody := client.ParseJSON(t, batchSendRes) + historicalEventIDs := client.GetJSONFieldStringArray(t, batchSendResBody, "event_ids") - // Make sure all of the events have been backfilled - fetchUntilMessagesResponseHas(t, remoteCharlie, roomID, func(ev gjson.Result) bool { - if ev.Get("event_id").Str == eventIdBefore { - return true - } + // Send the marker event which lets remote homeservers know there are + // some historical messages back at the given insertion event. + sendMarkerAndEnsureBackfilled(t, as, alice, roomID, insertionEventID) - return false - }) + // Join the room from a remote homeserver after the historical messages were sent + remoteCharlie.JoinRoom(t, roomID, []string{"hs1"}) - // FIXME: In the future, we should probably replace the following logic - // with `validateBatchSendRes` to re-use and have some more robust - // assertion logic here. We're currently not using it because the message - // order isn't quite perfect when a remote federated homeserver gets - // backfilled. - // validateBatchSendRes(t, remoteCharlie, roomID, batchSendRes, false) - messagesRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ - "dir": []string{"b"}, - "limit": []string{"100"}, - })) + // Make sure all of the events have been backfilled + fetchUntilMessagesResponseHas(t, remoteCharlie, roomID, func(ev gjson.Result) bool { + if ev.Get("event_id").Str == eventIdBefore { + return true + } - must.MatchResponse(t, messagesRes, match.HTTPResponse{ - JSON: []match.JSON{ - match.JSONCheckOffAllowUnwanted("chunk", makeInterfaceSlice(historicalEventIDs), func(r gjson.Result) interface{} { - return r.Get("event_id").Str - }, nil), - }, + return false + }) + + // FIXME: In the future, we should probably replace the following logic + // with `validateBatchSendRes` to re-use and have some more robust + // assertion logic here. We're currently not using it because the message + // order isn't quite perfect when a remote federated homeserver gets + // backfilled. + // validateBatchSendRes(t, remoteCharlie, roomID, batchSendRes, false) + messagesRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ + "dir": []string{"b"}, + "limit": []string{"100"}, + })) + + must.MatchResponse(t, messagesRes, match.HTTPResponse{ + JSON: []match.JSON{ + match.JSONCheckOffAllowUnwanted("chunk", makeInterfaceSlice(historicalEventIDs), func(r gjson.Result) interface{} { + return r.Get("event_id").Str + }, nil), + }, + }) }) - }) - t.Run("Historical messages are visible when already joined on federated server", func(t *testing.T) { - t.Parallel() + t.Run("Historical messages are visible when already joined on federated server", func(t *testing.T) { + t.Parallel() - roomID := as.CreateRoom(t, createPublicRoomOpts) - alice.JoinRoom(t, roomID, nil) + roomID := as.CreateRoom(t, createPublicRoomOpts) + alice.JoinRoom(t, roomID, nil) - // Join the room from a remote homeserver before any historical messages are sent - remoteCharlie.JoinRoom(t, roomID, []string{"hs1"}) + // Join the room from a remote homeserver before any historical messages are sent + remoteCharlie.JoinRoom(t, roomID, []string{"hs1"}) - eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) - eventIdBefore := eventIDsBefore[0] - timeAfterEventBefore := time.Now() + eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) + eventIdBefore := eventIDsBefore[0] + timeAfterEventBefore := time.Now() - // eventIDsAfter - createMessagesInRoom(t, alice, roomID, 10) + // eventIDsAfter + createMessagesInRoom(t, alice, roomID, 10) - // Mimic scrollback just through the latest messages - remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ - "dir": []string{"b"}, - // Limited so we can only see a portion of the latest messages - "limit": []string{"5"}, - })) + // Mimic scrollback just through the latest messages + remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ + "dir": []string{"b"}, + // Limited so we can only see a portion of the latest messages + "limit": []string{"5"}, + })) - numMessagesSent := 2 - batchSendRes := batchSendHistoricalMessages( - t, - as, - roomID, - eventIdBefore, - "", - createJoinStateEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore), - createMessageEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore, numMessagesSent), - // Status - 200, - ) - batchSendResBody := client.ParseJSON(t, batchSendRes) - historicalEventIDs := client.GetJSONFieldStringArray(t, batchSendResBody, "event_ids") - baseInsertionEventID := client.GetJSONFieldStr(t, batchSendResBody, "base_insertion_event_id") + numMessagesSent := 2 + batchSendRes := batchSendHistoricalMessages( + t, + as, + roomID, + eventIdBefore, + "", + createJoinStateEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore), + createMessageEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore, numMessagesSent), + // Status + 200, + ) + batchSendResBody := client.ParseJSON(t, batchSendRes) + historicalEventIDs := client.GetJSONFieldStringArray(t, batchSendResBody, "event_ids") + baseInsertionEventID := client.GetJSONFieldStr(t, batchSendResBody, "base_insertion_event_id") - if len(historicalEventIDs) != numMessagesSent { - t.Fatalf("Expected %d event_ids in the response that correspond to the %d events we sent in the request but saw %d: %s", numMessagesSent, numMessagesSent, len(historicalEventIDs), historicalEventIDs) - } + if len(historicalEventIDs) != numMessagesSent { + t.Fatalf("Expected %d event_ids in the response that correspond to the %d events we sent in the request but saw %d: %s", numMessagesSent, numMessagesSent, len(historicalEventIDs), historicalEventIDs) + } - beforeMarkerMessagesRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ - "dir": []string{"b"}, - "limit": []string{"100"}, - })) - beforeMarkerMesssageResBody := client.ParseJSON(t, beforeMarkerMessagesRes) - eventDebugStringsFromBeforeMarkerResponse := mustGetRelevantEventDebugStringsFromMessagesResponse(t, "chunk", beforeMarkerMesssageResBody, relevantToScrollbackEventFilter) - // Since the original body can only be read once, create a new one from the body bytes we just read - beforeMarkerMessagesRes.Body = ioutil.NopCloser(bytes.NewBuffer(beforeMarkerMesssageResBody)) - - // Make sure the history isn't visible before we expect it to be there. - // This is to avoid some bug in the homeserver using some unknown - // mechanism to distribute the historical messages to other homeservers. - must.MatchResponse(t, beforeMarkerMessagesRes, match.HTTPResponse{ - JSON: []match.JSON{ - match.JSONArrayEach("chunk", func(r gjson.Result) error { - // Throw if we find one of the historical events in the message response - for _, historicalEventID := range historicalEventIDs { - if r.Get("event_id").Str == historicalEventID { - return fmt.Errorf("Historical event (%s) found on remote homeserver before marker event was sent out\nmessage response (%d): %v\nhistoricalEventIDs (%d): %v", historicalEventID, len(eventDebugStringsFromBeforeMarkerResponse), eventDebugStringsFromBeforeMarkerResponse, len(historicalEventIDs), historicalEventIDs) + beforeMarkerMessagesRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ + "dir": []string{"b"}, + "limit": []string{"100"}, + })) + beforeMarkerMesssageResBody := client.ParseJSON(t, beforeMarkerMessagesRes) + eventDebugStringsFromBeforeMarkerResponse := mustGetRelevantEventDebugStringsFromMessagesResponse(t, "chunk", beforeMarkerMesssageResBody, relevantToScrollbackEventFilter) + // Since the original body can only be read once, create a new one from the body bytes we just read + beforeMarkerMessagesRes.Body = ioutil.NopCloser(bytes.NewBuffer(beforeMarkerMesssageResBody)) + + // Make sure the history isn't visible before we expect it to be there. + // This is to avoid some bug in the homeserver using some unknown + // mechanism to distribute the historical messages to other homeservers. + must.MatchResponse(t, beforeMarkerMessagesRes, match.HTTPResponse{ + JSON: []match.JSON{ + match.JSONArrayEach("chunk", func(r gjson.Result) error { + // Throw if we find one of the historical events in the message response + for _, historicalEventID := range historicalEventIDs { + if r.Get("event_id").Str == historicalEventID { + return fmt.Errorf("Historical event (%s) found on remote homeserver before marker event was sent out\nmessage response (%d): %v\nhistoricalEventIDs (%d): %v", historicalEventID, len(eventDebugStringsFromBeforeMarkerResponse), eventDebugStringsFromBeforeMarkerResponse, len(historicalEventIDs), historicalEventIDs) + } } - } - - return nil - }), - }, - }) - // Send the marker event which lets remote homeservers know there are - // some historical messages back at the given insertion event. - sendMarkerAndEnsureBackfilled(t, as, remoteCharlie, roomID, baseInsertionEventID) - - // FIXME: In the future, we should probably replace the following logic - // with `validateBatchSendRes` to re-use and have some more robust - // assertion logic here. We're currently not using it because the message - // order isn't quite perfect when a remote federated homeserver gets - // backfilled. - // validateBatchSendRes(t, remoteCharlie, roomID, batchSendRes, false) - remoteMessagesRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ - "dir": []string{"b"}, - "limit": []string{"100"}, - })) + return nil + }), + }, + }) - // Make sure all of the historical messages are visible when we scrollback again - must.MatchResponse(t, remoteMessagesRes, match.HTTPResponse{ - JSON: []match.JSON{ - match.JSONCheckOffAllowUnwanted("chunk", makeInterfaceSlice(historicalEventIDs), func(r gjson.Result) interface{} { - return r.Get("event_id").Str - }, nil), - }, + // Send the marker event which lets remote homeservers know there are + // some historical messages back at the given insertion event. + sendMarkerAndEnsureBackfilled(t, as, remoteCharlie, roomID, baseInsertionEventID) + + // FIXME: In the future, we should probably replace the following logic + // with `validateBatchSendRes` to re-use and have some more robust + // assertion logic here. We're currently not using it because the message + // order isn't quite perfect when a remote federated homeserver gets + // backfilled. + // validateBatchSendRes(t, remoteCharlie, roomID, batchSendRes, false) + remoteMessagesRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ + "dir": []string{"b"}, + "limit": []string{"100"}, + })) + + // Make sure all of the historical messages are visible when we scrollback again + must.MatchResponse(t, remoteMessagesRes, match.HTTPResponse{ + JSON: []match.JSON{ + match.JSONCheckOffAllowUnwanted("chunk", makeInterfaceSlice(historicalEventIDs), func(r gjson.Result) interface{} { + return r.Get("event_id").Str + }, nil), + }, + }) }) - }) - t.Run("When messages have already been scrolled back through, new historical messages are visible in next scroll back on federated server", func(t *testing.T) { - t.Parallel() + t.Run("When messages have already been scrolled back through, new historical messages are visible in next scroll back on federated server", func(t *testing.T) { + t.Parallel() - roomID := as.CreateRoom(t, createPublicRoomOpts) - alice.JoinRoom(t, roomID, nil) + roomID := as.CreateRoom(t, createPublicRoomOpts) + alice.JoinRoom(t, roomID, nil) - // Join the room from a remote homeserver before any historical messages are sent - remoteCharlie.JoinRoom(t, roomID, []string{"hs1"}) + // Join the room from a remote homeserver before any historical messages are sent + remoteCharlie.JoinRoom(t, roomID, []string{"hs1"}) - eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) - eventIdBefore := eventIDsBefore[0] - timeAfterEventBefore := time.Now() + eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) + eventIdBefore := eventIDsBefore[0] + timeAfterEventBefore := time.Now() - // eventIDsAfter - createMessagesInRoom(t, alice, roomID, 3) + // eventIDsAfter + createMessagesInRoom(t, alice, roomID, 3) - // Mimic scrollback to all of the messages - // scrollbackMessagesRes - remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ - "dir": []string{"b"}, - "limit": []string{"100"}, - })) + // Mimic scrollback to all of the messages + // scrollbackMessagesRes + remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ + "dir": []string{"b"}, + "limit": []string{"100"}, + })) - // Historical messages are inserted where we have already scrolled back to - numMessagesSent := 2 - batchSendRes := batchSendHistoricalMessages( - t, - as, - roomID, - eventIdBefore, - "", - createJoinStateEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore), - createMessageEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore, numMessagesSent), - // Status - 200, - ) - batchSendResBody := client.ParseJSON(t, batchSendRes) - historicalEventIDs := client.GetJSONFieldStringArray(t, batchSendResBody, "event_ids") - baseInsertionEventID := client.GetJSONFieldStr(t, batchSendResBody, "base_insertion_event_id") + // Historical messages are inserted where we have already scrolled back to + numMessagesSent := 2 + batchSendRes := batchSendHistoricalMessages( + t, + as, + roomID, + eventIdBefore, + "", + createJoinStateEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore), + createMessageEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore, numMessagesSent), + // Status + 200, + ) + batchSendResBody := client.ParseJSON(t, batchSendRes) + historicalEventIDs := client.GetJSONFieldStringArray(t, batchSendResBody, "event_ids") + baseInsertionEventID := client.GetJSONFieldStr(t, batchSendResBody, "base_insertion_event_id") - if len(historicalEventIDs) != numMessagesSent { - t.Fatalf("Expected %d event_ids in the response that correspond to the %d events we sent in the request but saw %d: %s", numMessagesSent, numMessagesSent, len(historicalEventIDs), historicalEventIDs) - } + if len(historicalEventIDs) != numMessagesSent { + t.Fatalf("Expected %d event_ids in the response that correspond to the %d events we sent in the request but saw %d: %s", numMessagesSent, numMessagesSent, len(historicalEventIDs), historicalEventIDs) + } - beforeMarkerMessagesRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ - "dir": []string{"b"}, - "limit": []string{"100"}, - })) - beforeMarkerMesssageResBody := client.ParseJSON(t, beforeMarkerMessagesRes) - eventDebugStringsFromBeforeMarkerResponse := mustGetRelevantEventDebugStringsFromMessagesResponse(t, "chunk", beforeMarkerMesssageResBody, relevantToScrollbackEventFilter) - // Since the original body can only be read once, create a new one from the body bytes we just read - beforeMarkerMessagesRes.Body = ioutil.NopCloser(bytes.NewBuffer(beforeMarkerMesssageResBody)) - // Make sure the history isn't visible before we expect it to be there. - // This is to avoid some bug in the homeserver using some unknown - // mechanism to distribute the historical messages to other homeservers. - must.MatchResponse(t, beforeMarkerMessagesRes, match.HTTPResponse{ - JSON: []match.JSON{ - match.JSONArrayEach("chunk", func(r gjson.Result) error { - // Throw if we find one of the historical events in the message response - for _, historicalEventID := range historicalEventIDs { - if r.Get("event_id").Str == historicalEventID { - return fmt.Errorf("Historical event (%s) found on remote homeserver before marker event was sent out\nmessage response (%d): %v\nhistoricalEventIDs (%d): %v", historicalEventID, len(eventDebugStringsFromBeforeMarkerResponse), eventDebugStringsFromBeforeMarkerResponse, len(historicalEventIDs), historicalEventIDs) + beforeMarkerMessagesRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ + "dir": []string{"b"}, + "limit": []string{"100"}, + })) + beforeMarkerMesssageResBody := client.ParseJSON(t, beforeMarkerMessagesRes) + eventDebugStringsFromBeforeMarkerResponse := mustGetRelevantEventDebugStringsFromMessagesResponse(t, "chunk", beforeMarkerMesssageResBody, relevantToScrollbackEventFilter) + // Since the original body can only be read once, create a new one from the body bytes we just read + beforeMarkerMessagesRes.Body = ioutil.NopCloser(bytes.NewBuffer(beforeMarkerMesssageResBody)) + // Make sure the history isn't visible before we expect it to be there. + // This is to avoid some bug in the homeserver using some unknown + // mechanism to distribute the historical messages to other homeservers. + must.MatchResponse(t, beforeMarkerMessagesRes, match.HTTPResponse{ + JSON: []match.JSON{ + match.JSONArrayEach("chunk", func(r gjson.Result) error { + // Throw if we find one of the historical events in the message response + for _, historicalEventID := range historicalEventIDs { + if r.Get("event_id").Str == historicalEventID { + return fmt.Errorf("Historical event (%s) found on remote homeserver before marker event was sent out\nmessage response (%d): %v\nhistoricalEventIDs (%d): %v", historicalEventID, len(eventDebugStringsFromBeforeMarkerResponse), eventDebugStringsFromBeforeMarkerResponse, len(historicalEventIDs), historicalEventIDs) + } } - } - return nil - }), - }, - }) + return nil + }), + }, + }) - // Send the marker event which lets remote homeservers know there are - // some historical messages back at the given insertion event. - sendMarkerAndEnsureBackfilled(t, as, remoteCharlie, roomID, baseInsertionEventID) - - // FIXME: In the future, we should probably replace the following logic - // with `validateBatchSendRes` to re-use and have some more robust - // assertion logic here. We're currently not using it because the message - // order isn't quite perfect when a remote federated homeserver gets - // backfilled. - // validateBatchSendRes(t, remoteCharlie, roomID, batchSendRes, false) - remoteMessagesRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ - "dir": []string{"b"}, - "limit": []string{"100"}, - })) - - // Make sure all of the historical messages are visible when we scrollback again - must.MatchResponse(t, remoteMessagesRes, match.HTTPResponse{ - JSON: []match.JSON{ - match.JSONCheckOffAllowUnwanted("chunk", makeInterfaceSlice(historicalEventIDs), func(r gjson.Result) interface{} { - return r.Get("event_id").Str - }, nil), - }, + // Send the marker event which lets remote homeservers know there are + // some historical messages back at the given insertion event. + sendMarkerAndEnsureBackfilled(t, as, remoteCharlie, roomID, baseInsertionEventID) + + // FIXME: In the future, we should probably replace the following logic + // with `validateBatchSendRes` to re-use and have some more robust + // assertion logic here. We're currently not using it because the message + // order isn't quite perfect when a remote federated homeserver gets + // backfilled. + // validateBatchSendRes(t, remoteCharlie, roomID, batchSendRes, false) + remoteMessagesRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ + "dir": []string{"b"}, + "limit": []string{"100"}, + })) + + // Make sure all of the historical messages are visible when we scrollback again + must.MatchResponse(t, remoteMessagesRes, match.HTTPResponse{ + JSON: []match.JSON{ + match.JSONCheckOffAllowUnwanted("chunk", makeInterfaceSlice(historicalEventIDs), func(r gjson.Result) interface{} { + return r.Get("event_id").Str + }, nil), + }, + }) }) })