Skip to content

Commit 69353a4

Browse files
Get schemas
1 parent e790432 commit 69353a4

File tree

7 files changed

+281
-53
lines changed

7 files changed

+281
-53
lines changed

crates/datafusion-app/src/extensions/s3.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
2020
use crate::config::ExecutionConfig;
2121
use crate::extensions::{DftSessionStateBuilder, Extension};
22-
use log::info;
22+
use log::{debug, info};
2323
use std::sync::Arc;
2424

2525
use url::Url;
@@ -52,20 +52,18 @@ impl Extension for AwsS3Extension {
5252
for s3_config in s3_configs {
5353
match s3_config.to_object_store() {
5454
Ok(object_store) => {
55-
info!("Created object store: {}", object_store);
55+
debug!("created object store: {}", object_store);
5656
if let Some(object_store_url) = s3_config.object_store_url() {
57-
info!("Endpoint exists");
5857
if let Ok(parsed_endpoint) = Url::parse(object_store_url) {
59-
info!("Parsed endpoint");
6058
builder
6159
.runtime_env()
6260
.register_object_store(&parsed_endpoint, Arc::new(object_store));
63-
info!("Registered s3 object store");
61+
info!("registered s3 object store at {object_store_url}");
6462
}
6563
}
6664
}
6765
Err(e) => {
68-
log::error!("Error creating object store: {:?}", e);
66+
log::error!("error creating object store: {:?}", e);
6967
}
7068
}
7169
}

crates/datafusion-app/src/flightsql.rs

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
use std::sync::Arc;
1919

2020
use arrow_flight::{
21-
decode::FlightRecordBatchStream, sql::client::FlightSqlServiceClient, FlightInfo,
21+
decode::FlightRecordBatchStream,
22+
sql::{client::FlightSqlServiceClient, CommandGetDbSchemas},
23+
FlightInfo,
2224
};
2325
#[cfg(feature = "flightsql")]
2426
use base64::engine::{general_purpose::STANDARD, Engine as _};
@@ -195,19 +197,19 @@ impl FlightSQLContext {
195197
}
196198
}
197199
Err(e) => Err(DataFusionError::External(
198-
format!("Call to do_get failed: {}", e.to_string()).into(),
200+
format!("Call to do_get failed: {}", e).into(),
199201
)),
200202
}
201203
} else {
202-
return Err(DataFusionError::External("Missing ticket".into()));
204+
Err(DataFusionError::External("Missing ticket".into()))
203205
}
204206
} else {
205-
return Err(DataFusionError::External("Missing client".into()));
207+
Err(DataFusionError::External("Missing client".into()))
206208
}
207209
}
208210

209211
pub async fn get_catalogs_flight_info(&self) -> DFResult<FlightInfo> {
210-
let client = self.client.clone();
212+
let client = Arc::clone(&self.client);
211213
let mut guard = client.lock().await;
212214
if let Some(client) = guard.as_mut() {
213215
client
@@ -221,8 +223,31 @@ impl FlightSQLContext {
221223
}
222224
}
223225

226+
pub async fn get_db_schemas_flight_info(
227+
&self,
228+
catalog: Option<String>,
229+
schema_pattern: Option<String>,
230+
) -> DFResult<FlightInfo> {
231+
let client = Arc::clone(&self.client);
232+
let mut guard = client.lock().await;
233+
if let Some(client) = guard.as_mut() {
234+
let cmd = CommandGetDbSchemas {
235+
catalog,
236+
db_schema_filter_pattern: schema_pattern,
237+
};
238+
client
239+
.get_db_schemas(cmd)
240+
.await
241+
.map_err(|e| DataFusionError::ArrowError(e, None))
242+
} else {
243+
Err(DataFusionError::External(
244+
"No FlightSQL client configured. Add one in `~/.config/dft/config.toml`".into(),
245+
))
246+
}
247+
}
248+
224249
pub async fn do_get(&self, flight_info: FlightInfo) -> DFResult<Vec<FlightRecordBatchStream>> {
225-
let client = self.client.clone();
250+
let client = Arc::clone(&self.client);
226251
let mut guard = client.lock().await;
227252
if let Some(client) = guard.as_mut() {
228253
let mut streams = Vec::new();

crates/datafusion-app/src/local.rs

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -72,24 +72,6 @@ pub struct ExecutionContext {
7272
observability: ObservabilityContext,
7373
}
7474

75-
impl Default for ExecutionContext {
76-
fn default() -> Self {
77-
let cfg = SessionConfig::new().with_information_schema(true);
78-
let session_ctx = SessionContext::new_with_config(cfg);
79-
#[cfg(feature = "observability")]
80-
let observability =
81-
ObservabilityContext::try_new(ObservabilityConfig::default(), "test").unwrap();
82-
Self {
83-
config: ExecutionConfig::default(),
84-
session_ctx,
85-
ddl_path: None,
86-
executor: None,
87-
#[cfg(feature = "observability")]
88-
observability,
89-
}
90-
}
91-
}
92-
9375
impl std::fmt::Debug for ExecutionContext {
9476
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
9577
f.debug_struct("ExecutionContext").finish()
@@ -181,6 +163,27 @@ impl ExecutionContext {
181163
Ok(ctx)
182164
}
183165

166+
/// Useful for testing execution functionality
167+
pub fn test() -> Self {
168+
let cfg = SessionConfig::new().with_information_schema(true);
169+
let session_ctx = SessionContext::new_with_config(cfg);
170+
let exec_cfg = ExecutionConfig::default();
171+
// Okay to `unwrap` in a test
172+
let app_catalog = create_app_catalog(&exec_cfg, "test", ".0.1.0").unwrap();
173+
session_ctx.register_catalog("test", app_catalog);
174+
#[cfg(feature = "observability")]
175+
let observability =
176+
ObservabilityContext::try_new(ObservabilityConfig::default(), "test").unwrap();
177+
Self {
178+
config: ExecutionConfig::default(),
179+
session_ctx,
180+
ddl_path: None,
181+
executor: None,
182+
#[cfg(feature = "observability")]
183+
observability,
184+
}
185+
}
186+
184187
pub fn config(&self) -> &ExecutionConfig {
185188
&self.config
186189
}

src/args.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -129,10 +129,21 @@ impl DftArgs {
129129

130130
#[derive(Clone, Debug, Subcommand)]
131131
pub enum FlightSqlCommand {
132-
/// Executes `GetFlightInfo` and `DoGet` on the provided SQL query
133-
StatementQuery { sql: String },
134-
/// Executes `GetCatalogsFlightInfo` and `DoGet`
132+
/// Executes `CommandStatementQuery` and `DoGet` to return results
133+
StatementQuery {
134+
/// The query to execute
135+
sql: String,
136+
},
137+
/// Executes `CommandGetCatalogs` and `DoGet` to return results
135138
GetCatalogs,
139+
/// Executes `CommandGetDbSchemas` and `DoGet` to return results
140+
GetDbSchemas {
141+
/// The catalog to retrieve schemas
142+
#[clap(long, short)]
143+
catalog: Option<String>,
144+
#[clap(long, short)]
145+
schema_pattern: Option<String>,
146+
},
136147
}
137148

138149
#[derive(Clone, Debug, Subcommand)]

src/cli/mod.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,24 @@ impl CliApp {
102102
.await?;
103103
let flight_batch_stream = stream::select_all(streams);
104104
self.print_any_stream(flight_batch_stream).await;
105-
105+
Ok(())
106+
}
107+
FlightSqlCommand::GetDbSchemas {
108+
catalog,
109+
schema_pattern,
110+
} => {
111+
let flight_info = self
112+
.app_execution
113+
.flightsql_ctx()
114+
.get_db_schemas_flight_info(catalog, schema_pattern)
115+
.await?;
116+
let streams = self
117+
.app_execution
118+
.flightsql_ctx()
119+
.do_get(flight_info)
120+
.await?;
121+
let flight_batch_stream = stream::select_all(streams);
122+
self.print_any_stream(flight_batch_stream).await;
106123
Ok(())
107124
}
108125
}

src/server/flightsql/service.rs

Lines changed: 50 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ use arrow_flight::error::FlightError;
2121
use arrow_flight::flight_service_server::{FlightService, FlightServiceServer};
2222
use arrow_flight::sql::server::FlightSqlService;
2323
use arrow_flight::sql::{
24-
Any, CommandGetCatalogs, CommandStatementQuery, SqlInfo, TicketStatementQuery,
24+
Any, CommandGetCatalogs, CommandGetDbSchemas, CommandStatementQuery, SqlInfo,
25+
TicketStatementQuery,
2526
};
2627
use arrow_flight::{FlightDescriptor, FlightEndpoint, FlightInfo, Ticket};
2728
use color_eyre::Result;
@@ -90,13 +91,13 @@ impl FlightSqlServiceImpl {
9091
.boxed();
9192
Ok(Response::new(flight_data_stream))
9293
} else {
93-
Err(Status::internal("Plan not found for id"))
94+
Err(Status::internal("plan not found for id"))
9495
}
9596
}
9697
Err(e) => {
9798
error!("error decoding handle to uuid for {request_id}: {:?}", e);
9899
Err(Status::internal(
99-
"Error decoding handle to uuid for {request_id}",
100+
"error decoding handle to uuid for {request_id}",
100101
))
101102
}
102103
}
@@ -131,20 +132,22 @@ impl FlightSqlServiceImpl {
131132
.try_record_request(ctx, req)
132133
.await
133134
{
134-
error!("Error recording request: {}", e.to_string())
135+
error!("error recording request: {}", e.to_string())
135136
}
136137

137138
histogram!(latency_metric).record(duration.get_milliseconds() as f64);
138139
}
139140

140-
async fn get_flight_info_statement_handler(
141+
async fn create_flight_info(
141142
&self,
142143
query: String,
143144
request_id: Uuid,
144-
request: Request<FlightDescriptor>,
145+
_request: Request<FlightDescriptor>,
145146
) -> Result<Response<FlightInfo>, Status> {
146-
info!("get_flight_info_statement query: {:?}", query);
147-
debug!("get_flight_info_statement request: {:?}", request);
147+
debug!(
148+
"creating flight info for request id {request_id} with query: {:?}",
149+
query
150+
);
148151
let dialect = datafusion::sql::sqlparser::dialect::GenericDialect {};
149152
match DFParser::parse_sql_with_dialect(&query, &dialect) {
150153
Ok(statements) => {
@@ -174,25 +177,25 @@ impl FlightSqlServiceImpl {
174177
debug!("flight info: {:?}", info);
175178

176179
let mut guard = self.requests.lock().map_err(|_| {
177-
Status::internal("Failed to acquire lock on requests")
180+
Status::internal("failed to acquire lock on requests")
178181
})?;
179182
guard.insert(request_id, logical_plan);
180183

181184
Ok(Response::new(info))
182185
} else {
183186
error!("error encoding ticket");
184-
Err(Status::internal("Error encoding ticket"))
187+
Err(Status::internal("error encoding ticket"))
185188
}
186189
}
187190
Err(e) => {
188191
error!("error planning SQL query: {:?}", e);
189-
Err(Status::internal("Error planning SQL query"))
192+
Err(Status::internal("error planning SQL query"))
190193
}
191194
}
192195
}
193196
Err(e) => {
194197
error!("error parsing SQL query: {:?}", e);
195-
Err(Status::internal("Error parsing SQL query"))
198+
Err(Status::internal("error parsing SQL query"))
196199
}
197200
}
198201
}
@@ -229,9 +232,40 @@ impl FlightSqlService for FlightSqlServiceImpl {
229232
let start = Timestamp::now();
230233
let request_id = uuid::Uuid::new_v4();
231234
let query = "SELECT DISTINCT table_catalog FROM information_schema.tables".to_string();
232-
let res = self
233-
.get_flight_info_statement_handler(query, request_id, request)
234-
.await;
235+
let res = self.create_flight_info(query, request_id, request).await;
236+
237+
// TODO: Move recording to after response is sent to not impact response latency
238+
self.record_request(
239+
start,
240+
Some(request_id.to_string()),
241+
res.as_ref().err(),
242+
"/get_flight_info_catalogs".to_string(),
243+
"get_flight_info_catalogs_latency_ms",
244+
)
245+
.await;
246+
res
247+
}
248+
249+
async fn get_flight_info_schemas(
250+
&self,
251+
command: CommandGetDbSchemas,
252+
request: Request<FlightDescriptor>,
253+
) -> Result<Response<FlightInfo>, Status> {
254+
counter!("requests", "endpoint" => "get_flight_info").increment(1);
255+
let start = Timestamp::now();
256+
let request_id = uuid::Uuid::new_v4();
257+
let CommandGetDbSchemas {
258+
catalog,
259+
db_schema_filter_pattern,
260+
} = command;
261+
let query = match (catalog, db_schema_filter_pattern) {
262+
(Some(catalog), Some(filter)) => format!("SELECT DISTINCT table_catalog, table_schema FROM information_schema.tables WHERE table_catalog = '{catalog}' AND table_schema ILIKE '%{filter}%' ORDER BY table_catalog, table_schema"),
263+
(None, Some(filter)) => format!("SELECT DISTINCT table_catalog, table_schema FROM information_schema.tables WHERE table_schema ILIKE '%{filter}%' ORDER BY table_catalog, table_schema"),
264+
(Some(catalog), None) => format!("SELECT DISTINCT table_catalog, table_schema FROM information_schema.tables WHERE table_catalog = '{catalog}' ORDER BY table_catalog, table_schema"),
265+
(None, None) => "SELECT DISTINCT table_catalog, table_schema FROM information_schema.tables ORDER BY table_catalog, table_schema".to_string()
266+
};
267+
println!("QUERY: {query}");
268+
let res = self.create_flight_info(query, request_id, request).await;
235269

236270
// TODO: Move recording to after response is sent to not impact response latency
237271
self.record_request(
@@ -255,7 +289,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
255289
let CommandStatementQuery { query, .. } = query;
256290
let request_id = uuid::Uuid::new_v4();
257291
let res = self
258-
.get_flight_info_statement_handler(query.clone(), request_id, request)
292+
.create_flight_info(query.clone(), request_id, request)
259293
.await;
260294

261295
// TODO: Move recording to after response is sent to not impact response latency

0 commit comments

Comments
 (0)