Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions etl-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ sqlx = { workspace = true, features = [
] }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
tokio-postgres = { workspace = true }
tracing = { workspace = true, default-features = false }
tracing-actix-web = { workspace = true, features = ["emit_event_on_error"] }
utoipa = { workspace = true, features = ["actix_extras"] }
Expand Down
69 changes: 69 additions & 0 deletions etl-api/src/db/publications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use serde::Serialize;
use sqlx::{Executor, PgPool, Row};
use std::collections::HashMap;
use thiserror::Error;
use tokio_postgres::types::Type;
use utoipa::ToSchema;

use crate::db::tables::Table;
Expand All @@ -11,6 +12,9 @@ use crate::db::tables::Table;
pub enum PublicationsDbError {
#[error("Error while interacting with Postgres for publications: {0}")]
Database(#[from] sqlx::Error),

#[error("Publication contains tables with unsupported column types: {0}")]
UnsupportedColumnTypes(String),
}

#[derive(Serialize, ToSchema)]
Expand All @@ -19,6 +23,71 @@ pub struct Publication {
pub tables: Vec<Table>,
}

/// Validates that all columns in the publication tables have supported types.
///
/// Queries `pg_attribute` to get type OIDs for all columns in the specified tables.
/// Returns an error if any column has a type OID that is unknown to `tokio_postgres`.
/// These unknown types would silently fall back to TEXT during replication, causing
/// data corruption.
pub async fn validate_publication_column_types(
publication: &Publication,
pool: &PgPool,
) -> Result<(), PublicationsDbError> {
if publication.tables.is_empty() {
return Ok(());
}

let mut unsupported_columns = Vec::new();

for table in &publication.tables {
let quoted_schema = quote_literal(&table.schema);
let quoted_name = quote_literal(&table.name);

let query = format!(
r#"
select
a.attname as column_name,
a.atttypid::int as type_oid,
t.typname as type_name
from pg_attribute a
join pg_class c on a.attrelid = c.oid
join pg_namespace n on c.relnamespace = n.oid
join pg_type t on a.atttypid = t.oid
where n.nspname = {quoted_schema}
and c.relname = {quoted_name}
and a.attnum > 0
and not a.attisdropped
order by a.attnum
"#
);

let rows = pool.fetch_all(query.as_str()).await?;

for row in rows {
let column_name: String = row.get("column_name");
// OID type in Postgres is stored as i32 in sqlx but represents u32
let type_oid_raw: i32 = row.get("type_oid");
let type_oid = type_oid_raw as u32;
let type_name: String = row.get("type_name");

if Type::from_oid(type_oid).is_none() {
unsupported_columns.push(format!(
"{}.{}.{} (type: {}, oid: {})",
table.schema, table.name, column_name, type_name, type_oid
));
}
}
}

if !unsupported_columns.is_empty() {
return Err(PublicationsDbError::UnsupportedColumnTypes(
unsupported_columns.join(", "),
));
}

Ok(())
}

pub async fn create_publication(
publication: &Publication,
pool: &PgPool,
Expand Down
84 changes: 69 additions & 15 deletions etl-api/src/routes/sources/publications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ enum PublicationError {
#[error("The publication with name {0} was not found")]
PublicationNotFound(String),

#[error("Invalid publication request: {0}")]
InvalidPublication(String),

#[error(transparent)]
TenantId(#[from] TenantIdError),

Expand All @@ -46,6 +49,15 @@ impl PublicationError {
| PublicationError::PublicationsDb(PublicationsDbError::Database(_)) => {
"internal server error".to_string()
}
// Validation errors are safe to expose - they help users fix their input
PublicationError::PublicationsDb(PublicationsDbError::UnsupportedColumnTypes(
details,
)) => {
format!(
"Publication contains tables with unsupported column types: {}",
details
)
}
// Every other message is ok, as they do not divulge sensitive information
e => e.to_string(),
}
Expand All @@ -55,13 +67,18 @@ impl PublicationError {
impl ResponseError for PublicationError {
fn status_code(&self) -> StatusCode {
match self {
PublicationError::SourcesDb(_)
| PublicationError::PublicationsDb(_)
| PublicationError::Database(_) => StatusCode::INTERNAL_SERVER_ERROR,
PublicationError::SourcesDb(_) | PublicationError::Database(_) => {
StatusCode::INTERNAL_SERVER_ERROR
}
PublicationError::PublicationsDb(PublicationsDbError::Database(_)) => {
StatusCode::INTERNAL_SERVER_ERROR
}
PublicationError::PublicationsDb(PublicationsDbError::UnsupportedColumnTypes(_))
| PublicationError::InvalidPublication(_)
| PublicationError::TenantId(_) => StatusCode::BAD_REQUEST,
PublicationError::SourceNotFound(_) | PublicationError::PublicationNotFound(_) => {
StatusCode::NOT_FOUND
}
PublicationError::TenantId(_) => StatusCode::BAD_REQUEST,
}
}

Expand All @@ -77,18 +94,18 @@ impl ResponseError for PublicationError {
}
}

#[derive(Deserialize, ToSchema)]
#[derive(Deserialize, Serialize, ToSchema)]
pub struct CreatePublicationRequest {
#[schema(example = "my_publication", required = true)]
name: String,
pub name: String,
#[schema(required = true)]
tables: Vec<Table>,
pub tables: Vec<Table>,
}

#[derive(Deserialize, ToSchema)]
#[derive(Deserialize, Serialize, ToSchema)]
pub struct UpdatePublicationRequest {
#[schema(required = true)]
tables: Vec<Table>,
pub tables: Vec<Table>,
}

#[derive(Serialize, ToSchema)]
Expand Down Expand Up @@ -125,13 +142,18 @@ pub async fn create_publication(
.map(|s| s.config)
.ok_or(PublicationError::SourceNotFound(source_id))?;

let source_pool =
connect_to_source_database_with_defaults(&config.into_connection_config()).await?;
let publication = publication.0;
let publication = publication.into_inner();
let publication = Publication {
name: publication.name,
tables: publication.tables,
};

validate_publication(&publication)?;

let source_pool =
connect_to_source_database_with_defaults(&config.into_connection_config()).await?;

db::publications::validate_publication_column_types(&publication, &source_pool).await?;
db::publications::create_publication(&publication, &source_pool).await?;

Ok(HttpResponse::Ok().finish())
Expand Down Expand Up @@ -206,13 +228,18 @@ pub async fn update_publication(
.map(|s| s.config)
.ok_or(PublicationError::SourceNotFound(source_id))?;

let source_pool =
connect_to_source_database_with_defaults(&config.into_connection_config()).await?;
let publication = publication.0;
let publication = publication.into_inner();
let publication = Publication {
name: publication_name,
tables: publication.tables,
};

validate_publication(&publication)?;

let source_pool =
connect_to_source_database_with_defaults(&config.into_connection_config()).await?;

db::publications::validate_publication_column_types(&publication, &source_pool).await?;
db::publications::update_publication(&publication, &source_pool).await?;

Ok(HttpResponse::Ok().finish())
Expand Down Expand Up @@ -288,3 +315,30 @@ pub async fn read_all_publications(

Ok(Json(response))
}

fn validate_publication(publication: &Publication) -> Result<(), PublicationError> {
let mut errors: Vec<String> = Vec::new();

if publication.name.trim().is_empty() {
errors.push("name cannot be empty".to_string());
}

if publication.tables.is_empty() {
errors.push("tables cannot be empty".to_string());
} else {
for (i, table) in publication.tables.iter().enumerate() {
if table.schema.trim().is_empty() {
errors.push(format!("table[{}]: schema cannot be empty", i));
}
if table.name.trim().is_empty() {
errors.push(format!("table[{}]: name cannot be empty", i));
}
}
}

if !errors.is_empty() {
return Err(PublicationError::InvalidPublication(errors.join(", ")));
}

Ok(())
}
Loading