@@ -85,6 +85,8 @@ pub trait StateStoreIntegrationTests {
8585 async fn test_display_names_saving ( & self ) ;
8686 /// Test operations with the send queue.
8787 async fn test_send_queue ( & self ) ;
88+ /// Test priority of operations with the send queue.
89+ async fn test_send_queue_priority ( & self ) ;
8890 /// Test operations related to send queue dependents.
8991 async fn test_send_queue_dependents ( & self ) ;
9092 /// Test saving/restoring server capabilities.
@@ -1212,7 +1214,7 @@ impl StateStoreIntegrationTests for DynStateStore {
12121214 let event0 =
12131215 SerializableEventContent :: new ( & RoomMessageEventContent :: text_plain ( "msg0" ) . into ( ) )
12141216 . unwrap ( ) ;
1215- self . save_send_queue_request ( room_id, txn0. clone ( ) , event0. into ( ) ) . await . unwrap ( ) ;
1217+ self . save_send_queue_request ( room_id, txn0. clone ( ) , event0. into ( ) , 0 ) . await . unwrap ( ) ;
12161218
12171219 // Reading it will work.
12181220 let pending = self . load_send_queue_requests ( room_id) . await . unwrap ( ) ;
@@ -1236,7 +1238,7 @@ impl StateStoreIntegrationTests for DynStateStore {
12361238 )
12371239 . unwrap ( ) ;
12381240
1239- self . save_send_queue_request ( room_id, txn, event. into ( ) ) . await . unwrap ( ) ;
1241+ self . save_send_queue_request ( room_id, txn, event. into ( ) , 0 ) . await . unwrap ( ) ;
12401242 }
12411243
12421244 // Reading all the events should work.
@@ -1334,7 +1336,7 @@ impl StateStoreIntegrationTests for DynStateStore {
13341336 let event =
13351337 SerializableEventContent :: new ( & RoomMessageEventContent :: text_plain ( "room2" ) . into ( ) )
13361338 . unwrap ( ) ;
1337- self . save_send_queue_request ( room_id2, txn. clone ( ) , event. into ( ) ) . await . unwrap ( ) ;
1339+ self . save_send_queue_request ( room_id2, txn. clone ( ) , event. into ( ) , 0 ) . await . unwrap ( ) ;
13381340 }
13391341
13401342 // Add and remove one event for room3.
@@ -1344,7 +1346,7 @@ impl StateStoreIntegrationTests for DynStateStore {
13441346 let event =
13451347 SerializableEventContent :: new ( & RoomMessageEventContent :: text_plain ( "room3" ) . into ( ) )
13461348 . unwrap ( ) ;
1347- self . save_send_queue_request ( room_id3, txn. clone ( ) , event. into ( ) ) . await . unwrap ( ) ;
1349+ self . save_send_queue_request ( room_id3, txn. clone ( ) , event. into ( ) , 0 ) . await . unwrap ( ) ;
13481350
13491351 self . remove_send_queue_request ( room_id3, & txn) . await . unwrap ( ) ;
13501352 }
@@ -1357,6 +1359,64 @@ impl StateStoreIntegrationTests for DynStateStore {
13571359 assert ! ( outstanding_rooms. iter( ) . any( |room| room == room_id2) ) ;
13581360 }
13591361
1362+ async fn test_send_queue_priority ( & self ) {
1363+ let room_id = room_id ! ( "!test_send_queue:localhost" ) ;
1364+
1365+ // No queued event in store at first.
1366+ let events = self . load_send_queue_requests ( room_id) . await . unwrap ( ) ;
1367+ assert ! ( events. is_empty( ) ) ;
1368+
1369+ // Saving one request should work.
1370+ let low0_txn = TransactionId :: new ( ) ;
1371+ let ev0 =
1372+ SerializableEventContent :: new ( & RoomMessageEventContent :: text_plain ( "low0" ) . into ( ) )
1373+ . unwrap ( ) ;
1374+ self . save_send_queue_request ( room_id, low0_txn. clone ( ) , ev0. into ( ) , 2 ) . await . unwrap ( ) ;
1375+
1376+ // Saving one request with higher priority should work.
1377+ let high_txn = TransactionId :: new ( ) ;
1378+ let ev1 =
1379+ SerializableEventContent :: new ( & RoomMessageEventContent :: text_plain ( "high" ) . into ( ) )
1380+ . unwrap ( ) ;
1381+ self . save_send_queue_request ( room_id, high_txn. clone ( ) , ev1. into ( ) , 10 ) . await . unwrap ( ) ;
1382+
1383+ // Saving another request with the low priority should work.
1384+ let low1_txn = TransactionId :: new ( ) ;
1385+ let ev2 =
1386+ SerializableEventContent :: new ( & RoomMessageEventContent :: text_plain ( "low1" ) . into ( ) )
1387+ . unwrap ( ) ;
1388+ self . save_send_queue_request ( room_id, low1_txn. clone ( ) , ev2. into ( ) , 2 ) . await . unwrap ( ) ;
1389+
1390+ // The requests should be ordered from higher priority to lower, and when equal,
1391+ // should use the insertion order instead.
1392+ let pending = self . load_send_queue_requests ( room_id) . await . unwrap ( ) ;
1393+
1394+ assert_eq ! ( pending. len( ) , 3 ) ;
1395+ {
1396+ assert_eq ! ( pending[ 0 ] . transaction_id, high_txn) ;
1397+
1398+ let deserialized = pending[ 0 ] . as_event ( ) . unwrap ( ) . deserialize ( ) . unwrap ( ) ;
1399+ assert_let ! ( AnyMessageLikeEventContent :: RoomMessage ( content) = deserialized) ;
1400+ assert_eq ! ( content. body( ) , "high" ) ;
1401+ }
1402+
1403+ {
1404+ assert_eq ! ( pending[ 1 ] . transaction_id, low0_txn) ;
1405+
1406+ let deserialized = pending[ 1 ] . as_event ( ) . unwrap ( ) . deserialize ( ) . unwrap ( ) ;
1407+ assert_let ! ( AnyMessageLikeEventContent :: RoomMessage ( content) = deserialized) ;
1408+ assert_eq ! ( content. body( ) , "low0" ) ;
1409+ }
1410+
1411+ {
1412+ assert_eq ! ( pending[ 2 ] . transaction_id, low1_txn) ;
1413+
1414+ let deserialized = pending[ 2 ] . as_event ( ) . unwrap ( ) . deserialize ( ) . unwrap ( ) ;
1415+ assert_let ! ( AnyMessageLikeEventContent :: RoomMessage ( content) = deserialized) ;
1416+ assert_eq ! ( content. body( ) , "low1" ) ;
1417+ }
1418+ }
1419+
13601420 async fn test_send_queue_dependents ( & self ) {
13611421 let room_id = room_id ! ( "!test_send_queue_dependents:localhost" ) ;
13621422
@@ -1365,7 +1425,7 @@ impl StateStoreIntegrationTests for DynStateStore {
13651425 let event0 =
13661426 SerializableEventContent :: new ( & RoomMessageEventContent :: text_plain ( "hey" ) . into ( ) )
13671427 . unwrap ( ) ;
1368- self . save_send_queue_request ( room_id, txn0. clone ( ) , event0. into ( ) ) . await . unwrap ( ) ;
1428+ self . save_send_queue_request ( room_id, txn0. clone ( ) , event0. into ( ) , 0 ) . await . unwrap ( ) ;
13691429
13701430 // No dependents, to start with.
13711431 assert ! ( self . load_dependent_queued_requests( room_id) . await . unwrap( ) . is_empty( ) ) ;
@@ -1427,7 +1487,7 @@ impl StateStoreIntegrationTests for DynStateStore {
14271487 let event1 =
14281488 SerializableEventContent :: new ( & RoomMessageEventContent :: text_plain ( "hey2" ) . into ( ) )
14291489 . unwrap ( ) ;
1430- self . save_send_queue_request ( room_id, txn1. clone ( ) , event1. into ( ) ) . await . unwrap ( ) ;
1490+ self . save_send_queue_request ( room_id, txn1. clone ( ) , event1. into ( ) , 0 ) . await . unwrap ( ) ;
14311491
14321492 self . save_dependent_queued_request (
14331493 room_id,
@@ -1609,6 +1669,12 @@ macro_rules! statestore_integration_tests {
16091669 store. test_send_queue( ) . await ;
16101670 }
16111671
1672+ #[ async_test]
1673+ async fn test_send_queue_priority( ) {
1674+ let store = get_store( ) . await . expect( "creating store failed" ) . into_state_store( ) ;
1675+ store. test_send_queue_priority( ) . await ;
1676+ }
1677+
16121678 #[ async_test]
16131679 async fn test_send_queue_dependents( ) {
16141680 let store = get_store( ) . await . expect( "creating store failed" ) . into_state_store( ) ;
0 commit comments