Skip to content

Commit 2889a21

Browse files
authored
feat: cloud client (#82)
1 parent 05a9ae1 commit 2889a21

31 files changed

+9167
-114
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,5 @@
77
/book/book
88
/dat
99
/node_modules
10+
11+
.zed/

Cargo.lock

Lines changed: 39 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ reqwest = { version = "0.12", default-features = false, features = [
3535
serde = { version = "1.0", features = ["derive"] }
3636
serde_json = "1.0"
3737
strum = { version = "0.27", features = ["derive"] }
38-
thiserror = "1"
38+
thiserror = "2"
3939
tonic = { version = "0.12.3" }
4040
tower = { version = "0.5", features = ["limit", "filter", "util"] }
4141
tracing = { version = "0.1", features = ["log"] }

delta-sharing/cli/src/main.rs

Lines changed: 3 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,13 @@
1-
use std::sync::Arc;
2-
31
use chrono::Days;
42
use clap::{Parser, Subcommand};
5-
use delta_sharing_common::{
6-
memory::InMemoryResourceStore, rest::AnonymousAuthenticator, ConstantPolicy,
7-
KernelQueryHandler, ServerHandler,
8-
};
9-
use delta_sharing_postgres::GraphStore;
103
use delta_sharing_profiles::{DefaultClaims, DeltaProfileManager, ProfileManager, TokenManager};
11-
use delta_sharing_server::run_rest_server_full;
12-
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
134

14-
use crate::error::{Error, Result};
5+
use crate::error::Result;
6+
use crate::server::{handle_rest, ServerArgs};
157

168
mod config;
179
mod error;
10+
mod server;
1811

1912
#[derive(Parser)]
2013
#[command(name = "delta-sharing", version, about = "CLI to manage delta.sharing services.", long_about = None)]
@@ -47,21 +40,6 @@ enum Commands {
4740
Migrate,
4841
}
4942

50-
#[derive(Parser)]
51-
struct ServerArgs {
52-
#[clap(long, default_value = "0.0.0.0")]
53-
host: String,
54-
55-
#[clap(long, short, default_value_t = 8080)]
56-
port: u16,
57-
58-
#[arg(short, long, default_value = "config.yaml")]
59-
config: String,
60-
61-
#[clap(long, help = "use database", default_value_t = false)]
62-
use_db: bool,
63-
}
64-
6543
#[derive(Parser)]
6644
struct ClientArgs {
6745
#[clap(help = "Sets the server address")]
@@ -117,87 +95,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
11795
Ok(())
11896
}
11997

120-
async fn get_db_handler() -> Result<ServerHandler> {
121-
let db_url = std::env::var("DATABASE_URL")
122-
.map_err(|_| Error::Generic("missing DATABASE_URL".to_string()))?;
123-
let store = Arc::new(GraphStore::connect(&db_url).await.unwrap());
124-
let policy = Arc::new(ConstantPolicy::default());
125-
store.migrate().await.unwrap();
126-
let handler = ServerHandler {
127-
query: KernelQueryHandler::new_multi_thread(
128-
store.clone(),
129-
Default::default(),
130-
policy.clone(),
131-
),
132-
store,
133-
policy,
134-
};
135-
Ok(handler)
136-
}
137-
138-
fn get_memory_handler() -> ServerHandler {
139-
let store = Arc::new(InMemoryResourceStore::new());
140-
let policy = Arc::new(ConstantPolicy::default());
141-
ServerHandler {
142-
query: KernelQueryHandler::new_multi_thread(
143-
store.clone(),
144-
Default::default(),
145-
policy.clone(),
146-
),
147-
store,
148-
policy,
149-
}
150-
}
151-
152-
fn init_tracing() {
153-
tracing_subscriber::registry()
154-
.with(
155-
tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| {
156-
// axum logs rejections from built-in extractors with the `axum::rejection`
157-
// target, at `TRACE` level. `axum::rejection=trace` enables showing those events
158-
format!(
159-
"{}=debug,tower_http=debug,axum::rejection=trace",
160-
env!("CARGO_CRATE_NAME")
161-
)
162-
.into()
163-
}),
164-
)
165-
.with(tracing_subscriber::fmt::layer())
166-
.init();
167-
}
168-
169-
/// Handle the rest server command.
170-
///
171-
/// This function starts a delta-sharing server using the REST protocol.
172-
async fn handle_rest(args: ServerArgs) -> Result<()> {
173-
init_tracing();
174-
175-
if args.use_db {
176-
let handler = get_db_handler().await?;
177-
run_rest_server_full(args.host, args.port, handler, AnonymousAuthenticator)
178-
.await
179-
.map_err(|_| Error::Generic("Server failed".to_string()))
180-
} else {
181-
let handler = get_memory_handler();
182-
run_rest_server_full(args.host, args.port, handler, AnonymousAuthenticator)
183-
.await
184-
.map_err(|_| Error::Generic("Server failed".to_string()))
185-
}
186-
}
187-
188-
/// Handle the server command.
189-
///
190-
/// This function starts a delta-sharing server using the gRPC protocol.
191-
// async fn handle_grpc(args: ServerArgs) -> Result<()> {
192-
// init_tracing();
193-
//
194-
// let handler = get_handler(args.config)?;
195-
//
196-
// run_grpc_server(args.host, args.port, handler)
197-
// .await
198-
// .map_err(|_| Error::Generic("Server failed".to_string()))
199-
// }
200-
20198
/// Handle the profile command.
20299
async fn handle_profile(args: ProfileArgs) -> Result<()> {
203100
let token_manager = TokenManager::new_from_secret(args.secret.as_bytes(), None);

delta-sharing/cli/src/server.rs

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
use std::sync::{Arc, LazyLock};
2+
3+
use clap::Parser;
4+
use delta_sharing_common::{
5+
memory::InMemoryResourceStore, rest::AnonymousAuthenticator, ConstantPolicy,
6+
KernelQueryHandler, ServerHandler,
7+
};
8+
use delta_sharing_postgres::GraphStore;
9+
use delta_sharing_server::run_rest_server_full;
10+
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
11+
12+
use crate::error::{Error, Result};
13+
14+
#[derive(Parser)]
15+
pub struct ServerArgs {
16+
#[clap(long, default_value = "0.0.0.0")]
17+
host: String,
18+
19+
#[clap(long, short, default_value_t = 8080)]
20+
port: u16,
21+
22+
#[arg(short, long, default_value = "config.yaml")]
23+
config: String,
24+
25+
#[clap(long, help = "use database", default_value_t = false)]
26+
use_db: bool,
27+
}
28+
29+
async fn get_db_handler() -> Result<ServerHandler> {
30+
let db_url = std::env::var("DATABASE_URL")
31+
.map_err(|_| Error::Generic("missing DATABASE_URL".to_string()))?;
32+
let store = Arc::new(GraphStore::connect(&db_url).await.unwrap());
33+
let policy = Arc::new(ConstantPolicy::default());
34+
store.migrate().await.unwrap();
35+
let handler = ServerHandler {
36+
query: KernelQueryHandler::new_multi_thread(
37+
store.clone(),
38+
Default::default(),
39+
policy.clone(),
40+
),
41+
store,
42+
policy,
43+
};
44+
Ok(handler)
45+
}
46+
47+
fn get_memory_handler() -> ServerHandler {
48+
let store = Arc::new(InMemoryResourceStore::new());
49+
let policy = Arc::new(ConstantPolicy::default());
50+
ServerHandler {
51+
query: KernelQueryHandler::new_multi_thread(
52+
store.clone(),
53+
Default::default(),
54+
policy.clone(),
55+
),
56+
store,
57+
policy,
58+
}
59+
}
60+
61+
fn init_tracing() {
62+
tracing_subscriber::registry()
63+
.with(
64+
tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| {
65+
// axum logs rejections from built-in extractors with the `axum::rejection`
66+
// target, at `TRACE` level. `axum::rejection=trace` enables showing those events
67+
format!(
68+
"{}=debug,tower_http=debug,axum::rejection=trace",
69+
env!("CARGO_CRATE_NAME")
70+
)
71+
.into()
72+
}),
73+
)
74+
.with(tracing_subscriber::fmt::layer())
75+
.init();
76+
}
77+
78+
/// Handle the rest server command.
79+
///
80+
/// This function starts a delta-sharing server using the REST protocol.
81+
pub async fn handle_rest(args: ServerArgs) -> Result<()> {
82+
init_tracing();
83+
84+
println!("{}", WELCOME.as_str());
85+
86+
if args.use_db {
87+
let handler = get_db_handler().await?;
88+
run_rest_server_full(args.host, args.port, handler, AnonymousAuthenticator)
89+
.await
90+
.map_err(|_| Error::Generic("Server failed".to_string()))
91+
} else {
92+
let handler = get_memory_handler();
93+
run_rest_server_full(args.host, args.port, handler, AnonymousAuthenticator)
94+
.await
95+
.map_err(|_| Error::Generic("Server failed".to_string()))
96+
}
97+
}
98+
99+
static WELCOME: LazyLock<String> = LazyLock::new(|| {
100+
format!(
101+
r#"
102+
_____ _ _ _____ _ _ _____ _____
103+
| __ \ | | | / ____| | (_) | __ \ / ____|
104+
| | | | ___| | |_ __ _ | (___ | |__ __ _ _ __ _ _ __ __ _ ______ | |__) | (___
105+
| | | |/ _ \ | __/ _` | \___ \| '_ \ / _` | '__| | '_ \ / _` | |______| | _ / \___ \
106+
| |__| | __/ | || (_| | ____) | | | | (_| | | | | | | | (_| | | | \ \ ____) |
107+
|_____/ \___|_|\__\__,_| |_____/|_| |_|\__,_|_| |_|_| |_|\__, | |_| \_\_____/
108+
__/ |
109+
version: {} |___/
110+
"#,
111+
env!("CARGO_PKG_VERSION")
112+
)
113+
});

delta-sharing/client/src/client/token.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ impl<T> Default for TokenCache<T> {
3131

3232
impl<T: Clone + Send> TokenCache<T> {
3333
/// Override the minimum remaining TTL for a cached token to be used
34-
#[cfg(feature = "aws")]
3534
pub fn with_min_ttl(self, min_ttl: Duration) -> Self {
3635
Self { min_ttl, ..self }
3736
}

0 commit comments

Comments
 (0)