Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[target.x86_64-pc-windows-msvc]
rustflags = [
"-C", "target-feature=+crt-static",
]

[target.x86_64-pc-windows-gnu]
rustflags = [
"-C", "target-feature=+crt-static",
"-C", "link-arg=-static-libstdc++",
"-C", "link-arg=-static-libgcc",
"-C", "link-arg=-static",
]
69 changes: 68 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,10 @@ tracing = "0.1.41"
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
typed-builder = "0.20.0"

[target.'cfg(windows)'.dependencies]
async-stream = "0.3.6"
futures-core = "0.3.31"
windows = { version = "0.58.0", features = ["Win32"] }

[build-dependencies]
tonic-build = "0.12.3"
22 changes: 19 additions & 3 deletions src/cmd/clean.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use crate::pb::db::management_client::ManagementClient;
use crate::pb::db::ShutdownRequest;
use crate::util::uds::{create_channel, get_sock_path};
#[cfg(windows)]
use crate::util::named_pipe;
#[cfg(unix)]
use crate::util::uds;
use tonic::Request;
use tracing::info;

pub(crate) async fn run() -> Result<(), Box<dyn std::error::Error>> {
if get_sock_path().exists() {
let channel = create_channel().await?;
#[cfg(unix)]
if uds::get_sock_path().exists() {
let channel = uds::create_channel().await?;
let mut mgmt_client = ManagementClient::new(channel.clone());

let req = Request::new(ShutdownRequest::default());
Expand All @@ -21,5 +25,17 @@ pub(crate) async fn run() -> Result<(), Box<dyn std::error::Error>> {
info!("server is not running");
}

#[cfg(windows)]
{
let channel = named_pipe::create_channel().await?;
let mut mgmt_client = ManagementClient::new(channel.clone());

let req = Request::new(ShutdownRequest::default());

info!("shutting down server...");
mgmt_client.shutdown(req).await?;
info!("shutdown successfully");
}

Ok(())
}
38 changes: 30 additions & 8 deletions src/cmd/load.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use crate::pb::db::management_client::ManagementClient;
use crate::pb::db::operation_client::OperationClient;
use crate::pb::db::{CreateTableRequest, HealthCheckRequest};
use crate::util::uds::{create_channel, get_sock_path};
#[cfg(windows)]
use crate::util::named_pipe;
#[cfg(unix)]
use crate::util::uds;
use clap::ValueEnum;
use tokio::process::Command;
use tonic::Request;
Expand Down Expand Up @@ -29,14 +32,33 @@ pub(crate) async fn run(
) -> Result<(), Box<dyn std::error::Error>> {
let exe_path = std::env::current_exe()?;

if !get_sock_path().exists() {
info!("starting server...");
let _child = Command::new(exe_path).arg("server").spawn()?;
info!("server start successfully");
}
#[cfg(unix)]
let (channel, mut mgmt_client) = {
if !uds::get_sock_path().exists() {
info!("starting server...");
let _child = Command::new(exe_path).arg("server").spawn()?;
info!("server start successfully");
}

let channel = uds::create_channel().await?;
let mgmt_client = ManagementClient::new(channel.clone());

(channel, mgmt_client)
};

#[cfg(windows)]
let (channel, mut mgmt_client) = {
let channel = match named_pipe::create_channel().await {
Ok(channel) => channel,
Err(_) => {
let _child = Command::new(exe_path).arg("server").spawn()?;
named_pipe::create_channel().await?
}
};
let mgmt_client = ManagementClient::new(channel.clone());

let channel = create_channel().await?;
let mut mgmt_client = ManagementClient::new(channel.clone());
(channel, mgmt_client)
};

let health_req = Request::new(HealthCheckRequest::default());
match mgmt_client.health_check(health_req).await {
Expand Down
10 changes: 8 additions & 2 deletions src/cmd/query.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
use crate::pb::db::operation_client::OperationClient;
use crate::pb::db::RawQueryRequest;
use crate::util::uds::create_channel;
#[cfg(windows)]
use crate::util::named_pipe;
#[cfg(unix)]
use crate::util::uds;
use comfy_table::Table;
use tonic::Request;

pub(crate) async fn run(query: String) -> Result<(), Box<dyn std::error::Error>> {
let channel = create_channel().await?;
#[cfg(unix)]
let channel = uds::create_channel().await?;
#[cfg(windows)]
let channel = named_pipe::create_channel().await?;
let mut client = OperationClient::new(channel);

let req = Request::new(RawQueryRequest { query });
Expand Down
47 changes: 38 additions & 9 deletions src/cmd/server.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
use crate::pb::db::management_server::ManagementServer;
use crate::pb::db::operation_server::OperationServer;
use crate::pb::{ManagementImpl, OperationImpl};
#[cfg(windows)]
use crate::util::named_pipe::get_named_pipe_server_stream;
#[cfg(unix)]
use crate::util::uds::get_sock_path;
use duckdb::Connection;
#[cfg(unix)]
use tokio::net::UnixListener;
use tokio::signal;
use tokio::sync::{mpsc, Mutex};
#[cfg(unix)]
use tonic::codegen::tokio_stream::wrappers::UnixListenerStream;
use tonic::transport::Server;
use tracing::{debug, info};
Expand All @@ -17,6 +22,7 @@ async fn shutdown_signal(mut shutdown_rx: mpsc::Receiver<()>) {
.expect("failed to install SIGINT handler");
};

#[cfg(unix)]
let term_fut = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("failed to install SIGTERM handler")
Expand All @@ -28,34 +34,57 @@ async fn shutdown_signal(mut shutdown_rx: mpsc::Receiver<()>) {
shutdown_rx.recv().await;
};

#[cfg(unix)]
tokio::select! {
_ = ctrl_c_fut => {},
_ = term_fut => {},
_ = shutdown_fut => {},
}

#[cfg(windows)]
tokio::select! {
_ = ctrl_c_fut => {},
_ = shutdown_fut => {},
}

debug!("shutdown signal received");

debug!("removing socket file...");
std::fs::remove_file(get_sock_path()).expect("failed to remove socket file");
#[cfg(unix)]
{
debug!("removing socket file...");
std::fs::remove_file(get_sock_path()).expect("failed to remove socket file");
}
}

pub(crate) async fn run() -> Result<(), Box<dyn std::error::Error>> {
let sock_path = get_sock_path();
info!("listening on {:?}", sock_path);

let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
let db_conn = Mutex::new(Connection::open_in_memory()?);

let mgmt = ManagementImpl::builder().shutdown_tx(shutdown_tx).build();
let ope = OperationImpl::builder().db_conn(db_conn).build();

let uds = UnixListener::bind(sock_path)?;
let uds_stream = UnixListenerStream::new(uds);
Server::builder()
#[cfg(unix)]
let uds_stream = {
let sock_path = get_sock_path();
info!("listening on {:?}", sock_path);

let uds = UnixListener::bind(sock_path)?;
UnixListenerStream::new(uds)
};

let builder = Server::builder()
.add_service(ManagementServer::new(mgmt))
.add_service(OperationServer::new(ope))
.add_service(OperationServer::new(ope));

#[cfg(unix)]
builder
.serve_with_incoming_shutdown(uds_stream, shutdown_signal(shutdown_rx))
.await?;

#[cfg(windows)]
builder
.serve_with_incoming_shutdown(get_named_pipe_server_stream(), shutdown_signal(shutdown_rx))
.await?;

Ok(())
}
3 changes: 3 additions & 0 deletions src/util/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
#[cfg(windows)]
pub(crate) mod named_pipe;
#[cfg(unix)]
pub(crate) mod uds;
Loading