Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 54 additions & 16 deletions docs/docs/sources/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ The `Postgres` source imports rows from a PostgreSQL table.

### Setup for PostgreSQL

* Ensure the table exists and has a primary key. Tables without a primary key are not supported.
* Grant the connecting user read permissions on the target table (e.g. `SELECT`).
* Provide a database connection. You can:
* Use CocoIndex's default database connection, or
* Provide an explicit connection via a transient auth entry referencing a `DatabaseConnectionSpec` with a `url`, for example:
* Ensure the table exists and has a primary key. Tables without a primary key are not supported.
* Grant the connecting user read permissions on the target table (e.g. `SELECT`).
* Provide a database connection. You can:
* Use CocoIndex's default database connection, or
* Provide an explicit connection via a transient auth entry referencing a `DatabaseConnectionSpec` with a `url`, for example:

```python
cocoindex.add_transient_auth_entry(
Expand All @@ -26,23 +26,38 @@ The `Postgres` source imports rows from a PostgreSQL table.

The spec takes the following fields:

* `table_name` (`str`): the PostgreSQL table to read from.
* `database` (`cocoindex.TransientAuthEntryReference[DatabaseConnectionSpec]`, optional): database connection reference. If not provided, the default CocoIndex database is used.
* `included_columns` (`list[str]`, optional): non-primary-key columns to include. If not specified, all non-PK columns are included.
* `ordinal_column` (`str`, optional): to specify a non-primary-key column used for change tracking and ordering, e.g. can be a modified timestamp or a monotonic version number. Supported types are integer-like (`bigint`/`integer`) and timestamps (`timestamp`, `timestamptz`).
* `table_name` (`str`): the PostgreSQL table to read from.
* `database` (`cocoindex.TransientAuthEntryReference[DatabaseConnectionSpec]`, optional): database connection reference. If not provided, the default CocoIndex database is used.
* `included_columns` (`list[str]`, optional): non-primary-key columns to include. If not specified, all non-PK columns are included.
* `ordinal_column` (`str`, optional): to specify a non-primary-key column used for change tracking and ordering, e.g. can be a modified timestamp or a monotonic version number. Supported types are integer-like (`bigint`/`integer`) and timestamps (`timestamp`, `timestamptz`).
`ordinal_column` must not be a primary key column.
* `notification` (`cocoindex.sources.PostgresNotification`, optional): when present, enable change capture based on Postgres LISTEN/NOTIFY. It has the following fields:
* `channel_name` (`str`, optional): the Postgres notification channel to listen on. CocoIndex will automatically create the channel with the given name. If omitted, CocoIndex uses `{flow_name}__{source_name}__cocoindex`.

* `filter` (`str`, optional): arbitrary SQL boolean expression to filter rows. Only rows satisfying this condition will be included. For example: `"age > 18"`, `"status = 'active'"`, or `"created_at > '2023-01-01'"`. The expression is added as a WHERE clause to the SQL queries.

:::info

The `filter` expression is inserted directly into SQL queries. Ensure that:
* The expression uses valid PostgreSQL syntax
* Column names and values are properly quoted if needed
* The expression evaluates to a boolean result
* You trust the source of the filter expression to avoid SQL injection

:::

* `notification` (`cocoindex.sources.PostgresNotification`, optional): when present, enable change capture based on Postgres LISTEN/NOTIFY. It has the following fields:
* `channel_name` (`str`, optional): the Postgres notification channel to listen on. CocoIndex will automatically create the channel with the given name. If omitted, CocoIndex uses `{flow_name}__{source_name}__cocoindex`.

:::info

If `notification` is provided, CocoIndex listens for row changes using Postgres LISTEN/NOTIFY and creates the required database objects on demand when the flow starts listening:

- Function to create notification message: `{channel_name}_n`.
- Trigger to react to table changes: `{channel_name}_t` on the specified `table_name`.
* Function to create notification message: `{channel_name}_n`.
* Trigger to react to table changes: `{channel_name}_t` on the specified `table_name`.

Creation is automatic when listening begins.

:::info

Currently CocoIndex doesn't automatically clean up these objects when the flow is dropped (unlike targets)
It's usually OK to leave them as they are, but if you want to clean them up, you can run the following SQL statements to manually drop them:

Expand All @@ -57,10 +72,33 @@ The spec takes the following fields:

The output is a [*KTable*](/docs/core/data_types#ktable) with straightforward 1 to 1 mapping from Postgres table columns to CocoIndex table fields:

* Key fields: All primary key columns in the Postgres table will be included automatically as key fields.
* Value fields: All non-primary-key columns in the Postgres table (included by `included_columns` or all when not specified) appear as value fields.
* Key fields: All primary key columns in the Postgres table will be included automatically as key fields.
* Value fields: All non-primary-key columns in the Postgres table (included by `included_columns` or all when not specified) appear as value fields.

### Example

An example of using `filter` to filter rows:

```python
data_scope["products"] = flow_builder.add_source(
cocoindex.sources.Postgres(
table_name="source_products",
# Optional. Use the default CocoIndex database if not specified.
database=cocoindex.add_transient_auth_entry(
cocoindex.DatabaseConnectionSpec(
url=os.environ["SOURCE_DATABASE_URL"],
)
),
# Optional
ordinal_column="modified_time",
# Optional
filter="amount > 0",
# Optional
notification=cocoindex.sources.PostgresNotification(),
),
)
```

You can find end-to-end example using Postgres source at:
* [examples/postgres_source](https://github.com/cocoindex-io/cocoindex/tree/main/examples/postgres_source)

* [examples/postgres_source](https://github.com/cocoindex-io/cocoindex/tree/main/examples/postgres_source)
3 changes: 3 additions & 0 deletions python/cocoindex/sources/_engine_builtin_specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,6 @@ class Postgres(op.SourceSpec):

# Optional: when set, supports change capture from PostgreSQL notification.
notification: PostgresNotification | None = None

# Optional: SQL expression filter for rows (arbitrary SQL boolean expression)
filter: str | None = None
52 changes: 34 additions & 18 deletions src/ops/sources/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use indoc::formatdoc;
use sqlx::postgres::types::PgInterval;
use sqlx::postgres::{PgListener, PgNotification};
use sqlx::{PgPool, Row};
use std::fmt::Write;

type PgValueDecoder = fn(&sqlx::postgres::PgRow, usize) -> Result<Value>;

Expand All @@ -34,6 +35,8 @@ pub struct Spec {
ordinal_column: Option<String>,
/// Optional: notification for change capture
notification: Option<NotificationSpec>,
/// Optional: WHERE clause filter for rows (arbitrary SQL boolean expression)
filter: Option<String>,
}

#[derive(Clone)]
Expand All @@ -55,6 +58,7 @@ struct PostgresSourceExecutor {
table_name: String,
table_schema: PostgresTableSchema,
notification_ctx: Option<NotificationContext>,
filter: Option<String>,
}

impl PostgresSourceExecutor {
Expand Down Expand Up @@ -400,25 +404,29 @@ impl SourceExecutor for PostgresSourceExecutor {
&self,
options: &SourceExecutorReadOptions,
) -> Result<BoxStream<'async_trait, Result<Vec<PartialSourceRow>>>> {
let stream = try_stream! {
// Build selection including PKs (for keys), and optionally values and ordinal
let pk_columns: Vec<String> = self
.table_schema
.primary_key_columns
.iter()
.map(|col| format!("\"{}\"", col.schema.name))
.collect();
let pk_count = pk_columns.len();
let mut select_parts = pk_columns;
let ordinal_col_index = self.build_selected_columns(&mut select_parts, options);

let mut query = format!("SELECT {} FROM \"{}\"", select_parts.join(", "), self.table_name);

// Add ordering by ordinal column if specified
if let Some(ord_schema) = &self.table_schema.ordinal_field_schema {
query.push_str(&format!(" ORDER BY \"{}\"", ord_schema.schema.name));
}
// Build selection including PKs (for keys), and optionally values and ordinal
let pk_columns: Vec<String> = self
.table_schema
.primary_key_columns
.iter()
.map(|col| format!("\"{}\"", col.schema.name))
.collect();
let pk_count = pk_columns.len();
let mut select_parts = pk_columns;
let ordinal_col_index = self.build_selected_columns(&mut select_parts, options);

let mut query = format!(
"SELECT {} FROM \"{}\"",
select_parts.join(", "),
self.table_name
);

// Add WHERE filter if specified
if let Some(where_clause) = &self.filter {
write!(&mut query, " WHERE {}", where_clause)?;
}

let stream = try_stream! {
let mut rows = sqlx::query(&query).fetch(&self.db_pool);
while let Some(row) = rows.try_next().await? {
// Decode key from PKs (selected first)
Expand Down Expand Up @@ -485,6 +493,13 @@ impl SourceExecutor for PostgresSourceExecutor {
bind_key_field(&mut qb, key_value)?;
}

// Add WHERE filter if specified
if let Some(where_clause) = &self.filter {
qb.push(" AND (");
qb.push(where_clause);
qb.push(")");
}

let row_opt = qb.build().fetch_optional(&self.db_pool).await?;
let data = match &row_opt {
Some(row) => self.decode_row_data(&row, options, ordinal_col_index, 0)?,
Expand Down Expand Up @@ -804,6 +819,7 @@ impl SourceFactoryBase for Factory {
table_name: spec.table_name.clone(),
table_schema,
notification_ctx,
filter: spec.filter,
};

Ok(Box::new(executor))
Expand Down