Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions etl-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ sqlx = { workspace = true, features = [
] }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
tokio-postgres = { workspace = true }
tracing = { workspace = true, default-features = false }
tracing-actix-web = { workspace = true, features = ["emit_event_on_error"] }
utoipa = { workspace = true, features = ["actix_extras"] }
Expand Down
69 changes: 69 additions & 0 deletions etl-api/src/db/publications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use serde::Serialize;
use sqlx::{Executor, PgPool, Row};
use std::collections::HashMap;
use thiserror::Error;
use tokio_postgres::types::Type;
use utoipa::ToSchema;

use crate::db::tables::Table;
Expand All @@ -11,6 +12,9 @@ use crate::db::tables::Table;
pub enum PublicationsDbError {
#[error("Error while interacting with Postgres for publications: {0}")]
Database(#[from] sqlx::Error),

#[error("Publication contains tables with unsupported column types: {0}")]
UnsupportedColumnTypes(String),
}

#[derive(Serialize, ToSchema)]
Expand All @@ -19,6 +23,71 @@ pub struct Publication {
pub tables: Vec<Table>,
}

/// Validates that all columns in the publication tables have supported types.
///
/// Queries `pg_attribute` to get type OIDs for all columns in the specified tables.
/// Returns an error if any column has a type OID that is unknown to `tokio_postgres`.
/// These unknown types would silently fall back to TEXT during replication, causing
/// data corruption.
pub async fn validate_publication_column_types(
publication: &Publication,
pool: &PgPool,
) -> Result<(), PublicationsDbError> {
if publication.tables.is_empty() {
return Ok(());
}

let mut unsupported_columns = Vec::new();

for table in &publication.tables {
let quoted_schema = quote_literal(&table.schema);
let quoted_name = quote_literal(&table.name);

let query = format!(
r#"
select
a.attname as column_name,
a.atttypid::int as type_oid,
t.typname as type_name
from pg_attribute a
join pg_class c on a.attrelid = c.oid
join pg_namespace n on c.relnamespace = n.oid
join pg_type t on a.atttypid = t.oid
where n.nspname = {quoted_schema}
and c.relname = {quoted_name}
and a.attnum > 0
and not a.attisdropped
order by a.attnum
"#
);

let rows = pool.fetch_all(query.as_str()).await?;

for row in rows {
let column_name: String = row.get("column_name");
// OID type in Postgres is stored as i32 in sqlx but represents u32
let type_oid_raw: i32 = row.get("type_oid");
let type_oid = type_oid_raw as u32;
let type_name: String = row.get("type_name");

if Type::from_oid(type_oid).is_none() {
unsupported_columns.push(format!(
"{}.{}.{} (type: {}, oid: {})",
table.schema, table.name, column_name, type_name, type_oid
));
}
}
}

if !unsupported_columns.is_empty() {
return Err(PublicationsDbError::UnsupportedColumnTypes(
unsupported_columns.join(", "),
));
}

Ok(())
}

pub async fn create_publication(
publication: &Publication,
pool: &PgPool,
Expand Down
84 changes: 69 additions & 15 deletions etl-api/src/routes/sources/publications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ enum PublicationError {
#[error("The publication with name {0} was not found")]
PublicationNotFound(String),

#[error("Invalid publication request: {0}")]
InvalidPublication(String),

#[error(transparent)]
TenantId(#[from] TenantIdError),

Expand All @@ -46,6 +49,15 @@ impl PublicationError {
| PublicationError::PublicationsDb(PublicationsDbError::Database(_)) => {
"internal server error".to_string()
}
// Validation errors are safe to expose - they help users fix their input
PublicationError::PublicationsDb(PublicationsDbError::UnsupportedColumnTypes(
details,
)) => {
format!(
"Publication contains tables with unsupported column types: {}",
details
)
}
// Every other message is ok, as they do not divulge sensitive information
e => e.to_string(),
}
Expand All @@ -55,13 +67,18 @@ impl PublicationError {
impl ResponseError for PublicationError {
fn status_code(&self) -> StatusCode {
match self {
PublicationError::SourcesDb(_)
| PublicationError::PublicationsDb(_)
| PublicationError::Database(_) => StatusCode::INTERNAL_SERVER_ERROR,
PublicationError::SourcesDb(_) | PublicationError::Database(_) => {
StatusCode::INTERNAL_SERVER_ERROR
}
PublicationError::PublicationsDb(PublicationsDbError::Database(_)) => {
StatusCode::INTERNAL_SERVER_ERROR
}
PublicationError::PublicationsDb(PublicationsDbError::UnsupportedColumnTypes(_))
| PublicationError::InvalidPublication(_)
| PublicationError::TenantId(_) => StatusCode::BAD_REQUEST,
PublicationError::SourceNotFound(_) | PublicationError::PublicationNotFound(_) => {
StatusCode::NOT_FOUND
}
PublicationError::TenantId(_) => StatusCode::BAD_REQUEST,
}
}

Expand All @@ -77,18 +94,18 @@ impl ResponseError for PublicationError {
}
}

#[derive(Deserialize, ToSchema)]
#[derive(Deserialize, Serialize, ToSchema)]
pub struct CreatePublicationRequest {
#[schema(example = "my_publication", required = true)]
name: String,
pub name: String,
#[schema(required = true)]
tables: Vec<Table>,
pub tables: Vec<Table>,
}

#[derive(Deserialize, ToSchema)]
#[derive(Deserialize, Serialize, ToSchema)]
pub struct UpdatePublicationRequest {
#[schema(required = true)]
tables: Vec<Table>,
pub tables: Vec<Table>,
}

#[derive(Serialize, ToSchema)]
Expand Down Expand Up @@ -125,13 +142,18 @@ pub async fn create_publication(
.map(|s| s.config)
.ok_or(PublicationError::SourceNotFound(source_id))?;

let source_pool =
connect_to_source_database_with_defaults(&config.into_connection_config()).await?;
let publication = publication.0;
let publication = publication.into_inner();
let publication = Publication {
name: publication.name,
tables: publication.tables,
};

validate_publication(&publication)?;

let source_pool =
connect_to_source_database_with_defaults(&config.into_connection_config()).await?;

db::publications::validate_publication_column_types(&publication, &source_pool).await?;
db::publications::create_publication(&publication, &source_pool).await?;

Ok(HttpResponse::Ok().finish())
Expand Down Expand Up @@ -206,13 +228,18 @@ pub async fn update_publication(
.map(|s| s.config)
.ok_or(PublicationError::SourceNotFound(source_id))?;

let source_pool =
connect_to_source_database_with_defaults(&config.into_connection_config()).await?;
let publication = publication.0;
let publication = publication.into_inner();
let publication = Publication {
name: publication_name,
tables: publication.tables,
};

validate_publication(&publication)?;

let source_pool =
connect_to_source_database_with_defaults(&config.into_connection_config()).await?;

db::publications::validate_publication_column_types(&publication, &source_pool).await?;
db::publications::update_publication(&publication, &source_pool).await?;

Ok(HttpResponse::Ok().finish())
Expand Down Expand Up @@ -288,3 +315,30 @@ pub async fn read_all_publications(

Ok(Json(response))
}

fn validate_publication(publication: &Publication) -> Result<(), PublicationError> {
let mut errors: Vec<String> = Vec::new();

if publication.name.trim().is_empty() {
errors.push("name cannot be empty".to_string());
}

if publication.tables.is_empty() {
errors.push("tables cannot be empty".to_string());
} else {
for (i, table) in publication.tables.iter().enumerate() {
if table.schema.trim().is_empty() {
errors.push(format!("table[{}]: schema cannot be empty", i));
}
if table.name.trim().is_empty() {
errors.push(format!("table[{}]: name cannot be empty", i));
}
}
}

if !errors.is_empty() {
return Err(PublicationError::InvalidPublication(errors.join(", ")));
}

Ok(())
}
Loading