diff --git a/datafusion-postgres/src/handlers.rs b/datafusion-postgres/src/handlers.rs index 1110a0b..6067e37 100644 --- a/datafusion-postgres/src/handlers.rs +++ b/datafusion-postgres/src/handlers.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::sync::Arc; use async_trait::async_trait; @@ -138,9 +139,9 @@ impl ExtendedQueryHandler for DfSessionService { .map_err(|e| PgWireError::ApiError(Box::new(e)))?; let mut param_types = Vec::with_capacity(params.len()); - for param_type in params.into_values() { + for param_type in ordered_param_types(¶ms).iter() { if let Some(datatype) = param_type { - let pgtype = into_pg_type(&datatype)?; + let pgtype = into_pg_type(datatype)?; param_types.push(pgtype); } else { param_types.push(Type::UNKNOWN); @@ -177,15 +178,12 @@ impl ExtendedQueryHandler for DfSessionService { { let plan = &portal.statement.statement; - let param_values = datatypes::deserialize_parameters( - portal, - &plan - .get_parameter_types() - .map_err(|e| PgWireError::ApiError(Box::new(e)))? - .values() - .map(|v| v.as_ref()) - .collect::>>(), - )?; + let param_types = plan + .get_parameter_types() + .map_err(|e| PgWireError::ApiError(Box::new(e)))?; + + let param_values = + datatypes::deserialize_parameters(portal, &ordered_param_types(¶m_types))?; let plan = plan .clone() @@ -202,3 +200,12 @@ impl ExtendedQueryHandler for DfSessionService { Ok(Response::Query(resp)) } } + +fn ordered_param_types(types: &HashMap>) -> Vec> { + // Datafusion stores the parameters as a map. In our case, the keys will be + // `$1`, `$2` etc. The values will be the parameter types. + + let mut types = types.iter().collect::>(); + types.sort_by(|a, b| a.0.cmp(b.0)); + types.into_iter().map(|pt| pt.1.as_ref()).collect() +}