Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

### Fixed

- Support composite and array arguments and columns when creating native queries via the cli. We used to assume scalars

## [v2.1.1] - 2025-03-12

### Changed
Expand Down
133 changes: 93 additions & 40 deletions crates/configuration/src/version4/native_operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use std::path::Path;
use query_engine_sql::sql;

use ndc_models as models;
use sqlx::postgres::types::Oid;
use sqlx::postgres::PgTypeKind;
use sqlx::Connection;
use sqlx::Executor;
use sqlx::{Column, PgConnection};
Expand All @@ -20,6 +22,37 @@ pub enum Kind {
Mutation,
}

/// Recursively figure out the underlying type oid for a given type
/// Figure out the underlying type oid for an argument or column
/// Usually we just want the oid, but for arrays we need the oid for the underlying (non-array) type
fn underlying_oid(kind: &PgTypeKind, oid: Option<Oid>) -> Option<Oid> {
match kind {
PgTypeKind::Simple
| PgTypeKind::Pseudo
| PgTypeKind::Domain(_)
| PgTypeKind::Enum(_)
| PgTypeKind::Range(_)
| PgTypeKind::Composite(_) => oid,
PgTypeKind::Array(pg_type_info) => underlying_oid(pg_type_info.kind(), pg_type_info.oid()),
}
}

/// Convert a kind and a resolved underlying type name to a metadata type
fn to_metadata_type(kind: &PgTypeKind, underlying_type_name: &str) -> metadata::Type {
match kind {
PgTypeKind::Simple
| PgTypeKind::Pseudo
| PgTypeKind::Domain(_)
| PgTypeKind::Enum(_)
| PgTypeKind::Range(_) => metadata::Type::ScalarType(underlying_type_name.into()),
PgTypeKind::Composite(_) => metadata::Type::CompositeType(underlying_type_name.into()),
PgTypeKind::Array(pg_type_info) => metadata::Type::ArrayType(Box::new(to_metadata_type(
pg_type_info.kind(),
underlying_type_name,
))),
}
}

/// Take a SQL file containing a Native Operation, check against the database that it is valid,
/// and add it to the configuration if it is.
pub async fn create(
Expand Down Expand Up @@ -60,21 +93,23 @@ pub async fn create(
anyhow::bail!("Internal error: Native operation parameter was not a variable.")
};

let the_oid = result_param
.oid()
// get the oid for the underlying type. If the parameter is an array, get the array item's oid
let the_oid = underlying_oid(result_param.kind(), result_param.oid())
.ok_or(anyhow::anyhow!(
"Internal error: All sqlx TypeInfos should have an oid."
))?
.0;

arguments_to_oids.insert(param_name, i64::from(the_oid));
arguments_to_oids.insert(
param_name,
(i64::from(the_oid), result_param.kind().to_owned()),
);
}

// Fill the columns list.
for (index, column) in result.columns.iter().enumerate() {
let the_oid = column
.type_info()
.oid()
// get the oid for the underlying type. If the parameter is an array, get the array item's oid
let the_oid = underlying_oid(column.type_info().kind(), column.type_info().oid())
.ok_or(anyhow::anyhow!(
"Internal error: All sqlx TypeInfos should have an oid."
))?
Expand All @@ -84,44 +119,59 @@ pub async fn create(
true,
);

columns_to_oids.insert(column.name().to_string(), (i64::from(the_oid), is_nullable));
columns_to_oids.insert(
column.name().to_string(),
(
i64::from(the_oid),
column.type_info().kind().to_owned(),
is_nullable,
),
);
}

let mut oids: BTreeSet<i64> = arguments_to_oids.values().copied().collect();
oids.extend::<BTreeSet<i64>>(columns_to_oids.values().copied().map(|x| x.0).collect());
let mut oids: BTreeSet<i64> = arguments_to_oids
.values()
.map(|(oid, _)| oid)
.copied()
.collect();
oids.extend::<BTreeSet<i64>>(
columns_to_oids
.values()
.map(|(oid, _, _)| oid)
.copied()
.collect(),
);
let oids_vec: Vec<_> = oids.into_iter().collect();
let oids_map = oids_to_typenames(configuration, connection_string, &oids_vec).await?;

let mut arguments = BTreeMap::new();
for (name, oid) in arguments_to_oids {
for (name, (oid, kind)) in arguments_to_oids {
let underlying_type_name = oids_map
.get(&oid)
.ok_or_else(|| anyhow::anyhow!("Internal error: oid not found in map."))?;

arguments.insert(
name.clone().into(),
metadata::ReadOnlyColumnInfo {
name: name.clone(),
r#type: metadata::Type::ScalarType(
oids_map
.get(&oid)
.ok_or_else(|| anyhow::anyhow!("Internal error: oid not found in map."))?
.clone(),
),
r#type: to_metadata_type(&kind, &underlying_type_name.to_string()),
description: None,
// we don't have this information, so we assume not nullable.
nullable: metadata::Nullable::NonNullable,
},
);
}
let mut columns = BTreeMap::new();
for (name, (oid, is_nullable)) in columns_to_oids {
for (name, (oid, kind, is_nullable)) in columns_to_oids {
let underlying_type_name = oids_map
.get(&oid)
.ok_or_else(|| anyhow::anyhow!("Internal error: oid not found in map."))?;

columns.insert(
name.clone().into(),
metadata::ReadOnlyColumnInfo {
name: name.clone(),
r#type: metadata::Type::ScalarType(
oids_map
.get(&oid)
.ok_or_else(|| anyhow::anyhow!("Internal error: oid not found in map."))?
.clone(),
),
r#type: to_metadata_type(&kind, &underlying_type_name.to_string()),
description: None,
nullable: if is_nullable {
metadata::Nullable::Nullable
Expand Down Expand Up @@ -155,7 +205,7 @@ pub async fn oids_to_typenames(
configuration: &super::ParsedConfiguration,
connection_string: &str,
oids: &Vec<i64>,
) -> Result<BTreeMap<i64, models::ScalarTypeName>, sqlx::Error> {
) -> Result<BTreeMap<i64, models::TypeName>, sqlx::Error> {
let mut connection = PgConnection::connect(connection_string)
.instrument(info_span!("Connect to database"))
.await?;
Expand All @@ -166,37 +216,40 @@ pub async fn oids_to_typenames(
.instrument(info_span!("Run oid lookup query"))
.await?;

let mut oids_map: BTreeMap<i64, models::ScalarTypeName> = BTreeMap::new();
let mut oids_map: BTreeMap<i64, models::TypeName> = BTreeMap::new();

// Reverse lookup the schema.typename and find the ndc type name,
// if we find all we can just add the nq and call it a day.
for row in rows {
'rows: for row in rows {
let schema_name: String = row.schema_name;
let type_name: String = row.type_name;
let oid: i64 = row.oid;

let mut found = false;
for (scalar_type_name, info) in &configuration.metadata.scalar_types.0 {
if info.schema_name == schema_name && info.type_name == type_name {
oids_map.insert(oid, scalar_type_name.clone());
found = true;
continue;
oids_map.insert(oid, scalar_type_name.inner().clone());
continue 'rows;
}
}

for (composite_type_name, info) in &configuration.metadata.composite_types.0 {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This section was missing, and since v4 does have composite types I think this was a mistake

if info.schema_name == schema_name && info.type_name == type_name {
oids_map.insert(oid, composite_type_name.clone());
continue 'rows;
}
}

// If we don't find it we generate a name which is either schema_typename
// or just typename depending if the schema is in the unqualified list or not,
// then add the nq and run the introspection.
if !found {
if configuration
.introspection_options
.unqualified_schemas_for_types_and_procedures
.contains(&schema_name)
{
oids_map.insert(oid, type_name.into());
} else {
oids_map.insert(oid, format!("{schema_name}_{type_name}").into());
}
if configuration
.introspection_options
.unqualified_schemas_for_types_and_procedures
.contains(&schema_name)
{
oids_map.insert(oid, type_name.into());
} else {
oids_map.insert(oid, format!("{schema_name}_{type_name}").into());
}
}

Expand Down
Loading
Loading