diff --git a/src/ops/sources/postgres.rs b/src/ops/sources/postgres.rs index ac467454e..10f42be3e 100644 --- a/src/ops/sources/postgres.rs +++ b/src/ops/sources/postgres.rs @@ -44,11 +44,17 @@ struct PostgresTableSchema { ordinal_field_schema: Option, } +struct NotificationContext { + channel_name: String, + function_name: String, + trigger_name: String, +} + struct PostgresSourceExecutor { db_pool: PgPool, table_name: String, table_schema: PostgresTableSchema, - notification: Option, + notification_ctx: Option, } impl PostgresSourceExecutor { @@ -495,32 +501,23 @@ impl SourceExecutor for PostgresSourceExecutor { async fn change_stream( &self, ) -> Result>>> { - let Some(notification_spec) = &self.notification else { + let Some(notification_ctx) = &self.notification_ctx else { return Ok(None); }; - - let channel_name = notification_spec - .channel_name - .as_ref() - .ok_or_else(|| anyhow::anyhow!("channel_name is required for change_stream"))?; - let function_name = format!("notify__{channel_name}"); - let trigger_name = format!("{function_name}__trigger"); - - // Create the notification function - self.create_notification_function(&function_name, channel_name, &trigger_name) - .await?; + // Create the notification channel + self.create_notification_function(notification_ctx).await?; // Set up listener let mut listener = PgListener::connect_with(&self.db_pool).await?; - listener.listen(&channel_name).await?; + listener.listen(¬ification_ctx.channel_name).await?; - let stream = try_stream! { + let stream = stream! { while let Ok(notification) = listener.recv().await { - let change = self.parse_notification_payload(¬ification)?; - yield SourceChangeMessage { + let change = self.parse_notification_payload(¬ification); + yield change.map(|change| SourceChangeMessage { changes: vec![change], ack_fn: None, - }; + }); } }; @@ -535,10 +532,12 @@ impl SourceExecutor for PostgresSourceExecutor { impl PostgresSourceExecutor { async fn create_notification_function( &self, - function_name: &str, - channel_name: &str, - trigger_name: &str, + notification_ctx: &NotificationContext, ) -> Result<()> { + let channel_name = ¬ification_ctx.channel_name; + let function_name = ¬ification_ctx.function_name; + let trigger_name = ¬ification_ctx.trigger_name; + let json_object_expr = |var: &str| { let mut fields = (self.table_schema.primary_key_columns.iter()) .chain(self.table_schema.ordinal_field_schema.iter()) @@ -774,7 +773,7 @@ impl SourceFactoryBase for Factory { async fn build_executor( self: Arc, - _source_name: &str, + source_name: &str, spec: Spec, context: Arc, ) -> Result> { @@ -789,11 +788,22 @@ impl SourceFactoryBase for Factory { ) .await?; + let notification_ctx = spec.notification.map(|spec| { + let channel_name = spec + .channel_name + .unwrap_or_else(|| format!("coco_{}__{}", context.flow_instance_name, source_name)); + NotificationContext { + function_name: format!("{channel_name}_n"), + trigger_name: format!("{channel_name}_t"), + channel_name, + } + }); + let executor = PostgresSourceExecutor { db_pool, table_name: spec.table_name.clone(), table_schema, - notification: spec.notification.clone(), + notification_ctx, }; Ok(Box::new(executor))