Skip to content

Commit a129485

Browse files
committed
fix: clear all pending signals after workflow complete
1 parent 373f6cf commit a129485

File tree

9 files changed

+422
-63
lines changed

9 files changed

+422
-63
lines changed

engine/packages/gasoline/src/db/kv/keys/metric.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ pub enum GaugeMetric {
77
WorkflowSleeping(String),
88
WorkflowDead(String, String),
99
WorkflowComplete(String),
10+
// Deprecated
1011
SignalPending(String),
12+
SignalPending2(String),
1113
}
1214

1315
impl GaugeMetric {
@@ -18,6 +20,7 @@ impl GaugeMetric {
1820
GaugeMetric::WorkflowDead(_, _) => GaugeMetricVariant::WorkflowDead,
1921
GaugeMetric::WorkflowComplete(_) => GaugeMetricVariant::WorkflowComplete,
2022
GaugeMetric::SignalPending(_) => GaugeMetricVariant::SignalPending,
23+
GaugeMetric::SignalPending2(_) => GaugeMetricVariant::SignalPending2,
2124
}
2225
}
2326
}
@@ -28,7 +31,9 @@ enum GaugeMetricVariant {
2831
WorkflowSleeping = 1,
2932
WorkflowDead = 2,
3033
WorkflowComplete = 3,
34+
// Deprecated
3135
SignalPending = 4,
36+
SignalPending2 = 5,
3237
}
3338

3439
#[derive(Debug)]
@@ -79,6 +84,7 @@ impl TuplePack for GaugeMetricKey {
7984
}
8085
GaugeMetric::WorkflowComplete(workflow_name) => workflow_name.pack(w, tuple_depth)?,
8186
GaugeMetric::SignalPending(signal_name) => signal_name.pack(w, tuple_depth)?,
87+
GaugeMetric::SignalPending2(signal_name) => signal_name.pack(w, tuple_depth)?,
8288
};
8389

8490
std::result::Result::Ok(offset)
@@ -144,6 +150,16 @@ impl<'de> TupleUnpack<'de> for GaugeMetricKey {
144150
},
145151
)
146152
}
153+
GaugeMetricVariant::SignalPending2 => {
154+
let (input, signal_name) = String::unpack(input, tuple_depth)?;
155+
156+
(
157+
input,
158+
GaugeMetricKey {
159+
metric: GaugeMetric::SignalPending2(signal_name),
160+
},
161+
)
162+
}
147163
};
148164

149165
std::result::Result::Ok((input, v))

engine/packages/gasoline/src/db/kv/keys/workflow.rs

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1189,3 +1189,125 @@ impl<'de> TupleUnpack<'de> for SilenceTsKey {
11891189
Ok((input, v))
11901190
}
11911191
}
1192+
1193+
#[derive(Debug, PartialEq, Eq)]
1194+
pub enum GaugeMetric {
1195+
SignalPending(String),
1196+
}
1197+
1198+
impl GaugeMetric {
1199+
fn variant(&self) -> GaugeMetricVariant {
1200+
match self {
1201+
GaugeMetric::SignalPending(_) => GaugeMetricVariant::SignalPending,
1202+
}
1203+
}
1204+
}
1205+
1206+
#[derive(strum::FromRepr)]
1207+
enum GaugeMetricVariant {
1208+
SignalPending = 0,
1209+
}
1210+
1211+
#[derive(Debug)]
1212+
pub struct GaugeMetricKey {
1213+
pub workflow_id: Id,
1214+
pub metric: GaugeMetric,
1215+
}
1216+
1217+
impl GaugeMetricKey {
1218+
pub fn new(workflow_id: Id, metric: GaugeMetric) -> Self {
1219+
GaugeMetricKey {
1220+
workflow_id,
1221+
metric,
1222+
}
1223+
}
1224+
1225+
pub fn subspace(workflow_id: Id) -> GaugeMetricSubspaceKey {
1226+
GaugeMetricSubspaceKey::new(workflow_id)
1227+
}
1228+
}
1229+
1230+
impl FormalKey for GaugeMetricKey {
1231+
// IMPORTANT: Uses LE bytes, not BE
1232+
/// Count.
1233+
type Value = usize;
1234+
1235+
fn deserialize(&self, raw: &[u8]) -> Result<Self::Value> {
1236+
Ok(usize::from_le_bytes(raw.try_into()?))
1237+
}
1238+
1239+
fn serialize(&self, value: Self::Value) -> Result<Vec<u8>> {
1240+
Ok(value.to_le_bytes().to_vec())
1241+
}
1242+
}
1243+
1244+
impl TuplePack for GaugeMetricKey {
1245+
fn pack<W: std::io::Write>(
1246+
&self,
1247+
w: &mut W,
1248+
tuple_depth: TupleDepth,
1249+
) -> std::io::Result<VersionstampOffset> {
1250+
let mut offset = VersionstampOffset::None { size: 0 };
1251+
1252+
let t = (
1253+
WORKFLOW,
1254+
METRIC,
1255+
self.workflow_id,
1256+
self.metric.variant() as usize,
1257+
);
1258+
offset += t.pack(w, tuple_depth)?;
1259+
1260+
offset += match &self.metric {
1261+
GaugeMetric::SignalPending(signal_name) => signal_name.pack(w, tuple_depth)?,
1262+
};
1263+
1264+
Ok(offset)
1265+
}
1266+
}
1267+
1268+
impl<'de> TupleUnpack<'de> for GaugeMetricKey {
1269+
fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> {
1270+
let (input, (_, _, workflow_id, variant)) =
1271+
<(usize, usize, Id, usize)>::unpack(input, tuple_depth)?;
1272+
let variant = GaugeMetricVariant::from_repr(variant).ok_or_else(|| {
1273+
PackError::Message(format!("invalid metric variant `{variant}` in key").into())
1274+
})?;
1275+
1276+
let (input, v) = match variant {
1277+
GaugeMetricVariant::SignalPending => {
1278+
let (input, signal_name) = String::unpack(input, tuple_depth)?;
1279+
1280+
(
1281+
input,
1282+
GaugeMetricKey {
1283+
workflow_id,
1284+
metric: GaugeMetric::SignalPending(signal_name),
1285+
},
1286+
)
1287+
}
1288+
};
1289+
1290+
Ok((input, v))
1291+
}
1292+
}
1293+
1294+
pub struct GaugeMetricSubspaceKey {
1295+
workflow_id: Id,
1296+
}
1297+
1298+
impl GaugeMetricSubspaceKey {
1299+
pub fn new(workflow_id: Id) -> Self {
1300+
GaugeMetricSubspaceKey { workflow_id }
1301+
}
1302+
}
1303+
1304+
impl TuplePack for GaugeMetricSubspaceKey {
1305+
fn pack<W: std::io::Write>(
1306+
&self,
1307+
w: &mut W,
1308+
tuple_depth: TupleDepth,
1309+
) -> std::io::Result<VersionstampOffset> {
1310+
let t = (WORKFLOW, METRIC, self.workflow_id);
1311+
t.pack(w, tuple_depth)
1312+
}
1313+
}

0 commit comments

Comments
 (0)