Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions datafusion-postgres-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::Arc;
use datafusion::execution::options::{
ArrowReadOptions, AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ParquetReadOptions,
};
use datafusion::prelude::SessionContext;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_postgres::{serve, ServerOptions}; // Assuming the crate name is `datafusion_postgres`
use structopt::StructOpt;

Expand Down Expand Up @@ -179,7 +179,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut opts = Opt::from_args();
opts.include_directory_files()?;

let session_context = SessionContext::new();
let session_config = SessionConfig::new().with_information_schema(true);
let session_context = SessionContext::new_with_config(session_config);

setup_session_context(&session_context, &opts).await?;

Expand Down
68 changes: 1 addition & 67 deletions datafusion-postgres/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use pgwire::api::{ClientInfo, NoopErrorHandler, PgWireServerHandlers, Type};
use tokio::sync::Mutex;

use crate::datatypes;
use crate::information_schema::{columns_df, schemata_df, tables_df};
use pgwire::error::{PgWireError, PgWireResult};

pub struct HandlerFactory(pub Arc<DfSessionService>);
Expand Down Expand Up @@ -91,31 +90,6 @@ impl DfSessionService {
Ok(QueryResponse::new(Arc::new(fields), Box::pin(row_stream)))
}

// Mock pg_namespace response
async fn mock_pg_namespace<'a>(&self) -> PgWireResult<QueryResponse<'a>> {
let fields = Arc::new(vec![FieldInfo::new(
"nspname".to_string(),
None,
None,
Type::VARCHAR,
FieldFormat::Text,
)]);

let fields_ref = fields.clone();
let rows = self
.session_context
.catalog_names()
.into_iter()
.map(move |name| {
let mut encoder = pgwire::api::results::DataRowEncoder::new(fields_ref.clone());
encoder.encode_field(&Some(&name))?; // Return catalog_name as a schema
encoder.finish()
});

let row_stream = futures::stream::iter(rows);
Ok(QueryResponse::new(fields.clone(), Box::pin(row_stream)))
}

async fn try_respond_set_statements<'a>(
&self,
query_lower: &str,
Expand Down Expand Up @@ -189,39 +163,6 @@ impl DfSessionService {
Ok(None)
}
}

async fn try_respond_information_schema<'a>(
&self,
query_lower: &str,
) -> PgWireResult<Option<Response<'a>>> {
if query_lower.contains("information_schema.schemata") {
let df = schemata_df(&self.session_context)
.await
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;
let resp = datatypes::encode_dataframe(df, &Format::UnifiedText).await?;
return Ok(Some(Response::Query(resp)));
} else if query_lower.contains("information_schema.tables") {
let df = tables_df(&self.session_context)
.await
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;
let resp = datatypes::encode_dataframe(df, &Format::UnifiedText).await?;
return Ok(Some(Response::Query(resp)));
} else if query_lower.contains("information_schema.columns") {
let df = columns_df(&self.session_context)
.await
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;
let resp = datatypes::encode_dataframe(df, &Format::UnifiedText).await?;
return Ok(Some(Response::Query(resp)));
}

// Handle pg_catalog.pg_namespace for pgcli compatibility
if query_lower.contains("pg_catalog.pg_namespace") {
let resp = self.mock_pg_namespace().await?;
return Ok(Some(Response::Query(resp)));
}

Ok(None)
}
}

#[async_trait]
Expand All @@ -241,10 +182,6 @@ impl SimpleQueryHandler for DfSessionService {
return Ok(vec![resp]);
}

if let Some(resp) = self.try_respond_information_schema(&query_lower).await? {
return Ok(vec![resp]);
}

let df = self
.session_context
.sql(query)
Expand Down Expand Up @@ -361,11 +298,8 @@ impl ExtendedQueryHandler for DfSessionService {
return Ok(resp);
}

if let Some(resp) = self.try_respond_information_schema(&query).await? {
return Ok(resp);
}

let (_, plan) = &portal.statement.statement;

let param_types = plan
.get_parameter_types()
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;
Expand Down
134 changes: 0 additions & 134 deletions datafusion-postgres/src/information_schema.rs

This file was deleted.

1 change: 0 additions & 1 deletion datafusion-postgres/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
mod datatypes;
mod encoder;
mod handlers;
mod information_schema;

use std::sync::Arc;

Expand Down
Loading