Skip to content

Commit 26af6f5

Browse files
authored
feat: add --dir option for the cli (#82)
* feat: add --dir option for the cli * feat: report error when loading directory fails
1 parent 6564b1d commit 26af6f5

File tree

3 files changed

+94
-9
lines changed

3 files changed

+94
-9
lines changed

README.md

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,11 @@ can connect using psql or language drivers to execute `SELECT` queries against
5353
them.
5454

5555
```
56-
datafusion-postgres 0.1.0
57-
A postgres interface for datatfusion. Serve any CSV/JSON/Arrow files as tables.
56+
datafusion-postgres 0.4.0
57+
A postgres interface for datafusion. Serve any CSV/JSON/Arrow files as tables.
5858
5959
USAGE:
60-
datafusion-postgres [OPTIONS]
60+
datafusion-postgres-cli [OPTIONS]
6161
6262
FLAGS:
6363
-h, --help Prints help information
@@ -67,8 +67,11 @@ OPTIONS:
6767
--arrow <arrow-tables>... Arrow files to register as table, using syntax `table_name:file_path`
6868
--avro <avro-tables>... Avro files to register as table, using syntax `table_name:file_path`
6969
--csv <csv-tables>... CSV files to register as table, using syntax `table_name:file_path`
70+
-d, --dir <directory> Directory to serve, all supported files will be registered as tables
71+
--host <host> Host address the server listens to, default to 127.0.0.1 [default: 127.0.0.1]
7072
--json <json-tables>... JSON files to register as table, using syntax `table_name:file_path`
7173
--parquet <parquet-tables>... Parquet files to register as table, using syntax `table_name:file_path`
74+
-p <port> Port the server listens to, default to 5432 [default: 5432]
7275
```
7376

7477
For example, we use this command to host `ETTm1.csv` dataset as table `ettm1`.

datafusion-postgres-cli/src/main.rs

Lines changed: 85 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
use std::ffi::OsStr;
2+
use std::fs;
3+
14
use datafusion::execution::options::{
25
ArrowReadOptions, AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ParquetReadOptions,
36
};
@@ -26,6 +29,9 @@ struct Opt {
2629
/// Avro files to register as table, using syntax `table_name:file_path`
2730
#[structopt(long("avro"))]
2831
avro_tables: Vec<String>,
32+
/// Directory to serve, all supported files will be registered as tables
33+
#[structopt(long("dir"), short("d"))]
34+
directory: Option<String>,
2935
/// Port the server listens to, default to 5432
3036
#[structopt(short, default_value = "5432")]
3137
port: u16,
@@ -40,12 +46,75 @@ fn parse_table_def(table_def: &str) -> (&str, &str) {
4046
.expect("Use this pattern to register table: table_name:file_path")
4147
}
4248

43-
#[tokio::main]
44-
async fn main() -> Result<(), Box<dyn std::error::Error>> {
45-
let opts = Opt::from_args();
49+
impl Opt {
50+
fn include_directory_files(&mut self) -> Result<(), Box<dyn std::error::Error>> {
51+
if let Some(directory) = &self.directory {
52+
match fs::read_dir(directory) {
53+
Ok(entries) => {
54+
for entry in entries.flatten() {
55+
let path = entry.path();
56+
if !path.is_file() {
57+
continue;
58+
}
4659

47-
let session_context = SessionContext::new();
60+
if let Some(ext) = path.extension().and_then(OsStr::to_str) {
61+
let ext_lower = ext.to_lowercase();
62+
if let Some(base_name) = path.file_stem().and_then(|s| s.to_str()) {
63+
match ext_lower.as_ref() {
64+
"json" => {
65+
self.json_tables.push(format!(
66+
"{}:{}",
67+
base_name,
68+
path.to_string_lossy()
69+
));
70+
}
71+
"avro" => {
72+
self.avro_tables.push(format!(
73+
"{}:{}",
74+
base_name,
75+
path.to_string_lossy()
76+
));
77+
}
78+
"parquet" => {
79+
self.parquet_tables.push(format!(
80+
"{}:{}",
81+
base_name,
82+
path.to_string_lossy()
83+
));
84+
}
85+
"csv" => {
86+
self.csv_tables.push(format!(
87+
"{}:{}",
88+
base_name,
89+
path.to_string_lossy()
90+
));
91+
}
92+
"arrow" => {
93+
self.arrow_tables.push(format!(
94+
"{}:{}",
95+
base_name,
96+
path.to_string_lossy()
97+
));
98+
}
99+
_ => {}
100+
}
101+
}
102+
}
103+
}
104+
}
105+
Err(e) => {
106+
return Err(format!("Failed to load directory {}: {}", directory, e).into());
107+
}
108+
}
109+
}
110+
Ok(())
111+
}
112+
}
48113

114+
async fn setup_session_context(
115+
session_context: &SessionContext,
116+
opts: &Opt,
117+
) -> Result<(), Box<dyn std::error::Error>> {
49118
// Register CSV tables
50119
for (table_name, table_path) in opts.csv_tables.iter().map(|s| parse_table_def(s.as_ref())) {
51120
session_context
@@ -99,6 +168,18 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
99168
println!("Loaded {} as table {}", table_path, table_name);
100169
}
101170

171+
Ok(())
172+
}
173+
174+
#[tokio::main]
175+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
176+
let mut opts = Opt::from_args();
177+
opts.include_directory_files()?;
178+
179+
let session_context = SessionContext::new();
180+
181+
setup_session_context(&session_context, &opts).await?;
182+
102183
let server_options = ServerOptions::new()
103184
.with_host(opts.host)
104185
.with_port(opts.port);

datafusion-postgres/src/lib.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,16 @@ mod encoder;
33
mod handlers;
44
mod information_schema;
55

6-
pub use handlers::{DfSessionService, HandlerFactory, Parser};
7-
86
use std::sync::Arc;
97

108
use datafusion::prelude::SessionContext;
119
use getset::{Getters, Setters, WithSetters};
1210
use pgwire::tokio::process_socket;
1311
use tokio::net::TcpListener;
1412

13+
use handlers::HandlerFactory;
14+
pub use handlers::{DfSessionService, Parser};
15+
1516
#[derive(Getters, Setters, WithSetters)]
1617
#[getset(get = "pub", set = "pub", set_with = "pub")]
1718
pub struct ServerOptions {

0 commit comments

Comments
 (0)