@@ -14,11 +14,12 @@ use mas_axum_utils::record_error;
14
14
use mas_data_model:: { CompatSession , CompatSsoLoginState , Device , SiteConfig , TokenType , User } ;
15
15
use mas_matrix:: HomeserverConnection ;
16
16
use mas_storage:: {
17
- BoxClock , BoxRepository , BoxRng , Clock , RepositoryAccess ,
17
+ BoxClock , BoxRepository , BoxRepositoryFactory , BoxRng , Clock , RepositoryAccess ,
18
18
compat:: {
19
19
CompatAccessTokenRepository , CompatRefreshTokenRepository , CompatSessionRepository ,
20
20
CompatSsoLoginRepository ,
21
21
} ,
22
+ queue:: { QueueJobRepositoryExt as _, SyncDevicesJob } ,
22
23
user:: { UserPasswordRepository , UserRepository } ,
23
24
} ;
24
25
use opentelemetry:: { Key , KeyValue , metrics:: Counter } ;
@@ -268,7 +269,7 @@ pub(crate) async fn post(
268
269
mut rng : BoxRng ,
269
270
clock : BoxClock ,
270
271
State ( password_manager) : State < PasswordManager > ,
271
- mut repo : BoxRepository ,
272
+ State ( repository_factory ) : State < BoxRepositoryFactory > ,
272
273
activity_tracker : BoundActivityTracker ,
273
274
State ( homeserver) : State < Arc < dyn HomeserverConnection > > ,
274
275
State ( site_config) : State < SiteConfig > ,
@@ -279,6 +280,7 @@ pub(crate) async fn post(
279
280
) -> Result < impl IntoResponse , RouteError > {
280
281
let user_agent = user_agent. map ( |ua| ua. as_str ( ) . to_owned ( ) ) ;
281
282
let login_type = input. credentials . login_type ( ) ;
283
+ let mut repo = repository_factory. create ( ) . await ?;
282
284
let ( mut session, user) = match ( password_manager. is_enabled ( ) , input. credentials ) {
283
285
(
284
286
true ,
@@ -301,15 +303,17 @@ pub(crate) async fn post(
301
303
}
302
304
} ;
303
305
306
+ // Try getting the localpart out of the MXID
307
+ let username = homeserver. localpart ( & user) . unwrap_or ( & user) ;
308
+
304
309
user_password_login (
305
310
& mut rng,
306
311
& clock,
307
312
& password_manager,
308
313
& limiter,
309
314
requester,
310
315
& mut repo,
311
- & homeserver,
312
- user,
316
+ username,
313
317
password,
314
318
input. device_id , // TODO check for validity
315
319
input. initial_device_display_name ,
@@ -322,7 +326,6 @@ pub(crate) async fn post(
322
326
& mut rng,
323
327
& clock,
324
328
& mut repo,
325
- & homeserver,
326
329
& token,
327
330
input. device_id ,
328
331
input. initial_device_display_name ,
@@ -368,12 +371,53 @@ pub(crate) async fn post(
368
371
None
369
372
} ;
370
373
374
+ // Ideally, we'd keep the lock whilst we actually create the device, but we
375
+ // really want to stop holding the transaction while we talk to the
376
+ // homeserver.
377
+ //
378
+ // In practice, this is fine, because:
379
+ // - the session exists after we commited the transaction, so a sync job won't
380
+ // try to delete it
381
+ // - we've acquired a lock on the user before creating the session, meaning
382
+ // we've made sure that sync jobs finished before we create the new session
383
+ // - we're in the read-commited isolation level, which means the sync will see
384
+ // what we've committed and won't try to delete the session once we release
385
+ // the lock
371
386
repo. save ( ) . await ?;
372
387
373
388
activity_tracker
374
389
. record_compat_session ( & clock, & session)
375
390
. await ;
376
391
392
+ // This session will have for sure the device on it, both methods create a
393
+ // device
394
+ let Some ( device) = & session. device else {
395
+ unreachable ! ( )
396
+ } ;
397
+
398
+ // Now we can create the device on the homeserver, without holding the
399
+ // transaction
400
+ if let Err ( err) = homeserver
401
+ . create_device ( & user_id, device. as_str ( ) , session. human_name . as_deref ( ) )
402
+ . await
403
+ {
404
+ // Something went wrong, let's end this session and schedule a device sync
405
+ let mut repo = repository_factory. create ( ) . await ?;
406
+ let session = repo. compat_session ( ) . finish ( & clock, session) . await ?;
407
+
408
+ repo. queue_job ( )
409
+ . schedule_job (
410
+ & mut rng,
411
+ & clock,
412
+ SyncDevicesJob :: new_for_id ( session. user_id ) ,
413
+ )
414
+ . await ?;
415
+
416
+ repo. save ( ) . await ?;
417
+
418
+ return Err ( RouteError :: ProvisionDeviceFailed ( err) ) ;
419
+ }
420
+
377
421
LOGIN_COUNTER . add (
378
422
1 ,
379
423
& [
@@ -395,7 +439,6 @@ async fn token_login(
395
439
rng : & mut ( dyn RngCore + Send ) ,
396
440
clock : & dyn Clock ,
397
441
repo : & mut BoxRepository ,
398
- homeserver : & dyn HomeserverConnection ,
399
442
token : & str ,
400
443
requested_device_id : Option < String > ,
401
444
initial_device_display_name : Option < String > ,
@@ -461,7 +504,8 @@ async fn token_login(
461
504
return Err ( RouteError :: InvalidLoginToken ) ;
462
505
}
463
506
464
- // Lock the user sync to make sure we don't get into a race condition
507
+ // We're about to create a device, let's explicitly acquire a lock, so that
508
+ // any concurrent sync will read after we've committed
465
509
repo. user ( )
466
510
. acquire_lock_for_sync ( & browser_session. user )
467
511
. await ?;
@@ -471,20 +515,14 @@ async fn token_login(
471
515
} else {
472
516
Device :: generate ( rng)
473
517
} ;
474
- let mxid = homeserver. mxid ( & browser_session. user . username ) ;
475
- homeserver
476
- . create_device (
477
- & mxid,
478
- device. as_str ( ) ,
479
- initial_device_display_name. as_deref ( ) ,
480
- )
481
- . await
482
- . map_err ( RouteError :: ProvisionDeviceFailed ) ?;
483
518
484
519
repo. app_session ( )
485
520
. finish_sessions_to_replace_device ( clock, & browser_session. user , & device)
486
521
. await ?;
487
522
523
+ // We first create the session in the database, commit the transaction, then
524
+ // create it on the homeserver, scheduling a device sync job afterwards to
525
+ // make sure we don't end up in an inconsistent state.
488
526
let compat_session = repo
489
527
. compat_session ( )
490
528
. add (
@@ -512,15 +550,11 @@ async fn user_password_login(
512
550
limiter : & Limiter ,
513
551
requester : RequesterFingerprint ,
514
552
repo : & mut BoxRepository ,
515
- homeserver : & dyn HomeserverConnection ,
516
- username : String ,
553
+ username : & str ,
517
554
password : String ,
518
555
requested_device_id : Option < String > ,
519
556
initial_device_display_name : Option < String > ,
520
557
) -> Result < ( CompatSession , User ) , RouteError > {
521
- // Try getting the localpart out of the MXID
522
- let username = homeserver. localpart ( & username) . unwrap_or ( & username) ;
523
-
524
558
// Find the user
525
559
let user = repo
526
560
. user ( )
@@ -566,25 +600,16 @@ async fn user_password_login(
566
600
. await ?;
567
601
}
568
602
569
- // Lock the user sync to make sure we don't get into a race condition
603
+ // We're about to create a device, let's explicitly acquire a lock, so that
604
+ // any concurrent sync will read after we've committed
570
605
repo. user ( ) . acquire_lock_for_sync ( & user) . await ?;
571
606
572
- let mxid = homeserver. mxid ( & user. username ) ;
573
-
574
607
// Now that the user credentials have been verified, start a new compat session
575
608
let device = if let Some ( requested_device_id) = requested_device_id {
576
609
Device :: from ( requested_device_id)
577
610
} else {
578
611
Device :: generate ( & mut rng)
579
612
} ;
580
- homeserver
581
- . create_device (
582
- & mxid,
583
- device. as_str ( ) ,
584
- initial_device_display_name. as_deref ( ) ,
585
- )
586
- . await
587
- . map_err ( RouteError :: ProvisionDeviceFailed ) ?;
588
613
589
614
repo. app_session ( )
590
615
. finish_sessions_to_replace_device ( clock, & user, & device)
0 commit comments