Skip to content

Commit 4f348fe

Browse files
committed
feat(engine): optionally automatically drain old runners on new runner version connected
1 parent c9cfd90 commit 4f348fe

File tree

8 files changed

+248
-56
lines changed

8 files changed

+248
-56
lines changed

Cargo.lock

Lines changed: 45 additions & 45 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/artifacts/openapi.json

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/packages/api-types/src/namespaces/runner_configs.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@ pub struct RunnerConfig {
1414
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
1515
#[serde(rename_all = "snake_case")]
1616
pub enum RunnerConfigKind {
17-
Normal {},
17+
Normal {
18+
#[serde(default, skip_serializing_if = "Option::is_none")]
19+
drain_on_version_upgrade: Option<bool>,
20+
},
1821
Serverless {
1922
url: String,
2023
headers: Option<HashMap<String, String>>,
@@ -31,7 +34,11 @@ impl Into<rivet_types::runner_configs::RunnerConfig> for RunnerConfig {
3134
fn into(self) -> rivet_types::runner_configs::RunnerConfig {
3235
let RunnerConfig { kind, metadata } = self;
3336
let kind = match kind {
34-
RunnerConfigKind::Normal {} => rivet_types::runner_configs::RunnerConfigKind::Normal {},
37+
RunnerConfigKind::Normal {
38+
drain_on_version_upgrade,
39+
} => rivet_types::runner_configs::RunnerConfigKind::Normal {
40+
drain_on_version_upgrade,
41+
},
3542
RunnerConfigKind::Serverless {
3643
url,
3744
headers,

engine/packages/pegboard/src/workflows/runner.rs

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use universalpubsub::PublishOpts;
1313
use vbare::OwnedVersionedData;
1414

1515
use crate::{keys, metrics, workflows::actor::Allocate};
16+
use rivet_types::runner_configs::RunnerConfigKind;
1617

1718
/// Batch size of how many events to ack.
1819
const EVENT_ACK_BATCH_SIZE: i64 = 500;
@@ -132,6 +133,23 @@ pub async fn pegboard_runner(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
132133
create_ts: ctx.create_ts(),
133134
})
134135
.await?;
136+
137+
// Drain older runner versions if configured
138+
let older_runners = ctx
139+
.activity(DrainOlderVersionsInput {
140+
namespace_id: input.namespace_id,
141+
name: input.name.clone(),
142+
version: input.version,
143+
})
144+
.await?;
145+
for workflow_id in older_runners {
146+
ctx.signal(Stop {
147+
reset_actor_rescheduling: false,
148+
})
149+
.to_workflow_id(workflow_id)
150+
.send()
151+
.await?;
152+
}
135153
}
136154

137155
// Check for pending actors (which happen when there is not enough runner capacity)
@@ -1159,6 +1177,75 @@ async fn send_message_to_runner(ctx: &ActivityCtx, input: &SendMessageToRunnerIn
11591177
Ok(())
11601178
}
11611179

1180+
#[derive(Debug, Serialize, Deserialize, Hash)]
1181+
struct DrainOlderVersionsInput {
1182+
namespace_id: Id,
1183+
name: String,
1184+
version: u32,
1185+
}
1186+
1187+
/// If drain_on_version_upgrade is enabled for this runner config, find all runners with older
1188+
/// versions and return their workflow IDs so they can be stopped.
1189+
#[activity(DrainOlderVersions)]
1190+
async fn drain_older_versions(
1191+
ctx: &ActivityCtx,
1192+
input: &DrainOlderVersionsInput,
1193+
) -> Result<Vec<Id>> {
1194+
// Fetch runner config
1195+
let config = ctx
1196+
.op(crate::ops::runner_config::get::Input {
1197+
runners: vec![(input.namespace_id, input.name.clone())],
1198+
bypass_cache: false,
1199+
})
1200+
.await?;
1201+
1202+
// Check if drain_on_version_upgrade is enabled
1203+
let Some(config) = config.into_iter().next() else {
1204+
return Ok(vec![]);
1205+
};
1206+
let RunnerConfigKind::Normal {
1207+
drain_on_version_upgrade,
1208+
} = config.config.kind
1209+
else {
1210+
return Ok(vec![]);
1211+
};
1212+
if !drain_on_version_upgrade.unwrap_or(false) {
1213+
return Ok(vec![]);
1214+
}
1215+
1216+
// Scan RunnerAllocIdxKey for older versions
1217+
ctx.udb()?
1218+
.run(|tx| async move {
1219+
let tx = tx.with_subspace(keys::subspace());
1220+
let mut older_runners = Vec::new();
1221+
1222+
let runner_alloc_subspace = keys::subspace().subspace(
1223+
&keys::ns::RunnerAllocIdxKey::subspace(input.namespace_id, input.name.clone()),
1224+
);
1225+
1226+
let mut stream = tx.get_ranges_keyvalues(
1227+
universaldb::RangeOption {
1228+
mode: StreamingMode::WantAll,
1229+
..(&runner_alloc_subspace).into()
1230+
},
1231+
Snapshot,
1232+
);
1233+
1234+
while let Some(entry) = stream.try_next().await? {
1235+
let (key, data) = tx.read_entry::<keys::ns::RunnerAllocIdxKey>(&entry)?;
1236+
1237+
// Only collect runners with older versions
1238+
if key.version < input.version {
1239+
older_runners.push(data.workflow_id);
1240+
}
1241+
}
1242+
1243+
Ok(older_runners)
1244+
})
1245+
.custom_instrument(tracing::info_span!("drain_older_versions_tx"))
1246+
.await
1247+
}
1248+
11621249
#[signal("pegboard_runner_check_queue")]
11631250
pub struct CheckQueue {}
11641251

engine/packages/pegboard/src/workflows/runner2.rs

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use universalpubsub::PublishOpts;
1313
use vbare::OwnedVersionedData;
1414

1515
use crate::{keys, metrics, workflows::actor::Allocate};
16+
use rivet_types::runner_configs::RunnerConfigKind;
1617

1718
const EARLY_TXN_TIMEOUT: Duration = Duration::from_millis(2500);
1819

@@ -56,6 +57,23 @@ pub async fn pegboard_runner2(ctx: &mut WorkflowCtx, input: &Input) -> Result<()
5657
})
5758
.await?;
5859

60+
// Drain older runner versions if configured
61+
let older_runners = ctx
62+
.activity(DrainOlderVersionsInput {
63+
namespace_id: input.namespace_id,
64+
name: input.name.clone(),
65+
version: input.version,
66+
})
67+
.await?;
68+
for workflow_id in older_runners {
69+
ctx.signal(Stop {
70+
reset_actor_rescheduling: false,
71+
})
72+
.to_workflow_id(workflow_id)
73+
.send()
74+
.await?;
75+
}
76+
5977
// Check for pending actors (which happen when there is not enough runner capacity)
6078
let res = ctx
6179
.activity(AllocatePendingActorsInput {
@@ -811,6 +829,75 @@ async fn send_messages_to_runner(
811829
Ok(())
812830
}
813831

832+
#[derive(Debug, Serialize, Deserialize, Hash)]
833+
struct DrainOlderVersionsInput {
834+
namespace_id: Id,
835+
name: String,
836+
version: u32,
837+
}
838+
839+
/// If drain_on_version_upgrade is enabled for this runner config, find all runners with older
840+
/// versions and return their workflow IDs so they can be stopped.
841+
#[activity(DrainOlderVersions)]
842+
async fn drain_older_versions(
843+
ctx: &ActivityCtx,
844+
input: &DrainOlderVersionsInput,
845+
) -> Result<Vec<Id>> {
846+
// Fetch runner config
847+
let config = ctx
848+
.op(crate::ops::runner_config::get::Input {
849+
runners: vec![(input.namespace_id, input.name.clone())],
850+
bypass_cache: false,
851+
})
852+
.await?;
853+
854+
// Check if drain_on_version_upgrade is enabled
855+
let Some(config) = config.into_iter().next() else {
856+
return Ok(vec![]);
857+
};
858+
let RunnerConfigKind::Normal {
859+
drain_on_version_upgrade,
860+
} = config.config.kind
861+
else {
862+
return Ok(vec![]);
863+
};
864+
if !drain_on_version_upgrade.unwrap_or(false) {
865+
return Ok(vec![]);
866+
}
867+
868+
// Scan RunnerAllocIdxKey for older versions
869+
ctx.udb()?
870+
.run(|tx| async move {
871+
let tx = tx.with_subspace(keys::subspace());
872+
let mut older_runners = Vec::new();
873+
874+
let runner_alloc_subspace = keys::subspace().subspace(
875+
&keys::ns::RunnerAllocIdxKey::subspace(input.namespace_id, input.name.clone()),
876+
);
877+
878+
let mut stream = tx.get_ranges_keyvalues(
879+
universaldb::RangeOption {
880+
mode: StreamingMode::WantAll,
881+
..(&runner_alloc_subspace).into()
882+
},
883+
Snapshot,
884+
);
885+
886+
while let Some(entry) = stream.try_next().await? {
887+
let (key, data) = tx.read_entry::<keys::ns::RunnerAllocIdxKey>(&entry)?;
888+
889+
// Only collect runners with older versions
890+
if key.version < input.version {
891+
older_runners.push(data.workflow_id);
892+
}
893+
}
894+
895+
Ok(older_runners)
896+
})
897+
.custom_instrument(tracing::info_span!("drain_older_versions_tx"))
898+
.await
899+
}
900+
814901
#[signal("pegboard_runner_init")]
815902
pub struct Init {}
816903

engine/packages/types/src/runner_configs.rs

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@ pub struct RunnerConfig {
1414
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
1515
#[serde(rename_all = "snake_case")]
1616
pub enum RunnerConfigKind {
17-
Normal {},
17+
Normal {
18+
#[serde(default, skip_serializing_if = "Option::is_none")]
19+
drain_on_version_upgrade: Option<bool>,
20+
},
1821
Serverless {
1922
url: String,
2023
headers: HashMap<String, String>,
@@ -33,9 +36,13 @@ impl From<RunnerConfig> for rivet_data::generated::namespace_runner_config_v2::R
3336
rivet_data::generated::namespace_runner_config_v2::RunnerConfig {
3437
metadata: metadata.and_then(|value| serde_json::to_string(&value).ok()),
3538
kind: match kind {
36-
RunnerConfigKind::Normal {} => {
37-
rivet_data::generated::namespace_runner_config_v2::RunnerConfigKind::Normal
38-
}
39+
RunnerConfigKind::Normal {
40+
drain_on_version_upgrade,
41+
} => rivet_data::generated::namespace_runner_config_v2::RunnerConfigKind::Normal(
42+
rivet_data::generated::namespace_runner_config_v2::Normal {
43+
drain_on_version_upgrade,
44+
},
45+
),
3946
RunnerConfigKind::Serverless {
4047
url,
4148
headers,
@@ -69,8 +76,10 @@ impl From<rivet_data::generated::namespace_runner_config_v2::RunnerConfig> for R
6976
RunnerConfig {
7077
metadata: metadata.and_then(|raw| serde_json::from_str(&raw).ok()),
7178
kind: match kind {
72-
rivet_data::generated::namespace_runner_config_v2::RunnerConfigKind::Normal => {
73-
RunnerConfigKind::Normal {}
79+
rivet_data::generated::namespace_runner_config_v2::RunnerConfigKind::Normal(o) => {
80+
RunnerConfigKind::Normal {
81+
drain_on_version_upgrade: o.drain_on_version_upgrade,
82+
}
7483
}
7584
rivet_data::generated::namespace_runner_config_v2::RunnerConfigKind::Serverless(
7685
o,

engine/sdks/rust/data/src/versioned/namespace_runner_config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ impl NamespaceRunnerConfig {
114114
),
115115
))
116116
}
117-
namespace_runner_config_v2::RunnerConfigKind::Normal => {
117+
namespace_runner_config_v2::RunnerConfigKind::Normal(_) => {
118118
bail!("namespace runner config v1 does not support normal runner config")
119119
}
120120
}

engine/sdks/schemas/data/namespace.runner_config.v2.bare

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ type Serverless struct {
1010
runners_margin: u32
1111
}
1212

13-
type Normal void
13+
type Normal struct {
14+
drain_on_version_upgrade: optional<bool>
15+
}
1416

1517
type RunnerConfigKind union {
1618
Serverless |

0 commit comments

Comments
 (0)