@@ -44,11 +44,17 @@ struct PostgresTableSchema {
4444 ordinal_field_schema : Option < FieldSchemaInfo > ,
4545}
4646
47+ struct NotificationContext {
48+ channel_name : String ,
49+ function_name : String ,
50+ trigger_name : String ,
51+ }
52+
4753struct PostgresSourceExecutor {
4854 db_pool : PgPool ,
4955 table_name : String ,
5056 table_schema : PostgresTableSchema ,
51- notification : Option < NotificationSpec > ,
57+ notification_ctx : Option < NotificationContext > ,
5258}
5359
5460impl PostgresSourceExecutor {
@@ -495,32 +501,23 @@ impl SourceExecutor for PostgresSourceExecutor {
495501 async fn change_stream (
496502 & self ,
497503 ) -> Result < Option < BoxStream < ' async_trait , Result < SourceChangeMessage > > > > {
498- let Some ( notification_spec ) = & self . notification else {
504+ let Some ( notification_ctx ) = & self . notification_ctx else {
499505 return Ok ( None ) ;
500506 } ;
501-
502- let channel_name = notification_spec
503- . channel_name
504- . as_ref ( )
505- . ok_or_else ( || anyhow:: anyhow!( "channel_name is required for change_stream" ) ) ?;
506- let function_name = format ! ( "notify__{channel_name}" ) ;
507- let trigger_name = format ! ( "{function_name}__trigger" ) ;
508-
509- // Create the notification function
510- self . create_notification_function ( & function_name, channel_name, & trigger_name)
511- . await ?;
507+ // Create the notification channel
508+ self . create_notification_function ( notification_ctx) . await ?;
512509
513510 // Set up listener
514511 let mut listener = PgListener :: connect_with ( & self . db_pool ) . await ?;
515- listener. listen ( & channel_name) . await ?;
512+ listener. listen ( & notification_ctx . channel_name ) . await ?;
516513
517- let stream = try_stream ! {
514+ let stream = stream ! {
518515 while let Ok ( notification) = listener. recv( ) . await {
519- let change = self . parse_notification_payload( & notification) ? ;
520- yield SourceChangeMessage {
516+ let change = self . parse_notification_payload( & notification) ;
517+ yield change . map ( |change| SourceChangeMessage {
521518 changes: vec![ change] ,
522519 ack_fn: None ,
523- } ;
520+ } ) ;
524521 }
525522 } ;
526523
@@ -535,10 +532,12 @@ impl SourceExecutor for PostgresSourceExecutor {
535532impl PostgresSourceExecutor {
536533 async fn create_notification_function (
537534 & self ,
538- function_name : & str ,
539- channel_name : & str ,
540- trigger_name : & str ,
535+ notification_ctx : & NotificationContext ,
541536 ) -> Result < ( ) > {
537+ let channel_name = & notification_ctx. channel_name ;
538+ let function_name = & notification_ctx. function_name ;
539+ let trigger_name = & notification_ctx. trigger_name ;
540+
542541 let json_object_expr = |var : & str | {
543542 let mut fields = ( self . table_schema . primary_key_columns . iter ( ) )
544543 . chain ( self . table_schema . ordinal_field_schema . iter ( ) )
@@ -774,7 +773,7 @@ impl SourceFactoryBase for Factory {
774773
775774 async fn build_executor (
776775 self : Arc < Self > ,
777- _source_name : & str ,
776+ source_name : & str ,
778777 spec : Spec ,
779778 context : Arc < FlowInstanceContext > ,
780779 ) -> Result < Box < dyn SourceExecutor > > {
@@ -789,11 +788,22 @@ impl SourceFactoryBase for Factory {
789788 )
790789 . await ?;
791790
791+ let notification_ctx = spec. notification . map ( |spec| {
792+ let channel_name = spec
793+ . channel_name
794+ . unwrap_or_else ( || format ! ( "coco_{}__{}" , context. flow_instance_name, source_name) ) ;
795+ NotificationContext {
796+ function_name : format ! ( "{channel_name}_n" ) ,
797+ trigger_name : format ! ( "{channel_name}_t" ) ,
798+ channel_name,
799+ }
800+ } ) ;
801+
792802 let executor = PostgresSourceExecutor {
793803 db_pool,
794804 table_name : spec. table_name . clone ( ) ,
795805 table_schema,
796- notification : spec . notification . clone ( ) ,
806+ notification_ctx ,
797807 } ;
798808
799809 Ok ( Box :: new ( executor) )
0 commit comments