Skip to content

Commit 7a1349a

Browse files
committed
clean up sub if we never run cmd_loop
Signed-off-by: Somtochi Onyekwere <somtochionyekwere@gmail.com>
1 parent f06b591 commit 7a1349a

File tree

2 files changed

+49
-46
lines changed

2 files changed

+49
-46
lines changed

crates/corro-agent/src/api/public/pubsub.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,12 @@ pub async fn process_sub_channel(
164164

165165
let query_evt = tokio::select! {
166166
biased;
167-
Some(query_evt) = evt_rx.recv() => query_evt,
167+
res = evt_rx.recv() => match res {
168+
Some(query_evt) => query_evt,
169+
None => {
170+
break;
171+
},
172+
},
168173
_ = deadline_check => {
169174
if tx.receiver_count() == 0 {
170175
info!(sub_id = %id, "All listeners for subscription are gone and didn't come back within {MAX_UNSUB_TIME:?}");

crates/corro-types/src/pubsub.rs

Lines changed: 43 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -586,7 +586,7 @@ pub const SUB_DB_PATH: &str = "sub.sqlite";
586586
impl Matcher {
587587
fn new(
588588
id: Uuid,
589-
subs_path: Utf8PathBuf,
589+
subs_path: &Utf8PathBuf,
590590
schema: &Schema,
591591
state_conn: &Connection,
592592
evt_tx: mpsc::Sender<QueryEvent>,
@@ -910,9 +910,16 @@ impl Matcher {
910910
}
911911
})?;
912912

913-
let (matcher, handle) = Self::new(id, subs_path, schema, &state_conn, evt_tx, &sql)?;
913+
let (matcher, handle) = Self::new(id, &subs_path, schema, &state_conn, evt_tx, &sql)?;
914914

915-
spawn_counted(matcher.run_restore(state_conn, tripwire));
915+
spawn_counted(async move {
916+
if let Err(e) = matcher.run_restore(state_conn, tripwire).await {
917+
error!(sub_id = %id, "could not run restore: {e}");
918+
if let Err(e) = Self::cleanup(id, subs_path) {
919+
error!(sub_id = %id, "could not cleanup: {e}");
920+
}
921+
}
922+
});
916923

917924
Ok(handle)
918925
}
@@ -927,7 +934,7 @@ impl Matcher {
927934
sql: &str,
928935
tripwire: Tripwire,
929936
) -> Result<MatcherHandle, MatcherError> {
930-
let (mut matcher, handle) = Self::new(id, subs_path, schema, &state_conn, evt_tx, sql)?;
937+
let (mut matcher, handle) = Self::new(id, &subs_path, schema, &state_conn, evt_tx, sql)?;
931938

932939
let pk_cols = matcher
933940
.pks
@@ -1028,14 +1035,25 @@ impl Matcher {
10281035
Ok::<_, MatcherError>(())
10291036
})?;
10301037

1031-
spawn_counted(matcher.run(state_conn, tripwire));
1038+
spawn_counted(async move {
1039+
if let Err(e) = matcher.run(state_conn, tripwire).await {
1040+
error!(sub_id = %id, "could not setup subscription: {e}");
1041+
if let Err(e) = Self::cleanup(id, subs_path.clone()) {
1042+
error!(sub_id = %id, "could not cleanup: {e}");
1043+
}
1044+
}
1045+
});
10321046

10331047
Ok(handle)
10341048
}
10351049

1036-
async fn run_restore(mut self, mut state_conn: CrConn, tripwire: Tripwire) {
1050+
async fn run_restore(
1051+
mut self,
1052+
mut state_conn: CrConn,
1053+
tripwire: Tripwire,
1054+
) -> Result<(), MatcherError> {
10371055
info!(sub_id = %self.id, "Restoring subscription");
1038-
let init_res = block_in_place(|| {
1056+
block_in_place(|| {
10391057
self.last_rowid = self
10401058
.conn
10411059
.query_row(
@@ -1054,24 +1072,13 @@ impl Matcher {
10541072
_ = self.last_change_tx.send(max_change_id);
10551073

10561074
Ok::<_, MatcherError>(())
1057-
});
1058-
1059-
if let Err(e) = init_res {
1060-
error!(sub_id = %self.id, "could not re-initialize subscription: {e}");
1061-
return;
1062-
}
1075+
})?;
10631076

1064-
if let Err(e) = block_in_place(|| self.setup(&mut state_conn)) {
1065-
error!(sub_id = %self.id, "could not setup connection: {e}");
1066-
return;
1067-
}
1077+
block_in_place(|| self.setup(&mut state_conn))?;
1078+
self.set_status("running")?;
10681079

1069-
if let Err(e) = self.set_status("running") {
1070-
error!(sub_id = %self.id, "could not set status: {e}");
1071-
return;
1072-
}
1073-
1074-
self.cmd_loop(state_conn, tripwire).await
1080+
self.cmd_loop(state_conn, tripwire).await;
1081+
Ok(())
10751082
}
10761083

10771084
fn set_status(&self, status: &str) -> Result<(), MatcherError> {
@@ -1281,16 +1288,12 @@ impl Matcher {
12811288
info!(sub_id = %self.id, "matcher loop is done");
12821289
}
12831290

1284-
async fn run(mut self, mut state_conn: CrConn, tripwire: Tripwire) {
1291+
async fn run(mut self, mut state_conn: CrConn, tripwire: Tripwire) -> Result<(), MatcherError> {
12851292
info!(sub_id = %self.id, "Running initial query");
1286-
if let Err(e) = self
1287-
.evt_tx
1293+
self.evt_tx
12881294
.send(QueryEvent::Columns(self.col_names.clone()))
12891295
.await
1290-
{
1291-
error!(sub_id = %self.id, "could not send back columns, probably means no receivers! {e}");
1292-
return;
1293-
}
1296+
.map_err(|_| MatcherError::EventReceiverClosed)?;
12941297

12951298
let mut query_cols = vec![];
12961299
for i in 0..(self.parsed.columns.len()) {
@@ -1422,36 +1425,31 @@ impl Matcher {
14221425
"done w/ block_in_place for initial query, elapsed: {:?}",
14231426
elapsed
14241427
);
1425-
if let Err(e) = self
1426-
.evt_tx
1428+
self.evt_tx
14271429
.send(QueryEvent::EndOfQuery {
14281430
time: elapsed.as_secs_f64(),
14291431
change_id: Some(ChangeId(0)),
14301432
})
14311433
.await
1432-
{
1433-
error!(sub_id = %self.id, "could not return end of query event: {e}");
1434-
return;
1435-
}
1434+
.map_err(|_| MatcherError::EventReceiverClosed)?;
14361435
// db_version
14371436
}
14381437
Err(e) => {
14391438
warn!(sub_id = %self.id, "could not complete initial query: {e}");
1440-
_ = self
1441-
.evt_tx
1439+
self.evt_tx
14421440
.send(QueryEvent::Error(e.to_compact_string()))
1443-
.await;
1441+
.await
1442+
.map_err(|_| MatcherError::EventReceiverClosed)?;
14441443

1445-
return;
1444+
return Err(e);
14461445
}
14471446
};
14481447

1449-
if let Err(e) = block_in_place(|| self.setup(&mut state_conn)) {
1450-
error!(sub_id = %self.id, "could not setup connection: {e}");
1451-
return;
1452-
}
1448+
block_in_place(|| self.setup(&mut state_conn))?;
1449+
1450+
self.cmd_loop(state_conn, tripwire).await;
14531451

1454-
self.cmd_loop(state_conn, tripwire).await
1452+
Ok(())
14551453
}
14561454

14571455
fn handle_candidates(

0 commit comments

Comments
 (0)