Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 33 additions & 23 deletions src/ops/sources/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,17 @@ struct PostgresTableSchema {
ordinal_field_schema: Option<FieldSchemaInfo>,
}

struct NotificationContext {
channel_name: String,
function_name: String,
trigger_name: String,
}

struct PostgresSourceExecutor {
db_pool: PgPool,
table_name: String,
table_schema: PostgresTableSchema,
notification: Option<NotificationSpec>,
notification_ctx: Option<NotificationContext>,
}

impl PostgresSourceExecutor {
Expand Down Expand Up @@ -495,32 +501,23 @@ impl SourceExecutor for PostgresSourceExecutor {
async fn change_stream(
&self,
) -> Result<Option<BoxStream<'async_trait, Result<SourceChangeMessage>>>> {
let Some(notification_spec) = &self.notification else {
let Some(notification_ctx) = &self.notification_ctx else {
return Ok(None);
};

let channel_name = notification_spec
.channel_name
.as_ref()
.ok_or_else(|| anyhow::anyhow!("channel_name is required for change_stream"))?;
let function_name = format!("notify__{channel_name}");
let trigger_name = format!("{function_name}__trigger");

// Create the notification function
self.create_notification_function(&function_name, channel_name, &trigger_name)
.await?;
// Create the notification channel
self.create_notification_function(notification_ctx).await?;

// Set up listener
let mut listener = PgListener::connect_with(&self.db_pool).await?;
listener.listen(&channel_name).await?;
listener.listen(&notification_ctx.channel_name).await?;

let stream = try_stream! {
let stream = stream! {
while let Ok(notification) = listener.recv().await {
let change = self.parse_notification_payload(&notification)?;
yield SourceChangeMessage {
let change = self.parse_notification_payload(&notification);
yield change.map(|change| SourceChangeMessage {
changes: vec![change],
ack_fn: None,
};
});
}
};

Expand All @@ -535,10 +532,12 @@ impl SourceExecutor for PostgresSourceExecutor {
impl PostgresSourceExecutor {
async fn create_notification_function(
&self,
function_name: &str,
channel_name: &str,
trigger_name: &str,
notification_ctx: &NotificationContext,
) -> Result<()> {
let channel_name = &notification_ctx.channel_name;
let function_name = &notification_ctx.function_name;
let trigger_name = &notification_ctx.trigger_name;

let json_object_expr = |var: &str| {
let mut fields = (self.table_schema.primary_key_columns.iter())
.chain(self.table_schema.ordinal_field_schema.iter())
Expand Down Expand Up @@ -774,7 +773,7 @@ impl SourceFactoryBase for Factory {

async fn build_executor(
self: Arc<Self>,
_source_name: &str,
source_name: &str,
spec: Spec,
context: Arc<FlowInstanceContext>,
) -> Result<Box<dyn SourceExecutor>> {
Expand All @@ -789,11 +788,22 @@ impl SourceFactoryBase for Factory {
)
.await?;

let notification_ctx = spec.notification.map(|spec| {
let channel_name = spec
.channel_name
.unwrap_or_else(|| format!("coco_{}__{}", context.flow_instance_name, source_name));
NotificationContext {
function_name: format!("{channel_name}_n"),
trigger_name: format!("{channel_name}_t"),
channel_name,
}
});

let executor = PostgresSourceExecutor {
db_pool,
table_name: spec.table_name.clone(),
table_schema,
notification: spec.notification.clone(),
notification_ctx,
};

Ok(Box::new(executor))
Expand Down
Loading