@@ -20,9 +20,9 @@ use clap_mangen::Man;
2020
2121use shuttle_common:: {
2222 constants:: {
23- API_URL_DEFAULT , DEFAULT_IDLE_MINUTES , EXECUTABLE_DIRNAME , SHUTTLE_CLI_DOCS_URL ,
24- SHUTTLE_GH_ISSUE_URL , SHUTTLE_IDLE_DOCS_URL , SHUTTLE_INSTALL_DOCS_URL , SHUTTLE_LOGIN_URL ,
25- STORAGE_DIRNAME ,
23+ API_URL_DEFAULT , DEFAULT_IDLE_MINUTES , EXECUTABLE_DIRNAME , RESOURCE_SCHEMA_VERSION ,
24+ SHUTTLE_CLI_DOCS_URL , SHUTTLE_GH_ISSUE_URL , SHUTTLE_IDLE_DOCS_URL ,
25+ SHUTTLE_INSTALL_DOCS_URL , SHUTTLE_LOGIN_URL , STORAGE_DIRNAME ,
2626 } ,
2727 deployment:: { DEPLOYER_END_MESSAGES_BAD , DEPLOYER_END_MESSAGES_GOOD } ,
2828 models:: {
@@ -34,14 +34,16 @@ use shuttle_common::{
3434 project,
3535 resource:: get_resource_tables,
3636 } ,
37- resource, semvers_are_compatible, ApiKey , LogItem , VersionInfo ,
37+ resource:: { self , ResourceInput , ShuttleResourceOutput } ,
38+ semvers_are_compatible, ApiKey , DatabaseResource , DbInput , LogItem , VersionInfo ,
39+ } ;
40+ use shuttle_proto:: {
41+ provisioner:: { provisioner_server:: Provisioner , DatabaseRequest } ,
42+ runtime:: { self , LoadRequest , StartRequest , StopRequest } ,
3843} ;
39- use shuttle_proto:: runtime;
40- use shuttle_proto:: runtime:: { LoadRequest , StartRequest , StopRequest } ;
41- use shuttle_service:: runner;
4244use shuttle_service:: {
4345 builder:: { build_workspace, BuiltService } ,
44- Environment ,
46+ runner , Environment ,
4547} ;
4648
4749use anyhow:: { anyhow, bail, Context , Result } ;
@@ -63,9 +65,8 @@ use strum::IntoEnumIterator;
6365use tar:: Builder ;
6466use tokio:: io:: { AsyncBufReadExt , BufReader } ;
6567use tokio:: process:: Child ;
66- use tokio:: task:: JoinHandle ;
6768use tokio:: time:: { sleep, Duration } ;
68- use tonic:: Status ;
69+ use tonic:: { Request , Status } ;
6970use tracing:: { debug, error, trace, warn} ;
7071use uuid:: Uuid ;
7172
@@ -932,9 +933,7 @@ impl Shuttle {
932933 async fn spin_local_runtime (
933934 run_args : & RunArgs ,
934935 service : & BuiltService ,
935- provisioner_server : & JoinHandle < Result < ( ) , tonic:: transport:: Error > > ,
936936 idx : u16 ,
937- provisioner_port : u16 ,
938937 ) -> Result < Option < ( Child , runtime:: Client ) > > {
939938 let crate_directory = service. crate_directory ( ) ;
940939 let secrets_path = if crate_directory. join ( "Secrets.dev.toml" ) . exists ( ) {
@@ -1039,31 +1038,21 @@ impl Shuttle {
10391038
10401039 // Child process and gRPC client for sending requests to it
10411040 let ( mut runtime, mut runtime_client) = runner:: start (
1042- service. is_wasm ,
1043- Environment :: Local ,
1044- & format ! ( "http://localhost:{provisioner_port}" ) ,
1045- None ,
10461041 portpicker:: pick_unused_port ( ) . expect ( "unable to find available port for gRPC server" ) ,
10471042 runtime_executable,
10481043 service. workspace_path . as_path ( ) ,
10491044 )
1050- . await
1051- . map_err ( |err| {
1052- provisioner_server. abort ( ) ;
1053- err
1054- } ) ?;
1045+ . await ?;
10551046
10561047 let service_name = service. service_name ( ) ?;
10571048 let deployment_id: Uuid = Default :: default ( ) ;
10581049
1059- // Clones to send to spawn
1060- let service_name_clone = service_name. clone ( ) . to_string ( ) ;
1061-
10621050 let child_stdout = runtime
10631051 . stdout
10641052 . take ( )
10651053 . context ( "child process did not have a handle to stdout" ) ?;
10661054 let mut reader = BufReader :: new ( child_stdout) . lines ( ) ;
1055+ let service_name_clone = service_name. clone ( ) ;
10671056 tokio:: spawn ( async move {
10681057 while let Some ( line) = reader. next_line ( ) . await . unwrap ( ) {
10691058 let log_item = LogItem :: new (
@@ -1075,23 +1064,27 @@ impl Shuttle {
10751064 }
10761065 } ) ;
10771066
1067+ //
1068+ // LOADING PHASE
1069+ //
1070+
10781071 let load_request = tonic:: Request :: new ( LoadRequest {
1072+ project_name : service_name. to_string ( ) ,
1073+ env : Environment :: Local . to_string ( ) ,
1074+ secrets : secrets. clone ( ) ,
10791075 path : service
10801076 . executable_path
10811077 . clone ( )
10821078 . into_os_string ( )
10831079 . into_string ( )
10841080 . expect ( "to convert path to string" ) ,
1085- service_name : service_name. to_string ( ) ,
1086- resources : Default :: default ( ) ,
1087- secrets,
1081+ ..Default :: default ( )
10881082 } ) ;
10891083
10901084 trace ! ( "loading service" ) ;
10911085 let response = runtime_client
10921086 . load ( load_request)
10931087 . or_else ( |err| async {
1094- provisioner_server. abort ( ) ;
10951088 runtime. kill ( ) . await ?;
10961089 Err ( err)
10971090 } )
@@ -1103,17 +1096,23 @@ impl Shuttle {
11031096 return Ok ( None ) ;
11041097 }
11051098
1106- let resources = response
1107- . resources
1108- . into_iter ( )
1109- . map ( resource:: Response :: from_bytes)
1110- . collect ( ) ;
1099+ //
1100+ // PROVISIONING PHASE
1101+ //
1102+
1103+ let resources = response. resources ;
1104+ let ( resources, mocked_responses) =
1105+ Shuttle :: local_provision_phase ( service_name. as_str ( ) , resources, secrets) . await ?;
11111106
11121107 println ! (
11131108 "{}" ,
1114- get_resource_tables( & resources , service_name. as_str( ) , false , false )
1109+ get_resource_tables( & mocked_responses , service_name. as_str( ) , false , false )
11151110 ) ;
11161111
1112+ //
1113+ // START PHASE
1114+ //
1115+
11171116 let addr = SocketAddr :: new (
11181117 if run_args. external {
11191118 Ipv4Addr :: UNSPECIFIED // 0.0.0.0
@@ -1133,13 +1132,13 @@ impl Shuttle {
11331132
11341133 let start_request = StartRequest {
11351134 ip : addr. to_string ( ) ,
1135+ resources,
11361136 } ;
11371137
11381138 trace ! ( ?start_request, "starting service" ) ;
11391139 let response = runtime_client
11401140 . start ( tonic:: Request :: new ( start_request) )
11411141 . or_else ( |err| async {
1142- provisioner_server. abort ( ) ;
11431142 runtime. kill ( ) . await ?;
11441143 Err ( err)
11451144 } )
@@ -1150,6 +1149,112 @@ impl Shuttle {
11501149 Ok ( Some ( ( runtime, runtime_client) ) )
11511150 }
11521151
1152+ async fn local_provision_phase (
1153+ project_name : & str ,
1154+ mut resources : Vec < Vec < u8 > > ,
1155+ secrets : HashMap < String , String > ,
1156+ ) -> Result < ( Vec < Vec < u8 > > , Vec < resource:: Response > ) > {
1157+ // for displaying the tables
1158+ let mut mocked_responses: Vec < resource:: Response > = Vec :: new ( ) ;
1159+ let prov = LocalProvisioner :: new ( ) ?;
1160+
1161+ // Fail early if any bytes is invalid json
1162+ let values = resources
1163+ . iter ( )
1164+ . map ( |bytes| {
1165+ serde_json:: from_slice :: < ResourceInput > ( bytes)
1166+ . context ( "deserializing resource input" )
1167+ } )
1168+ . collect :: < anyhow:: Result < Vec < _ > > > ( ) ?;
1169+
1170+ for ( bytes, shuttle_resource) in
1171+ resources
1172+ . iter_mut ( )
1173+ . zip ( values)
1174+ // ignore non-Shuttle resource items
1175+ . filter_map ( |( bytes, value) | match value {
1176+ ResourceInput :: Shuttle ( shuttle_resource) => Some ( ( bytes, shuttle_resource) ) ,
1177+ ResourceInput :: Custom ( _) => None ,
1178+ } )
1179+ . map ( |( bytes, shuttle_resource) | {
1180+ if shuttle_resource. version == RESOURCE_SCHEMA_VERSION {
1181+ Ok ( ( bytes, shuttle_resource) )
1182+ } else {
1183+ Err ( anyhow ! ( "
1184+ Shuttle resource request for {} with incompatible version found. Expected {}, found {}. \
1185+ Make sure that this deployer and the Shuttle resource are up to date.
1186+ " ,
1187+ shuttle_resource. r#type,
1188+ RESOURCE_SCHEMA_VERSION ,
1189+ shuttle_resource. version
1190+ ) )
1191+ }
1192+ } ) . collect :: < anyhow:: Result < Vec < _ > > > ( ) ?. into_iter ( )
1193+ {
1194+ match shuttle_resource. r#type {
1195+ resource:: Type :: Database ( db_type) => {
1196+ let config: DbInput = serde_json:: from_value ( shuttle_resource. config )
1197+ . context ( "deserializing resource config" ) ?;
1198+ let res = match config. local_uri {
1199+ Some ( local_uri) => DatabaseResource :: ConnectionString ( local_uri) ,
1200+ None => DatabaseResource :: Info (
1201+ prov. provision_database ( Request :: new ( DatabaseRequest {
1202+ project_name : project_name. to_string ( ) ,
1203+ db_type : Some ( db_type. into ( ) ) ,
1204+ } ) )
1205+ . await ?
1206+ . into_inner ( )
1207+ . into ( ) ,
1208+ ) ,
1209+ } ;
1210+ mocked_responses. push ( resource:: Response {
1211+ r#type : shuttle_resource. r#type ,
1212+ config : serde_json:: Value :: Null ,
1213+ data : serde_json:: to_value ( & res) . unwrap ( ) ,
1214+ } ) ;
1215+ * bytes = serde_json:: to_vec ( & ShuttleResourceOutput {
1216+ output : res,
1217+ custom : shuttle_resource. custom ,
1218+ } )
1219+ . unwrap ( ) ;
1220+ }
1221+ resource:: Type :: Secrets => {
1222+ // We already know the secrets at this stage, they are not provisioned like other resources
1223+ mocked_responses. push ( resource:: Response {
1224+ r#type : shuttle_resource. r#type ,
1225+ config : serde_json:: Value :: Null ,
1226+ data : serde_json:: to_value ( secrets. clone ( ) ) . unwrap ( ) ,
1227+ } ) ;
1228+ * bytes = serde_json:: to_vec ( & ShuttleResourceOutput {
1229+ output : secrets. clone ( ) ,
1230+ custom : shuttle_resource. custom ,
1231+ } )
1232+ . unwrap ( ) ;
1233+ }
1234+ resource:: Type :: Persist => {
1235+ // only show that this resource is "connected"
1236+ mocked_responses. push ( resource:: Response {
1237+ r#type : shuttle_resource. r#type ,
1238+ config : serde_json:: Value :: Null ,
1239+ data : serde_json:: Value :: Null ,
1240+ } ) ;
1241+ }
1242+ resource:: Type :: Container => {
1243+ let config = serde_json:: from_value ( shuttle_resource. config )
1244+ . context ( "deserializing resource config" ) ?;
1245+ let res = prov. start_container ( config) . await ?;
1246+ * bytes = serde_json:: to_vec ( & ShuttleResourceOutput {
1247+ output : res,
1248+ custom : shuttle_resource. custom ,
1249+ } )
1250+ . unwrap ( ) ;
1251+ }
1252+ }
1253+ }
1254+
1255+ Ok ( ( resources, mocked_responses) )
1256+ }
1257+
11531258 async fn stop_runtime (
11541259 runtime : & mut Child ,
11551260 runtime_client : & mut runtime:: Client ,
@@ -1165,14 +1270,13 @@ impl Shuttle {
11651270 } )
11661271 . await ?
11671272 . into_inner ( ) ;
1168- trace ! ( response = ?response, "client stop response: " ) ;
1273+ trace ! ( response = ?response, "client stop response: " ) ;
11691274 Ok ( ( ) )
11701275 }
11711276
11721277 async fn add_runtime_info (
11731278 runtime : Option < ( Child , runtime:: Client ) > ,
11741279 existing_runtimes : & mut Vec < ( Child , runtime:: Client ) > ,
1175- extra_servers : & [ & JoinHandle < Result < ( ) , tonic:: transport:: Error > > ] ,
11761280 ) -> Result < ( ) , Status > {
11771281 match runtime {
11781282 Some ( inner) => {
@@ -1181,10 +1285,6 @@ impl Shuttle {
11811285 }
11821286 None => {
11831287 trace ! ( "Runtime error: No runtime process. Crashed during startup?" ) ;
1184- for server in extra_servers {
1185- server. abort ( ) ;
1186- }
1187-
11881288 for rt_info in existing_runtimes {
11891289 let mut errored_out = false ;
11901290 // Stopping all runtimes gracefully first, but if this errors out the function kills the runtime forcefully.
@@ -1229,24 +1329,11 @@ impl Shuttle {
12291329 build_workspace ( working_directory, run_args. release , tx, false ) . await
12301330 }
12311331
1232- async fn setup_local_provisioner (
1233- ) -> Result < ( JoinHandle < Result < ( ) , tonic:: transport:: Error > > , u16 ) > {
1234- let provisioner = LocalProvisioner :: new ( ) ?;
1235- let provisioner_port =
1236- portpicker:: pick_unused_port ( ) . expect ( "unable to find available port for provisioner" ) ;
1237- let provisioner_server = provisioner. start ( SocketAddr :: new (
1238- Ipv4Addr :: LOCALHOST . into ( ) ,
1239- provisioner_port,
1240- ) ) ;
1241-
1242- Ok ( ( provisioner_server, provisioner_port) )
1243- }
1244-
12451332 #[ cfg( target_family = "unix" ) ]
12461333 async fn local_run ( & self , mut run_args : RunArgs ) -> Result < CommandOutcome > {
12471334 debug ! ( "starting local run" ) ;
12481335 let services = self . pre_local_run ( & run_args) . await ?;
1249- let ( provisioner_server , provisioner_port ) = Shuttle :: setup_local_provisioner ( ) . await ? ;
1336+
12501337 let mut sigterm_notif =
12511338 tokio:: signal:: unix:: signal ( tokio:: signal:: unix:: SignalKind :: terminate ( ) )
12521339 . expect ( "Can not get the SIGTERM signal receptor" ) ;
@@ -1264,10 +1351,10 @@ impl Shuttle {
12641351 // We must cover the case of starting multiple workspace services and receiving a signal in parallel.
12651352 // This must stop all the existing runtimes and creating new ones.
12661353 signal_received = tokio:: select! {
1267- res = Shuttle :: spin_local_runtime( & run_args, service, & provisioner_server , i as u16 , provisioner_port ) => {
1354+ res = Shuttle :: spin_local_runtime( & run_args, service, i as u16 ) => {
12681355 match res {
12691356 Ok ( runtime) => {
1270- Shuttle :: add_runtime_info( runtime, & mut runtimes, & [ & provisioner_server ] ) . await ?;
1357+ Shuttle :: add_runtime_info( runtime, & mut runtimes) . await ?;
12711358 } ,
12721359 Err ( e) => println!( "Runtime error: {e:?}" ) ,
12731360 }
@@ -1295,7 +1382,6 @@ impl Shuttle {
12951382 // If prior signal received is set to true we must stop all the existing runtimes and
12961383 // exit the `local_run`.
12971384 if signal_received {
1298- provisioner_server. abort ( ) ;
12991385 for ( mut rt, mut rt_client) in runtimes {
13001386 Shuttle :: stop_runtime ( & mut rt, & mut rt_client)
13011387 . await
@@ -1333,7 +1419,6 @@ impl Shuttle {
13331419 println!(
13341420 "cargo-shuttle received SIGTERM. Killing all the runtimes..."
13351421 ) ;
1336- provisioner_server. abort( ) ;
13371422 Shuttle :: stop_runtime( & mut rt, & mut rt_client) . await . unwrap_or_else( |err| {
13381423 trace!( status = ?err, "stopping the runtime errored out" ) ;
13391424 } ) ;
@@ -1343,7 +1428,6 @@ impl Shuttle {
13431428 println!(
13441429 "cargo-shuttle received SIGINT. Killing all the runtimes..."
13451430 ) ;
1346- provisioner_server. abort( ) ;
13471431 Shuttle :: stop_runtime( & mut rt, & mut rt_client) . await . unwrap_or_else( |err| {
13481432 trace!( status = ?err, "stopping the runtime errored out" ) ;
13491433 } ) ;
0 commit comments