diff --git a/tests/msc2716_test.go b/tests/msc2716_test.go index 81295a30..f6787294 100644 --- a/tests/msc2716_test.go +++ b/tests/msc2716_test.go @@ -549,338 +549,340 @@ func TestImportHistoricalMessages(t *testing.T) { t.Skip("Skipping until implemented") }) - t.Run("Historical messages are visible when joining on federated server - auto-generated base insertion event", func(t *testing.T) { - t.Parallel() - - roomID := as.CreateRoom(t, createPublicRoomOpts) - alice.JoinRoom(t, roomID, nil) - - eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) - eventIdBefore := eventIDsBefore[0] - timeAfterEventBefore := time.Now() + t.Run("Federation", func(t *testing.T) { + t.Run("Historical messages are visible when joining on federated server - auto-generated base insertion event", func(t *testing.T) { + t.Parallel() - // eventIDsAfter - createMessagesInRoom(t, alice, roomID, 3) + roomID := as.CreateRoom(t, createPublicRoomOpts) + alice.JoinRoom(t, roomID, nil) - batchSendRes := batchSendHistoricalMessages( - t, - as, - roomID, - eventIdBefore, - "", - createJoinStateEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore), - createMessageEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore, 2), - // Status - 200, - ) - batchSendResBody := client.ParseJSON(t, batchSendRes) - historicalEventIDs := client.GetJSONFieldStringArray(t, batchSendResBody, "event_ids") - baseInsertionEventID := client.GetJSONFieldStr(t, batchSendResBody, "base_insertion_event_id") + eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) + eventIdBefore := eventIDsBefore[0] + timeAfterEventBefore := time.Now() - // Send the marker event which lets remote homeservers know there are - // some historical messages back at the given insertion event. - sendMarkerAndEnsureBackfilled(t, as, alice, roomID, baseInsertionEventID) + // eventIDsAfter + createMessagesInRoom(t, alice, roomID, 3) - // Join the room from a remote homeserver after the historical messages were sent - remoteCharlie.JoinRoom(t, roomID, []string{"hs1"}) + batchSendRes := batchSendHistoricalMessages( + t, + as, + roomID, + eventIdBefore, + "", + createJoinStateEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore), + createMessageEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore, 2), + // Status + 200, + ) + batchSendResBody := client.ParseJSON(t, batchSendRes) + historicalEventIDs := client.GetJSONFieldStringArray(t, batchSendResBody, "event_ids") + baseInsertionEventID := client.GetJSONFieldStr(t, batchSendResBody, "base_insertion_event_id") - // Make sure all of the events have been backfilled - fetchUntilMessagesResponseHas(t, remoteCharlie, roomID, func(ev gjson.Result) bool { - if ev.Get("event_id").Str == eventIdBefore { - return true - } + // Send the marker event which lets remote homeservers know there are + // some historical messages back at the given insertion event. + sendMarkerAndEnsureBackfilled(t, as, alice, roomID, baseInsertionEventID) - return false - }) + // Join the room from a remote homeserver after the historical messages were sent + remoteCharlie.JoinRoom(t, roomID, []string{"hs1"}) - // FIXME: In the future, we should probably replace the following logic - // with `validateBatchSendRes` to re-use and have some more robust - // assertion logic here. We're currently not using it because the message - // order isn't quite perfect when a remote federated homeserver gets - // backfilled. - // validateBatchSendRes(t, remoteCharlie, roomID, batchSendRes, false) - messagesRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ - "dir": []string{"b"}, - "limit": []string{"100"}, - })) + // Make sure all of the events have been backfilled + fetchUntilMessagesResponseHas(t, remoteCharlie, roomID, func(ev gjson.Result) bool { + if ev.Get("event_id").Str == eventIdBefore { + return true + } - must.MatchResponse(t, messagesRes, match.HTTPResponse{ - JSON: []match.JSON{ - match.JSONCheckOffAllowUnwanted("chunk", makeInterfaceSlice(historicalEventIDs), func(r gjson.Result) interface{} { - return r.Get("event_id").Str - }, nil), - }, + return false + }) + + // FIXME: In the future, we should probably replace the following logic + // with `validateBatchSendRes` to re-use and have some more robust + // assertion logic here. We're currently not using it because the message + // order isn't quite perfect when a remote federated homeserver gets + // backfilled. + // validateBatchSendRes(t, remoteCharlie, roomID, batchSendRes, false) + messagesRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ + "dir": []string{"b"}, + "limit": []string{"100"}, + })) + + must.MatchResponse(t, messagesRes, match.HTTPResponse{ + JSON: []match.JSON{ + match.JSONCheckOffAllowUnwanted("chunk", makeInterfaceSlice(historicalEventIDs), func(r gjson.Result) interface{} { + return r.Get("event_id").Str + }, nil), + }, + }) }) - }) - - t.Run("Historical messages are visible when joining on federated server - pre-made insertion event", func(t *testing.T) { - t.Parallel() - - roomID := as.CreateRoom(t, createPublicRoomOpts) - alice.JoinRoom(t, roomID, nil) - eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) - eventIdBefore := eventIDsBefore[0] - timeAfterEventBefore := time.Now() - - // Create insertion event in the normal DAG - batchID := "mynextBatchID123" - insertionEvent := b.Event{ - Type: insertionEventType, - Content: map[string]interface{}{ - nextBatchIDContentField: batchID, - historicalContentField: true, - }, - } - // We can't use as.SendEventSynced(...) because application services can't use the /sync API - txnId := getTxnID("sendInsertionAndEnsureBackfilled-txn") - insertionSendRes := as.MustDoFunc(t, "PUT", []string{"_matrix", "client", "r0", "rooms", roomID, "send", insertionEvent.Type, txnId}, client.WithJSONBody(t, insertionEvent.Content)) - insertionSendBody := client.ParseJSON(t, insertionSendRes) - insertionEventID := client.GetJSONFieldStr(t, insertionSendBody, "event_id") - // Make sure the insertion event has reached the homeserver - alice.MustSyncUntil(t, client.SyncReq{}, client.SyncTimelineHas(roomID, func(ev gjson.Result) bool { - return ev.Get("event_id").Str == insertionEventID - })) + t.Run("Historical messages are visible when joining on federated server - pre-made insertion event", func(t *testing.T) { + t.Parallel() - // eventIDsAfter - createMessagesInRoom(t, alice, roomID, 3) + roomID := as.CreateRoom(t, createPublicRoomOpts) + alice.JoinRoom(t, roomID, nil) - batchSendRes := batchSendHistoricalMessages( - t, - as, - roomID, - eventIdBefore, - batchID, - createJoinStateEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore), - createMessageEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore, 2), - // Status - 200, - ) - batchSendResBody := client.ParseJSON(t, batchSendRes) - historicalEventIDs := client.GetJSONFieldStringArray(t, batchSendResBody, "event_ids") + eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) + eventIdBefore := eventIDsBefore[0] + timeAfterEventBefore := time.Now() - // Send the marker event which lets remote homeservers know there are - // some historical messages back at the given insertion event. - sendMarkerAndEnsureBackfilled(t, as, alice, roomID, insertionEventID) + // Create insertion event in the normal DAG + batchID := "mynextBatchID123" + insertionEvent := b.Event{ + Type: insertionEventType, + Content: map[string]interface{}{ + nextBatchIDContentField: batchID, + historicalContentField: true, + }, + } + // We can't use as.SendEventSynced(...) because application services can't use the /sync API + txnId := getTxnID("sendInsertionAndEnsureBackfilled-txn") + insertionSendRes := as.MustDoFunc(t, "PUT", []string{"_matrix", "client", "r0", "rooms", roomID, "send", insertionEvent.Type, txnId}, client.WithJSONBody(t, insertionEvent.Content)) + insertionSendBody := client.ParseJSON(t, insertionSendRes) + insertionEventID := client.GetJSONFieldStr(t, insertionSendBody, "event_id") + // Make sure the insertion event has reached the homeserver + alice.MustSyncUntil(t, client.SyncReq{}, client.SyncTimelineHas(roomID, func(ev gjson.Result) bool { + return ev.Get("event_id").Str == insertionEventID + })) + + // eventIDsAfter + createMessagesInRoom(t, alice, roomID, 3) - // Join the room from a remote homeserver after the historical messages were sent - remoteCharlie.JoinRoom(t, roomID, []string{"hs1"}) + batchSendRes := batchSendHistoricalMessages( + t, + as, + roomID, + eventIdBefore, + batchID, + createJoinStateEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore), + createMessageEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore, 2), + // Status + 200, + ) + batchSendResBody := client.ParseJSON(t, batchSendRes) + historicalEventIDs := client.GetJSONFieldStringArray(t, batchSendResBody, "event_ids") - // Make sure all of the events have been backfilled - fetchUntilMessagesResponseHas(t, remoteCharlie, roomID, func(ev gjson.Result) bool { - if ev.Get("event_id").Str == eventIdBefore { - return true - } + // Send the marker event which lets remote homeservers know there are + // some historical messages back at the given insertion event. + sendMarkerAndEnsureBackfilled(t, as, alice, roomID, insertionEventID) - return false - }) + // Join the room from a remote homeserver after the historical messages were sent + remoteCharlie.JoinRoom(t, roomID, []string{"hs1"}) - // FIXME: In the future, we should probably replace the following logic - // with `validateBatchSendRes` to re-use and have some more robust - // assertion logic here. We're currently not using it because the message - // order isn't quite perfect when a remote federated homeserver gets - // backfilled. - // validateBatchSendRes(t, remoteCharlie, roomID, batchSendRes, false) - messagesRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ - "dir": []string{"b"}, - "limit": []string{"100"}, - })) + // Make sure all of the events have been backfilled + fetchUntilMessagesResponseHas(t, remoteCharlie, roomID, func(ev gjson.Result) bool { + if ev.Get("event_id").Str == eventIdBefore { + return true + } - must.MatchResponse(t, messagesRes, match.HTTPResponse{ - JSON: []match.JSON{ - match.JSONCheckOffAllowUnwanted("chunk", makeInterfaceSlice(historicalEventIDs), func(r gjson.Result) interface{} { - return r.Get("event_id").Str - }, nil), - }, + return false + }) + + // FIXME: In the future, we should probably replace the following logic + // with `validateBatchSendRes` to re-use and have some more robust + // assertion logic here. We're currently not using it because the message + // order isn't quite perfect when a remote federated homeserver gets + // backfilled. + // validateBatchSendRes(t, remoteCharlie, roomID, batchSendRes, false) + messagesRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ + "dir": []string{"b"}, + "limit": []string{"100"}, + })) + + must.MatchResponse(t, messagesRes, match.HTTPResponse{ + JSON: []match.JSON{ + match.JSONCheckOffAllowUnwanted("chunk", makeInterfaceSlice(historicalEventIDs), func(r gjson.Result) interface{} { + return r.Get("event_id").Str + }, nil), + }, + }) }) - }) - t.Run("Historical messages are visible when already joined on federated server", func(t *testing.T) { - t.Parallel() + t.Run("Historical messages are visible when already joined on federated server", func(t *testing.T) { + t.Parallel() - roomID := as.CreateRoom(t, createPublicRoomOpts) - alice.JoinRoom(t, roomID, nil) + roomID := as.CreateRoom(t, createPublicRoomOpts) + alice.JoinRoom(t, roomID, nil) - // Join the room from a remote homeserver before any historical messages are sent - remoteCharlie.JoinRoom(t, roomID, []string{"hs1"}) + // Join the room from a remote homeserver before any historical messages are sent + remoteCharlie.JoinRoom(t, roomID, []string{"hs1"}) - eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) - eventIdBefore := eventIDsBefore[0] - timeAfterEventBefore := time.Now() + eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) + eventIdBefore := eventIDsBefore[0] + timeAfterEventBefore := time.Now() - // eventIDsAfter - createMessagesInRoom(t, alice, roomID, 10) + // eventIDsAfter + createMessagesInRoom(t, alice, roomID, 10) - // Mimic scrollback just through the latest messages - remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ - "dir": []string{"b"}, - // Limited so we can only see a portion of the latest messages - "limit": []string{"5"}, - })) + // Mimic scrollback just through the latest messages + remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ + "dir": []string{"b"}, + // Limited so we can only see a portion of the latest messages + "limit": []string{"5"}, + })) - numMessagesSent := 2 - batchSendRes := batchSendHistoricalMessages( - t, - as, - roomID, - eventIdBefore, - "", - createJoinStateEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore), - createMessageEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore, numMessagesSent), - // Status - 200, - ) - batchSendResBody := client.ParseJSON(t, batchSendRes) - historicalEventIDs := client.GetJSONFieldStringArray(t, batchSendResBody, "event_ids") - baseInsertionEventID := client.GetJSONFieldStr(t, batchSendResBody, "base_insertion_event_id") + numMessagesSent := 2 + batchSendRes := batchSendHistoricalMessages( + t, + as, + roomID, + eventIdBefore, + "", + createJoinStateEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore), + createMessageEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore, numMessagesSent), + // Status + 200, + ) + batchSendResBody := client.ParseJSON(t, batchSendRes) + historicalEventIDs := client.GetJSONFieldStringArray(t, batchSendResBody, "event_ids") + baseInsertionEventID := client.GetJSONFieldStr(t, batchSendResBody, "base_insertion_event_id") - if len(historicalEventIDs) != numMessagesSent { - t.Fatalf("Expected %d event_ids in the response that correspond to the %d events we sent in the request but saw %d: %s", numMessagesSent, numMessagesSent, len(historicalEventIDs), historicalEventIDs) - } + if len(historicalEventIDs) != numMessagesSent { + t.Fatalf("Expected %d event_ids in the response that correspond to the %d events we sent in the request but saw %d: %s", numMessagesSent, numMessagesSent, len(historicalEventIDs), historicalEventIDs) + } - beforeMarkerMessagesRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ - "dir": []string{"b"}, - "limit": []string{"100"}, - })) - beforeMarkerMesssageResBody := client.ParseJSON(t, beforeMarkerMessagesRes) - eventDebugStringsFromBeforeMarkerResponse := mustGetRelevantEventDebugStringsFromMessagesResponse(t, "chunk", beforeMarkerMesssageResBody, relevantToScrollbackEventFilter) - // Since the original body can only be read once, create a new one from the body bytes we just read - beforeMarkerMessagesRes.Body = ioutil.NopCloser(bytes.NewBuffer(beforeMarkerMesssageResBody)) - - // Make sure the history isn't visible before we expect it to be there. - // This is to avoid some bug in the homeserver using some unknown - // mechanism to distribute the historical messages to other homeservers. - must.MatchResponse(t, beforeMarkerMessagesRes, match.HTTPResponse{ - JSON: []match.JSON{ - match.JSONArrayEach("chunk", func(r gjson.Result) error { - // Throw if we find one of the historical events in the message response - for _, historicalEventID := range historicalEventIDs { - if r.Get("event_id").Str == historicalEventID { - return fmt.Errorf("Historical event (%s) found on remote homeserver before marker event was sent out\nmessage response (%d): %v\nhistoricalEventIDs (%d): %v", historicalEventID, len(eventDebugStringsFromBeforeMarkerResponse), eventDebugStringsFromBeforeMarkerResponse, len(historicalEventIDs), historicalEventIDs) + beforeMarkerMessagesRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ + "dir": []string{"b"}, + "limit": []string{"100"}, + })) + beforeMarkerMesssageResBody := client.ParseJSON(t, beforeMarkerMessagesRes) + eventDebugStringsFromBeforeMarkerResponse := mustGetRelevantEventDebugStringsFromMessagesResponse(t, "chunk", beforeMarkerMesssageResBody, relevantToScrollbackEventFilter) + // Since the original body can only be read once, create a new one from the body bytes we just read + beforeMarkerMessagesRes.Body = ioutil.NopCloser(bytes.NewBuffer(beforeMarkerMesssageResBody)) + + // Make sure the history isn't visible before we expect it to be there. + // This is to avoid some bug in the homeserver using some unknown + // mechanism to distribute the historical messages to other homeservers. + must.MatchResponse(t, beforeMarkerMessagesRes, match.HTTPResponse{ + JSON: []match.JSON{ + match.JSONArrayEach("chunk", func(r gjson.Result) error { + // Throw if we find one of the historical events in the message response + for _, historicalEventID := range historicalEventIDs { + if r.Get("event_id").Str == historicalEventID { + return fmt.Errorf("Historical event (%s) found on remote homeserver before marker event was sent out\nmessage response (%d): %v\nhistoricalEventIDs (%d): %v", historicalEventID, len(eventDebugStringsFromBeforeMarkerResponse), eventDebugStringsFromBeforeMarkerResponse, len(historicalEventIDs), historicalEventIDs) + } } - } - - return nil - }), - }, - }) - // Send the marker event which lets remote homeservers know there are - // some historical messages back at the given insertion event. - sendMarkerAndEnsureBackfilled(t, as, remoteCharlie, roomID, baseInsertionEventID) - - // FIXME: In the future, we should probably replace the following logic - // with `validateBatchSendRes` to re-use and have some more robust - // assertion logic here. We're currently not using it because the message - // order isn't quite perfect when a remote federated homeserver gets - // backfilled. - // validateBatchSendRes(t, remoteCharlie, roomID, batchSendRes, false) - remoteMessagesRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ - "dir": []string{"b"}, - "limit": []string{"100"}, - })) + return nil + }), + }, + }) - // Make sure all of the historical messages are visible when we scrollback again - must.MatchResponse(t, remoteMessagesRes, match.HTTPResponse{ - JSON: []match.JSON{ - match.JSONCheckOffAllowUnwanted("chunk", makeInterfaceSlice(historicalEventIDs), func(r gjson.Result) interface{} { - return r.Get("event_id").Str - }, nil), - }, + // Send the marker event which lets remote homeservers know there are + // some historical messages back at the given insertion event. + sendMarkerAndEnsureBackfilled(t, as, remoteCharlie, roomID, baseInsertionEventID) + + // FIXME: In the future, we should probably replace the following logic + // with `validateBatchSendRes` to re-use and have some more robust + // assertion logic here. We're currently not using it because the message + // order isn't quite perfect when a remote federated homeserver gets + // backfilled. + // validateBatchSendRes(t, remoteCharlie, roomID, batchSendRes, false) + remoteMessagesRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ + "dir": []string{"b"}, + "limit": []string{"100"}, + })) + + // Make sure all of the historical messages are visible when we scrollback again + must.MatchResponse(t, remoteMessagesRes, match.HTTPResponse{ + JSON: []match.JSON{ + match.JSONCheckOffAllowUnwanted("chunk", makeInterfaceSlice(historicalEventIDs), func(r gjson.Result) interface{} { + return r.Get("event_id").Str + }, nil), + }, + }) }) - }) - t.Run("When messages have already been scrolled back through, new historical messages are visible in next scroll back on federated server", func(t *testing.T) { - t.Parallel() + t.Run("When messages have already been scrolled back through, new historical messages are visible in next scroll back on federated server", func(t *testing.T) { + t.Parallel() - roomID := as.CreateRoom(t, createPublicRoomOpts) - alice.JoinRoom(t, roomID, nil) + roomID := as.CreateRoom(t, createPublicRoomOpts) + alice.JoinRoom(t, roomID, nil) - // Join the room from a remote homeserver before any historical messages are sent - remoteCharlie.JoinRoom(t, roomID, []string{"hs1"}) + // Join the room from a remote homeserver before any historical messages are sent + remoteCharlie.JoinRoom(t, roomID, []string{"hs1"}) - eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) - eventIdBefore := eventIDsBefore[0] - timeAfterEventBefore := time.Now() + eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1) + eventIdBefore := eventIDsBefore[0] + timeAfterEventBefore := time.Now() - // eventIDsAfter - createMessagesInRoom(t, alice, roomID, 3) + // eventIDsAfter + createMessagesInRoom(t, alice, roomID, 3) - // Mimic scrollback to all of the messages - // scrollbackMessagesRes - remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ - "dir": []string{"b"}, - "limit": []string{"100"}, - })) + // Mimic scrollback to all of the messages + // scrollbackMessagesRes + remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ + "dir": []string{"b"}, + "limit": []string{"100"}, + })) - // Historical messages are inserted where we have already scrolled back to - numMessagesSent := 2 - batchSendRes := batchSendHistoricalMessages( - t, - as, - roomID, - eventIdBefore, - "", - createJoinStateEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore), - createMessageEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore, numMessagesSent), - // Status - 200, - ) - batchSendResBody := client.ParseJSON(t, batchSendRes) - historicalEventIDs := client.GetJSONFieldStringArray(t, batchSendResBody, "event_ids") - baseInsertionEventID := client.GetJSONFieldStr(t, batchSendResBody, "base_insertion_event_id") + // Historical messages are inserted where we have already scrolled back to + numMessagesSent := 2 + batchSendRes := batchSendHistoricalMessages( + t, + as, + roomID, + eventIdBefore, + "", + createJoinStateEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore), + createMessageEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore, numMessagesSent), + // Status + 200, + ) + batchSendResBody := client.ParseJSON(t, batchSendRes) + historicalEventIDs := client.GetJSONFieldStringArray(t, batchSendResBody, "event_ids") + baseInsertionEventID := client.GetJSONFieldStr(t, batchSendResBody, "base_insertion_event_id") - if len(historicalEventIDs) != numMessagesSent { - t.Fatalf("Expected %d event_ids in the response that correspond to the %d events we sent in the request but saw %d: %s", numMessagesSent, numMessagesSent, len(historicalEventIDs), historicalEventIDs) - } + if len(historicalEventIDs) != numMessagesSent { + t.Fatalf("Expected %d event_ids in the response that correspond to the %d events we sent in the request but saw %d: %s", numMessagesSent, numMessagesSent, len(historicalEventIDs), historicalEventIDs) + } - beforeMarkerMessagesRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ - "dir": []string{"b"}, - "limit": []string{"100"}, - })) - beforeMarkerMesssageResBody := client.ParseJSON(t, beforeMarkerMessagesRes) - eventDebugStringsFromBeforeMarkerResponse := mustGetRelevantEventDebugStringsFromMessagesResponse(t, "chunk", beforeMarkerMesssageResBody, relevantToScrollbackEventFilter) - // Since the original body can only be read once, create a new one from the body bytes we just read - beforeMarkerMessagesRes.Body = ioutil.NopCloser(bytes.NewBuffer(beforeMarkerMesssageResBody)) - // Make sure the history isn't visible before we expect it to be there. - // This is to avoid some bug in the homeserver using some unknown - // mechanism to distribute the historical messages to other homeservers. - must.MatchResponse(t, beforeMarkerMessagesRes, match.HTTPResponse{ - JSON: []match.JSON{ - match.JSONArrayEach("chunk", func(r gjson.Result) error { - // Throw if we find one of the historical events in the message response - for _, historicalEventID := range historicalEventIDs { - if r.Get("event_id").Str == historicalEventID { - return fmt.Errorf("Historical event (%s) found on remote homeserver before marker event was sent out\nmessage response (%d): %v\nhistoricalEventIDs (%d): %v", historicalEventID, len(eventDebugStringsFromBeforeMarkerResponse), eventDebugStringsFromBeforeMarkerResponse, len(historicalEventIDs), historicalEventIDs) + beforeMarkerMessagesRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ + "dir": []string{"b"}, + "limit": []string{"100"}, + })) + beforeMarkerMesssageResBody := client.ParseJSON(t, beforeMarkerMessagesRes) + eventDebugStringsFromBeforeMarkerResponse := mustGetRelevantEventDebugStringsFromMessagesResponse(t, "chunk", beforeMarkerMesssageResBody, relevantToScrollbackEventFilter) + // Since the original body can only be read once, create a new one from the body bytes we just read + beforeMarkerMessagesRes.Body = ioutil.NopCloser(bytes.NewBuffer(beforeMarkerMesssageResBody)) + // Make sure the history isn't visible before we expect it to be there. + // This is to avoid some bug in the homeserver using some unknown + // mechanism to distribute the historical messages to other homeservers. + must.MatchResponse(t, beforeMarkerMessagesRes, match.HTTPResponse{ + JSON: []match.JSON{ + match.JSONArrayEach("chunk", func(r gjson.Result) error { + // Throw if we find one of the historical events in the message response + for _, historicalEventID := range historicalEventIDs { + if r.Get("event_id").Str == historicalEventID { + return fmt.Errorf("Historical event (%s) found on remote homeserver before marker event was sent out\nmessage response (%d): %v\nhistoricalEventIDs (%d): %v", historicalEventID, len(eventDebugStringsFromBeforeMarkerResponse), eventDebugStringsFromBeforeMarkerResponse, len(historicalEventIDs), historicalEventIDs) + } } - } - return nil - }), - }, - }) + return nil + }), + }, + }) - // Send the marker event which lets remote homeservers know there are - // some historical messages back at the given insertion event. - sendMarkerAndEnsureBackfilled(t, as, remoteCharlie, roomID, baseInsertionEventID) - - // FIXME: In the future, we should probably replace the following logic - // with `validateBatchSendRes` to re-use and have some more robust - // assertion logic here. We're currently not using it because the message - // order isn't quite perfect when a remote federated homeserver gets - // backfilled. - // validateBatchSendRes(t, remoteCharlie, roomID, batchSendRes, false) - remoteMessagesRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ - "dir": []string{"b"}, - "limit": []string{"100"}, - })) - - // Make sure all of the historical messages are visible when we scrollback again - must.MatchResponse(t, remoteMessagesRes, match.HTTPResponse{ - JSON: []match.JSON{ - match.JSONCheckOffAllowUnwanted("chunk", makeInterfaceSlice(historicalEventIDs), func(r gjson.Result) interface{} { - return r.Get("event_id").Str - }, nil), - }, + // Send the marker event which lets remote homeservers know there are + // some historical messages back at the given insertion event. + sendMarkerAndEnsureBackfilled(t, as, remoteCharlie, roomID, baseInsertionEventID) + + // FIXME: In the future, we should probably replace the following logic + // with `validateBatchSendRes` to re-use and have some more robust + // assertion logic here. We're currently not using it because the message + // order isn't quite perfect when a remote federated homeserver gets + // backfilled. + // validateBatchSendRes(t, remoteCharlie, roomID, batchSendRes, false) + remoteMessagesRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ + "dir": []string{"b"}, + "limit": []string{"100"}, + })) + + // Make sure all of the historical messages are visible when we scrollback again + must.MatchResponse(t, remoteMessagesRes, match.HTTPResponse{ + JSON: []match.JSON{ + match.JSONCheckOffAllowUnwanted("chunk", makeInterfaceSlice(historicalEventIDs), func(r gjson.Result) interface{} { + return r.Get("event_id").Str + }, nil), + }, + }) }) })