|
7 | 7 | //!
|
8 | 8 | //! This module is responsible for writing new records to MAS' database.
|
9 | 9 |
|
10 |
| -use std::fmt::Display; |
| 10 | +use std::{fmt::Display, net::IpAddr}; |
11 | 11 |
|
12 | 12 | use chrono::{DateTime, Utc};
|
13 | 13 | use futures_util::{future::BoxFuture, FutureExt, TryStreamExt};
|
@@ -230,6 +230,18 @@ pub struct MasNewUpstreamOauthLink {
|
230 | 230 | pub created_at: DateTime<Utc>,
|
231 | 231 | }
|
232 | 232 |
|
| 233 | +pub struct MasNewCompatSession { |
| 234 | + pub session_id: Uuid, |
| 235 | + pub user_id: Uuid, |
| 236 | + pub device_id: String, |
| 237 | + pub human_name: Option<String>, |
| 238 | + pub created_at: DateTime<Utc>, |
| 239 | + pub is_synapse_admin: bool, |
| 240 | + pub last_active_at: Option<DateTime<Utc>>, |
| 241 | + pub last_active_ip: Option<IpAddr>, |
| 242 | + pub user_agent: Option<String>, |
| 243 | +} |
| 244 | + |
233 | 245 | /// The 'version' of the password hashing scheme used for passwords when they
|
234 | 246 | /// are migrated from Synapse to MAS.
|
235 | 247 | /// This is version 1, as in the previous syn2mas script.
|
@@ -761,6 +773,85 @@ impl<'conn> MasWriter<'conn> {
|
761 | 773 | })
|
762 | 774 | }).boxed()
|
763 | 775 | }
|
| 776 | + |
| 777 | + #[tracing::instrument(skip_all, level = Level::DEBUG)] |
| 778 | + pub fn write_compat_sessions( |
| 779 | + &mut self, |
| 780 | + sessions: Vec<MasNewCompatSession>, |
| 781 | + ) -> BoxFuture<'_, Result<(), Error>> { |
| 782 | + self.writer_pool |
| 783 | + .spawn_with_connection(move |conn| { |
| 784 | + Box::pin(async move { |
| 785 | + let mut session_ids: Vec<Uuid> = Vec::with_capacity(sessions.len()); |
| 786 | + let mut user_ids: Vec<Uuid> = Vec::with_capacity(sessions.len()); |
| 787 | + let mut device_ids: Vec<String> = Vec::with_capacity(sessions.len()); |
| 788 | + let mut human_names: Vec<Option<String>> = Vec::with_capacity(sessions.len()); |
| 789 | + let mut created_ats: Vec<DateTime<Utc>> = Vec::with_capacity(sessions.len()); |
| 790 | + let mut is_synapse_admins: Vec<bool> = Vec::with_capacity(sessions.len()); |
| 791 | + let mut last_active_ats: Vec<Option<DateTime<Utc>>> = |
| 792 | + Vec::with_capacity(sessions.len()); |
| 793 | + let mut last_active_ips: Vec<Option<IpAddr>> = |
| 794 | + Vec::with_capacity(sessions.len()); |
| 795 | + let mut user_agents: Vec<Option<String>> = Vec::with_capacity(sessions.len()); |
| 796 | + |
| 797 | + for MasNewCompatSession { |
| 798 | + session_id, |
| 799 | + user_id, |
| 800 | + device_id, |
| 801 | + human_name, |
| 802 | + created_at, |
| 803 | + is_synapse_admin, |
| 804 | + last_active_at, |
| 805 | + last_active_ip, |
| 806 | + user_agent, |
| 807 | + } in sessions |
| 808 | + { |
| 809 | + session_ids.push(session_id); |
| 810 | + user_ids.push(user_id); |
| 811 | + device_ids.push(device_id); |
| 812 | + human_names.push(human_name); |
| 813 | + created_ats.push(created_at); |
| 814 | + is_synapse_admins.push(is_synapse_admin); |
| 815 | + last_active_ats.push(last_active_at); |
| 816 | + last_active_ips.push(last_active_ip); |
| 817 | + user_agents.push(user_agent); |
| 818 | + } |
| 819 | + |
| 820 | + sqlx::query!( |
| 821 | + r#" |
| 822 | + INSERT INTO syn2mas__compat_sessions ( |
| 823 | + compat_session_id, user_id, |
| 824 | + device_id, human_name, |
| 825 | + created_at, is_synapse_admin, |
| 826 | + last_active_at, last_active_ip, |
| 827 | + user_agent) |
| 828 | + SELECT * FROM UNNEST( |
| 829 | + $1::UUID[], $2::UUID[], |
| 830 | + $3::TEXT[], $4::TEXT[], |
| 831 | + $5::TIMESTAMP WITH TIME ZONE[], $6::BOOLEAN[], |
| 832 | + $7::TIMESTAMP WITH TIME ZONE[], $8::INET[], |
| 833 | + $9::TEXT[]) |
| 834 | + "#, |
| 835 | + &session_ids[..], |
| 836 | + &user_ids[..], |
| 837 | + &device_ids[..], |
| 838 | + &human_names[..] as &[Option<String>], |
| 839 | + &created_ats[..], |
| 840 | + &is_synapse_admins[..], |
| 841 | + // We need to override the typing for arrays of optionals (sqlx limitation) |
| 842 | + &last_active_ats[..] as &[Option<DateTime<Utc>>], |
| 843 | + &last_active_ips[..] as &[Option<IpAddr>], |
| 844 | + &user_agents[..] as &[Option<String>], |
| 845 | + ) |
| 846 | + .execute(&mut *conn) |
| 847 | + .await |
| 848 | + .into_database("writing compat sessions to MAS")?; |
| 849 | + |
| 850 | + Ok(()) |
| 851 | + }) |
| 852 | + }) |
| 853 | + .boxed() |
| 854 | + } |
764 | 855 | }
|
765 | 856 |
|
766 | 857 | // How many entries to buffer at once, before writing a batch of rows to the
|
@@ -839,8 +930,8 @@ mod test {
|
839 | 930 |
|
840 | 931 | use crate::{
|
841 | 932 | mas_writer::{
|
842 |
| - MasNewEmailThreepid, MasNewUnsupportedThreepid, MasNewUpstreamOauthLink, MasNewUser, |
843 |
| - MasNewUserPassword, |
| 933 | + MasNewCompatSession, MasNewEmailThreepid, MasNewUnsupportedThreepid, |
| 934 | + MasNewUpstreamOauthLink, MasNewUser, MasNewUserPassword, |
844 | 935 | },
|
845 | 936 | LockedMasDatabase, MasWriter,
|
846 | 937 | };
|
@@ -1105,4 +1196,41 @@ mod test {
|
1105 | 1196 |
|
1106 | 1197 | assert_db_snapshot!(&mut conn);
|
1107 | 1198 | }
|
| 1199 | + |
| 1200 | + /// Tests writing a single user, with a device (compat session). |
| 1201 | + #[sqlx::test(migrator = "mas_storage_pg::MIGRATOR")] |
| 1202 | + async fn test_write_user_with_device(pool: PgPool) { |
| 1203 | + let mut conn = pool.acquire().await.unwrap(); |
| 1204 | + let mut writer = make_mas_writer(&pool, &mut conn).await; |
| 1205 | + |
| 1206 | + writer |
| 1207 | + .write_users(vec![MasNewUser { |
| 1208 | + user_id: Uuid::from_u128(1u128), |
| 1209 | + username: "alice".to_owned(), |
| 1210 | + created_at: DateTime::default(), |
| 1211 | + locked_at: None, |
| 1212 | + can_request_admin: false, |
| 1213 | + }]) |
| 1214 | + .await |
| 1215 | + .expect("failed to write user"); |
| 1216 | + |
| 1217 | + writer |
| 1218 | + .write_compat_sessions(vec![MasNewCompatSession { |
| 1219 | + user_id: Uuid::from_u128(1u128), |
| 1220 | + session_id: Uuid::from_u128(5u128), |
| 1221 | + created_at: DateTime::default(), |
| 1222 | + device_id: "ADEVICE".to_owned(), |
| 1223 | + human_name: Some("alice's pinephone".to_owned()), |
| 1224 | + is_synapse_admin: true, |
| 1225 | + last_active_at: Some(DateTime::default()), |
| 1226 | + last_active_ip: Some("203.0.113.1".parse().unwrap()), |
| 1227 | + user_agent: Some("Browser/5.0".to_owned()), |
| 1228 | + }]) |
| 1229 | + .await |
| 1230 | + .expect("failed to write link"); |
| 1231 | + |
| 1232 | + writer.finish().await.expect("failed to finish MasWriter"); |
| 1233 | + |
| 1234 | + assert_db_snapshot!(&mut conn); |
| 1235 | + } |
1108 | 1236 | }
|
0 commit comments