Skip to content

Commit 7b9bdf9

Browse files
committed
Merge branch 'master' into test/pgcli-startup-queries
2 parents de97ba6 + cb841ea commit 7b9bdf9

File tree

2 files changed

+54
-16
lines changed

2 files changed

+54
-16
lines changed

arrow-pg/src/datatypes/df.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use futures::{stream, StreamExt};
1111
use pgwire::api::portal::{Format, Portal};
1212
use pgwire::api::results::QueryResponse;
1313
use pgwire::api::Type;
14-
use pgwire::error::{ErrorInfo, PgWireError, PgWireResult};
14+
use pgwire::error::{PgWireError, PgWireResult};
1515
use pgwire::messages::data::DataRow;
1616
use rust_decimal::prelude::ToPrimitive;
1717
use rust_decimal::Decimal;
@@ -262,11 +262,10 @@ where
262262
}
263263
// TODO: add more advanced types (composite types, ranges, etc.)
264264
_ => {
265-
return Err(PgWireError::UserError(Box::new(ErrorInfo::new(
266-
"FATAL".to_string(),
267-
"XX000".to_string(),
268-
format!("Unsupported parameter type: {pg_type}"),
269-
))));
265+
// Default to string/text for unsupported parameter types
266+
// This allows graceful degradation instead of fatal errors
267+
let value = portal.parameter::<String>(i, &pg_type)?;
268+
deserialized_params.push(ScalarValue::Utf8(value));
270269
}
271270
}
272271
}

datafusion-postgres/src/handlers.rs

Lines changed: 49 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -338,9 +338,15 @@ impl DfSessionService {
338338
match query_lower.trim() {
339339
"begin" | "begin transaction" | "begin work" | "start transaction" => {
340340
match client.transaction_status() {
341-
TransactionStatus::Idle | TransactionStatus::Transaction => {
341+
TransactionStatus::Idle => {
342342
Ok(Some(Response::TransactionStart(Tag::new("BEGIN"))))
343343
}
344+
TransactionStatus::Transaction => {
345+
// PostgreSQL behavior: ignore nested BEGIN, just return SUCCESS
346+
// This matches PostgreSQL's handling of nested transaction blocks
347+
log::warn!("BEGIN command ignored: already in transaction block");
348+
Ok(Some(Response::Execution(Tag::new("BEGIN"))))
349+
}
344350
TransactionStatus::Error => {
345351
// Can't start new transaction from failed state
346352
Err(PgWireError::UserError(Box::new(
@@ -435,6 +441,16 @@ impl SimpleQueryHandler for DfSessionService {
435441
C: ClientInfo + Unpin + Send + Sync,
436442
{
437443
log::debug!("Received query: {query}"); // Log the query for debugging
444+
445+
// Check for transaction commands early to avoid SQL parsing issues with ABORT
446+
let query_lower = query.to_lowercase().trim().to_string();
447+
if let Some(resp) = self
448+
.try_respond_transaction_statements(client, &query_lower)
449+
.await?
450+
{
451+
return Ok(vec![resp]);
452+
}
453+
438454
let mut statements = parse(query).map_err(|e| PgWireError::ApiError(Box::new(e)))?;
439455

440456
// TODO: deal with multiple statements
@@ -467,13 +483,6 @@ impl SimpleQueryHandler for DfSessionService {
467483
return Ok(vec![resp]);
468484
}
469485

470-
if let Some(resp) = self
471-
.try_respond_transaction_statements(client, &query_lower)
472-
.await?
473-
{
474-
return Ok(vec![resp]);
475-
}
476-
477486
if let Some(resp) = self
478487
.try_respond_show_statements(client, &query_lower)
479488
.await?
@@ -716,8 +725,38 @@ impl QueryParser for Parser {
716725
sql: &str,
717726
_types: &[Type],
718727
) -> PgWireResult<Self::Statement> {
719-
log::debug!("Received parse extended query: {sql}"); // Log for
720-
// debugging
728+
log::debug!("Received parse extended query: {sql}"); // Log for debugging
729+
730+
// Check for transaction commands that shouldn't be parsed by DataFusion
731+
let sql_lower = sql.to_lowercase();
732+
let sql_trimmed = sql_lower.trim();
733+
if matches!(
734+
sql_trimmed,
735+
"begin"
736+
| "begin transaction"
737+
| "begin work"
738+
| "start transaction"
739+
| "commit"
740+
| "commit transaction"
741+
| "commit work"
742+
| "end"
743+
| "end transaction"
744+
| "rollback"
745+
| "rollback transaction"
746+
| "rollback work"
747+
| "abort"
748+
) {
749+
// Return a dummy plan for transaction commands - they'll be handled by transaction handler
750+
let dummy_schema = datafusion::common::DFSchema::empty();
751+
let dummy_plan = datafusion::logical_expr::LogicalPlan::EmptyRelation(
752+
datafusion::logical_expr::EmptyRelation {
753+
produce_one_row: false,
754+
schema: std::sync::Arc::new(dummy_schema),
755+
},
756+
);
757+
return Ok((sql.to_string(), dummy_plan));
758+
}
759+
721760
let mut statements = parse(sql).map_err(|e| PgWireError::ApiError(Box::new(e)))?;
722761
let mut statement = statements.remove(0);
723762

0 commit comments

Comments
 (0)