@@ -2,11 +2,11 @@ use etl_config::shared::PipelineConfig;
2
2
use etl_postgres:: replication:: slots:: get_slot_name;
3
3
use etl_postgres:: replication:: worker:: WorkerType ;
4
4
use etl_postgres:: types:: TableId ;
5
- use futures:: { FutureExt , StreamExt } ;
5
+ use futures:: StreamExt ;
6
6
use metrics:: histogram;
7
7
use postgres_replication:: protocol;
8
8
use postgres_replication:: protocol:: { LogicalReplicationMessage , ReplicationMessage } ;
9
- use std:: future:: { Future , pending } ;
9
+ use std:: future:: Future ;
10
10
use std:: pin:: Pin ;
11
11
use std:: sync:: Arc ;
12
12
use std:: time:: { Duration , Instant } ;
@@ -552,9 +552,9 @@ where
552
552
553
553
// PRIORITY 3: Handle table synchronization coordination signals.
554
554
// Table sync workers signal when they complete initial data copying and are ready
555
- // to transition to continuous replication mode. We use map_or_else with pending()
556
- // to make this branch optional - if no signal receiver exists, this branch never fires .
557
- _ = force_syncing_tables_rx. as_mut( ) . map_or_else ( || pending ( ) . boxed ( ) , |rx| rx . changed ( ) . boxed ( ) ) => {
555
+ // to transition to continuous replication mode. Guard the branch so it stays
556
+ // dormant if no signal receiver was provided .
557
+ _ = force_syncing_tables_rx. as_mut( ) . unwrap ( ) . changed ( ) , if force_syncing_tables_rx . is_some ( ) => {
558
558
// Table state transitions can only occur at transaction boundaries to maintain consistency.
559
559
// If we're in the middle of processing a transaction (remote_final_lsn is set),
560
560
// we defer the sync processing until the current transaction completes.
0 commit comments