Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions docs/docs/ops/sources.md
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,27 @@ The spec takes the following fields:
* `included_columns` (`list[str]`, optional): non-primary-key columns to include. If not specified, all non-PK columns are included.
* `ordinal_column` (`str`, optional): to specify a non-primary-key column used for change tracking and ordering, e.g. can be a modified timestamp or a monotonic version number. Supported types are integer-like (`bigint`/`integer`) and timestamps (`timestamp`, `timestamptz`).
`ordinal_column` must not be a primary key column.
* `notification` (`cocoindex.sources.PostgresNotification`, optional): when present, enable change capture based on Postgres LISTEN/NOTIFY. It has the following fields:
* `channel_name` (`str`, optional): the Postgres notification channel to listen on. CocoIndex will automatically create the channel with the given name. If omitted, CocoIndex uses `{flow_name}__{source_name}__cocoindex`.

:::info

If `notification` is provided, CocoIndex listens for row changes using Postgres LISTEN/NOTIFY and creates the required database objects on demand when the flow starts listening:

- Function to create notification message: `{channel_name}_n`.
- Trigger to react to table changes: `{channel_name}_t` on the specified `table_name`.

Creation is automatic when listening begins.

Currently CocoIndex doesn't automatically clean up these objects when the flow is dropped (unlike targets)
It's usually OK to leave them as they are, but if you want to clean them up, you can run the following SQL statements to manually drop them:

```sql
DROP TRIGGER IF EXISTS {channel_name}_t ON "{table_name}";
DROP FUNCTION IF EXISTS {channel_name}_n();
```

:::

### Schema

Expand Down
6 changes: 3 additions & 3 deletions src/ops/sources/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -789,9 +789,9 @@ impl SourceFactoryBase for Factory {
.await?;

let notification_ctx = spec.notification.map(|spec| {
let channel_name = spec
.channel_name
.unwrap_or_else(|| format!("coco_{}__{}", context.flow_instance_name, source_name));
let channel_name = spec.channel_name.unwrap_or_else(|| {
format!("{}__{}__cocoindex", context.flow_instance_name, source_name)
});
NotificationContext {
function_name: format!("{channel_name}_n"),
trigger_name: format!("{channel_name}_t"),
Expand Down
Loading