3939 insertionEventType = "org.matrix.msc2716.insertion"
4040 markerEventType = "org.matrix.msc2716.marker"
4141
42- historicalContentField = "org.matrix.msc2716.historical"
43- nextChunkIdContentField = "org.matrix.msc2716.next_chunk_id"
44- chunkIdContentField = "org.matrix.msc2716.chunk_id"
45- markerInsertionContentField = "org.matrix.msc2716.marker.insertion"
46- markerInsertionPrevEventsContentField = "org.matrix.msc2716.marker.insertion_prev_events"
42+ historicalContentField = "org.matrix.msc2716.historical"
43+ nextChunkIDContentField = "org.matrix.msc2716.next_chunk_id"
44+ markerInsertionContentField = "org.matrix.msc2716.marker.insertion"
4745)
4846
4947func TestBackfillingHistory (t * testing.T ) {
@@ -166,7 +164,7 @@ func TestBackfillingHistory(t *testing.T) {
166164 JSON : []match.JSON {
167165 match .JSONArrayEach ("chunk" , func (r gjson.Result ) error {
168166 // Find all events in order
169- if len ( r . Get ( "content" ). Get ( "body" ). Str ) > 0 || r . Get ( "type" ). Str == insertionEventType || r . Get ( "type" ). Str == markerEventType {
167+ if isRelevantEvent ( r ) {
170168 // Pop the next message off the expected list
171169 nextEventIdInOrder := workingExpectedEventIDOrder [0 ]
172170 workingExpectedEventIDOrder = workingExpectedEventIDOrder [1 :]
@@ -229,7 +227,7 @@ func TestBackfillingHistory(t *testing.T) {
229227
230228 must .MatchResponse (t , messagesRes , match.HTTPResponse {
231229 JSON : []match.JSON {
232- match .JSONCheckOffAllowUnwanted ("chunk" , [] interface {}{ historicalEventIDs [ 0 ], historicalEventIDs [ 1 ], historicalEventIDs [ 2 ]} , func (r gjson.Result ) interface {} {
230+ match .JSONCheckOffAllowUnwanted ("chunk" , makeInterfaceSlice ( historicalEventIDs ) , func (r gjson.Result ) interface {} {
233231 return r .Get ("event_id" ).Str
234232 }, nil ),
235233 },
@@ -379,14 +377,16 @@ func TestBackfillingHistory(t *testing.T) {
379377 // Join the room from a remote homeserver after the backfilled messages were sent
380378 remoteCharlie .JoinRoom (t , roomID , []string {"hs1" })
381379
380+ // TODO: I think we need to update this to be similar to
381+ // SyncUntilTimelineHas but going back in time because this can be flakey
382382 messagesRes := remoteCharlie .MustDoFunc (t , "GET" , []string {"_matrix" , "client" , "r0" , "rooms" , roomID , "messages" }, client .WithContentType ("application/json" ), client .WithQueries (url.Values {
383383 "dir" : []string {"b" },
384384 "limit" : []string {"100" },
385385 }))
386386
387387 must .MatchResponse (t , messagesRes , match.HTTPResponse {
388388 JSON : []match.JSON {
389- match .JSONCheckOffAllowUnwanted ("chunk" , [] interface {}{ historicalEventIDs [ 0 ], historicalEventIDs [ 1 ]} , func (r gjson.Result ) interface {} {
389+ match .JSONCheckOffAllowUnwanted ("chunk" , makeInterfaceSlice ( historicalEventIDs ) , func (r gjson.Result ) interface {} {
390390 return r .Get ("event_id" ).Str
391391 }, nil ),
392392 },
@@ -434,6 +434,57 @@ func TestBackfillingHistory(t *testing.T) {
434434 )
435435 batchSendResBody := client .ParseJSON (t , batchSendRes )
436436 historicalEventIDs := getEventsFromBatchSendResponseBody (t , batchSendResBody )
437+ baseInsertionEventID := historicalEventIDs [len (historicalEventIDs )- 1 ]
438+
439+ // 2 historical events + 2 insertion events
440+ if len (historicalEventIDs ) != 4 {
441+ t .Fatalf ("Expected eventID list should be length 15 but saw %d: %s" , len (historicalEventIDs ), historicalEventIDs )
442+ }
443+
444+ beforeMarkerMessagesRes := remoteCharlie .MustDoFunc (t , "GET" , []string {"_matrix" , "client" , "r0" , "rooms" , roomID , "messages" }, client .WithContentType ("application/json" ), client .WithQueries (url.Values {
445+ "dir" : []string {"b" },
446+ "limit" : []string {"100" },
447+ }))
448+ beforeMarkerMesssageResBody := client .ParseJSON (t , beforeMarkerMessagesRes )
449+ eventDebugStringsFromBeforeMarkerResponse := getRelevantEventDebugStringsFromMessagesResponse (t , beforeMarkerMesssageResBody )
450+ // Since the original body can only be read once, create a new one from the body bytes we just read
451+ beforeMarkerMessagesRes .Body = ioutil .NopCloser (bytes .NewBuffer (beforeMarkerMesssageResBody ))
452+
453+ // Make sure the history isn't visible before we expect it to be there.
454+ // This is to avoid some bug in the homeserver using some unknown
455+ // mechanism to distribute the historical messages to other homeservers.
456+ must .MatchResponse (t , beforeMarkerMessagesRes , match.HTTPResponse {
457+ JSON : []match.JSON {
458+ match .JSONArrayEach ("chunk" , func (r gjson.Result ) error {
459+ // Throw if we find one of the historical events in the message response
460+ for _ , historicalEventID := range historicalEventIDs {
461+ if r .Get ("event_id" ).Str == historicalEventID {
462+ return fmt .Errorf ("Historical event (%s) found on remote homeserver before marker event was sent out\n message response (%d): %v\n historicalEventIDs (%d): %v" , historicalEventID , len (eventDebugStringsFromBeforeMarkerResponse ), eventDebugStringsFromBeforeMarkerResponse , len (historicalEventIDs ), historicalEventIDs )
463+ }
464+ }
465+
466+ return nil
467+ }),
468+ },
469+ })
470+
471+ // Send a marker event to let all of the homeservers know about the
472+ // insertion point where all of the historical messages are at
473+ markerEvent := b.Event {
474+ Type : markerEventType ,
475+ Content : map [string ]interface {}{
476+ markerInsertionContentField : baseInsertionEventID ,
477+ },
478+ }
479+ // We can't use as.SendEventSynced(...) because application services can't use the /sync API
480+ markerSendRes := as .MustDoFunc (t , "PUT" , []string {"_matrix" , "client" , "r0" , "rooms" , roomID , "send" , markerEvent .Type , "txn-m123" }, client .WithJSONBody (t , markerEvent .Content ))
481+ markerSendBody := client .ParseJSON (t , markerSendRes )
482+ markerEventID := client .GetJSONFieldStr (t , markerSendBody , "event_id" )
483+
484+ // Make sure the marker event has reached the remote homeserver
485+ remoteCharlie .SyncUntilTimelineHas (t , roomID , func (ev gjson.Result ) bool {
486+ return ev .Get ("event_id" ).Str == markerEventID
487+ })
437488
438489 messagesRes := remoteCharlie .MustDoFunc (t , "GET" , []string {"_matrix" , "client" , "r0" , "rooms" , roomID , "messages" }, client .WithContentType ("application/json" ), client .WithQueries (url.Values {
439490 "dir" : []string {"b" },
@@ -442,7 +493,7 @@ func TestBackfillingHistory(t *testing.T) {
442493
443494 must .MatchResponse (t , messagesRes , match.HTTPResponse {
444495 JSON : []match.JSON {
445- match .JSONCheckOffAllowUnwanted ("chunk" , [] interface {}{ historicalEventIDs [ 0 ], historicalEventIDs [ 1 ]} , func (r gjson.Result ) interface {} {
496+ match .JSONCheckOffAllowUnwanted ("chunk" , makeInterfaceSlice ( historicalEventIDs ) , func (r gjson.Result ) interface {} {
446497 return r .Get ("event_id" ).Str
447498 }, nil ),
448499 },
@@ -492,14 +543,16 @@ func TestBackfillingHistory(t *testing.T) {
492543 batchSendResBody := client .ParseJSON (t , batchSendRes )
493544 historicalEventIDs := getEventsFromBatchSendResponseBody (t , batchSendResBody )
494545
546+ // TODO: Send marker event
547+
495548 messagesRes := remoteCharlie .MustDoFunc (t , "GET" , []string {"_matrix" , "client" , "r0" , "rooms" , roomID , "messages" }, client .WithContentType ("application/json" ), client .WithQueries (url.Values {
496549 "dir" : []string {"b" },
497550 "limit" : []string {"100" },
498551 }))
499552
500553 must .MatchResponse (t , messagesRes , match.HTTPResponse {
501554 JSON : []match.JSON {
502- match .JSONCheckOffAllowUnwanted ("chunk" , [] interface {}{ historicalEventIDs [ 0 ], historicalEventIDs [ 1 ]} , func (r gjson.Result ) interface {} {
555+ match .JSONCheckOffAllowUnwanted ("chunk" , makeInterfaceSlice ( historicalEventIDs ) , func (r gjson.Result ) interface {} {
503556 return r .Get ("event_id" ).Str
504557 }, nil ),
505558 },
@@ -508,6 +561,15 @@ func TestBackfillingHistory(t *testing.T) {
508561 })
509562}
510563
564+ func makeInterfaceSlice (slice []string ) []interface {} {
565+ interfaceSlice := make ([]interface {}, len (slice ))
566+ for i := range slice {
567+ interfaceSlice [i ] = slice [i ]
568+ }
569+
570+ return interfaceSlice
571+ }
572+
511573func reversed (in []string ) []string {
512574 out := make ([]string , len (in ))
513575 for i := 0 ; i < len (in ); i ++ {
@@ -516,6 +578,10 @@ func reversed(in []string) []string {
516578 return out
517579}
518580
581+ func isRelevantEvent (r gjson.Result ) bool {
582+ return len (r .Get ("content" ).Get ("body" ).Str ) > 0 || r .Get ("type" ).Str == insertionEventType || r .Get ("type" ).Str == markerEventType
583+ }
584+
519585func getRelevantEventDebugStringsFromMessagesResponse (t * testing.T , body []byte ) (eventIDsFromResponse []string ) {
520586 t .Helper ()
521587
@@ -529,7 +595,7 @@ func getRelevantEventDebugStringsFromMessagesResponse(t *testing.T, body []byte)
529595 }
530596
531597 res .ForEach (func (key , r gjson.Result ) bool {
532- if len ( r . Get ( "content" ). Get ( "body" ). Str ) > 0 || r . Get ( "type" ). Str == insertionEventType || r . Get ( "type" ). Str == markerEventType {
598+ if isRelevantEvent ( r ) {
533599 eventIDsFromResponse = append (eventIDsFromResponse , r .Get ("event_id" ).Str + " (" + r .Get ("content" ).Get ("body" ).Str + ")" )
534600 }
535601 return true
0 commit comments