Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion crates/corro-agent/src/api/public/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,12 @@ pub async fn process_sub_channel(

let query_evt = tokio::select! {
biased;
Some(query_evt) = evt_rx.recv() => query_evt,
res = evt_rx.recv() => match res {
Some(query_evt) => query_evt,
None => {
break;
},
},
_ = deadline_check => {
if tx.receiver_count() == 0 {
info!(sub_id = %id, "All listeners for subscription are gone and didn't come back within {MAX_UNSUB_TIME:?}");
Expand Down
88 changes: 43 additions & 45 deletions crates/corro-types/src/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ pub const SUB_DB_PATH: &str = "sub.sqlite";
impl Matcher {
fn new(
id: Uuid,
subs_path: Utf8PathBuf,
subs_path: &Utf8PathBuf,
schema: &Schema,
state_conn: &Connection,
evt_tx: mpsc::Sender<QueryEvent>,
Expand Down Expand Up @@ -910,9 +910,16 @@ impl Matcher {
}
})?;

let (matcher, handle) = Self::new(id, subs_path, schema, &state_conn, evt_tx, &sql)?;
let (matcher, handle) = Self::new(id, &subs_path, schema, &state_conn, evt_tx, &sql)?;

spawn_counted(matcher.run_restore(state_conn, tripwire));
spawn_counted(async move {
if let Err(e) = matcher.run_restore(state_conn, tripwire).await {
error!(sub_id = %id, "could not run restore: {e}");
if let Err(e) = Self::cleanup(id, subs_path) {
error!(sub_id = %id, "could not cleanup: {e}");
}
}
});

Ok(handle)
}
Expand All @@ -927,7 +934,7 @@ impl Matcher {
sql: &str,
tripwire: Tripwire,
) -> Result<MatcherHandle, MatcherError> {
let (mut matcher, handle) = Self::new(id, subs_path, schema, &state_conn, evt_tx, sql)?;
let (mut matcher, handle) = Self::new(id, &subs_path, schema, &state_conn, evt_tx, sql)?;

let pk_cols = matcher
.pks
Expand Down Expand Up @@ -1028,14 +1035,25 @@ impl Matcher {
Ok::<_, MatcherError>(())
})?;

spawn_counted(matcher.run(state_conn, tripwire));
spawn_counted(async move {
if let Err(e) = matcher.run(state_conn, tripwire).await {
error!(sub_id = %id, "could not setup subscription: {e}");
if let Err(e) = Self::cleanup(id, subs_path.clone()) {
error!(sub_id = %id, "could not cleanup: {e}");
}
}
});

Ok(handle)
}

async fn run_restore(mut self, mut state_conn: CrConn, tripwire: Tripwire) {
async fn run_restore(
mut self,
mut state_conn: CrConn,
tripwire: Tripwire,
) -> Result<(), MatcherError> {
info!(sub_id = %self.id, "Restoring subscription");
let init_res = block_in_place(|| {
block_in_place(|| {
self.last_rowid = self
.conn
.query_row(
Expand All @@ -1054,24 +1072,13 @@ impl Matcher {
_ = self.last_change_tx.send(max_change_id);

Ok::<_, MatcherError>(())
});

if let Err(e) = init_res {
error!(sub_id = %self.id, "could not re-initialize subscription: {e}");
return;
}
})?;

if let Err(e) = block_in_place(|| self.setup(&mut state_conn)) {
error!(sub_id = %self.id, "could not setup connection: {e}");
return;
}
block_in_place(|| self.setup(&mut state_conn))?;
self.set_status("running")?;

if let Err(e) = self.set_status("running") {
error!(sub_id = %self.id, "could not set status: {e}");
return;
}

self.cmd_loop(state_conn, tripwire).await
self.cmd_loop(state_conn, tripwire).await;
Ok(())
}

fn set_status(&self, status: &str) -> Result<(), MatcherError> {
Expand Down Expand Up @@ -1281,16 +1288,12 @@ impl Matcher {
info!(sub_id = %self.id, "matcher loop is done");
}

async fn run(mut self, mut state_conn: CrConn, tripwire: Tripwire) {
async fn run(mut self, mut state_conn: CrConn, tripwire: Tripwire) -> Result<(), MatcherError> {
info!(sub_id = %self.id, "Running initial query");
if let Err(e) = self
.evt_tx
self.evt_tx
.send(QueryEvent::Columns(self.col_names.clone()))
.await
{
error!(sub_id = %self.id, "could not send back columns, probably means no receivers! {e}");
return;
}
.map_err(|_| MatcherError::EventReceiverClosed)?;

let mut query_cols = vec![];
for i in 0..(self.parsed.columns.len()) {
Expand Down Expand Up @@ -1422,36 +1425,31 @@ impl Matcher {
"done w/ block_in_place for initial query, elapsed: {:?}",
elapsed
);
if let Err(e) = self
.evt_tx
self.evt_tx
.send(QueryEvent::EndOfQuery {
time: elapsed.as_secs_f64(),
change_id: Some(ChangeId(0)),
})
.await
{
error!(sub_id = %self.id, "could not return end of query event: {e}");
return;
}
.map_err(|_| MatcherError::EventReceiverClosed)?;
// db_version
}
Err(e) => {
warn!(sub_id = %self.id, "could not complete initial query: {e}");
_ = self
.evt_tx
self.evt_tx
.send(QueryEvent::Error(e.to_compact_string()))
.await;
.await
.map_err(|_| MatcherError::EventReceiverClosed)?;

return;
return Err(e);
}
};

if let Err(e) = block_in_place(|| self.setup(&mut state_conn)) {
error!(sub_id = %self.id, "could not setup connection: {e}");
return;
}
block_in_place(|| self.setup(&mut state_conn))?;

self.cmd_loop(state_conn, tripwire).await;

self.cmd_loop(state_conn, tripwire).await
Ok(())
}

fn handle_candidates(
Expand Down
Loading