Skip to content

Commit dfe376e

Browse files
committed
feat: add --dir option for the cli
1 parent 6564b1d commit dfe376e

File tree

3 files changed

+88
-9
lines changed

3 files changed

+88
-9
lines changed

README.md

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,11 @@ can connect using psql or language drivers to execute `SELECT` queries against
5353
them.
5454

5555
```
56-
datafusion-postgres 0.1.0
57-
A postgres interface for datatfusion. Serve any CSV/JSON/Arrow files as tables.
56+
datafusion-postgres 0.4.0
57+
A postgres interface for datafusion. Serve any CSV/JSON/Arrow files as tables.
5858
5959
USAGE:
60-
datafusion-postgres [OPTIONS]
60+
datafusion-postgres-cli [OPTIONS]
6161
6262
FLAGS:
6363
-h, --help Prints help information
@@ -67,8 +67,11 @@ OPTIONS:
6767
--arrow <arrow-tables>... Arrow files to register as table, using syntax `table_name:file_path`
6868
--avro <avro-tables>... Avro files to register as table, using syntax `table_name:file_path`
6969
--csv <csv-tables>... CSV files to register as table, using syntax `table_name:file_path`
70+
-d, --dir <directory> Directory to serve, all supported files will be registered as tables
71+
--host <host> Host address the server listens to, default to 127.0.0.1 [default: 127.0.0.1]
7072
--json <json-tables>... JSON files to register as table, using syntax `table_name:file_path`
7173
--parquet <parquet-tables>... Parquet files to register as table, using syntax `table_name:file_path`
74+
-p <port> Port the server listens to, default to 5432 [default: 5432]
7275
```
7376

7477
For example, we use this command to host `ETTm1.csv` dataset as table `ettm1`.

datafusion-postgres-cli/src/main.rs

Lines changed: 79 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
use std::ffi::OsStr;
2+
use std::fs;
3+
14
use datafusion::execution::options::{
25
ArrowReadOptions, AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ParquetReadOptions,
36
};
@@ -26,6 +29,9 @@ struct Opt {
2629
/// Avro files to register as table, using syntax `table_name:file_path`
2730
#[structopt(long("avro"))]
2831
avro_tables: Vec<String>,
32+
/// Directory to serve, all supported files will be registered as tables
33+
#[structopt(long("dir"), short("d"))]
34+
directory: Option<String>,
2935
/// Port the server listens to, default to 5432
3036
#[structopt(short, default_value = "5432")]
3137
port: u16,
@@ -40,12 +46,69 @@ fn parse_table_def(table_def: &str) -> (&str, &str) {
4046
.expect("Use this pattern to register table: table_name:file_path")
4147
}
4248

43-
#[tokio::main]
44-
async fn main() -> Result<(), Box<dyn std::error::Error>> {
45-
let opts = Opt::from_args();
49+
impl Opt {
50+
fn include_directory_files(&mut self) {
51+
if let Some(directory) = &self.directory {
52+
if let Ok(entries) = fs::read_dir(directory) {
53+
for entry in entries.flatten() {
54+
let path = entry.path();
55+
if !path.is_file() {
56+
continue;
57+
}
4658

47-
let session_context = SessionContext::new();
59+
if let Some(ext) = path.extension().and_then(OsStr::to_str) {
60+
let ext_lower = ext.to_lowercase();
61+
if let Some(base_name) = path.file_stem().and_then(|s| s.to_str()) {
62+
match ext_lower.as_ref() {
63+
"json" => {
64+
self.json_tables.push(format!(
65+
"{}:{}",
66+
base_name,
67+
path.to_string_lossy()
68+
));
69+
}
70+
"avro" => {
71+
self.avro_tables.push(format!(
72+
"{}:{}",
73+
base_name,
74+
path.to_string_lossy()
75+
));
76+
}
77+
"parquet" => {
78+
self.parquet_tables.push(format!(
79+
"{}:{}",
80+
base_name,
81+
path.to_string_lossy()
82+
));
83+
}
84+
"csv" => {
85+
self.csv_tables.push(format!(
86+
"{}:{}",
87+
base_name,
88+
path.to_string_lossy()
89+
));
90+
}
91+
"arrow" => {
92+
self.arrow_tables.push(format!(
93+
"{}:{}",
94+
base_name,
95+
path.to_string_lossy()
96+
));
97+
}
98+
_ => {}
99+
}
100+
}
101+
}
102+
}
103+
}
104+
}
105+
}
106+
}
48107

108+
async fn setup_session_context(
109+
session_context: &SessionContext,
110+
opts: &Opt,
111+
) -> Result<(), Box<dyn std::error::Error>> {
49112
// Register CSV tables
50113
for (table_name, table_path) in opts.csv_tables.iter().map(|s| parse_table_def(s.as_ref())) {
51114
session_context
@@ -99,6 +162,18 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
99162
println!("Loaded {} as table {}", table_path, table_name);
100163
}
101164

165+
Ok(())
166+
}
167+
168+
#[tokio::main]
169+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
170+
let mut opts = Opt::from_args();
171+
opts.include_directory_files();
172+
173+
let session_context = SessionContext::new();
174+
175+
setup_session_context(&session_context, &opts).await?;
176+
102177
let server_options = ServerOptions::new()
103178
.with_host(opts.host)
104179
.with_port(opts.port);

datafusion-postgres/src/lib.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,16 @@ mod encoder;
33
mod handlers;
44
mod information_schema;
55

6-
pub use handlers::{DfSessionService, HandlerFactory, Parser};
7-
86
use std::sync::Arc;
97

108
use datafusion::prelude::SessionContext;
119
use getset::{Getters, Setters, WithSetters};
1210
use pgwire::tokio::process_socket;
1311
use tokio::net::TcpListener;
1412

13+
use handlers::HandlerFactory;
14+
pub use handlers::{DfSessionService, Parser};
15+
1516
#[derive(Getters, Setters, WithSetters)]
1617
#[getset(get = "pub", set = "pub", set_with = "pub")]
1718
pub struct ServerOptions {

0 commit comments

Comments
 (0)