Skip to content

Commit 33b521f

Browse files
committed
Add MasWriter support for compat sessions
1 parent 4731d3c commit 33b521f

6 files changed

+286
-26
lines changed

crates/syn2mas/.sqlx/query-396c97dbfbc932c73301daa7376e36f4ef03e1224a0a89a921e5a3d398a5d35c.json

Lines changed: 22 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/syn2mas/src/mas_writer/mod.rs

Lines changed: 131 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
//!
88
//! This module is responsible for writing new records to MAS' database.
99
10-
use std::fmt::Display;
10+
use std::{fmt::Display, net::IpAddr};
1111

1212
use chrono::{DateTime, Utc};
1313
use futures_util::{future::BoxFuture, FutureExt, TryStreamExt};
@@ -230,6 +230,18 @@ pub struct MasNewUpstreamOauthLink {
230230
pub created_at: DateTime<Utc>,
231231
}
232232

233+
pub struct MasNewCompatSession {
234+
pub session_id: Uuid,
235+
pub user_id: Uuid,
236+
pub device_id: String,
237+
pub human_name: Option<String>,
238+
pub created_at: DateTime<Utc>,
239+
pub is_synapse_admin: bool,
240+
pub last_active_at: Option<DateTime<Utc>>,
241+
pub last_active_ip: Option<IpAddr>,
242+
pub user_agent: Option<String>,
243+
}
244+
233245
/// The 'version' of the password hashing scheme used for passwords when they
234246
/// are migrated from Synapse to MAS.
235247
/// This is version 1, as in the previous syn2mas script.
@@ -761,6 +773,85 @@ impl<'conn> MasWriter<'conn> {
761773
})
762774
}).boxed()
763775
}
776+
777+
#[tracing::instrument(skip_all, level = Level::DEBUG)]
778+
pub fn write_compat_sessions(
779+
&mut self,
780+
sessions: Vec<MasNewCompatSession>,
781+
) -> BoxFuture<'_, Result<(), Error>> {
782+
self.writer_pool
783+
.spawn_with_connection(move |conn| {
784+
Box::pin(async move {
785+
let mut session_ids: Vec<Uuid> = Vec::with_capacity(sessions.len());
786+
let mut user_ids: Vec<Uuid> = Vec::with_capacity(sessions.len());
787+
let mut device_ids: Vec<String> = Vec::with_capacity(sessions.len());
788+
let mut human_names: Vec<Option<String>> = Vec::with_capacity(sessions.len());
789+
let mut created_ats: Vec<DateTime<Utc>> = Vec::with_capacity(sessions.len());
790+
let mut is_synapse_admins: Vec<bool> = Vec::with_capacity(sessions.len());
791+
let mut last_active_ats: Vec<Option<DateTime<Utc>>> =
792+
Vec::with_capacity(sessions.len());
793+
let mut last_active_ips: Vec<Option<IpAddr>> =
794+
Vec::with_capacity(sessions.len());
795+
let mut user_agents: Vec<Option<String>> = Vec::with_capacity(sessions.len());
796+
797+
for MasNewCompatSession {
798+
session_id,
799+
user_id,
800+
device_id,
801+
human_name,
802+
created_at,
803+
is_synapse_admin,
804+
last_active_at,
805+
last_active_ip,
806+
user_agent,
807+
} in sessions
808+
{
809+
session_ids.push(session_id);
810+
user_ids.push(user_id);
811+
device_ids.push(device_id);
812+
human_names.push(human_name);
813+
created_ats.push(created_at);
814+
is_synapse_admins.push(is_synapse_admin);
815+
last_active_ats.push(last_active_at);
816+
last_active_ips.push(last_active_ip);
817+
user_agents.push(user_agent);
818+
}
819+
820+
sqlx::query!(
821+
r#"
822+
INSERT INTO syn2mas__compat_sessions (
823+
compat_session_id, user_id,
824+
device_id, human_name,
825+
created_at, is_synapse_admin,
826+
last_active_at, last_active_ip,
827+
user_agent)
828+
SELECT * FROM UNNEST(
829+
$1::UUID[], $2::UUID[],
830+
$3::TEXT[], $4::TEXT[],
831+
$5::TIMESTAMP WITH TIME ZONE[], $6::BOOLEAN[],
832+
$7::TIMESTAMP WITH TIME ZONE[], $8::INET[],
833+
$9::TEXT[])
834+
"#,
835+
&session_ids[..],
836+
&user_ids[..],
837+
&device_ids[..],
838+
&human_names[..] as &[Option<String>],
839+
&created_ats[..],
840+
&is_synapse_admins[..],
841+
// We need to override the typing for arrays of optionals (sqlx limitation)
842+
&last_active_ats[..] as &[Option<DateTime<Utc>>],
843+
&last_active_ips[..] as &[Option<IpAddr>],
844+
&user_agents[..] as &[Option<String>],
845+
)
846+
.execute(&mut *conn)
847+
.await
848+
.into_database("writing compat sessions to MAS")?;
849+
850+
Ok(())
851+
})
852+
})
853+
.boxed()
854+
}
764855
}
765856

766857
// How many entries to buffer at once, before writing a batch of rows to the
@@ -839,8 +930,8 @@ mod test {
839930

840931
use crate::{
841932
mas_writer::{
842-
MasNewEmailThreepid, MasNewUnsupportedThreepid, MasNewUpstreamOauthLink, MasNewUser,
843-
MasNewUserPassword,
933+
MasNewCompatSession, MasNewEmailThreepid, MasNewUnsupportedThreepid,
934+
MasNewUpstreamOauthLink, MasNewUser, MasNewUserPassword,
844935
},
845936
LockedMasDatabase, MasWriter,
846937
};
@@ -1105,4 +1196,41 @@ mod test {
11051196

11061197
assert_db_snapshot!(&mut conn);
11071198
}
1199+
1200+
/// Tests writing a single user, with a device (compat session).
1201+
#[sqlx::test(migrator = "mas_storage_pg::MIGRATOR")]
1202+
async fn test_write_user_with_device(pool: PgPool) {
1203+
let mut conn = pool.acquire().await.unwrap();
1204+
let mut writer = make_mas_writer(&pool, &mut conn).await;
1205+
1206+
writer
1207+
.write_users(vec![MasNewUser {
1208+
user_id: Uuid::from_u128(1u128),
1209+
username: "alice".to_owned(),
1210+
created_at: DateTime::default(),
1211+
locked_at: None,
1212+
can_request_admin: false,
1213+
}])
1214+
.await
1215+
.expect("failed to write user");
1216+
1217+
writer
1218+
.write_compat_sessions(vec![MasNewCompatSession {
1219+
user_id: Uuid::from_u128(1u128),
1220+
session_id: Uuid::from_u128(5u128),
1221+
created_at: DateTime::default(),
1222+
device_id: "ADEVICE".to_owned(),
1223+
human_name: Some("alice's pinephone".to_owned()),
1224+
is_synapse_admin: true,
1225+
last_active_at: Some(DateTime::default()),
1226+
last_active_ip: Some("203.0.113.1".parse().unwrap()),
1227+
user_agent: Some("Browser/5.0".to_owned()),
1228+
}])
1229+
.await
1230+
.expect("failed to write link");
1231+
1232+
writer.finish().await.expect("failed to finish MasWriter");
1233+
1234+
assert_db_snapshot!(&mut conn);
1235+
}
11081236
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
---
2+
source: crates/syn2mas/src/mas_writer/mod.rs
3+
expression: db_snapshot
4+
---
5+
compat_sessions:
6+
- compat_session_id: 00000000-0000-0000-0000-000000000005
7+
created_at: "1970-01-01 00:00:00+00"
8+
device_id: ADEVICE
9+
finished_at: ~
10+
human_name: "alice's pinephone"
11+
is_synapse_admin: "true"
12+
last_active_at: "1970-01-01 00:00:00+00"
13+
last_active_ip: 203.0.113.1/32
14+
user_agent: Browser/5.0
15+
user_id: 00000000-0000-0000-0000-000000000001
16+
user_session_id: ~
17+
users:
18+
- can_request_admin: "false"
19+
created_at: "1970-01-01 00:00:00+00"
20+
locked_at: ~
21+
primary_user_email_id: ~
22+
user_id: 00000000-0000-0000-0000-000000000001
23+
username: alice

crates/syn2mas/src/mas_writer/syn2mas_revert_temporary_tables.sql

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,6 @@ ALTER TABLE syn2mas__user_passwords RENAME TO user_passwords;
1313
ALTER TABLE syn2mas__user_emails RENAME TO user_emails;
1414
ALTER TABLE syn2mas__user_unsupported_third_party_ids RENAME TO user_unsupported_third_party_ids;
1515
ALTER TABLE syn2mas__upstream_oauth_links RENAME TO upstream_oauth_links;
16+
ALTER TABLE syn2mas__compat_sessions RENAME TO compat_sessions;
17+
ALTER TABLE syn2mas__compat_access_tokens RENAME TO compat_access_tokens;
18+
ALTER TABLE syn2mas__compat_refresh_tokens RENAME TO compat_refresh_tokens;

crates/syn2mas/src/mas_writer/syn2mas_temporary_tables.sql

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,6 @@ ALTER TABLE user_passwords RENAME TO syn2mas__user_passwords;
4242
ALTER TABLE user_emails RENAME TO syn2mas__user_emails;
4343
ALTER TABLE user_unsupported_third_party_ids RENAME TO syn2mas__user_unsupported_third_party_ids;
4444
ALTER TABLE upstream_oauth_links RENAME TO syn2mas__upstream_oauth_links;
45+
ALTER TABLE compat_sessions RENAME TO syn2mas__compat_sessions;
46+
ALTER TABLE compat_access_tokens RENAME TO syn2mas__compat_access_tokens;
47+
ALTER TABLE compat_refresh_tokens RENAME TO syn2mas__compat_refresh_tokens;

0 commit comments

Comments
 (0)