Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 14 additions & 142 deletions crates/syn2mas/src/mas_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ impl<'conn> MasWriter<'conn> {
/// - If the database writer connection pool had an error.
#[allow(clippy::missing_panics_doc)] // not a real panic
#[tracing::instrument(skip_all, level = Level::DEBUG)]
pub async fn write_users(&mut self, users: Vec<MasNewUser>) -> Result<(), Error> {
pub fn write_users(&mut self, users: Vec<MasNewUser>) -> BoxFuture<'_, Result<(), Error>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reasoning with boxing here, and why not -> impl Future?

self.writer_pool.spawn_with_connection(move |conn| Box::pin(async move {
// `UNNEST` is a fast way to do bulk inserts, as it lets us send multiple rows in one statement
// without having to change the statement SQL thus altering the query plan.
Expand Down Expand Up @@ -577,7 +577,7 @@ impl<'conn> MasWriter<'conn> {
).execute(&mut *conn).await.into_database("writing users to MAS")?;

Ok(())
})).await
})).boxed()
}

/// Write a batch of user passwords to the database.
Expand All @@ -589,14 +589,10 @@ impl<'conn> MasWriter<'conn> {
/// - If the database writer connection pool had an error.
#[allow(clippy::missing_panics_doc)] // not a real panic
#[tracing::instrument(skip_all, level = Level::DEBUG)]
pub async fn write_passwords(
pub fn write_passwords(
&mut self,
passwords: Vec<MasNewUserPassword>,
) -> Result<(), Error> {
if passwords.is_empty() {
return Ok(());
}

) -> BoxFuture<'_, Result<(), Error>> {
self.writer_pool.spawn_with_connection(move |conn| Box::pin(async move {
let mut user_password_ids: Vec<Uuid> = Vec::with_capacity(passwords.len());
let mut user_ids: Vec<Uuid> = Vec::with_capacity(passwords.len());
Expand Down Expand Up @@ -631,17 +627,14 @@ impl<'conn> MasWriter<'conn> {
).execute(&mut *conn).await.into_database("writing users to MAS")?;

Ok(())
})).await
})).boxed()
}

#[tracing::instrument(skip_all, level = Level::DEBUG)]
pub async fn write_email_threepids(
pub fn write_email_threepids(
&mut self,
threepids: Vec<MasNewEmailThreepid>,
) -> Result<(), Error> {
if threepids.is_empty() {
return Ok(());
}
) -> BoxFuture<'_, Result<(), Error>> {
self.writer_pool.spawn_with_connection(move |conn| {
Box::pin(async move {
let mut user_email_ids: Vec<Uuid> = Vec::with_capacity(threepids.len());
Expand Down Expand Up @@ -678,17 +671,14 @@ impl<'conn> MasWriter<'conn> {

Ok(())
})
}).await
}).boxed()
}

#[tracing::instrument(skip_all, level = Level::DEBUG)]
pub async fn write_unsupported_threepids(
pub fn write_unsupported_threepids(
&mut self,
threepids: Vec<MasNewUnsupportedThreepid>,
) -> Result<(), Error> {
if threepids.is_empty() {
return Ok(());
}
) -> BoxFuture<'_, Result<(), Error>> {
self.writer_pool.spawn_with_connection(move |conn| {
Box::pin(async move {
let mut user_ids: Vec<Uuid> = Vec::with_capacity(threepids.len());
Expand Down Expand Up @@ -723,17 +713,14 @@ impl<'conn> MasWriter<'conn> {

Ok(())
})
}).await
}).boxed()
}

#[tracing::instrument(skip_all, level = Level::DEBUG)]
pub fn write_upstream_oauth_links(
&mut self,
links: Vec<MasNewUpstreamOauthLink>,
) -> BoxFuture<'_, Result<(), Error>> {
if links.is_empty() {
return async { Ok(()) }.boxed();
}
self.writer_pool.spawn_with_connection(move |conn| {
Box::pin(async move {
let mut link_ids: Vec<Uuid> = Vec::with_capacity(links.len());
Expand Down Expand Up @@ -783,124 +770,6 @@ impl<'conn> MasWriter<'conn> {
// stream to two tables at once...)
const WRITE_BUFFER_BATCH_SIZE: usize = 4096;

// TODO replace with just `MasWriteBuffer`
pub struct MasUserWriteBuffer<'writer, 'conn> {
users: Vec<MasNewUser>,
passwords: Vec<MasNewUserPassword>,
writer: &'writer mut MasWriter<'conn>,
}

impl<'writer, 'conn> MasUserWriteBuffer<'writer, 'conn> {
pub fn new(writer: &'writer mut MasWriter<'conn>) -> Self {
MasUserWriteBuffer {
users: Vec::with_capacity(WRITE_BUFFER_BATCH_SIZE),
passwords: Vec::with_capacity(WRITE_BUFFER_BATCH_SIZE),
writer,
}
}

pub async fn finish(mut self) -> Result<(), Error> {
self.flush_users().await?;
self.flush_passwords().await?;
Ok(())
}

pub async fn flush_users(&mut self) -> Result<(), Error> {
// via copy: 13s
// not via copy: 14s
// difference probably gets worse with latency
self.writer
.write_users(std::mem::take(&mut self.users))
.await?;

self.users.reserve_exact(WRITE_BUFFER_BATCH_SIZE);
Ok(())
}

pub async fn flush_passwords(&mut self) -> Result<(), Error> {
self.writer
.write_passwords(std::mem::take(&mut self.passwords))
.await?;
self.passwords.reserve_exact(WRITE_BUFFER_BATCH_SIZE);

Ok(())
}

pub async fn write_user(&mut self, user: MasNewUser) -> Result<(), Error> {
self.users.push(user);
if self.users.len() >= WRITE_BUFFER_BATCH_SIZE {
self.flush_users().await?;
}
Ok(())
}

pub async fn write_password(&mut self, password: MasNewUserPassword) -> Result<(), Error> {
self.passwords.push(password);
if self.passwords.len() >= WRITE_BUFFER_BATCH_SIZE {
self.flush_passwords().await?;
}
Ok(())
}
}

// TODO replace with just `MasWriteBuffer`
pub struct MasThreepidWriteBuffer<'writer, 'conn> {
email: Vec<MasNewEmailThreepid>,
unsupported: Vec<MasNewUnsupportedThreepid>,
writer: &'writer mut MasWriter<'conn>,
}

impl<'writer, 'conn> MasThreepidWriteBuffer<'writer, 'conn> {
pub fn new(writer: &'writer mut MasWriter<'conn>) -> Self {
MasThreepidWriteBuffer {
email: Vec::with_capacity(WRITE_BUFFER_BATCH_SIZE),
unsupported: Vec::with_capacity(WRITE_BUFFER_BATCH_SIZE),
writer,
}
}

pub async fn finish(mut self) -> Result<(), Error> {
self.flush_emails().await?;
self.flush_unsupported().await?;
Ok(())
}

pub async fn flush_emails(&mut self) -> Result<(), Error> {
self.writer
.write_email_threepids(std::mem::take(&mut self.email))
.await?;
self.email.reserve_exact(WRITE_BUFFER_BATCH_SIZE);
Ok(())
}

pub async fn flush_unsupported(&mut self) -> Result<(), Error> {
self.writer
.write_unsupported_threepids(std::mem::take(&mut self.unsupported))
.await?;
self.unsupported.reserve_exact(WRITE_BUFFER_BATCH_SIZE);
Ok(())
}

pub async fn write_email(&mut self, user: MasNewEmailThreepid) -> Result<(), Error> {
self.email.push(user);
if self.email.len() >= WRITE_BUFFER_BATCH_SIZE {
self.flush_emails().await?;
}
Ok(())
}

pub async fn write_password(
&mut self,
unsupported: MasNewUnsupportedThreepid,
) -> Result<(), Error> {
self.unsupported.push(unsupported);
if self.unsupported.len() >= WRITE_BUFFER_BATCH_SIZE {
self.flush_unsupported().await?;
}
Ok(())
}
}

/// A function that can accept and flush buffers from a `MasWriteBuffer`.
/// Intended uses are the methods on `MasWriter` such as `write_users`.
type WriteBufferFlusher<'conn, T> =
Expand Down Expand Up @@ -934,6 +803,9 @@ impl<'conn, T> MasWriteBuffer<'conn, T> {
}

pub async fn flush(&mut self, writer: &mut MasWriter<'conn>) -> Result<(), Error> {
if self.rows.is_empty() {
return Ok(());
}
let rows = std::mem::take(&mut self.rows);
self.rows.reserve_exact(WRITE_BUFFER_BATCH_SIZE);
(self.flusher)(writer, rows).await?;
Expand Down
72 changes: 44 additions & 28 deletions crates/syn2mas/src/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use uuid::Uuid;
use crate::{
mas_writer::{
self, MasNewEmailThreepid, MasNewUnsupportedThreepid, MasNewUpstreamOauthLink, MasNewUser,
MasNewUserPassword, MasThreepidWriteBuffer, MasUserWriteBuffer, MasWriteBuffer, MasWriter,
MasNewUserPassword, MasWriteBuffer, MasWriter,
},
synapse_reader::{
self, ExtractLocalpartError, FullUserId, SynapseExternalId, SynapseThreepid, SynapseUser,
Expand Down Expand Up @@ -132,7 +132,8 @@ async fn migrate_users(
server_name: &str,
rng: &mut impl RngCore,
) -> Result<UsersMigrated, Error> {
let mut write_buffer = MasUserWriteBuffer::new(mas);
let mut user_buffer = MasWriteBuffer::new(MasWriter::write_users);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure how I feel about passing function pointers instead of using traits… I guess that works, not sure how it would look like with traits

let mut password_buffer = MasWriteBuffer::new(MasWriter::write_passwords);
let mut users_stream = pin!(synapse.read_users());
// TODO is 1:1 capacity enough for a hashmap?
let mut user_localparts_to_uuid = HashMap::with_capacity(user_count_hint);
Expand All @@ -143,23 +144,24 @@ async fn migrate_users(

user_localparts_to_uuid.insert(CompactString::new(&mas_user.username), mas_user.user_id);

write_buffer
.write_user(mas_user)
user_buffer
.write(mas, mas_user)
.await
.into_mas("writing user")?;

if let Some(mas_password) = mas_password_opt {
write_buffer
.write_password(mas_password)
password_buffer
.write(mas, mas_password)
.await
.into_mas("writing password")?;
}
}

write_buffer
.finish()
user_buffer.finish(mas).await.into_mas("writing users")?;
password_buffer
.finish(mas)
.await
.into_mas("writing users & passwords")?;
.into_mas("writing passwords")?;

Ok(UsersMigrated {
user_localparts_to_uuid,
Expand All @@ -174,7 +176,8 @@ async fn migrate_threepids(
rng: &mut impl RngCore,
user_localparts_to_uuid: &HashMap<CompactString, Uuid>,
) -> Result<(), Error> {
let mut write_buffer = MasThreepidWriteBuffer::new(mas);
let mut email_buffer = MasWriteBuffer::new(MasWriter::write_email_threepids);
let mut unsupported_buffer = MasWriteBuffer::new(MasWriter::write_unsupported_threepids);
let mut users_stream = pin!(synapse.read_threepids());

while let Some(threepid_res) = users_stream.next().await {
Expand All @@ -198,32 +201,45 @@ async fn migrate_threepids(
};

if medium == "email" {
write_buffer
.write_email(MasNewEmailThreepid {
user_id,
user_email_id: Uuid::from(Ulid::from_datetime_with_source(
created_at.into(),
rng,
)),
email: address,
created_at,
})
email_buffer
.write(
mas,
MasNewEmailThreepid {
user_id,
user_email_id: Uuid::from(Ulid::from_datetime_with_source(
created_at.into(),
rng,
)),
email: address,
created_at,
},
)
.await
.into_mas("writing email")?;
} else {
write_buffer
.write_password(MasNewUnsupportedThreepid {
user_id,
medium,
address,
created_at,
})
unsupported_buffer
.write(
mas,
MasNewUnsupportedThreepid {
user_id,
medium,
address,
created_at,
},
)
.await
.into_mas("writing unsupported threepid")?;
}
}

write_buffer.finish().await.into_mas("writing threepids")?;
email_buffer
.finish(mas)
.await
.into_mas("writing email threepids")?;
unsupported_buffer
.finish(mas)
.await
.into_mas("writing unsupported threepids")?;

Ok(())
}
Expand Down