Skip to content

Commit ada3fe1

Browse files
authored
feat(deployer): load phase caching, automatic startup (#1640)
* feat(deployer): cache resources after provisioning for use on wakeups * fix(deployer): fix old deployment handling, always start previous running deploy on wakeup * nit: comment * fix and speed up depleoyer integration * fix deployer integration
1 parent ba57785 commit ada3fe1

File tree

11 files changed

+136
-141
lines changed

11 files changed

+136
-141
lines changed

Makefile

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -134,14 +134,14 @@ DOCKER_COMPOSE_ENV=\
134134
SHUTTLE_ENV=$(SHUTTLE_ENV)\
135135
SHUTTLE_SERVICE_VERSION=$(SHUTTLE_SERVICE_VERSION)
136136

137-
.PHONY: clean cargo-clean images the-shuttle-images shuttle-% postgres otel deploy test docker-compose.rendered.yml up down
137+
.PHONY: clean deep-clean images the-shuttle-images shuttle-% postgres otel deploy test docker-compose.rendered.yml up down
138138

139139
clean:
140140
rm .shuttle-*
141141
rm docker-compose.rendered.yml
142142

143-
cargo-clean:
144-
find . -type d \( -name target -or -name .shuttle-executables \) | xargs rm -rf
143+
deep-clean:
144+
find . -type d \( -name target -or -name .shuttle-executables -or -name node_modules \) | xargs rm -rf
145145

146146
images: the-shuttle-images postgres otel
147147

deployer/src/deployment/queue.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ impl Queued {
245245
project_id: self.project_id,
246246
tracing_context: Default::default(),
247247
is_next,
248-
claim: self.claim,
248+
claim: Some(self.claim),
249249
secrets,
250250
};
251251

deployer/src/deployment/run.rs

Lines changed: 86 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ pub async fn task(
150150
}
151151
}
152152

153-
#[instrument(skip(active_deployment_getter, deployment_id, runtime_manager))]
153+
#[instrument(skip_all)]
154154
async fn kill_old_deployments(
155155
service_id: Ulid,
156156
deployment_id: Uuid,
@@ -230,16 +230,14 @@ pub struct Built {
230230
pub project_id: Ulid,
231231
pub tracing_context: HashMap<String, String>,
232232
pub is_next: bool,
233-
pub claim: Claim,
233+
/// must be set if this run will perform requests to backends
234+
pub claim: Option<Claim>,
234235
pub secrets: HashMap<String, String>,
235236
}
236237

237238
impl Built {
238-
#[instrument(
239-
name = "Loading resources",
240-
skip(self, resource_manager, runtime_manager, kill_old_deployments, cleanup, provisioner_client),
241-
fields(deployment_id = %self.id, state = %State::Loading)
242-
)]
239+
#[instrument(name = "Loading resources", skip_all, fields(deployment_id = %self.id, state = %State::Loading))]
240+
#[allow(clippy::too_many_arguments)]
243241
pub async fn handle(
244242
self,
245243
mut resource_manager: impl ResourceManager,
@@ -256,6 +254,9 @@ impl Built {
256254
let executable_path = project_path
257255
.join(EXECUTABLE_DIRNAME)
258256
.join(self.id.to_string());
257+
let cached_resources_path = project_path
258+
.join(EXECUTABLE_DIRNAME)
259+
.join(format!("{}.resources", self.id));
259260

260261
// Let the runtime expose its user HTTP port on port 8000
261262
let address = SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 8000);
@@ -279,62 +280,87 @@ impl Built {
279280
.await
280281
.map_err(Error::Runtime)?;
281282

282-
info!("Loading resources");
283+
// Check for cached resources for this deployment id. This only succeeds on wakeup from idle or project restart.
284+
let resources = if let Some(bytes) = std::fs::read(&cached_resources_path)
285+
.ok()
286+
.and_then(|bytes| serde_json::from_slice(bytes.as_slice()).ok())
287+
{
288+
info!("Using cached resources");
283289

284-
let mut new_secrets = self.secrets;
285-
let prev_resources = resource_manager
286-
.get_resources(&self.service_id, self.claim.clone())
287-
.await
288-
.map_err(|err| Error::Load(err.to_string()))?
289-
.resources
290-
.into_iter()
291-
.map(resource::Response::try_from)
292-
// Ignore and trace the errors for resources with corrupted data, returning just the valid resources.
293-
// TODO: investigate how the resource data can get corrupted.
294-
.filter_map(|resource| {
295-
resource
296-
.map_err(|err| {
297-
error!(error = ?err, "failed to parse resource data");
298-
})
299-
.ok()
300-
})
301-
// inject old secrets into the secrets added in this deployment
302-
.inspect(|r| {
303-
if r.r#type == shuttle_common::resource::Type::Secrets {
304-
match serde_json::from_value::<SecretStore>(r.data.clone()) {
305-
Ok(ss) => {
306-
// Combine old and new, but insert old first so that new ones override.
307-
let mut combined = HashMap::from_iter(ss.into_iter());
308-
combined.extend(new_secrets.clone().into_iter());
309-
new_secrets = combined;
310-
}
311-
Err(err) => {
312-
error!(error = ?err, "failed to parse old secrets data");
290+
bytes
291+
}
292+
// Default case for handling resources and provisioning
293+
else {
294+
info!("Loading resources");
295+
let claim = self
296+
.claim
297+
.expect("claim must be present when loading resources");
298+
299+
let mut new_secrets = self.secrets;
300+
let prev_resources = resource_manager
301+
.get_resources(&self.service_id, claim.clone())
302+
.await
303+
.map_err(|err| Error::Load(err.to_string()))?
304+
.resources
305+
.into_iter()
306+
.map(resource::Response::try_from)
307+
// Ignore and trace the errors for resources with corrupted data, returning just the valid resources.
308+
// TODO: investigate how the resource data can get corrupted.
309+
.filter_map(|resource| {
310+
resource
311+
.map_err(|err| {
312+
error!(error = ?err, "failed to parse resource data");
313+
})
314+
.ok()
315+
})
316+
// inject old secrets into the secrets added in this deployment
317+
.inspect(|r| {
318+
if r.r#type == shuttle_common::resource::Type::Secrets {
319+
match serde_json::from_value::<SecretStore>(r.data.clone()) {
320+
Ok(ss) => {
321+
// Combine old and new, but insert old first so that new ones override.
322+
let mut combined = HashMap::from_iter(ss.into_iter());
323+
combined.extend(new_secrets.clone().into_iter());
324+
new_secrets = combined;
325+
}
326+
Err(err) => {
327+
error!(error = ?err, "failed to parse old secrets data");
328+
}
313329
}
314330
}
315-
}
316-
})
317-
.collect::<Vec<_>>();
318-
319-
let resources = load(
320-
self.service_name.clone(),
321-
runtime_client.clone(),
322-
&new_secrets,
323-
)
324-
.await?;
325-
326-
let resources = provision(
327-
self.service_name.as_str(),
328-
self.service_id,
329-
provisioner_client,
330-
resource_manager,
331-
self.claim,
332-
prev_resources,
333-
resources,
334-
new_secrets,
335-
)
336-
.await
337-
.map_err(Error::Provision)?;
331+
})
332+
.collect::<Vec<_>>();
333+
334+
let resources = load(
335+
self.service_name.clone(),
336+
runtime_client.clone(),
337+
&new_secrets,
338+
)
339+
.await?;
340+
341+
let resources = provision(
342+
self.service_name.as_str(),
343+
self.service_id,
344+
provisioner_client,
345+
resource_manager,
346+
claim,
347+
prev_resources,
348+
resources,
349+
new_secrets,
350+
)
351+
.await
352+
.map_err(Error::Provision)?;
353+
354+
// cache the final resources output for use in wakeups
355+
// this should only happen on deployment, and not on wakeups
356+
std::fs::write(
357+
&cached_resources_path,
358+
serde_json::to_vec(&resources).expect("resources to serialize"),
359+
)
360+
.map_err(|_| Error::Load("Failed to save resource cache".into()))?;
361+
362+
resources
363+
};
338364

339365
kill_old_deployments.await?;
340366

deployer/src/handlers/mod.rs

Lines changed: 5 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use shuttle_proto::logger::LogsRequest;
3939

4040
use crate::persistence::{Deployment, Persistence, State};
4141
use crate::{
42-
deployment::{Built, DeploymentManager, Queued},
42+
deployment::{DeploymentManager, Queued},
4343
persistence::resource::ResourceManager,
4444
};
4545
pub use {self::error::Error, self::error::Result, self::local::set_jwt_bearer};
@@ -430,32 +430,10 @@ pub async fn delete_deployment(
430430
}
431431
}
432432

433-
#[instrument(skip_all, fields(shuttle.project.name = %project_name, %deployment_id))]
434-
pub async fn start_deployment(
435-
Extension(persistence): Extension<Persistence>,
436-
Extension(deployment_manager): Extension<DeploymentManager>,
437-
Extension(claim): Extension<Claim>,
438-
Extension(project_id): Extension<Ulid>,
439-
CustomErrorPath((project_name, deployment_id)): CustomErrorPath<(String, Uuid)>,
440-
) -> Result<()> {
441-
if let Some(deployment) = persistence.get_runnable_deployment(&deployment_id).await? {
442-
let built = Built {
443-
id: deployment.id,
444-
service_name: deployment.service_name,
445-
service_id: deployment.service_id,
446-
project_id,
447-
tracing_context: Default::default(),
448-
is_next: deployment.is_next,
449-
claim,
450-
secrets: Default::default(),
451-
};
452-
deployment_manager.run_push(built).await;
453-
454-
Ok(())
455-
} else {
456-
Err(Error::NotFound("deployment not found".to_string()))
457-
}
458-
}
433+
/// Deprecated.
434+
/// Now always starts the last running deployment on start up / wake up.
435+
/// Kept around for compatibility.
436+
pub async fn start_deployment() {}
459437

460438
#[instrument(skip_all, fields(shuttle.project.name = %project_name, %deployment_id))]
461439
pub async fn get_logs(

deployer/src/lib.rs

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ mod runtime_manager;
1717

1818
pub use crate::args::Args;
1919
pub use crate::deployment::state_change_layer::StateChangeLayer;
20-
use crate::deployment::DeploymentManager;
20+
use crate::deployment::{Built, DeploymentManager};
2121
use shuttle_common::backends::client::gateway;
2222

2323
const VERSION: &str = env!("CARGO_PKG_VERSION");
@@ -29,6 +29,9 @@ pub async fn start(
2929
log_fetcher: logger::Client,
3030
args: Args,
3131
) {
32+
let project_id = Ulid::from_string(args.project_id.as_str())
33+
.expect("to have a valid ULID as project_id arg");
34+
3235
// when _set is dropped once axum exits, the deployment tasks will be aborted.
3336
let deployment_manager = DeploymentManager::builder()
3437
.build_log_recorder(log_recorder)
@@ -47,18 +50,32 @@ pub async fn start(
4750

4851
persistence.cleanup_invalid_states().await.unwrap();
4952

50-
let runnable_deployments = persistence.get_all_runnable_deployments().await.unwrap();
51-
info!(count = %runnable_deployments.len(), "stopping all but last running deploy");
52-
53-
// Make sure we don't stop the last running deploy. This works because they are returned in descending order.
54-
let project_id = Ulid::from_string(args.project_id.as_str())
55-
.expect("to have a valid ULID as project_id arg");
56-
for existing_deployment in runnable_deployments.into_iter().skip(1) {
53+
let deployments = persistence.get_all_runnable_deployments().await.unwrap();
54+
info!(count = %deployments.len(), "Deployments considered in the running state");
55+
// This works because they are returned in descending order.
56+
let mut deployments = deployments.into_iter();
57+
let last_running_deployment = deployments.next();
58+
info!("Marking all but last running deployment as stopped");
59+
for older_deployment in deployments {
5760
persistence
58-
.stop_running_deployment(existing_deployment)
61+
.stop_running_deployment(older_deployment)
5962
.await
6063
.unwrap();
6164
}
65+
if let Some(deployment) = last_running_deployment {
66+
info!("Starting up last running deployment");
67+
let built = Built {
68+
id: deployment.id,
69+
service_name: deployment.service_name,
70+
service_id: deployment.service_id,
71+
project_id,
72+
tracing_context: Default::default(),
73+
is_next: deployment.is_next,
74+
claim: None,
75+
secrets: Default::default(),
76+
};
77+
deployment_manager.run_push(built).await;
78+
}
6279

6380
let mut builder = handlers::RouterBuilder::new(
6481
persistence,

deployer/src/persistence/mod.rs

Lines changed: 1 addition & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ impl Persistence {
222222
}
223223

224224
pub async fn get_active_deployment(&self, service_id: &Ulid) -> Result<Option<Deployment>> {
225-
sqlx::query_as("SELECT * FROM deployments WHERE service_id = ? AND state = ?")
225+
sqlx::query_as("SELECT * FROM deployments WHERE service_id = ? AND state = ? ORDER BY last_update DESC")
226226
.bind(service_id.to_string())
227227
.bind(State::Running)
228228
.fetch_optional(&self.pool)
@@ -301,24 +301,6 @@ impl Persistence {
301301
.map_err(Error::from)
302302
}
303303

304-
/// Gets a deployment if it is runnable
305-
pub async fn get_runnable_deployment(&self, id: &Uuid) -> Result<Option<DeploymentRunnable>> {
306-
sqlx::query_as(
307-
r#"SELECT d.id, service_id, s.name AS service_name, d.is_next
308-
FROM deployments AS d
309-
JOIN services AS s ON s.id = d.service_id
310-
WHERE state IN (?, ?, ?)
311-
AND d.id = ?"#,
312-
)
313-
.bind(State::Running)
314-
.bind(State::Stopped)
315-
.bind(State::Completed)
316-
.bind(id)
317-
.fetch_optional(&self.pool)
318-
.await
319-
.map_err(Error::from)
320-
}
321-
322304
pub async fn stop_running_deployment(&self, deployable: DeploymentRunnable) -> Result<()> {
323305
update_deployment(
324306
&self.pool,
@@ -931,20 +913,6 @@ mod tests {
931913
p.insert_deployment(deployment).await.unwrap();
932914
}
933915

934-
let runnable = p.get_runnable_deployment(&id_1).await.unwrap();
935-
assert_eq!(
936-
runnable,
937-
Some(DeploymentRunnable {
938-
id: id_1,
939-
service_name: "foo".to_string(),
940-
service_id: foo_id,
941-
is_next: false,
942-
})
943-
);
944-
945-
let runnable = p.get_runnable_deployment(&id_crashed).await.unwrap();
946-
assert_eq!(runnable, None);
947-
948916
let runnable = p.get_all_runnable_deployments().await.unwrap();
949917
assert_eq!(
950918
runnable,

0 commit comments

Comments
 (0)