Skip to content

Commit 270fad1

Browse files
committed
fix: clear all pending signals after workflow complete
1 parent 22e1cee commit 270fad1

File tree

7 files changed

+120
-67
lines changed

7 files changed

+120
-67
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/packages/gasoline/src/db/kv/keys/workflow.rs

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -804,6 +804,10 @@ impl PendingSignalKey {
804804
pub fn subspace(workflow_id: Id, signal_name: String) -> PendingSignalSubspaceKey {
805805
PendingSignalSubspaceKey::new(workflow_id, signal_name)
806806
}
807+
808+
pub fn workflow_subspace(workflow_id: Id) -> PendingSignalSubspaceKey {
809+
PendingSignalSubspaceKey::workflow(workflow_id)
810+
}
807811
}
808812

809813
impl FormalKey for PendingSignalKey {
@@ -854,14 +858,21 @@ impl<'de> TupleUnpack<'de> for PendingSignalKey {
854858

855859
pub struct PendingSignalSubspaceKey {
856860
workflow_id: Id,
857-
signal_name: String,
861+
signal_name: Option<String>,
858862
}
859863

860864
impl PendingSignalSubspaceKey {
861865
pub fn new(workflow_id: Id, signal_name: String) -> Self {
862866
PendingSignalSubspaceKey {
863867
workflow_id,
864-
signal_name,
868+
signal_name: Some(signal_name),
869+
}
870+
}
871+
872+
pub fn workflow(workflow_id: Id) -> Self {
873+
PendingSignalSubspaceKey {
874+
workflow_id,
875+
signal_name: None,
865876
}
866877
}
867878
}
@@ -872,14 +883,16 @@ impl TuplePack for PendingSignalSubspaceKey {
872883
w: &mut W,
873884
tuple_depth: TupleDepth,
874885
) -> std::io::Result<VersionstampOffset> {
875-
let t = (
876-
WORKFLOW,
877-
SIGNAL,
878-
self.workflow_id,
879-
PENDING,
880-
&self.signal_name,
881-
);
882-
t.pack(w, tuple_depth)
886+
let mut offset = VersionstampOffset::None { size: 0 };
887+
888+
let t = (WORKFLOW, SIGNAL, self.workflow_id, PENDING);
889+
offset += t.pack(w, tuple_depth)?;
890+
891+
if let Some(signal_name) = &self.signal_name {
892+
offset += signal_name.pack(w, tuple_depth)?;
893+
}
894+
895+
Ok(offset)
883896
}
884897
}
885898

engine/packages/gasoline/src/db/kv/mod.rs

Lines changed: 90 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1719,12 +1719,15 @@ impl Database for DatabaseKv {
17191719
) -> WorkflowResult<()> {
17201720
let start_instant = Instant::now();
17211721

1722-
let wrote_to_wake_idx = self
1722+
let (wrote_to_wake_idx, pending_signal_cleared_count) = self
17231723
.pools
17241724
.udb()
17251725
.map_err(WorkflowError::PoolsGeneric)?
17261726
.run(|tx| {
17271727
async move {
1728+
let start = Instant::now();
1729+
let tx = tx.with_subspace(self.subspace.clone());
1730+
17281731
let sub_workflow_wake_subspace = self
17291732
.subspace
17301733
.subspace(&keys::wake::SubWorkflowWakeKey::subspace(workflow_id));
@@ -1742,34 +1745,29 @@ impl Database for DatabaseKv {
17421745
Serializable,
17431746
);
17441747

1745-
let (wrote_to_wake_idx, tag_keys, wake_deadline_entry) = tokio::try_join!(
1748+
let (wrote_to_wake_idx, tag_keys, wake_deadline) = tokio::try_join!(
17461749
// Check for other workflows waiting on this one, wake all
17471750
async {
17481751
let mut wrote_to_wake_idx = false;
17491752

17501753
while let Some(entry) = stream.try_next().await? {
1751-
let sub_workflow_wake_key =
1752-
self.subspace
1753-
.unpack::<keys::wake::SubWorkflowWakeKey>(&entry.key())?;
1754-
let workflow_name =
1755-
sub_workflow_wake_key.deserialize(entry.value())?;
1756-
1757-
let wake_condition_key = keys::wake::WorkflowWakeConditionKey::new(
1758-
workflow_name,
1759-
sub_workflow_wake_key.workflow_id,
1760-
keys::wake::WakeCondition::SubWorkflow {
1761-
sub_workflow_id: workflow_id,
1762-
},
1763-
);
1754+
let (sub_workflow_wake_key, workflow_name) =
1755+
tx.read_entry::<keys::wake::SubWorkflowWakeKey>(&entry)?;
17641756

17651757
// Add wake condition for workflow
1766-
tx.set(
1767-
&self.subspace.pack(&wake_condition_key),
1768-
&wake_condition_key.serialize(())?,
1769-
);
1758+
tx.write(
1759+
&keys::wake::WorkflowWakeConditionKey::new(
1760+
workflow_name,
1761+
sub_workflow_wake_key.workflow_id,
1762+
keys::wake::WakeCondition::SubWorkflow {
1763+
sub_workflow_id: workflow_id,
1764+
},
1765+
),
1766+
(),
1767+
)?;
17701768

17711769
// Clear secondary index
1772-
tx.clear(entry.key());
1770+
tx.delete(&sub_workflow_wake_key);
17731771

17741772
wrote_to_wake_idx = true;
17751773
}
@@ -1785,73 +1783,111 @@ impl Database for DatabaseKv {
17851783
Serializable,
17861784
)
17871785
.map(|res| {
1788-
self.subspace
1789-
.unpack::<keys::workflow::TagKey>(res?.key())
1786+
tx.unpack::<keys::workflow::TagKey>(res?.key())
17901787
.map_err(anyhow::Error::from)
17911788
})
17921789
.try_collect::<Vec<_>>(),
1793-
tx.get(&self.subspace.pack(&wake_deadline_key), Serializable),
1790+
tx.read_opt(&wake_deadline_key, Serializable),
17941791
)?;
17951792

17961793
for key in tag_keys {
1797-
let by_name_and_tag_key = keys::workflow::ByNameAndTagKey::new(
1794+
tx.delete(&keys::workflow::ByNameAndTagKey::new(
17981795
workflow_name.to_string(),
17991796
key.k,
18001797
key.v,
18011798
workflow_id,
1802-
);
1803-
tx.clear(&self.subspace.pack(&by_name_and_tag_key));
1799+
));
18041800
}
18051801

18061802
// Clear null key
1807-
{
1808-
let by_name_and_tag_key = keys::workflow::ByNameAndTagKey::null(
1809-
workflow_name.to_string(),
1810-
workflow_id,
1811-
);
1812-
tx.clear(&self.subspace.pack(&by_name_and_tag_key));
1813-
}
1803+
tx.delete(&keys::workflow::ByNameAndTagKey::null(
1804+
workflow_name.to_string(),
1805+
workflow_id,
1806+
));
18141807

18151808
// Get and clear the pending deadline wake condition, if any. This could be put in the
18161809
// `pull_workflows` function (where we clear secondary indexes) but we chose to clear it
18171810
// here and in `commit_workflow` because its not a secondary index so theres no worry of
18181811
// it inserting more wake conditions. This reduces the load on `pull_workflows`. The
18191812
// reason this isn't immediately cleared in `pull_workflows` along with the rest of the
18201813
// wake conditions is because it might be in the future.
1821-
if let Some(raw) = wake_deadline_entry {
1822-
let deadline_ts = wake_deadline_key.deserialize(&raw)?;
1823-
1824-
let wake_condition_key = keys::wake::WorkflowWakeConditionKey::new(
1814+
if let Some(deadline_ts) = wake_deadline {
1815+
tx.delete(&keys::wake::WorkflowWakeConditionKey::new(
18251816
workflow_name.to_string(),
18261817
workflow_id,
18271818
keys::wake::WakeCondition::Deadline { deadline_ts },
1828-
);
1829-
1830-
tx.clear(&self.subspace.pack(&wake_condition_key));
1819+
));
18311820
}
18321821

18331822
// Clear "has wake condition"
1834-
let has_wake_condition_key =
1835-
keys::workflow::HasWakeConditionKey::new(workflow_id);
1836-
tx.clear(&self.subspace.pack(&has_wake_condition_key));
1823+
tx.delete(&keys::workflow::HasWakeConditionKey::new(workflow_id));
18371824

18381825
// Write output
18391826
let output_key = keys::workflow::OutputKey::new(workflow_id);
18401827

18411828
for (i, chunk) in output_key.split_ref(output)?.into_iter().enumerate() {
18421829
let chunk_key = output_key.chunk(i);
18431830

1844-
tx.set(&self.subspace.pack(&chunk_key), &chunk);
1831+
tx.set(&tx.pack(&chunk_key), &chunk);
1832+
}
1833+
1834+
// Mark all remaining pending signals as ack'd
1835+
let now = rivet_util::timestamp::now();
1836+
1837+
// This subspace contains all pending signals for the workflow
1838+
let pending_signal_subspace = self.subspace.subspace(
1839+
&keys::workflow::PendingSignalKey::workflow_subspace(workflow_id),
1840+
);
1841+
1842+
let mut stream = tx.get_ranges_keyvalues(
1843+
universaldb::RangeOption {
1844+
mode: StreamingMode::WantAll,
1845+
..(&pending_signal_subspace).into()
1846+
},
1847+
Serializable,
1848+
);
1849+
1850+
let mut pending_signal_cleared_count = 0;
1851+
1852+
loop {
1853+
if start.elapsed() > EARLY_TXN_TIMEOUT {
1854+
tracing::warn!(
1855+
"timed out processing pending signals for complete workflow"
1856+
);
1857+
break;
1858+
}
1859+
1860+
let Some(entry) = stream.try_next().await? else {
1861+
break;
1862+
};
1863+
1864+
let (key, _) = tx.read_entry::<keys::workflow::PendingSignalKey>(&entry)?;
1865+
1866+
// Ack signal
1867+
tx.add_conflict_key(&key, ConflictRangeType::Read)?;
1868+
1869+
tx.write(&keys::signal::AckTsKey::new(key.signal_id), now)?;
1870+
1871+
update_metric(
1872+
&tx,
1873+
Some(keys::metric::GaugeMetric::SignalPending(
1874+
key.signal_name.to_string(),
1875+
)),
1876+
None,
1877+
);
1878+
1879+
// Clear pending signal key
1880+
tx.delete(&key);
1881+
1882+
pending_signal_cleared_count += 1;
18451883
}
18461884

18471885
// Clear lease
1848-
let lease_key = keys::workflow::LeaseKey::new(workflow_id);
1849-
tx.clear(&self.subspace.pack(&lease_key));
1850-
let worker_id_key = keys::workflow::WorkerIdKey::new(workflow_id);
1851-
tx.clear(&self.subspace.pack(&worker_id_key));
1886+
tx.delete(&keys::workflow::LeaseKey::new(workflow_id));
1887+
tx.delete(&keys::workflow::WorkerIdKey::new(workflow_id));
18521888

18531889
update_metric(
1854-
&tx.with_subspace(self.subspace.clone()),
1890+
&tx,
18551891
Some(keys::metric::GaugeMetric::WorkflowActive(
18561892
workflow_name.to_string(),
18571893
)),
@@ -1860,7 +1896,7 @@ impl Database for DatabaseKv {
18601896
)),
18611897
);
18621898

1863-
Ok(wrote_to_wake_idx)
1899+
Ok((wrote_to_wake_idx, pending_signal_cleared_count))
18641900
}
18651901
})
18661902
.custom_instrument(tracing::info_span!("complete_workflows_tx"))
@@ -1874,6 +1910,10 @@ impl Database for DatabaseKv {
18741910
self.bump(BumpSubSubject::Worker);
18751911
}
18761912

1913+
if pending_signal_cleared_count != 0 {
1914+
tracing::debug!(count=%pending_signal_cleared_count, "cleared pending signals after workflow completed");
1915+
}
1916+
18771917
let dt = start_instant.elapsed().as_secs_f64();
18781918
metrics::COMPLETE_WORKFLOW_DURATION
18791919
.with_label_values(&[workflow_name])

engine/packages/pegboard/src/workflows/runner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use universaldb::{
1111
use universalpubsub::PublishOpts;
1212
use vbare::OwnedVersionedData;
1313

14-
use crate::{keys, metrics, workflows::actor::Allocate};
14+
use crate::{keys, workflows::actor::Allocate};
1515

1616
/// Batch size of how many events to ack.
1717
const EVENT_ACK_BATCH_SIZE: i64 = 500;

engine/packages/pegboard/src/workflows/runner2.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use universaldb::{
1111
use universalpubsub::PublishOpts;
1212
use vbare::OwnedVersionedData;
1313

14-
use crate::{keys, metrics, workflows::actor::Allocate};
14+
use crate::{keys, workflows::actor::Allocate};
1515

1616
const EARLY_TXN_TIMEOUT: Duration = Duration::from_millis(2500);
1717

engine/packages/universalpubsub/src/driver/postgres/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -389,7 +389,7 @@ impl PubSubDriver for PostgresDriver {
389389
let encoded = BASE64.encode(payload);
390390
let hashed = self.hash_subject(subject);
391391

392-
tracing::debug!("attempting to get connection for publish");
392+
tracing::trace!("attempting to get connection for publish");
393393

394394
// Wait for listen connection to be ready first if this channel has subscribers
395395
// This ensures that if we're reconnecting, the LISTEN is re-registered before NOTIFY

engine/packages/workflow-worker/Cargo.toml

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,9 @@ edition.workspace = true
77

88
[dependencies]
99
anyhow.workspace = true
10-
gas.workspace = true
1110
epoxy.workspace = true
12-
rivet-config.workspace = true
13-
tracing.workspace = true
14-
11+
gas.workspace = true
1512
namespace.workspace = true
1613
pegboard.workspace = true
14+
rivet-config.workspace = true
15+
tracing.workspace = true

0 commit comments

Comments
 (0)