Skip to content

Commit c0827ca

Browse files
authored
Feat/postgres schema support (#1138)
* feat: add PostgreSQL schema support * feat: add PostgreSQL schema support to Postgres target * fix/remove-quote * docs: add schema parameter
1 parent 158943f commit c0827ca

File tree

3 files changed

+44
-7
lines changed

3 files changed

+44
-7
lines changed

docs/docs/targets/postgres.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ The spec takes the following fields:
4747

4848
* `table_name` (`str`, optional): The name of the table to store to. If unspecified, will use the table name `[${AppNamespace}__]${FlowName}__${TargetName}`, e.g. `DemoFlow__doc_embeddings` or `Staging__DemoFlow__doc_embeddings`.
4949

50+
* `schema` (`str`, optional): The PostgreSQL schema to create the table in. If unspecified, the table will be created in the default schema (usually `public`). When specified, `table_name` must also be explicitly specified. CocoIndex will automatically create the schema if it doesn't exist.
51+
5052
## Example
5153
<ExampleButton
5254
href="https://github.com/cocoindex-io/cocoindex/tree/main/examples/text_embedding"

python/cocoindex/targets/_engine_builtin_specs.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ class Postgres(op.TargetSpec):
1414

1515
database: AuthEntryReference[DatabaseConnectionSpec] | None = None
1616
table_name: str | None = None
17+
schema: str | None = None
1718

1819

1920
class PostgresSqlAttachment(op.TargetAttachmentSpec):

src/ops/targets/postgres.rs

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use std::ops::Bound;
1818
pub struct Spec {
1919
database: Option<spec::AuthEntryReference<DatabaseConnectionSpec>>,
2020
table_name: Option<String>,
21+
schema: Option<String>,
2122
}
2223
const BIND_LIMIT: usize = 65535;
2324

@@ -143,10 +144,12 @@ impl ExportContext {
143144
fn new(
144145
db_ref: Option<spec::AuthEntryReference<DatabaseConnectionSpec>>,
145146
db_pool: PgPool,
146-
table_name: String,
147+
table_id: &TableId,
147148
key_fields_schema: Box<[FieldSchema]>,
148149
value_fields_schema: Vec<FieldSchema>,
149150
) -> Result<Self> {
151+
let table_name = qualified_table_name(table_id);
152+
150153
let key_fields = key_fields_schema
151154
.iter()
152155
.map(|f| format!("\"{}\"", f.name))
@@ -254,12 +257,18 @@ struct TargetFactory;
254257
pub struct TableId {
255258
#[serde(skip_serializing_if = "Option::is_none")]
256259
database: Option<spec::AuthEntryReference<DatabaseConnectionSpec>>,
260+
#[serde(skip_serializing_if = "Option::is_none")]
261+
schema: Option<String>,
257262
table_name: String,
258263
}
259264

260265
impl std::fmt::Display for TableId {
261266
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
262-
write!(f, "{}", self.table_name)?;
267+
if let Some(schema) = &self.schema {
268+
write!(f, "{}.{}", schema, self.table_name)?;
269+
} else {
270+
write!(f, "{}", self.table_name)?;
271+
}
263272
if let Some(database) = &self.database {
264273
write!(f, " (database: {database})")?;
265274
}
@@ -344,6 +353,13 @@ fn to_column_type_sql(column_type: &ValueType) -> String {
344353
}
345354
}
346355

356+
fn qualified_table_name(table_id: &TableId) -> String {
357+
match &table_id.schema {
358+
Some(schema) => format!("\"{}\".{}", schema, table_id.table_name),
359+
None => table_id.table_name.clone(),
360+
}
361+
}
362+
347363
impl<'a> From<&'a SetupState> for Cow<'a, TableColumnsSchema<String>> {
348364
fn from(val: &'a SetupState) -> Self {
349365
Cow::Owned(TableColumnsSchema {
@@ -553,7 +569,9 @@ impl setup::ResourceSetupChange for SetupChange {
553569
}
554570

555571
impl SetupChange {
556-
async fn apply_change(&self, db_pool: &PgPool, table_name: &str) -> Result<()> {
572+
async fn apply_change(&self, db_pool: &PgPool, table_id: &TableId) -> Result<()> {
573+
let table_name = qualified_table_name(table_id);
574+
557575
if self.actions.table_action.drop_existing {
558576
sqlx::query(&format!("DROP TABLE IF EXISTS {table_name}"))
559577
.execute(db_pool)
@@ -571,6 +589,12 @@ impl SetupChange {
571589
if let Some(table_upsertion) = &self.actions.table_action.table_upsertion {
572590
match table_upsertion {
573591
TableUpsertionAction::Create { keys, values } => {
592+
// Create schema if specified
593+
if let Some(schema) = &table_id.schema {
594+
let sql = format!("CREATE SCHEMA IF NOT EXISTS \"{}\"", schema);
595+
sqlx::query(&sql).execute(db_pool).await?;
596+
}
597+
574598
let mut fields = (keys
575599
.iter()
576600
.map(|(name, typ)| format!("\"{name}\" {typ} NOT NULL")))
@@ -637,8 +661,18 @@ impl TargetFactoryBase for TargetFactory {
637661
let data_coll_output = data_collections
638662
.into_iter()
639663
.map(|d| {
664+
// Validate: if schema is specified, table_name must be explicit
665+
if d.spec.schema.is_some() && d.spec.table_name.is_none() {
666+
bail!(
667+
"Postgres target '{}': when 'schema' is specified, 'table_name' must also be explicitly provided. \
668+
Auto-generated table names are not supported with custom schemas",
669+
d.name
670+
);
671+
}
672+
640673
let table_id = TableId {
641674
database: d.spec.database.clone(),
675+
schema: d.spec.schema.clone(),
642676
table_name: d.spec.table_name.unwrap_or_else(|| {
643677
utils::db::sanitize_identifier(&format!(
644678
"{}__{}",
@@ -652,15 +686,15 @@ impl TargetFactoryBase for TargetFactory {
652686
&d.value_fields_schema,
653687
&d.index_options,
654688
);
655-
let table_name = table_id.table_name.clone();
689+
let table_id_clone = table_id.clone();
656690
let db_ref = d.spec.database;
657691
let auth_registry = context.auth_registry.clone();
658692
let export_context = Box::pin(async move {
659693
let db_pool = get_db_pool(db_ref.as_ref(), &auth_registry).await?;
660694
let export_context = Arc::new(ExportContext::new(
661695
db_ref,
662696
db_pool.clone(),
663-
table_name,
697+
&table_id_clone,
664698
d.key_fields_schema,
665699
d.value_fields_schema,
666700
)?);
@@ -698,7 +732,7 @@ impl TargetFactoryBase for TargetFactory {
698732
}
699733

700734
fn describe_resource(&self, key: &TableId) -> Result<String> {
701-
Ok(format!("Postgres table {}", key.table_name))
735+
Ok(format!("Postgres table {}", key))
702736
}
703737

704738
async fn apply_mutation(
@@ -745,7 +779,7 @@ impl TargetFactoryBase for TargetFactory {
745779
let db_pool = get_db_pool(change.key.database.as_ref(), &context.auth_registry).await?;
746780
change
747781
.setup_change
748-
.apply_change(&db_pool, &change.key.table_name)
782+
.apply_change(&db_pool, &change.key)
749783
.await?;
750784
}
751785
Ok(())

0 commit comments

Comments
 (0)