|
3 | 3 | // https://github.com/spkenv/spk |
4 | 4 |
|
5 | 5 | use clap::Args; |
6 | | -use miette::Result; |
| 6 | +use miette::{IntoDiagnostic, Result}; |
7 | 7 | use spfs_cli_common as cli; |
8 | 8 |
|
9 | 9 | /// Start an spfs server |
@@ -42,37 +42,65 @@ impl CmdServer { |
42 | 42 |
|
43 | 43 | let payload_service = |
44 | 44 | spfs::server::PayloadService::new(repo.clone(), self.payloads_root.clone()); |
45 | | - let http_server = { |
46 | | - let payload_service = payload_service.clone(); |
47 | | - hyper::Server::bind(&self.http_address).serve(hyper::service::make_service_fn( |
48 | | - move |_| { |
49 | | - let s = payload_service.clone(); |
50 | | - async move { Ok::<_, std::convert::Infallible>(s) } |
51 | | - }, |
52 | | - )) |
53 | | - }; |
54 | | - let http_future = http_server.with_graceful_shutdown(async { |
55 | | - if let Err(err) = tokio::signal::ctrl_c().await { |
56 | | - tracing::error!(?err, "Failed to setup graceful shutdown handler"); |
57 | | - }; |
58 | | - tracing::info!("shutting down http server..."); |
59 | | - }); |
60 | 45 | let grpc_future = tonic::transport::Server::builder() |
61 | 46 | .add_service(spfs::server::Repository::new_srv()) |
62 | 47 | .add_service(spfs::server::TagService::new_srv(repo.clone())) |
63 | 48 | .add_service(spfs::server::DatabaseService::new_srv(repo)) |
64 | | - .add_service(payload_service.into_srv()) |
| 49 | + .add_service(payload_service.clone().into_srv()) |
65 | 50 | .serve_with_shutdown(self.grpc_address, async { |
66 | 51 | if let Err(err) = tokio::signal::ctrl_c().await { |
67 | 52 | tracing::error!(?err, "Failed to setup graceful shutdown handler"); |
68 | 53 | }; |
69 | 54 | tracing::info!("shutting down gRPC server..."); |
70 | 55 | }); |
| 56 | + let http_listener = tokio::net::TcpListener::bind(self.http_address) |
| 57 | + .await |
| 58 | + .into_diagnostic()?; |
| 59 | + let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel(); |
| 60 | + tokio::task::spawn(async move { |
| 61 | + if let Err(err) = tokio::signal::ctrl_c().await { |
| 62 | + tracing::error!(?err, "failed to setup graceful shutdown handler"); |
| 63 | + } else { |
| 64 | + tracing::info!("shutting down HTTP server..."); |
| 65 | + shutdown_tx.send(()).ok(); |
| 66 | + } |
| 67 | + }); |
| 68 | + let http_future = async move { |
| 69 | + loop { |
| 70 | + let conn = tokio::select! { |
| 71 | + conn = http_listener.accept() => conn, |
| 72 | + _ = &mut shutdown_rx => { |
| 73 | + break; |
| 74 | + } |
| 75 | + }; |
| 76 | + let stream = match conn { |
| 77 | + Ok((stream, _)) => { |
| 78 | + tracing::debug!("Accepted connection from {:?}", stream.peer_addr()); |
| 79 | + stream |
| 80 | + } |
| 81 | + Err(err) => { |
| 82 | + tracing::error!("Error accepting connection: {:?}", err); |
| 83 | + continue; |
| 84 | + } |
| 85 | + }; |
| 86 | + let io = hyper_util::rt::TokioIo::new(stream); |
| 87 | + let service = payload_service.clone(); |
| 88 | + tokio::task::spawn(async move { |
| 89 | + if let Err(err) = hyper::server::conn::http1::Builder::new() |
| 90 | + .serve_connection(io, service) |
| 91 | + .await |
| 92 | + { |
| 93 | + tracing::error!("Error serving connection: {:?}", err); |
| 94 | + } |
| 95 | + }); |
| 96 | + } |
| 97 | + Result::<(), miette::Report>::Ok(()) |
| 98 | + }; |
71 | 99 | tracing::info!("listening on: {}, {}", self.grpc_address, self.http_address); |
72 | 100 |
|
73 | 101 | // TODO: stop the other server when one fails so that |
74 | 102 | // the process can exit |
75 | | - let (grpc_result, http_result) = tokio::join!(grpc_future, http_future,); |
| 103 | + let (grpc_result, http_result) = tokio::join!(grpc_future, http_future); |
76 | 104 | if let Err(err) = grpc_result { |
77 | 105 | tracing::error!("gRPC server failed: {:?}", err); |
78 | 106 | } |
|
0 commit comments