Skip to content

Commit c9ae892

Browse files
committed
Remove special-purpose write buffers and use only the generic one
1 parent 3130d23 commit c9ae892

File tree

2 files changed

+58
-170
lines changed

2 files changed

+58
-170
lines changed

crates/syn2mas/src/mas_writer/mod.rs

Lines changed: 14 additions & 142 deletions
Original file line numberDiff line numberDiff line change
@@ -531,7 +531,7 @@ impl<'conn> MasWriter<'conn> {
531531
/// - If the database writer connection pool had an error.
532532
#[allow(clippy::missing_panics_doc)] // not a real panic
533533
#[tracing::instrument(skip_all, level = Level::DEBUG)]
534-
pub async fn write_users(&mut self, users: Vec<MasNewUser>) -> Result<(), Error> {
534+
pub fn write_users(&mut self, users: Vec<MasNewUser>) -> BoxFuture<'_, Result<(), Error>> {
535535
self.writer_pool.spawn_with_connection(move |conn| Box::pin(async move {
536536
// `UNNEST` is a fast way to do bulk inserts, as it lets us send multiple rows in one statement
537537
// without having to change the statement SQL thus altering the query plan.
@@ -577,7 +577,7 @@ impl<'conn> MasWriter<'conn> {
577577
).execute(&mut *conn).await.into_database("writing users to MAS")?;
578578

579579
Ok(())
580-
})).await
580+
})).boxed()
581581
}
582582

583583
/// Write a batch of user passwords to the database.
@@ -589,14 +589,10 @@ impl<'conn> MasWriter<'conn> {
589589
/// - If the database writer connection pool had an error.
590590
#[allow(clippy::missing_panics_doc)] // not a real panic
591591
#[tracing::instrument(skip_all, level = Level::DEBUG)]
592-
pub async fn write_passwords(
592+
pub fn write_passwords(
593593
&mut self,
594594
passwords: Vec<MasNewUserPassword>,
595-
) -> Result<(), Error> {
596-
if passwords.is_empty() {
597-
return Ok(());
598-
}
599-
595+
) -> BoxFuture<'_, Result<(), Error>> {
600596
self.writer_pool.spawn_with_connection(move |conn| Box::pin(async move {
601597
let mut user_password_ids: Vec<Uuid> = Vec::with_capacity(passwords.len());
602598
let mut user_ids: Vec<Uuid> = Vec::with_capacity(passwords.len());
@@ -631,17 +627,14 @@ impl<'conn> MasWriter<'conn> {
631627
).execute(&mut *conn).await.into_database("writing users to MAS")?;
632628

633629
Ok(())
634-
})).await
630+
})).boxed()
635631
}
636632

637633
#[tracing::instrument(skip_all, level = Level::DEBUG)]
638-
pub async fn write_email_threepids(
634+
pub fn write_email_threepids(
639635
&mut self,
640636
threepids: Vec<MasNewEmailThreepid>,
641-
) -> Result<(), Error> {
642-
if threepids.is_empty() {
643-
return Ok(());
644-
}
637+
) -> BoxFuture<'_, Result<(), Error>> {
645638
self.writer_pool.spawn_with_connection(move |conn| {
646639
Box::pin(async move {
647640
let mut user_email_ids: Vec<Uuid> = Vec::with_capacity(threepids.len());
@@ -678,17 +671,14 @@ impl<'conn> MasWriter<'conn> {
678671

679672
Ok(())
680673
})
681-
}).await
674+
}).boxed()
682675
}
683676

684677
#[tracing::instrument(skip_all, level = Level::DEBUG)]
685-
pub async fn write_unsupported_threepids(
678+
pub fn write_unsupported_threepids(
686679
&mut self,
687680
threepids: Vec<MasNewUnsupportedThreepid>,
688-
) -> Result<(), Error> {
689-
if threepids.is_empty() {
690-
return Ok(());
691-
}
681+
) -> BoxFuture<'_, Result<(), Error>> {
692682
self.writer_pool.spawn_with_connection(move |conn| {
693683
Box::pin(async move {
694684
let mut user_ids: Vec<Uuid> = Vec::with_capacity(threepids.len());
@@ -723,17 +713,14 @@ impl<'conn> MasWriter<'conn> {
723713

724714
Ok(())
725715
})
726-
}).await
716+
}).boxed()
727717
}
728718

729719
#[tracing::instrument(skip_all, level = Level::DEBUG)]
730720
pub fn write_upstream_oauth_links(
731721
&mut self,
732722
links: Vec<MasNewUpstreamOauthLink>,
733723
) -> BoxFuture<'_, Result<(), Error>> {
734-
if links.is_empty() {
735-
return async { Ok(()) }.boxed();
736-
}
737724
self.writer_pool.spawn_with_connection(move |conn| {
738725
Box::pin(async move {
739726
let mut link_ids: Vec<Uuid> = Vec::with_capacity(links.len());
@@ -783,124 +770,6 @@ impl<'conn> MasWriter<'conn> {
783770
// stream to two tables at once...)
784771
const WRITE_BUFFER_BATCH_SIZE: usize = 4096;
785772

786-
// TODO replace with just `MasWriteBuffer`
787-
pub struct MasUserWriteBuffer<'writer, 'conn> {
788-
users: Vec<MasNewUser>,
789-
passwords: Vec<MasNewUserPassword>,
790-
writer: &'writer mut MasWriter<'conn>,
791-
}
792-
793-
impl<'writer, 'conn> MasUserWriteBuffer<'writer, 'conn> {
794-
pub fn new(writer: &'writer mut MasWriter<'conn>) -> Self {
795-
MasUserWriteBuffer {
796-
users: Vec::with_capacity(WRITE_BUFFER_BATCH_SIZE),
797-
passwords: Vec::with_capacity(WRITE_BUFFER_BATCH_SIZE),
798-
writer,
799-
}
800-
}
801-
802-
pub async fn finish(mut self) -> Result<(), Error> {
803-
self.flush_users().await?;
804-
self.flush_passwords().await?;
805-
Ok(())
806-
}
807-
808-
pub async fn flush_users(&mut self) -> Result<(), Error> {
809-
// via copy: 13s
810-
// not via copy: 14s
811-
// difference probably gets worse with latency
812-
self.writer
813-
.write_users(std::mem::take(&mut self.users))
814-
.await?;
815-
816-
self.users.reserve_exact(WRITE_BUFFER_BATCH_SIZE);
817-
Ok(())
818-
}
819-
820-
pub async fn flush_passwords(&mut self) -> Result<(), Error> {
821-
self.writer
822-
.write_passwords(std::mem::take(&mut self.passwords))
823-
.await?;
824-
self.passwords.reserve_exact(WRITE_BUFFER_BATCH_SIZE);
825-
826-
Ok(())
827-
}
828-
829-
pub async fn write_user(&mut self, user: MasNewUser) -> Result<(), Error> {
830-
self.users.push(user);
831-
if self.users.len() >= WRITE_BUFFER_BATCH_SIZE {
832-
self.flush_users().await?;
833-
}
834-
Ok(())
835-
}
836-
837-
pub async fn write_password(&mut self, password: MasNewUserPassword) -> Result<(), Error> {
838-
self.passwords.push(password);
839-
if self.passwords.len() >= WRITE_BUFFER_BATCH_SIZE {
840-
self.flush_passwords().await?;
841-
}
842-
Ok(())
843-
}
844-
}
845-
846-
// TODO replace with just `MasWriteBuffer`
847-
pub struct MasThreepidWriteBuffer<'writer, 'conn> {
848-
email: Vec<MasNewEmailThreepid>,
849-
unsupported: Vec<MasNewUnsupportedThreepid>,
850-
writer: &'writer mut MasWriter<'conn>,
851-
}
852-
853-
impl<'writer, 'conn> MasThreepidWriteBuffer<'writer, 'conn> {
854-
pub fn new(writer: &'writer mut MasWriter<'conn>) -> Self {
855-
MasThreepidWriteBuffer {
856-
email: Vec::with_capacity(WRITE_BUFFER_BATCH_SIZE),
857-
unsupported: Vec::with_capacity(WRITE_BUFFER_BATCH_SIZE),
858-
writer,
859-
}
860-
}
861-
862-
pub async fn finish(mut self) -> Result<(), Error> {
863-
self.flush_emails().await?;
864-
self.flush_unsupported().await?;
865-
Ok(())
866-
}
867-
868-
pub async fn flush_emails(&mut self) -> Result<(), Error> {
869-
self.writer
870-
.write_email_threepids(std::mem::take(&mut self.email))
871-
.await?;
872-
self.email.reserve_exact(WRITE_BUFFER_BATCH_SIZE);
873-
Ok(())
874-
}
875-
876-
pub async fn flush_unsupported(&mut self) -> Result<(), Error> {
877-
self.writer
878-
.write_unsupported_threepids(std::mem::take(&mut self.unsupported))
879-
.await?;
880-
self.unsupported.reserve_exact(WRITE_BUFFER_BATCH_SIZE);
881-
Ok(())
882-
}
883-
884-
pub async fn write_email(&mut self, user: MasNewEmailThreepid) -> Result<(), Error> {
885-
self.email.push(user);
886-
if self.email.len() >= WRITE_BUFFER_BATCH_SIZE {
887-
self.flush_emails().await?;
888-
}
889-
Ok(())
890-
}
891-
892-
pub async fn write_password(
893-
&mut self,
894-
unsupported: MasNewUnsupportedThreepid,
895-
) -> Result<(), Error> {
896-
self.unsupported.push(unsupported);
897-
if self.unsupported.len() >= WRITE_BUFFER_BATCH_SIZE {
898-
self.flush_unsupported().await?;
899-
}
900-
Ok(())
901-
}
902-
}
903-
904773
/// A function that can accept and flush buffers from a `MasWriteBuffer`.
905774
/// Intended uses are the methods on `MasWriter` such as `write_users`.
906775
type WriteBufferFlusher<'conn, T> =
@@ -934,6 +803,9 @@ impl<'conn, T> MasWriteBuffer<'conn, T> {
934803
}
935804

936805
pub async fn flush(&mut self, writer: &mut MasWriter<'conn>) -> Result<(), Error> {
806+
if self.rows.is_empty() {
807+
return Ok(());
808+
}
937809
let rows = std::mem::take(&mut self.rows);
938810
self.rows.reserve_exact(WRITE_BUFFER_BATCH_SIZE);
939811
(self.flusher)(writer, rows).await?;

crates/syn2mas/src/migration.rs

Lines changed: 44 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use uuid::Uuid;
2626
use crate::{
2727
mas_writer::{
2828
self, MasNewEmailThreepid, MasNewUnsupportedThreepid, MasNewUpstreamOauthLink, MasNewUser,
29-
MasNewUserPassword, MasThreepidWriteBuffer, MasUserWriteBuffer, MasWriteBuffer, MasWriter,
29+
MasNewUserPassword, MasWriteBuffer, MasWriter,
3030
},
3131
synapse_reader::{
3232
self, ExtractLocalpartError, FullUserId, SynapseExternalId, SynapseThreepid, SynapseUser,
@@ -132,7 +132,8 @@ async fn migrate_users(
132132
server_name: &str,
133133
rng: &mut impl RngCore,
134134
) -> Result<UsersMigrated, Error> {
135-
let mut write_buffer = MasUserWriteBuffer::new(mas);
135+
let mut user_buffer = MasWriteBuffer::new(MasWriter::write_users);
136+
let mut password_buffer = MasWriteBuffer::new(MasWriter::write_passwords);
136137
let mut users_stream = pin!(synapse.read_users());
137138
// TODO is 1:1 capacity enough for a hashmap?
138139
let mut user_localparts_to_uuid = HashMap::with_capacity(user_count_hint);
@@ -143,23 +144,24 @@ async fn migrate_users(
143144

144145
user_localparts_to_uuid.insert(CompactString::new(&mas_user.username), mas_user.user_id);
145146

146-
write_buffer
147-
.write_user(mas_user)
147+
user_buffer
148+
.write(mas, mas_user)
148149
.await
149150
.into_mas("writing user")?;
150151

151152
if let Some(mas_password) = mas_password_opt {
152-
write_buffer
153-
.write_password(mas_password)
153+
password_buffer
154+
.write(mas, mas_password)
154155
.await
155156
.into_mas("writing password")?;
156157
}
157158
}
158159

159-
write_buffer
160-
.finish()
160+
user_buffer.finish(mas).await.into_mas("writing users")?;
161+
password_buffer
162+
.finish(mas)
161163
.await
162-
.into_mas("writing users & passwords")?;
164+
.into_mas("writing passwords")?;
163165

164166
Ok(UsersMigrated {
165167
user_localparts_to_uuid,
@@ -174,7 +176,8 @@ async fn migrate_threepids(
174176
rng: &mut impl RngCore,
175177
user_localparts_to_uuid: &HashMap<CompactString, Uuid>,
176178
) -> Result<(), Error> {
177-
let mut write_buffer = MasThreepidWriteBuffer::new(mas);
179+
let mut email_buffer = MasWriteBuffer::new(MasWriter::write_email_threepids);
180+
let mut unsupported_buffer = MasWriteBuffer::new(MasWriter::write_unsupported_threepids);
178181
let mut users_stream = pin!(synapse.read_threepids());
179182

180183
while let Some(threepid_res) = users_stream.next().await {
@@ -198,32 +201,45 @@ async fn migrate_threepids(
198201
};
199202

200203
if medium == "email" {
201-
write_buffer
202-
.write_email(MasNewEmailThreepid {
203-
user_id,
204-
user_email_id: Uuid::from(Ulid::from_datetime_with_source(
205-
created_at.into(),
206-
rng,
207-
)),
208-
email: address,
209-
created_at,
210-
})
204+
email_buffer
205+
.write(
206+
mas,
207+
MasNewEmailThreepid {
208+
user_id,
209+
user_email_id: Uuid::from(Ulid::from_datetime_with_source(
210+
created_at.into(),
211+
rng,
212+
)),
213+
email: address,
214+
created_at,
215+
},
216+
)
211217
.await
212218
.into_mas("writing email")?;
213219
} else {
214-
write_buffer
215-
.write_password(MasNewUnsupportedThreepid {
216-
user_id,
217-
medium,
218-
address,
219-
created_at,
220-
})
220+
unsupported_buffer
221+
.write(
222+
mas,
223+
MasNewUnsupportedThreepid {
224+
user_id,
225+
medium,
226+
address,
227+
created_at,
228+
},
229+
)
221230
.await
222231
.into_mas("writing unsupported threepid")?;
223232
}
224233
}
225234

226-
write_buffer.finish().await.into_mas("writing threepids")?;
235+
email_buffer
236+
.finish(mas)
237+
.await
238+
.into_mas("writing email threepids")?;
239+
unsupported_buffer
240+
.finish(mas)
241+
.await
242+
.into_mas("writing unsupported threepids")?;
227243

228244
Ok(())
229245
}

0 commit comments

Comments
 (0)