Skip to content

Commit 3b642e2

Browse files
committed
feat: add PostgreSQL schema support to Postgres target
1 parent 3b25c9f commit 3b642e2

File tree

1 file changed

+41
-7
lines changed

1 file changed

+41
-7
lines changed

src/ops/targets/postgres.rs

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use std::ops::Bound;
1818
pub struct Spec {
1919
database: Option<spec::AuthEntryReference<DatabaseConnectionSpec>>,
2020
table_name: Option<String>,
21+
schema: Option<String>,
2122
}
2223
const BIND_LIMIT: usize = 65535;
2324

@@ -143,10 +144,12 @@ impl ExportContext {
143144
fn new(
144145
db_ref: Option<spec::AuthEntryReference<DatabaseConnectionSpec>>,
145146
db_pool: PgPool,
146-
table_name: String,
147+
table_id: &TableId,
147148
key_fields_schema: Box<[FieldSchema]>,
148149
value_fields_schema: Vec<FieldSchema>,
149150
) -> Result<Self> {
151+
let table_name = qualified_table_name(table_id);
152+
150153
let key_fields = key_fields_schema
151154
.iter()
152155
.map(|f| format!("\"{}\"", f.name))
@@ -255,12 +258,18 @@ pub struct Factory {}
255258
pub struct TableId {
256259
#[serde(skip_serializing_if = "Option::is_none")]
257260
database: Option<spec::AuthEntryReference<DatabaseConnectionSpec>>,
261+
#[serde(skip_serializing_if = "Option::is_none")]
262+
schema: Option<String>,
258263
table_name: String,
259264
}
260265

261266
impl std::fmt::Display for TableId {
262267
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
263-
write!(f, "{}", self.table_name)?;
268+
if let Some(schema) = &self.schema {
269+
write!(f, "{}.{}", schema, self.table_name)?;
270+
} else {
271+
write!(f, "{}", self.table_name)?;
272+
}
264273
if let Some(database) = &self.database {
265274
write!(f, " (database: {database})")?;
266275
}
@@ -345,6 +354,13 @@ fn to_column_type_sql(column_type: &ValueType) -> String {
345354
}
346355
}
347356

357+
fn qualified_table_name(table_id: &TableId) -> String {
358+
match &table_id.schema {
359+
Some(schema) => format!("\"{}\".\"{}\"", schema, table_id.table_name),
360+
None => format!("\"{}\"", table_id.table_name),
361+
}
362+
}
363+
348364
impl<'a> From<&'a SetupState> for Cow<'a, TableColumnsSchema<String>> {
349365
fn from(val: &'a SetupState) -> Self {
350366
Cow::Owned(TableColumnsSchema {
@@ -554,7 +570,15 @@ impl setup::ResourceSetupChange for SetupChange {
554570
}
555571

556572
impl SetupChange {
557-
async fn apply_change(&self, db_pool: &PgPool, table_name: &str) -> Result<()> {
573+
async fn apply_change(&self, db_pool: &PgPool, table_id: &TableId) -> Result<()> {
574+
// Create schema if specified
575+
if let Some(schema) = &table_id.schema {
576+
let sql = format!("CREATE SCHEMA IF NOT EXISTS \"{}\"", schema);
577+
sqlx::query(&sql).execute(db_pool).await?;
578+
}
579+
580+
let table_name = qualified_table_name(table_id);
581+
558582
if self.actions.table_action.drop_existing {
559583
sqlx::query(&format!("DROP TABLE IF EXISTS {table_name}"))
560584
.execute(db_pool)
@@ -638,8 +662,18 @@ impl TargetFactoryBase for Factory {
638662
let data_coll_output = data_collections
639663
.into_iter()
640664
.map(|d| {
665+
// Validate: if schema is specified, table_name must be explicit
666+
if d.spec.schema.is_some() && d.spec.table_name.is_none() {
667+
bail!(
668+
"Postgres target '{}': when 'schema' is specified, 'table_name' must also be explicitly provided. \
669+
Auto-generated table names are not supported with custom schemas",
670+
d.name
671+
);
672+
}
673+
641674
let table_id = TableId {
642675
database: d.spec.database.clone(),
676+
schema: d.spec.schema.clone(),
643677
table_name: d.spec.table_name.unwrap_or_else(|| {
644678
utils::db::sanitize_identifier(&format!(
645679
"{}__{}",
@@ -653,15 +687,15 @@ impl TargetFactoryBase for Factory {
653687
&d.value_fields_schema,
654688
&d.index_options,
655689
);
656-
let table_name = table_id.table_name.clone();
690+
let table_id_clone = table_id.clone();
657691
let db_ref = d.spec.database;
658692
let auth_registry = context.auth_registry.clone();
659693
let export_context = Box::pin(async move {
660694
let db_pool = get_db_pool(db_ref.as_ref(), &auth_registry).await?;
661695
let export_context = Arc::new(ExportContext::new(
662696
db_ref,
663697
db_pool.clone(),
664-
table_name,
698+
&table_id_clone,
665699
d.key_fields_schema,
666700
d.value_fields_schema,
667701
)?);
@@ -699,7 +733,7 @@ impl TargetFactoryBase for Factory {
699733
}
700734

701735
fn describe_resource(&self, key: &TableId) -> Result<String> {
702-
Ok(format!("Postgres table {}", key.table_name))
736+
Ok(format!("Postgres table {}", key))
703737
}
704738

705739
async fn apply_mutation(
@@ -746,7 +780,7 @@ impl TargetFactoryBase for Factory {
746780
let db_pool = get_db_pool(change.key.database.as_ref(), &context.auth_registry).await?;
747781
change
748782
.setup_change
749-
.apply_change(&db_pool, &change.key.table_name)
783+
.apply_change(&db_pool, &change.key)
750784
.await?;
751785
}
752786
Ok(())

0 commit comments

Comments
 (0)