Skip to content

Commit 1a6243a

Browse files
committed
fix: clear all pending signals after workflow complete (#3798)
1 parent 7dd5e2b commit 1a6243a

File tree

9 files changed

+424
-61
lines changed

9 files changed

+424
-61
lines changed

engine/packages/gasoline/src/db/kv/keys/metric.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ pub enum GaugeMetric {
77
WorkflowSleeping(String),
88
WorkflowDead(String, String),
99
WorkflowComplete(String),
10+
// Deprecated
1011
SignalPending(String),
12+
SignalPending2(String),
1113
}
1214

1315
impl GaugeMetric {
@@ -18,6 +20,7 @@ impl GaugeMetric {
1820
GaugeMetric::WorkflowDead(_, _) => GaugeMetricVariant::WorkflowDead,
1921
GaugeMetric::WorkflowComplete(_) => GaugeMetricVariant::WorkflowComplete,
2022
GaugeMetric::SignalPending(_) => GaugeMetricVariant::SignalPending,
23+
GaugeMetric::SignalPending2(_) => GaugeMetricVariant::SignalPending2,
2124
}
2225
}
2326
}
@@ -28,9 +31,12 @@ enum GaugeMetricVariant {
2831
WorkflowSleeping = 1,
2932
WorkflowDead = 2,
3033
WorkflowComplete = 3,
34+
// Deprecated
3135
SignalPending = 4,
36+
SignalPending2 = 5,
3237
}
3338

39+
/// Stores gauge metrics for global database usage.
3440
#[derive(Debug)]
3541
pub struct GaugeMetricKey {
3642
pub metric: GaugeMetric,
@@ -79,6 +85,7 @@ impl TuplePack for GaugeMetricKey {
7985
}
8086
GaugeMetric::WorkflowComplete(workflow_name) => workflow_name.pack(w, tuple_depth)?,
8187
GaugeMetric::SignalPending(signal_name) => signal_name.pack(w, tuple_depth)?,
88+
GaugeMetric::SignalPending2(signal_name) => signal_name.pack(w, tuple_depth)?,
8289
};
8390

8491
std::result::Result::Ok(offset)
@@ -144,12 +151,23 @@ impl<'de> TupleUnpack<'de> for GaugeMetricKey {
144151
},
145152
)
146153
}
154+
GaugeMetricVariant::SignalPending2 => {
155+
let (input, signal_name) = String::unpack(input, tuple_depth)?;
156+
157+
(
158+
input,
159+
GaugeMetricKey {
160+
metric: GaugeMetric::SignalPending2(signal_name),
161+
},
162+
)
163+
}
147164
};
148165

149166
std::result::Result::Ok((input, v))
150167
}
151168
}
152169

170+
/// Used to list all global gauge metrics.
153171
pub struct GaugeMetricSubspaceKey {}
154172

155173
impl GaugeMetricSubspaceKey {

engine/packages/gasoline/src/db/kv/keys/workflow.rs

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1189,3 +1189,127 @@ impl<'de> TupleUnpack<'de> for SilenceTsKey {
11891189
Ok((input, v))
11901190
}
11911191
}
1192+
1193+
#[derive(Debug, PartialEq, Eq)]
1194+
pub enum GaugeMetric {
1195+
SignalPending(String),
1196+
}
1197+
1198+
impl GaugeMetric {
1199+
fn variant(&self) -> GaugeMetricVariant {
1200+
match self {
1201+
GaugeMetric::SignalPending(_) => GaugeMetricVariant::SignalPending,
1202+
}
1203+
}
1204+
}
1205+
1206+
#[derive(strum::FromRepr)]
1207+
enum GaugeMetricVariant {
1208+
SignalPending = 0,
1209+
}
1210+
1211+
/// Stores gauge metrics for a single workflow.
1212+
#[derive(Debug)]
1213+
pub struct GaugeMetricKey {
1214+
pub workflow_id: Id,
1215+
pub metric: GaugeMetric,
1216+
}
1217+
1218+
impl GaugeMetricKey {
1219+
pub fn new(workflow_id: Id, metric: GaugeMetric) -> Self {
1220+
GaugeMetricKey {
1221+
workflow_id,
1222+
metric,
1223+
}
1224+
}
1225+
1226+
pub fn subspace(workflow_id: Id) -> GaugeMetricSubspaceKey {
1227+
GaugeMetricSubspaceKey::new(workflow_id)
1228+
}
1229+
}
1230+
1231+
impl FormalKey for GaugeMetricKey {
1232+
// IMPORTANT: Uses LE bytes, not BE
1233+
/// Count.
1234+
type Value = usize;
1235+
1236+
fn deserialize(&self, raw: &[u8]) -> Result<Self::Value> {
1237+
Ok(usize::from_le_bytes(raw.try_into()?))
1238+
}
1239+
1240+
fn serialize(&self, value: Self::Value) -> Result<Vec<u8>> {
1241+
Ok(value.to_le_bytes().to_vec())
1242+
}
1243+
}
1244+
1245+
impl TuplePack for GaugeMetricKey {
1246+
fn pack<W: std::io::Write>(
1247+
&self,
1248+
w: &mut W,
1249+
tuple_depth: TupleDepth,
1250+
) -> std::io::Result<VersionstampOffset> {
1251+
let mut offset = VersionstampOffset::None { size: 0 };
1252+
1253+
let t = (
1254+
WORKFLOW,
1255+
METRIC,
1256+
self.workflow_id,
1257+
self.metric.variant() as usize,
1258+
);
1259+
offset += t.pack(w, tuple_depth)?;
1260+
1261+
offset += match &self.metric {
1262+
GaugeMetric::SignalPending(signal_name) => signal_name.pack(w, tuple_depth)?,
1263+
};
1264+
1265+
Ok(offset)
1266+
}
1267+
}
1268+
1269+
impl<'de> TupleUnpack<'de> for GaugeMetricKey {
1270+
fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> {
1271+
let (input, (_, _, workflow_id, variant)) =
1272+
<(usize, usize, Id, usize)>::unpack(input, tuple_depth)?;
1273+
let variant = GaugeMetricVariant::from_repr(variant).ok_or_else(|| {
1274+
PackError::Message(format!("invalid metric variant `{variant}` in key").into())
1275+
})?;
1276+
1277+
let (input, v) = match variant {
1278+
GaugeMetricVariant::SignalPending => {
1279+
let (input, signal_name) = String::unpack(input, tuple_depth)?;
1280+
1281+
(
1282+
input,
1283+
GaugeMetricKey {
1284+
workflow_id,
1285+
metric: GaugeMetric::SignalPending(signal_name),
1286+
},
1287+
)
1288+
}
1289+
};
1290+
1291+
Ok((input, v))
1292+
}
1293+
}
1294+
1295+
/// Used to list all gauge metrics for a workflow.
1296+
pub struct GaugeMetricSubspaceKey {
1297+
workflow_id: Id,
1298+
}
1299+
1300+
impl GaugeMetricSubspaceKey {
1301+
pub fn new(workflow_id: Id) -> Self {
1302+
GaugeMetricSubspaceKey { workflow_id }
1303+
}
1304+
}
1305+
1306+
impl TuplePack for GaugeMetricSubspaceKey {
1307+
fn pack<W: std::io::Write>(
1308+
&self,
1309+
w: &mut W,
1310+
tuple_depth: TupleDepth,
1311+
) -> std::io::Result<VersionstampOffset> {
1312+
let t = (WORKFLOW, METRIC, self.workflow_id);
1313+
t.pack(w, tuple_depth)
1314+
}
1315+
}

0 commit comments

Comments
 (0)