Skip to content

Commit 0bcfc03

Browse files
committed
Remove special-purpose write buffers and use only the generic one
1 parent fa52f7f commit 0bcfc03

File tree

2 files changed

+58
-170
lines changed

2 files changed

+58
-170
lines changed

crates/syn2mas/src/mas_writer/mod.rs

Lines changed: 14 additions & 142 deletions
Original file line numberDiff line numberDiff line change
@@ -531,7 +531,7 @@ impl<'conn> MasWriter<'conn> {
531531
/// - If the database writer connection pool had an error.
532532
#[allow(clippy::missing_panics_doc)] // not a real panic
533533
#[tracing::instrument(skip_all, level = Level::DEBUG)]
534-
pub async fn write_users(&mut self, users: Vec<MasNewUser>) -> Result<(), Error> {
534+
pub fn write_users(&mut self, users: Vec<MasNewUser>) -> BoxFuture<'_, Result<(), Error>> {
535535
self.writer_pool.spawn_with_connection(move |conn| Box::pin(async move {
536536
// `UNNEST` is a fast way to do bulk inserts, as it lets us send multiple rows in one statement
537537
// without having to change the statement SQL thus altering the query plan.
@@ -577,7 +577,7 @@ impl<'conn> MasWriter<'conn> {
577577
).execute(&mut *conn).await.into_database("writing users to MAS")?;
578578

579579
Ok(())
580-
})).await
580+
})).boxed()
581581
}
582582

583583
/// Write a batch of user passwords to the database.
@@ -589,14 +589,10 @@ impl<'conn> MasWriter<'conn> {
589589
/// - If the database writer connection pool had an error.
590590
#[allow(clippy::missing_panics_doc)] // not a real panic
591591
#[tracing::instrument(skip_all, level = Level::DEBUG)]
592-
pub async fn write_passwords(
592+
pub fn write_passwords(
593593
&mut self,
594594
passwords: Vec<MasNewUserPassword>,
595-
) -> Result<(), Error> {
596-
if passwords.is_empty() {
597-
return Ok(());
598-
}
599-
595+
) -> BoxFuture<'_, Result<(), Error>> {
600596
self.writer_pool.spawn_with_connection(move |conn| Box::pin(async move {
601597
let mut user_password_ids: Vec<Uuid> = Vec::with_capacity(passwords.len());
602598
let mut user_ids: Vec<Uuid> = Vec::with_capacity(passwords.len());
@@ -631,17 +627,14 @@ impl<'conn> MasWriter<'conn> {
631627
).execute(&mut *conn).await.into_database("writing users to MAS")?;
632628

633629
Ok(())
634-
})).await
630+
})).boxed()
635631
}
636632

637633
#[tracing::instrument(skip_all, level = Level::DEBUG)]
638-
pub async fn write_email_threepids(
634+
pub fn write_email_threepids(
639635
&mut self,
640636
threepids: Vec<MasNewEmailThreepid>,
641-
) -> Result<(), Error> {
642-
if threepids.is_empty() {
643-
return Ok(());
644-
}
637+
) -> BoxFuture<'_, Result<(), Error>> {
645638
self.writer_pool.spawn_with_connection(move |conn| {
646639
Box::pin(async move {
647640
let mut user_email_ids: Vec<Uuid> = Vec::with_capacity(threepids.len());
@@ -678,17 +671,14 @@ impl<'conn> MasWriter<'conn> {
678671

679672
Ok(())
680673
})
681-
}).await
674+
}).boxed()
682675
}
683676

684677
#[tracing::instrument(skip_all, level = Level::DEBUG)]
685-
pub async fn write_unsupported_threepids(
678+
pub fn write_unsupported_threepids(
686679
&mut self,
687680
threepids: Vec<MasNewUnsupportedThreepid>,
688-
) -> Result<(), Error> {
689-
if threepids.is_empty() {
690-
return Ok(());
691-
}
681+
) -> BoxFuture<'_, Result<(), Error>> {
692682
self.writer_pool.spawn_with_connection(move |conn| {
693683
Box::pin(async move {
694684
let mut user_ids: Vec<Uuid> = Vec::with_capacity(threepids.len());
@@ -723,17 +713,14 @@ impl<'conn> MasWriter<'conn> {
723713

724714
Ok(())
725715
})
726-
}).await
716+
}).boxed()
727717
}
728718

729719
#[tracing::instrument(skip_all, level = Level::DEBUG)]
730720
pub fn write_upstream_oauth_links(
731721
&mut self,
732722
links: Vec<MasNewUpstreamOauthLink>,
733723
) -> BoxFuture<'_, Result<(), Error>> {
734-
if links.is_empty() {
735-
return async { Ok(()) }.boxed();
736-
}
737724
self.writer_pool.spawn_with_connection(move |conn| {
738725
Box::pin(async move {
739726
let mut link_ids: Vec<Uuid> = Vec::with_capacity(links.len());
@@ -783,124 +770,6 @@ impl<'conn> MasWriter<'conn> {
783770
// stream to two tables at once...)
784771
const WRITE_BUFFER_BATCH_SIZE: usize = 4096;
785772

786-
// TODO replace with just `MasWriteBuffer`
787-
pub struct MasUserWriteBuffer<'writer, 'conn> {
788-
users: Vec<MasNewUser>,
789-
passwords: Vec<MasNewUserPassword>,
790-
writer: &'writer mut MasWriter<'conn>,
791-
}
792-
793-
impl<'writer, 'conn> MasUserWriteBuffer<'writer, 'conn> {
794-
pub fn new(writer: &'writer mut MasWriter<'conn>) -> Self {
795-
MasUserWriteBuffer {
796-
users: Vec::with_capacity(WRITE_BUFFER_BATCH_SIZE),
797-
passwords: Vec::with_capacity(WRITE_BUFFER_BATCH_SIZE),
798-
writer,
799-
}
800-
}
801-
802-
pub async fn finish(mut self) -> Result<(), Error> {
803-
self.flush_users().await?;
804-
self.flush_passwords().await?;
805-
Ok(())
806-
}
807-
808-
pub async fn flush_users(&mut self) -> Result<(), Error> {
809-
// via copy: 13s
810-
// not via copy: 14s
811-
// difference probably gets worse with latency
812-
self.writer
813-
.write_users(std::mem::take(&mut self.users))
814-
.await?;
815-
816-
self.users.reserve_exact(WRITE_BUFFER_BATCH_SIZE);
817-
Ok(())
818-
}
819-
820-
pub async fn flush_passwords(&mut self) -> Result<(), Error> {
821-
self.writer
822-
.write_passwords(std::mem::take(&mut self.passwords))
823-
.await?;
824-
self.passwords.reserve_exact(WRITE_BUFFER_BATCH_SIZE);
825-
826-
Ok(())
827-
}
828-
829-
pub async fn write_user(&mut self, user: MasNewUser) -> Result<(), Error> {
830-
self.users.push(user);
831-
if self.users.len() >= WRITE_BUFFER_BATCH_SIZE {
832-
self.flush_users().await?;
833-
}
834-
Ok(())
835-
}
836-
837-
pub async fn write_password(&mut self, password: MasNewUserPassword) -> Result<(), Error> {
838-
self.passwords.push(password);
839-
if self.passwords.len() >= WRITE_BUFFER_BATCH_SIZE {
840-
self.flush_passwords().await?;
841-
}
842-
Ok(())
843-
}
844-
}
845-
846-
// TODO replace with just `MasWriteBuffer`
847-
pub struct MasThreepidWriteBuffer<'writer, 'conn> {
848-
email: Vec<MasNewEmailThreepid>,
849-
unsupported: Vec<MasNewUnsupportedThreepid>,
850-
writer: &'writer mut MasWriter<'conn>,
851-
}
852-
853-
impl<'writer, 'conn> MasThreepidWriteBuffer<'writer, 'conn> {
854-
pub fn new(writer: &'writer mut MasWriter<'conn>) -> Self {
855-
MasThreepidWriteBuffer {
856-
email: Vec::with_capacity(WRITE_BUFFER_BATCH_SIZE),
857-
unsupported: Vec::with_capacity(WRITE_BUFFER_BATCH_SIZE),
858-
writer,
859-
}
860-
}
861-
862-
pub async fn finish(mut self) -> Result<(), Error> {
863-
self.flush_emails().await?;
864-
self.flush_unsupported().await?;
865-
Ok(())
866-
}
867-
868-
pub async fn flush_emails(&mut self) -> Result<(), Error> {
869-
self.writer
870-
.write_email_threepids(std::mem::take(&mut self.email))
871-
.await?;
872-
self.email.reserve_exact(WRITE_BUFFER_BATCH_SIZE);
873-
Ok(())
874-
}
875-
876-
pub async fn flush_unsupported(&mut self) -> Result<(), Error> {
877-
self.writer
878-
.write_unsupported_threepids(std::mem::take(&mut self.unsupported))
879-
.await?;
880-
self.unsupported.reserve_exact(WRITE_BUFFER_BATCH_SIZE);
881-
Ok(())
882-
}
883-
884-
pub async fn write_email(&mut self, user: MasNewEmailThreepid) -> Result<(), Error> {
885-
self.email.push(user);
886-
if self.email.len() >= WRITE_BUFFER_BATCH_SIZE {
887-
self.flush_emails().await?;
888-
}
889-
Ok(())
890-
}
891-
892-
pub async fn write_password(
893-
&mut self,
894-
unsupported: MasNewUnsupportedThreepid,
895-
) -> Result<(), Error> {
896-
self.unsupported.push(unsupported);
897-
if self.unsupported.len() >= WRITE_BUFFER_BATCH_SIZE {
898-
self.flush_unsupported().await?;
899-
}
900-
Ok(())
901-
}
902-
}
903-
904773
/// A function that can accept and flush buffers from a `MasWriteBuffer`.
905774
/// Intended uses are the methods on `MasWriter` such as `write_users`.
906775
type WriteBufferFlusher<'conn, T> =
@@ -934,6 +803,9 @@ impl<'conn, T> MasWriteBuffer<'conn, T> {
934803
}
935804

936805
pub async fn flush(&mut self, writer: &mut MasWriter<'conn>) -> Result<(), Error> {
806+
if self.rows.is_empty() {
807+
return Ok(());
808+
}
937809
let rows = std::mem::take(&mut self.rows);
938810
self.rows.reserve_exact(WRITE_BUFFER_BATCH_SIZE);
939811
(self.flusher)(writer, rows).await?;

crates/syn2mas/src/migration.rs

Lines changed: 44 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use uuid::Uuid;
2626
use crate::{
2727
mas_writer::{
2828
self, MasNewEmailThreepid, MasNewUnsupportedThreepid, MasNewUpstreamOauthLink, MasNewUser,
29-
MasNewUserPassword, MasThreepidWriteBuffer, MasUserWriteBuffer, MasWriteBuffer, MasWriter,
29+
MasNewUserPassword, MasWriteBuffer, MasWriter,
3030
},
3131
synapse_reader::{
3232
self, ExtractLocalpartError, FullUserId, SynapseExternalId, SynapseThreepid, SynapseUser,
@@ -131,7 +131,8 @@ async fn migrate_users(
131131
server_name: &str,
132132
rng: &mut impl RngCore,
133133
) -> Result<UsersMigrated, Error> {
134-
let mut write_buffer = MasUserWriteBuffer::new(mas);
134+
let mut user_buffer = MasWriteBuffer::new(MasWriter::write_users);
135+
let mut password_buffer = MasWriteBuffer::new(MasWriter::write_passwords);
135136
let mut users_stream = pin!(synapse.read_users());
136137
// TODO is 1:1 capacity enough for a hashmap?
137138
let mut user_localparts_to_uuid = HashMap::with_capacity(user_count_hint);
@@ -142,23 +143,24 @@ async fn migrate_users(
142143

143144
user_localparts_to_uuid.insert(CompactString::new(&mas_user.username), mas_user.user_id);
144145

145-
write_buffer
146-
.write_user(mas_user)
146+
user_buffer
147+
.write(mas, mas_user)
147148
.await
148149
.into_mas("writing user")?;
149150

150151
if let Some(mas_password) = mas_password_opt {
151-
write_buffer
152-
.write_password(mas_password)
152+
password_buffer
153+
.write(mas, mas_password)
153154
.await
154155
.into_mas("writing password")?;
155156
}
156157
}
157158

158-
write_buffer
159-
.finish()
159+
user_buffer.finish(mas).await.into_mas("writing users")?;
160+
password_buffer
161+
.finish(mas)
160162
.await
161-
.into_mas("writing users & passwords")?;
163+
.into_mas("writing passwords")?;
162164

163165
Ok(UsersMigrated {
164166
user_localparts_to_uuid,
@@ -173,7 +175,8 @@ async fn migrate_threepids(
173175
rng: &mut impl RngCore,
174176
user_localparts_to_uuid: &HashMap<CompactString, Uuid>,
175177
) -> Result<(), Error> {
176-
let mut write_buffer = MasThreepidWriteBuffer::new(mas);
178+
let mut email_buffer = MasWriteBuffer::new(MasWriter::write_email_threepids);
179+
let mut unsupported_buffer = MasWriteBuffer::new(MasWriter::write_unsupported_threepids);
177180
let mut users_stream = pin!(synapse.read_threepids());
178181

179182
while let Some(threepid_res) = users_stream.next().await {
@@ -197,32 +200,45 @@ async fn migrate_threepids(
197200
};
198201

199202
if medium == "email" {
200-
write_buffer
201-
.write_email(MasNewEmailThreepid {
202-
user_id,
203-
user_email_id: Uuid::from(Ulid::from_datetime_with_source(
204-
created_at.into(),
205-
rng,
206-
)),
207-
email: address,
208-
created_at,
209-
})
203+
email_buffer
204+
.write(
205+
mas,
206+
MasNewEmailThreepid {
207+
user_id,
208+
user_email_id: Uuid::from(Ulid::from_datetime_with_source(
209+
created_at.into(),
210+
rng,
211+
)),
212+
email: address,
213+
created_at,
214+
},
215+
)
210216
.await
211217
.into_mas("writing email")?;
212218
} else {
213-
write_buffer
214-
.write_password(MasNewUnsupportedThreepid {
215-
user_id,
216-
medium,
217-
address,
218-
created_at,
219-
})
219+
unsupported_buffer
220+
.write(
221+
mas,
222+
MasNewUnsupportedThreepid {
223+
user_id,
224+
medium,
225+
address,
226+
created_at,
227+
},
228+
)
220229
.await
221230
.into_mas("writing unsupported threepid")?;
222231
}
223232
}
224233

225-
write_buffer.finish().await.into_mas("writing threepids")?;
234+
email_buffer
235+
.finish(mas)
236+
.await
237+
.into_mas("writing email threepids")?;
238+
unsupported_buffer
239+
.finish(mas)
240+
.await
241+
.into_mas("writing unsupported threepids")?;
226242

227243
Ok(())
228244
}

0 commit comments

Comments
 (0)