@@ -21,7 +21,6 @@ use opentelemetry::KeyValue;
21
21
use rand:: { RngCore , SeedableRng } ;
22
22
use thiserror:: Error ;
23
23
use thiserror_ext:: ContextInto ;
24
- use tokio_util:: sync:: PollSender ;
25
24
use tracing:: { Instrument as _, Level , info} ;
26
25
use ulid:: Ulid ;
27
26
use uuid:: { NonNilUuid , Uuid } ;
@@ -275,6 +274,9 @@ async fn migrate_users(
275
274
let mut password_buffer = MasWriteBuffer :: new ( & mas, MasWriter :: write_passwords) ;
276
275
277
276
while let Ok ( user) = rx. recv_async ( ) . await {
277
+ // Force yielding to the scheduler, else we'll have long tick times
278
+ tokio:: task:: yield_now ( ) . await ;
279
+
278
280
// Handling an edge case: some AS users may have invalid localparts containing
279
281
// extra `:` characters. These users are ignored and a warning is logged.
280
282
if user. appservice_id . is_some ( )
@@ -616,6 +618,9 @@ async fn migrate_devices(
616
618
let mut write_buffer = MasWriteBuffer :: new ( & mas, MasWriter :: write_compat_sessions) ;
617
619
618
620
while let Ok ( device) = rx. recv_async ( ) . await {
621
+ // Force yielding to the scheduler, else we'll have long tick times
622
+ tokio:: task:: yield_now ( ) . await ;
623
+
619
624
let SynapseDevice {
620
625
user_id : synapse_user_id,
621
626
device_id,
@@ -769,6 +774,9 @@ async fn migrate_unrefreshable_access_tokens(
769
774
MasWriteBuffer :: new ( & mas, MasWriter :: write_compat_sessions) ;
770
775
771
776
while let Ok ( token) = rx. recv_async ( ) . await {
777
+ // Force yielding to the scheduler, else we'll have long tick times
778
+ tokio:: task:: yield_now ( ) . await ;
779
+
772
780
let SynapseAccessToken {
773
781
user_id : synapse_user_id,
774
782
device_id,
0 commit comments