@@ -22,7 +22,7 @@ use chrono::{DateTime, Utc};
2222use compact_str:: CompactString ;
2323use futures_util:: StreamExt as _;
2424use mas_storage:: Clock ;
25- use rand:: RngCore ;
25+ use rand:: { RngCore , SeedableRng } ;
2626use thiserror:: Error ;
2727use thiserror_ext:: ContextInto ;
2828use tracing:: { info, Level , Span } ;
@@ -133,7 +133,7 @@ struct MigrationState {
133133#[ tracing:: instrument( skip_all, fields( indicatif. pb_show) ) ]
134134pub async fn migrate (
135135 mut synapse : SynapseReader < ' _ > ,
136- mut mas : MasWriter ,
136+ mas : MasWriter ,
137137 server_name : String ,
138138 clock : & dyn Clock ,
139139 rng : & mut impl RngCore ,
@@ -144,7 +144,7 @@ pub async fn migrate(
144144 span. pb_set_length ( 8 ) ;
145145 let counts = synapse. count_rows ( ) . await . into_synapse ( "counting rows" ) ?;
146146
147- let mut state = MigrationState {
147+ let state = MigrationState {
148148 server_name,
149149 // We oversize the hashmaps, as the estimates are innaccurate, and we would like to avoid
150150 // reallocations.
@@ -158,7 +158,8 @@ pub async fn migrate(
158158
159159 span. pb_set_message ( "migrating user rows" ) ;
160160 span. pb_inc ( 1 ) ;
161- migrate_users ( & mut synapse, & mut mas, counts. users , & mut state, rng) . await ?;
161+ let ( mut state, mut mas) = migrate_users ( & mut synapse, mas, counts. users , state, rng) . await ?;
162+
162163 span. pb_set_message ( "migrating threepids" ) ;
163164 span. pb_inc ( 1 ) ;
164165 migrate_threepids ( & mut synapse, & mut mas, counts. threepids , rng, & state) . await ?;
@@ -213,65 +214,73 @@ pub async fn migrate(
213214#[ tracing:: instrument( skip_all, fields( indicatif. pb_show) , level = Level :: INFO ) ]
214215async fn migrate_users (
215216 synapse : & mut SynapseReader < ' _ > ,
216- mas : & mut MasWriter ,
217+ mut mas : MasWriter ,
217218 count_hint : usize ,
218- state : & mut MigrationState ,
219+ state : MigrationState ,
219220 rng : & mut impl RngCore ,
220- ) -> Result < ( ) , Error > {
221+ ) -> Result < ( MigrationState , MasWriter ) , Error > {
221222 let start = Instant :: now ( ) ;
222223
223- let mut user_buffer = MasWriteBuffer :: new ( mas, MasWriter :: write_users) ;
224- let mut password_buffer = MasWriteBuffer :: new ( mas, MasWriter :: write_passwords) ;
225224 let mut users_stream = pin ! ( synapse. read_users( ) . with_progress_bar( count_hint, 10_000 ) ) ;
225+ let ( tx, mut rx) = tokio:: sync:: mpsc:: channel ( 1024 * 1024 ) ;
226226
227- while let Some ( user_res) = users_stream. next ( ) . await {
228- let user = user_res. into_synapse ( "reading user" ) ?;
229- let ( mas_user, mas_password_opt) = transform_user ( & user, & state. server_name , rng) ?;
227+ let mut rng = rand_chacha:: ChaCha8Rng :: from_rng ( rng) . expect ( "failed to seed rng" ) ;
228+ let task = tokio:: spawn ( async move {
229+ let mut user_buffer = MasWriteBuffer :: new ( & mas, MasWriter :: write_users) ;
230+ let mut password_buffer = MasWriteBuffer :: new ( & mas, MasWriter :: write_passwords) ;
230231
231- let mut flags = UserFlags :: empty ( ) ;
232- if bool:: from ( user. admin ) {
233- flags |= UserFlags :: IS_SYNAPSE_ADMIN ;
234- }
235- if bool:: from ( user. deactivated ) {
236- flags |= UserFlags :: IS_DEACTIVATED ;
237- }
238- if bool:: from ( user. is_guest ) {
239- flags |= UserFlags :: IS_GUEST ;
240- }
232+ while let Some ( user) = rx. recv ( ) . await {
233+ let ( mas_user, mas_password_opt) = transform_user ( & user, & state. server_name , & mut rng) ?;
234+
235+ let mut flags = UserFlags :: empty ( ) ;
236+ if bool:: from ( user. admin ) {
237+ flags |= UserFlags :: IS_SYNAPSE_ADMIN ;
238+ }
239+ if bool:: from ( user. deactivated ) {
240+ flags |= UserFlags :: IS_DEACTIVATED ;
241+ }
242+ if bool:: from ( user. is_guest ) {
243+ flags |= UserFlags :: IS_GUEST ;
244+ }
241245
242- state. users . insert (
243- CompactString :: new ( & mas_user. username ) ,
244- UserInfo {
245- mas_user_id : mas_user. user_id ,
246- flags,
247- } ,
248- ) ;
246+ user_buffer
247+ . write ( & mut mas, mas_user)
248+ . await
249+ . into_mas ( "writing user" ) ?;
250+
251+ if let Some ( mas_password) = mas_password_opt {
252+ password_buffer
253+ . write ( & mut mas, mas_password)
254+ . await
255+ . into_mas ( "writing password" ) ?;
256+ }
257+ }
249258
250259 user_buffer
251- . write ( mas, mas_user)
260+ . finish ( & mut mas)
261+ . await
262+ . into_mas ( "writing users" ) ?;
263+ password_buffer
264+ . finish ( & mut mas)
252265 . await
253- . into_mas ( "writing user " ) ?;
266+ . into_mas ( "writing passwords " ) ?;
254267
255- if let Some ( mas_password ) = mas_password_opt {
256- password_buffer
257- . write ( mas , mas_password )
258- . await
259- . into_mas ( "writing password ") ?;
260- }
268+ Ok ( ( state , mas ) )
269+ } ) ;
270+
271+ while let Some ( user_res ) = users_stream . next ( ) . await {
272+ let user = user_res . into_synapse ( "reading user ") ?;
273+ tx . send ( user ) . await . expect ( "failed to send in channel" ) ;
261274 }
262275
263- user_buffer. finish ( mas) . await . into_mas ( "writing users" ) ?;
264- password_buffer
265- . finish ( mas)
266- . await
267- . into_mas ( "writing passwords" ) ?;
276+ let ( state, mas) = task. await . expect ( "task panicked" ) ?;
268277
269278 info ! (
270279 "users migrated in {:.1}s" ,
271280 Instant :: now( ) . duration_since( start) . as_secs_f64( )
272281 ) ;
273282
274- Ok ( ( ) )
283+ Ok ( ( state , mas ) )
275284}
276285
277286#[ tracing:: instrument( skip_all, fields( indicatif. pb_show) , level = Level :: INFO ) ]
0 commit comments