Skip to content

Commit 0c14abf

Browse files
committed
Allow restoring indexes in the background by regularly committing
1 parent 7f4f991 commit 0c14abf

File tree

2 files changed

+40
-1
lines changed

2 files changed

+40
-1
lines changed

crates/syn2mas/src/mas_writer/mod.rs

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,32 @@ impl WriterConnectionPool {
157157
}
158158
}
159159

160+
pub async fn commit(&mut self) -> Result<(), Error> {
161+
let mut connections = Vec::with_capacity(self.num_connections);
162+
while let Some(connection_or_error) = self.connection_rx.recv().await {
163+
let mut connection = connection_or_error?;
164+
query("COMMIT;")
165+
.execute(&mut connection)
166+
.await
167+
.into_database("commit writer transaction")?;
168+
query("BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;")
169+
.execute(&mut connection)
170+
.await
171+
.into_database("begin writer transaction")?;
172+
173+
connections.push(connection);
174+
}
175+
176+
// Put all the connections back in the pool
177+
for connection in connections {
178+
self.connection_tx
179+
.try_send(Ok(connection))
180+
.expect("channel closed");
181+
}
182+
183+
Ok(())
184+
}
185+
160186
/// Finishes writing to the database, committing all changes.
161187
///
162188
/// # Errors
@@ -400,7 +426,7 @@ impl<'conn> MasWriter<'conn> {
400426
#[tracing::instrument(name = "syn2mas.mas_writer.new", skip_all)]
401427
pub async fn new(
402428
mut conn: LockedMasDatabase<'conn>,
403-
index_restore_conn: PgConnection,
429+
mut index_restore_conn: PgConnection,
404430
mut writer_connections: Vec<PgConnection>,
405431
) -> Result<Self, Error> {
406432
// Given that we don't have any concurrent transactions here,
@@ -511,6 +537,10 @@ impl<'conn> MasWriter<'conn> {
511537
.into_database("begin MAS writer transaction")?;
512538
}
513539

540+
query("SET AUTOCOMMIT ON;")
541+
.execute(index_restore_conn.as_mut())
542+
.await
543+
.into_database("set conn autocommit")?;
514544
let (constraint_restore_tx, index_restore_tx, restorer_task) =
515545
Self::restore_task(index_restore_conn);
516546

@@ -655,6 +685,10 @@ impl<'conn> MasWriter<'conn> {
655685
Ok(())
656686
}
657687

688+
pub async fn commit(&mut self) -> Result<(), Error> {
689+
self.writer_pool.commit().await
690+
}
691+
658692
/// Finish writing to the MAS database, flushing and committing all changes.
659693
///
660694
/// # Errors

crates/syn2mas/src/migration.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ pub async fn migrate(
117117
)
118118
.await?;
119119

120+
mas.commit().await.into_mas("committing users")?;
120121
mas.liberate_table("syn2mas__users")
121122
.await
122123
.into_mas("liberating users table")?;
@@ -139,6 +140,7 @@ pub async fn migrate(
139140
)
140141
.await?;
141142

143+
mas.commit().await.into_mas("committing threepids")?;
142144
mas.liberate_table("syn2mas__user_emails")
143145
.await
144146
.into_mas("liberating user_emails table")?;
@@ -162,6 +164,7 @@ pub async fn migrate(
162164
)
163165
.await?;
164166

167+
mas.commit().await.into_mas("committing external IDs")?;
165168
mas.liberate_table("syn2mas__upstream_oauth_links")
166169
.await
167170
.into_mas("liberating upstream_oauth_links table")?;
@@ -211,6 +214,7 @@ pub async fn migrate(
211214
)
212215
.await?;
213216

217+
mas.commit().await.into_mas("committing tokens")?;
214218
mas.liberate_table("syn2mas__compat_access_tokens")
215219
.await
216220
.into_mas("liberating compat_access_tokens table")?;
@@ -235,6 +239,7 @@ pub async fn migrate(
235239
)
236240
.await?;
237241

242+
mas.commit().await.into_mas("committing devices")?;
238243
mas.liberate_table("syn2mas__compat_sessions")
239244
.await
240245
.into_mas("liberating compat_sessions table")?;

0 commit comments

Comments
 (0)