diff --git a/docs/docs/ops/sources.md b/docs/docs/ops/sources.md index 1a936b93c..bce063e93 100644 --- a/docs/docs/ops/sources.md +++ b/docs/docs/ops/sources.md @@ -313,6 +313,27 @@ The spec takes the following fields: * `included_columns` (`list[str]`, optional): non-primary-key columns to include. If not specified, all non-PK columns are included. * `ordinal_column` (`str`, optional): to specify a non-primary-key column used for change tracking and ordering, e.g. can be a modified timestamp or a monotonic version number. Supported types are integer-like (`bigint`/`integer`) and timestamps (`timestamp`, `timestamptz`). `ordinal_column` must not be a primary key column. +* `notification` (`cocoindex.sources.PostgresNotification`, optional): when present, enable change capture based on Postgres LISTEN/NOTIFY. It has the following fields: + * `channel_name` (`str`, optional): the Postgres notification channel to listen on. CocoIndex will automatically create the channel with the given name. If omitted, CocoIndex uses `{flow_name}__{source_name}__cocoindex`. + + :::info + + If `notification` is provided, CocoIndex listens for row changes using Postgres LISTEN/NOTIFY and creates the required database objects on demand when the flow starts listening: + + - Function to create notification message: `{channel_name}_n`. + - Trigger to react to table changes: `{channel_name}_t` on the specified `table_name`. + + Creation is automatic when listening begins. + + Currently CocoIndex doesn't automatically clean up these objects when the flow is dropped (unlike targets) + It's usually OK to leave them as they are, but if you want to clean them up, you can run the following SQL statements to manually drop them: + + ```sql + DROP TRIGGER IF EXISTS {channel_name}_t ON "{table_name}"; + DROP FUNCTION IF EXISTS {channel_name}_n(); + ``` + + ::: ### Schema diff --git a/src/ops/sources/postgres.rs b/src/ops/sources/postgres.rs index 10f42be3e..303cbe9d7 100644 --- a/src/ops/sources/postgres.rs +++ b/src/ops/sources/postgres.rs @@ -789,9 +789,9 @@ impl SourceFactoryBase for Factory { .await?; let notification_ctx = spec.notification.map(|spec| { - let channel_name = spec - .channel_name - .unwrap_or_else(|| format!("coco_{}__{}", context.flow_instance_name, source_name)); + let channel_name = spec.channel_name.unwrap_or_else(|| { + format!("{}__{}__cocoindex", context.flow_instance_name, source_name) + }); NotificationContext { function_name: format!("{channel_name}_n"), trigger_name: format!("{channel_name}_t"),