diff --git a/engine/packages/gasoline/src/db/kv/keys/metric.rs b/engine/packages/gasoline/src/db/kv/keys/metric.rs index c20c60d2c7..9f48097f4c 100644 --- a/engine/packages/gasoline/src/db/kv/keys/metric.rs +++ b/engine/packages/gasoline/src/db/kv/keys/metric.rs @@ -7,7 +7,9 @@ pub enum GaugeMetric { WorkflowSleeping(String), WorkflowDead(String, String), WorkflowComplete(String), + // Deprecated SignalPending(String), + SignalPending2(String), } impl GaugeMetric { @@ -18,6 +20,7 @@ impl GaugeMetric { GaugeMetric::WorkflowDead(_, _) => GaugeMetricVariant::WorkflowDead, GaugeMetric::WorkflowComplete(_) => GaugeMetricVariant::WorkflowComplete, GaugeMetric::SignalPending(_) => GaugeMetricVariant::SignalPending, + GaugeMetric::SignalPending2(_) => GaugeMetricVariant::SignalPending2, } } } @@ -28,9 +31,12 @@ enum GaugeMetricVariant { WorkflowSleeping = 1, WorkflowDead = 2, WorkflowComplete = 3, + // Deprecated SignalPending = 4, + SignalPending2 = 5, } +/// Stores gauge metrics for global database usage. #[derive(Debug)] pub struct GaugeMetricKey { pub metric: GaugeMetric, @@ -79,6 +85,7 @@ impl TuplePack for GaugeMetricKey { } GaugeMetric::WorkflowComplete(workflow_name) => workflow_name.pack(w, tuple_depth)?, GaugeMetric::SignalPending(signal_name) => signal_name.pack(w, tuple_depth)?, + GaugeMetric::SignalPending2(signal_name) => signal_name.pack(w, tuple_depth)?, }; std::result::Result::Ok(offset) @@ -144,12 +151,23 @@ impl<'de> TupleUnpack<'de> for GaugeMetricKey { }, ) } + GaugeMetricVariant::SignalPending2 => { + let (input, signal_name) = String::unpack(input, tuple_depth)?; + + ( + input, + GaugeMetricKey { + metric: GaugeMetric::SignalPending2(signal_name), + }, + ) + } }; std::result::Result::Ok((input, v)) } } +/// Used to list all global gauge metrics. pub struct GaugeMetricSubspaceKey {} impl GaugeMetricSubspaceKey { diff --git a/engine/packages/gasoline/src/db/kv/keys/workflow.rs b/engine/packages/gasoline/src/db/kv/keys/workflow.rs index 4be980b6a9..4f8c75fb52 100644 --- a/engine/packages/gasoline/src/db/kv/keys/workflow.rs +++ b/engine/packages/gasoline/src/db/kv/keys/workflow.rs @@ -1189,3 +1189,127 @@ impl<'de> TupleUnpack<'de> for SilenceTsKey { Ok((input, v)) } } + +#[derive(Debug, PartialEq, Eq)] +pub enum GaugeMetric { + SignalPending(String), +} + +impl GaugeMetric { + fn variant(&self) -> GaugeMetricVariant { + match self { + GaugeMetric::SignalPending(_) => GaugeMetricVariant::SignalPending, + } + } +} + +#[derive(strum::FromRepr)] +enum GaugeMetricVariant { + SignalPending = 0, +} + +/// Stores gauge metrics for a single workflow. +#[derive(Debug)] +pub struct GaugeMetricKey { + pub workflow_id: Id, + pub metric: GaugeMetric, +} + +impl GaugeMetricKey { + pub fn new(workflow_id: Id, metric: GaugeMetric) -> Self { + GaugeMetricKey { + workflow_id, + metric, + } + } + + pub fn subspace(workflow_id: Id) -> GaugeMetricSubspaceKey { + GaugeMetricSubspaceKey::new(workflow_id) + } +} + +impl FormalKey for GaugeMetricKey { + // IMPORTANT: Uses LE bytes, not BE + /// Count. + type Value = usize; + + fn deserialize(&self, raw: &[u8]) -> Result { + Ok(usize::from_le_bytes(raw.try_into()?)) + } + + fn serialize(&self, value: Self::Value) -> Result> { + Ok(value.to_le_bytes().to_vec()) + } +} + +impl TuplePack for GaugeMetricKey { + fn pack( + &self, + w: &mut W, + tuple_depth: TupleDepth, + ) -> std::io::Result { + let mut offset = VersionstampOffset::None { size: 0 }; + + let t = ( + WORKFLOW, + METRIC, + self.workflow_id, + self.metric.variant() as usize, + ); + offset += t.pack(w, tuple_depth)?; + + offset += match &self.metric { + GaugeMetric::SignalPending(signal_name) => signal_name.pack(w, tuple_depth)?, + }; + + Ok(offset) + } +} + +impl<'de> TupleUnpack<'de> for GaugeMetricKey { + fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { + let (input, (_, _, workflow_id, variant)) = + <(usize, usize, Id, usize)>::unpack(input, tuple_depth)?; + let variant = GaugeMetricVariant::from_repr(variant).ok_or_else(|| { + PackError::Message(format!("invalid metric variant `{variant}` in key").into()) + })?; + + let (input, v) = match variant { + GaugeMetricVariant::SignalPending => { + let (input, signal_name) = String::unpack(input, tuple_depth)?; + + ( + input, + GaugeMetricKey { + workflow_id, + metric: GaugeMetric::SignalPending(signal_name), + }, + ) + } + }; + + Ok((input, v)) + } +} + +/// Used to list all gauge metrics for a workflow. +pub struct GaugeMetricSubspaceKey { + workflow_id: Id, +} + +impl GaugeMetricSubspaceKey { + pub fn new(workflow_id: Id) -> Self { + GaugeMetricSubspaceKey { workflow_id } + } +} + +impl TuplePack for GaugeMetricSubspaceKey { + fn pack( + &self, + w: &mut W, + tuple_depth: TupleDepth, + ) -> std::io::Result { + let t = (WORKFLOW, METRIC, self.workflow_id); + t.pack(w, tuple_depth) + } +} diff --git a/engine/packages/gasoline/src/db/kv/mod.rs b/engine/packages/gasoline/src/db/kv/mod.rs index 5552cdb7f5..fb81625850 100644 --- a/engine/packages/gasoline/src/db/kv/mod.rs +++ b/engine/packages/gasoline/src/db/kv/mod.rs @@ -227,6 +227,14 @@ impl DatabaseKv { signal_name.to_string(), )), ); + update_wf_metric( + &tx.with_subspace(self.subspace.clone()), + workflow_id, + None, + Some(keys::workflow::GaugeMetric::SignalPending( + signal_name.to_string(), + )), + ); Ok(()) } @@ -735,7 +743,8 @@ impl Database for DatabaseKv { total_workflow_counts.push((workflow_name, count)); } } - keys::metric::GaugeMetric::SignalPending(signal_name) => { + keys::metric::GaugeMetric::SignalPending(_) => {} + keys::metric::GaugeMetric::SignalPending2(signal_name) => { metrics::SIGNAL_PENDING .with_label_values(&[signal_name.as_str()]) .set(count as i64); @@ -1719,12 +1728,14 @@ impl Database for DatabaseKv { ) -> WorkflowResult<()> { let start_instant = Instant::now(); - let wrote_to_wake_idx = self + let (wrote_to_wake_idx, pending_signal_cleared_count) = self .pools .udb() .map_err(WorkflowError::PoolsGeneric)? .run(|tx| { async move { + let tx = tx.with_subspace(self.subspace.clone()); + let sub_workflow_wake_subspace = self .subspace .subspace(&keys::wake::SubWorkflowWakeKey::subspace(workflow_id)); @@ -1742,34 +1753,29 @@ impl Database for DatabaseKv { Serializable, ); - let (wrote_to_wake_idx, tag_keys, wake_deadline_entry) = tokio::try_join!( + let (wrote_to_wake_idx, tag_keys, wake_deadline) = tokio::try_join!( // Check for other workflows waiting on this one, wake all async { let mut wrote_to_wake_idx = false; while let Some(entry) = stream.try_next().await? { - let sub_workflow_wake_key = - self.subspace - .unpack::(&entry.key())?; - let workflow_name = - sub_workflow_wake_key.deserialize(entry.value())?; - - let wake_condition_key = keys::wake::WorkflowWakeConditionKey::new( - workflow_name, - sub_workflow_wake_key.workflow_id, - keys::wake::WakeCondition::SubWorkflow { - sub_workflow_id: workflow_id, - }, - ); + let (sub_workflow_wake_key, workflow_name) = + tx.read_entry::(&entry)?; // Add wake condition for workflow - tx.set( - &self.subspace.pack(&wake_condition_key), - &wake_condition_key.serialize(())?, - ); + tx.write( + &keys::wake::WorkflowWakeConditionKey::new( + workflow_name, + sub_workflow_wake_key.workflow_id, + keys::wake::WakeCondition::SubWorkflow { + sub_workflow_id: workflow_id, + }, + ), + (), + )?; // Clear secondary index - tx.clear(entry.key()); + tx.delete(&sub_workflow_wake_key); wrote_to_wake_idx = true; } @@ -1785,32 +1791,27 @@ impl Database for DatabaseKv { Serializable, ) .map(|res| { - self.subspace - .unpack::(res?.key()) + tx.unpack::(res?.key()) .map_err(anyhow::Error::from) }) .try_collect::>(), - tx.get(&self.subspace.pack(&wake_deadline_key), Serializable), + tx.read_opt(&wake_deadline_key, Serializable), )?; for key in tag_keys { - let by_name_and_tag_key = keys::workflow::ByNameAndTagKey::new( + tx.delete(&keys::workflow::ByNameAndTagKey::new( workflow_name.to_string(), key.k, key.v, workflow_id, - ); - tx.clear(&self.subspace.pack(&by_name_and_tag_key)); + )); } // Clear null key - { - let by_name_and_tag_key = keys::workflow::ByNameAndTagKey::null( - workflow_name.to_string(), - workflow_id, - ); - tx.clear(&self.subspace.pack(&by_name_and_tag_key)); - } + tx.delete(&keys::workflow::ByNameAndTagKey::null( + workflow_name.to_string(), + workflow_id, + )); // Get and clear the pending deadline wake condition, if any. This could be put in the // `pull_workflows` function (where we clear secondary indexes) but we chose to clear it @@ -1818,22 +1819,16 @@ impl Database for DatabaseKv { // it inserting more wake conditions. This reduces the load on `pull_workflows`. The // reason this isn't immediately cleared in `pull_workflows` along with the rest of the // wake conditions is because it might be in the future. - if let Some(raw) = wake_deadline_entry { - let deadline_ts = wake_deadline_key.deserialize(&raw)?; - - let wake_condition_key = keys::wake::WorkflowWakeConditionKey::new( + if let Some(deadline_ts) = wake_deadline { + tx.delete(&keys::wake::WorkflowWakeConditionKey::new( workflow_name.to_string(), workflow_id, keys::wake::WakeCondition::Deadline { deadline_ts }, - ); - - tx.clear(&self.subspace.pack(&wake_condition_key)); + )); } // Clear "has wake condition" - let has_wake_condition_key = - keys::workflow::HasWakeConditionKey::new(workflow_id); - tx.clear(&self.subspace.pack(&has_wake_condition_key)); + tx.delete(&keys::workflow::HasWakeConditionKey::new(workflow_id)); // Write output let output_key = keys::workflow::OutputKey::new(workflow_id); @@ -1841,17 +1836,54 @@ impl Database for DatabaseKv { for (i, chunk) in output_key.split_ref(output)?.into_iter().enumerate() { let chunk_key = output_key.chunk(i); - tx.set(&self.subspace.pack(&chunk_key), &chunk); + tx.set(&tx.pack(&chunk_key), &chunk); } // Clear lease - let lease_key = keys::workflow::LeaseKey::new(workflow_id); - tx.clear(&self.subspace.pack(&lease_key)); - let worker_id_key = keys::workflow::WorkerIdKey::new(workflow_id); - tx.clear(&self.subspace.pack(&worker_id_key)); + tx.delete(&keys::workflow::LeaseKey::new(workflow_id)); + tx.delete(&keys::workflow::WorkerIdKey::new(workflow_id)); + + // Clear pending signals metric for observability + let metrics_subspace = self + .subspace + .subspace(&keys::workflow::GaugeMetricKey::subspace(workflow_id)); + let mut stream = tx.get_ranges_keyvalues( + universaldb::RangeOption { + mode: StreamingMode::WantAll, + ..(&metrics_subspace).into() + }, + Serializable, + ); + + let mut pending_signal_cleared_count = 0; + loop { + let Some(entry) = stream.try_next().await? else { + break; + }; + + let (key, metric_count) = + tx.read_entry::(&entry)?; + + // Ignore negatives and zero + if isize::from_le_bytes(metric_count.to_le_bytes()) <= 0 { + continue; + } + + match key.metric { + keys::workflow::GaugeMetric::SignalPending(signal_name) => { + update_metric_by( + &tx, + Some(keys::metric::GaugeMetric::SignalPending2(signal_name)), + None, + metric_count, + ); + pending_signal_cleared_count += metric_count; + } + } + } update_metric( - &tx.with_subspace(self.subspace.clone()), + &tx, Some(keys::metric::GaugeMetric::WorkflowActive( workflow_name.to_string(), )), @@ -1860,7 +1892,7 @@ impl Database for DatabaseKv { )), ); - Ok(wrote_to_wake_idx) + Ok((wrote_to_wake_idx, pending_signal_cleared_count)) } }) .custom_instrument(tracing::info_span!("complete_workflows_tx")) @@ -1874,6 +1906,10 @@ impl Database for DatabaseKv { self.bump(BumpSubSubject::Worker); } + if pending_signal_cleared_count != 0 { + tracing::debug!(count=%pending_signal_cleared_count, "cleared pending signals after workflow completed"); + } + let dt = start_instant.elapsed().as_secs_f64(); metrics::COMPLETE_WORKFLOW_DURATION .with_label_values(&[workflow_name]) @@ -2152,7 +2188,15 @@ impl Database for DatabaseKv { update_metric( &tx.with_subspace(self.subspace.clone()), Some(keys::metric::GaugeMetric::SignalPending( - key.signal_name.to_string(), + key.signal_name.clone(), + )), + None, + ); + update_wf_metric( + &tx.with_subspace(self.subspace.clone()), + workflow_id, + Some(keys::workflow::GaugeMetric::SignalPending( + key.signal_name.clone(), )), None, ); @@ -2987,6 +3031,15 @@ fn update_metric( tx: &universaldb::Transaction, previous: Option, current: Option, +) { + update_metric_by(tx, previous, current, 1) +} + +fn update_metric_by( + tx: &universaldb::Transaction, + previous: Option, + current: Option, + by: usize, ) { if &previous == ¤t { return; @@ -2995,7 +3048,7 @@ fn update_metric( if let Some(previous) = previous { tx.atomic_op( &keys::metric::GaugeMetricKey::new(previous), - &(-1isize).to_le_bytes(), + &(by as isize * -1).to_le_bytes(), MutationType::Add, ); } @@ -3003,6 +3056,33 @@ fn update_metric( if let Some(current) = current { tx.atomic_op( &keys::metric::GaugeMetricKey::new(current), + &by.to_le_bytes(), + MutationType::Add, + ); + } +} + +fn update_wf_metric( + tx: &universaldb::Transaction, + workflow_id: Id, + previous: Option, + current: Option, +) { + if &previous == ¤t { + return; + } + + if let Some(previous) = previous { + tx.atomic_op( + &keys::workflow::GaugeMetricKey::new(workflow_id, previous), + &(-1isize).to_le_bytes(), + MutationType::Add, + ); + } + + if let Some(current) = current { + tx.atomic_op( + &keys::workflow::GaugeMetricKey::new(workflow_id, current), &1usize.to_le_bytes(), MutationType::Add, ); diff --git a/engine/packages/pegboard/src/workflows/runner.rs b/engine/packages/pegboard/src/workflows/runner.rs index 84b8fc7a0b..4f276d9fca 100644 --- a/engine/packages/pegboard/src/workflows/runner.rs +++ b/engine/packages/pegboard/src/workflows/runner.rs @@ -11,7 +11,7 @@ use universaldb::{ use universalpubsub::PublishOpts; use vbare::OwnedVersionedData; -use crate::{keys, metrics, workflows::actor::Allocate}; +use crate::{keys, workflows::actor::Allocate}; /// Batch size of how many events to ack. const EVENT_ACK_BATCH_SIZE: i64 = 500; diff --git a/engine/packages/pegboard/src/workflows/runner2.rs b/engine/packages/pegboard/src/workflows/runner2.rs index 8479f4b0b6..98d94c1be5 100644 --- a/engine/packages/pegboard/src/workflows/runner2.rs +++ b/engine/packages/pegboard/src/workflows/runner2.rs @@ -11,7 +11,7 @@ use universaldb::{ use universalpubsub::PublishOpts; use vbare::OwnedVersionedData; -use crate::{keys, metrics, workflows::actor::Allocate}; +use crate::{keys, workflows::actor::Allocate}; const EARLY_TXN_TIMEOUT: Duration = Duration::from_millis(2500); diff --git a/engine/packages/pegboard/src/workflows/serverless/conn.rs b/engine/packages/pegboard/src/workflows/serverless/conn.rs index e7fb039765..5a1d67142b 100644 --- a/engine/packages/pegboard/src/workflows/serverless/conn.rs +++ b/engine/packages/pegboard/src/workflows/serverless/conn.rs @@ -1,6 +1,6 @@ use std::time::Instant; -use anyhow::Context; +use anyhow::{Context, bail}; use base64::{Engine, engine::general_purpose::STANDARD as BASE64}; use futures_util::{FutureExt, StreamExt}; use gas::prelude::*; diff --git a/engine/packages/universalpubsub/src/driver/postgres/mod.rs b/engine/packages/universalpubsub/src/driver/postgres/mod.rs index 502b3ac5fd..56d9aa47eb 100644 --- a/engine/packages/universalpubsub/src/driver/postgres/mod.rs +++ b/engine/packages/universalpubsub/src/driver/postgres/mod.rs @@ -389,7 +389,7 @@ impl PubSubDriver for PostgresDriver { let encoded = BASE64.encode(payload); let hashed = self.hash_subject(subject); - tracing::debug!("attempting to get connection for publish"); + tracing::trace!("attempting to get connection for publish"); // Wait for listen connection to be ready first if this channel has subscribers // This ensures that if we're reconnecting, the LISTEN is re-registered before NOTIFY diff --git a/engine/packages/workflow-worker/Cargo.toml b/engine/packages/workflow-worker/Cargo.toml index a8f79fbdbd..ce52047310 100644 --- a/engine/packages/workflow-worker/Cargo.toml +++ b/engine/packages/workflow-worker/Cargo.toml @@ -7,10 +7,9 @@ edition.workspace = true [dependencies] anyhow.workspace = true -gas.workspace = true epoxy.workspace = true -rivet-config.workspace = true -tracing.workspace = true - +gas.workspace = true namespace.workspace = true pegboard.workspace = true +rivet-config.workspace = true +tracing.workspace = true diff --git a/scripts/debug/decode_runner_config.js b/scripts/debug/decode_runner_config.js new file mode 100644 index 0000000000..20c3ff7b40 --- /dev/null +++ b/scripts/debug/decode_runner_config.js @@ -0,0 +1,142 @@ +// BARE decoder for namespace runner config v3 +// Minimal implementation - no external dependencies + +const hexString = process.argv[2]; +if (!hexString) { + console.error("Usage: node decode_runner_config.js "); + process.exit(1); +} + +const buffer = Buffer.from(hexString, "hex"); + +// Skip version (first u16, 2 bytes) +const version = buffer.readUInt16LE(0); +const dataBuffer = buffer.slice(2); + +console.log("Embedded VBARE Version:", version); + +class BareDecoder { + constructor(buffer) { + this.buffer = buffer; + this.offset = 0; + } + + readByte() { + return this.buffer[this.offset++]; + } + + readUint() { + // Read variable-length unsigned integer (LEB128) + let result = 0; + let shift = 0; + while (true) { + const byte = this.readByte(); + result |= (byte & 0x7f) << shift; + if ((byte & 0x80) === 0) break; + shift += 7; + } + return result; + } + + readU32() { + // Read fixed 32-bit unsigned integer (little-endian) + const value = this.buffer.readUInt32LE(this.offset); + this.offset += 4; + return value; + } + + readData() { + // Read length-prefixed byte array + const length = this.readUint(); + const data = this.buffer.slice(this.offset, this.offset + length); + this.offset += length; + return data; + } + + readString() { + // Read length-prefixed UTF-8 string + return this.readData().toString("utf8"); + } + + readBool() { + return this.readByte() !== 0; + } + + readEnum() { + return this.readUint(); + } + + readOptional(readFn) { + const hasValue = this.readByte(); + if (hasValue === 0) return null; + return readFn.call(this); + } + + readMap(keyReadFn, valueReadFn) { + const length = this.readUint(); + const map = {}; + for (let i = 0; i < length; i++) { + const key = keyReadFn.call(this); + const value = valueReadFn.call(this); + map[key] = value; + } + return map; + } + + readUnion(variants, tagNames) { + const tag = this.readUint(); + const value = variants[tag].call(this); + const result = { + tag: tagNames ? tagNames[tag] : tag, + }; + // Only include value if it's not void (undefined/null or the string representation) + if (value !== undefined && value !== null && value !== "Normal") { + result.value = value; + } + return result; + } +} + +// Decode the runner config +const decoder = new BareDecoder(dataBuffer); + +console.log("Decoding runner config from hex:", hexString); + +// RunnerConfig struct +const runnerConfig = {}; + +// kind: RunnerConfigKind union +const kind = decoder.readUnion( + [ + // 0: Serverless + () => { + const serverless = {}; + serverless.url = decoder.readString(); + serverless.headers = decoder.readMap( + () => decoder.readString(), + () => decoder.readString(), + ); + serverless.request_lifespan = decoder.readU32(); + serverless.slots_per_runner = decoder.readU32(); + serverless.min_runners = decoder.readU32(); + serverless.max_runners = decoder.readU32(); + serverless.runners_margin = decoder.readU32(); + return serverless; + }, + // 1: Normal (void) + () => "Normal", + ], + ["Serverless", "Normal"], +); +runnerConfig.kind = kind; + +// metadata: optional +runnerConfig.metadata = decoder.readOptional(() => decoder.readString()); + +// drain_on_version_upgrade: bool +runnerConfig.drain_on_version_upgrade = decoder.readBool(); + +console.log("Decoded runner config:"); +console.log(JSON.stringify(runnerConfig, null, 2)); + +console.log("\nBytes consumed:", decoder.offset, "/", dataBuffer.length);