@@ -53,14 +53,14 @@ export interface LogicalReplicationClientOptions {
5353 leaderLockExtendIntervalMs ?: number ;
5454
5555 /**
56- * The number of times to retry acquiring the leader lock (default: 120 )
56+ * The interval in ms to retry acquiring the leader lock (default: 500 )
5757 */
58- leaderLockRetryCount ?: number ;
58+ leaderLockRetryIntervalMs ?: number ;
5959
6060 /**
61- * The interval in ms to retry acquiring the leader lock (default: 500 )
61+ * The additional time in ms to retry acquiring the leader lock (default: 1000ms )
6262 */
63- leaderLockRetryIntervalMs ?: number ;
63+ leaderLockAcquireAdditionalTimeMs ?: number ;
6464
6565 /**
6666 * The interval in seconds to automatically acknowledge the last LSN if no ack has been sent (default: 10)
@@ -97,7 +97,7 @@ export class LogicalReplicationClient {
9797 private lastAcknowledgedLsn : string | null = null ;
9898 private leaderLockTimeoutMs : number ;
9999 private leaderLockExtendIntervalMs : number ;
100- private leaderLockRetryCount : number ;
100+ private leaderLockAcquireAdditionalTimeMs : number ;
101101 private leaderLockRetryIntervalMs : number ;
102102 private leaderLockHeartbeatTimer : NodeJS . Timeout | null = null ;
103103 private ackIntervalSeconds : number ;
@@ -124,7 +124,7 @@ export class LogicalReplicationClient {
124124
125125 this . leaderLockTimeoutMs = options . leaderLockTimeoutMs ?? 30000 ;
126126 this . leaderLockExtendIntervalMs = options . leaderLockExtendIntervalMs ?? 10000 ;
127- this . leaderLockRetryCount = options . leaderLockRetryCount ?? 120 ;
127+ this . leaderLockAcquireAdditionalTimeMs = options . leaderLockAcquireAdditionalTimeMs ?? 1000 ;
128128 this . leaderLockRetryIntervalMs = options . leaderLockRetryIntervalMs ?? 500 ;
129129 this . ackIntervalSeconds = options . ackIntervalSeconds ?? 10 ;
130130
@@ -578,34 +578,74 @@ export class LogicalReplicationClient {
578578 }
579579
580580 async #acquireLeaderLock( ) : Promise < boolean > {
581- try {
582- this . leaderLock = await this . redlock . acquire (
583- [ `logical-replication-client:${ this . options . name } ` ] ,
584- this . leaderLockTimeoutMs ,
585- {
586- retryCount : this . leaderLockRetryCount ,
587- retryDelay : this . leaderLockRetryIntervalMs ,
588- }
589- ) ;
590- } catch ( err ) {
591- this . logger . error ( "Leader election failed" , {
592- name : this . options . name ,
593- table : this . options . table ,
594- slotName : this . options . slotName ,
595- publicationName : this . options . publicationName ,
596- retryCount : this . leaderLockRetryCount ,
597- retryIntervalMs : this . leaderLockRetryIntervalMs ,
598- error : err ,
599- } ) ;
581+ const startTime = Date . now ( ) ;
582+ const maxWaitTime = this . leaderLockTimeoutMs + this . leaderLockAcquireAdditionalTimeMs ;
600583
601- return false ;
584+ this . logger . debug ( "Acquiring leader lock" , {
585+ name : this . options . name ,
586+ slotName : this . options . slotName ,
587+ publicationName : this . options . publicationName ,
588+ maxWaitTime,
589+ } ) ;
590+
591+ let attempt = 0 ;
592+
593+ while ( Date . now ( ) - startTime < maxWaitTime ) {
594+ try {
595+ this . leaderLock = await this . redlock . acquire (
596+ [ `logical-replication-client:${ this . options . name } ` ] ,
597+ this . leaderLockTimeoutMs
598+ ) ;
599+
600+ this . logger . debug ( "Acquired leader lock" , {
601+ name : this . options . name ,
602+ slotName : this . options . slotName ,
603+ publicationName : this . options . publicationName ,
604+ lockTimeoutMs : this . leaderLockTimeoutMs ,
605+ lockExtendIntervalMs : this . leaderLockExtendIntervalMs ,
606+ lock : this . leaderLock ,
607+ attempt,
608+ } ) ;
609+ return true ;
610+ } catch ( err ) {
611+ attempt ++ ;
612+
613+ this . logger . debug ( "Failed to acquire leader lock, retrying" , {
614+ name : this . options . name ,
615+ slotName : this . options . slotName ,
616+ publicationName : this . options . publicationName ,
617+ attempt,
618+ retryIntervalMs : this . leaderLockRetryIntervalMs ,
619+ error : err ,
620+ } ) ;
621+
622+ await new Promise ( ( resolve ) => setTimeout ( resolve , this . leaderLockRetryIntervalMs ) ) ;
623+ }
602624 }
603625
604- return true ;
626+ this . logger . error ( "Leader election failed after retries" , {
627+ name : this . options . name ,
628+ table : this . options . table ,
629+ slotName : this . options . slotName ,
630+ publicationName : this . options . publicationName ,
631+ totalAttempts : attempt ,
632+ totalWaitTimeMs : Date . now ( ) - startTime ,
633+ } ) ;
634+ return false ;
605635 }
606636
607637 async #releaseLeaderLock( ) {
608638 if ( ! this . leaderLock ) return ;
639+
640+ this . logger . debug ( "Releasing leader lock" , {
641+ name : this . options . name ,
642+ slotName : this . options . slotName ,
643+ publicationName : this . options . publicationName ,
644+ lockTimeoutMs : this . leaderLockTimeoutMs ,
645+ lockExtendIntervalMs : this . leaderLockExtendIntervalMs ,
646+ lock : this . leaderLock ,
647+ } ) ;
648+
609649 const [ releaseError ] = await tryCatch ( this . leaderLock . release ( ) ) ;
610650 this . leaderLock = null ;
611651
@@ -631,13 +671,19 @@ export class LogicalReplicationClient {
631671 name : this . options . name ,
632672 slotName : this . options . slotName ,
633673 publicationName : this . options . publicationName ,
674+ lockTimeoutMs : this . leaderLockTimeoutMs ,
675+ lockExtendIntervalMs : this . leaderLockExtendIntervalMs ,
676+ lock : this . leaderLock ,
634677 } ) ;
635678 } catch ( err ) {
636679 this . logger . error ( "Failed to extend leader lock" , {
637680 name : this . options . name ,
638681 slotName : this . options . slotName ,
639682 publicationName : this . options . publicationName ,
640683 error : err ,
684+ lockTimeoutMs : this . leaderLockTimeoutMs ,
685+ lockExtendIntervalMs : this . leaderLockExtendIntervalMs ,
686+ lock : this . leaderLock ,
641687 } ) ;
642688 // Optionally emit an error or handle loss of leadership
643689 this . events . emit ( "error" , err instanceof Error ? err : new Error ( String ( err ) ) ) ;
0 commit comments