@@ -2209,83 +2209,94 @@ protected void flushHoldingLock(boolean force, boolean waitIfOngoing, ActionList
22092209            throw  new  IllegalArgumentException (message );
22102210        }
22112211        final  long  generation ;
2212-         if  (flushLock .tryLock () == false ) {
2213-             // if we can't get the lock right away we block if needed otherwise barf 
2214-             if  (waitIfOngoing  == false ) {
2215-                 logger .trace ("detected an in-flight flush, not blocking to wait for it's completion" );
2216-                 listener .onResponse (FlushResult .NO_FLUSH );
2217-                 return ;
2218-             }
2219-             logger .trace ("waiting for in-flight flush to finish" );
2220-             flushLock .lock ();
2221-             logger .trace ("acquired flush lock after blocking" );
2222-         } else  {
2223-             logger .trace ("acquired flush lock immediately" );
2224-         }
22252212
2226-         final  long  startTime  = System .nanoTime ();
2213+         // Acquire an engine read lock before the flush lock. If we were not acquiring a read lock here, a concurrent engine reset could 
2214+         // hold the engine write lock and later be blocked waiting for the flush lock (still holding the write lock), while the current 
2215+         // thread could be blocked waiting for the write lock to be released (and therefore never release the flush lock). 
2216+         final  var  engineReadLock  = engineConfig .getEngineResetLock ().readLock ();
2217+         engineReadLock .lock ();
22272218        try  {
2228-             // Only flush if (1) Lucene has uncommitted docs, or (2) forced by caller, or (3) the 
2229-             // newly created commit points to a different translog generation (can free translog), 
2230-             // or (4) the local checkpoint information in the last commit is stale, which slows down future recoveries. 
2231-             boolean  hasUncommittedChanges  = hasUncommittedChanges ();
2232-             if  (hasUncommittedChanges 
2233-                 || force 
2234-                 || shouldPeriodicallyFlush ()
2235-                 || getProcessedLocalCheckpoint () > Long .parseLong (
2236-                     lastCommittedSegmentInfos .userData .get (SequenceNumbers .LOCAL_CHECKPOINT_KEY )
2237-                 )) {
2238-                 ensureCanFlush ();
2239-                 Translog .Location  commitLocation  = getTranslogLastWriteLocation ();
2240-                 try  {
2241-                     translog .rollGeneration ();
2242-                     logger .trace ("starting commit for flush; commitTranslog=true" );
2243-                     long  lastFlushTimestamp  = relativeTimeInNanosSupplier .getAsLong ();
2244-                     // Pre-emptively recording the upcoming segment generation so that the live version map archive records 
2245-                     // the correct segment generation for doc IDs that go to the archive while a flush is happening. Otherwise, 
2246-                     // if right after committing the IndexWriter new docs get indexed/updated and a refresh moves them to the archive, 
2247-                     // we clear them from the archive once we see that segment generation on the search shards, but those changes 
2248-                     // were not included in the commit since they happened right after it. 
2249-                     preCommitSegmentGeneration .set (lastCommittedSegmentInfos .getGeneration () + 1 );
2250-                     commitIndexWriter (indexWriter , translog );
2251-                     logger .trace ("finished commit for flush" );
2252-                     // we need to refresh in order to clear older version values 
2253-                     refresh ("version_table_flush" , SearcherScope .INTERNAL , true );
2254-                     translog .trimUnreferencedReaders ();
2255-                     // Update the translog location for flushListener if (1) the writeLocation has changed during the flush and 
2256-                     // (2) indexWriter has committed all the changes (checks must be done in this order). 
2257-                     // If the indexWriter has uncommitted changes, they will be flushed by the next flush as intended. 
2258-                     final  Translog .Location  writeLocationAfterFlush  = translog .getLastWriteLocation ();
2259-                     if  (writeLocationAfterFlush .equals (commitLocation ) == false  && hasUncommittedChanges () == false ) {
2260-                         assert  writeLocationAfterFlush .compareTo (commitLocation ) > 0  : writeLocationAfterFlush  + " <= "  + commitLocation ;
2261-                         commitLocation  = writeLocationAfterFlush ;
2262-                     }
2263-                     // Use the timestamp from when the flush started, but only update it in case of success, so that any exception in 
2264-                     // the above lines would not lead the engine to think that it recently flushed, when it did not. 
2265-                     this .lastFlushTimestamp  = lastFlushTimestamp ;
2266-                 } catch  (AlreadyClosedException  e ) {
2267-                     failOnTragicEvent (e );
2268-                     throw  e ;
2269-                 } catch  (Exception  e ) {
2270-                     throw  new  FlushFailedEngineException (shardId , e );
2219+             if  (flushLock .tryLock () == false ) {
2220+                 // if we can't get the lock right away we block if needed otherwise barf 
2221+                 if  (waitIfOngoing  == false ) {
2222+                     logger .trace ("detected an in-flight flush, not blocking to wait for it's completion" );
2223+                     listener .onResponse (FlushResult .NO_FLUSH );
2224+                     return ;
22712225                }
2272-                 refreshLastCommittedSegmentInfos ( );
2273-                 generation  =  lastCommittedSegmentInfos . getGeneration ();
2274-                 flushListener . afterFlush ( generation ,  commitLocation );
2226+                 logger . trace ( "waiting for in-flight flush to finish" );
2227+                 flushLock . lock ();
2228+                 logger . trace ( "acquired flush lock after blocking" );
22752229            } else  {
2276-                 generation  = lastCommittedSegmentInfos .getGeneration ();
2230+                 logger .trace ("acquired flush lock immediately" );
2231+             }
2232+ 
2233+             final  long  startTime  = System .nanoTime ();
2234+             try  {
2235+                 // Only flush if (1) Lucene has uncommitted docs, or (2) forced by caller, or (3) the 
2236+                 // newly created commit points to a different translog generation (can free translog), 
2237+                 // or (4) the local checkpoint information in the last commit is stale, which slows down future recoveries. 
2238+                 boolean  hasUncommittedChanges  = hasUncommittedChanges ();
2239+                 if  (hasUncommittedChanges 
2240+                     || force 
2241+                     || shouldPeriodicallyFlush ()
2242+                     || getProcessedLocalCheckpoint () > Long .parseLong (
2243+                         lastCommittedSegmentInfos .userData .get (SequenceNumbers .LOCAL_CHECKPOINT_KEY )
2244+                     )) {
2245+                     ensureCanFlush ();
2246+                     Translog .Location  commitLocation  = getTranslogLastWriteLocation ();
2247+                     try  {
2248+                         translog .rollGeneration ();
2249+                         logger .trace ("starting commit for flush; commitTranslog=true" );
2250+                         long  lastFlushTimestamp  = relativeTimeInNanosSupplier .getAsLong ();
2251+                         // Pre-emptively recording the upcoming segment generation so that the live version map archive records 
2252+                         // the correct segment generation for doc IDs that go to the archive while a flush is happening. Otherwise, 
2253+                         // if right after committing the IndexWriter new docs get indexed/updated and a refresh moves them to the archive, 
2254+                         // we clear them from the archive once we see that segment generation on the search shards, but those changes 
2255+                         // were not included in the commit since they happened right after it. 
2256+                         preCommitSegmentGeneration .set (lastCommittedSegmentInfos .getGeneration () + 1 );
2257+                         commitIndexWriter (indexWriter , translog );
2258+                         logger .trace ("finished commit for flush" );
2259+                         // we need to refresh in order to clear older version values 
2260+                         refresh ("version_table_flush" , SearcherScope .INTERNAL , true );
2261+                         translog .trimUnreferencedReaders ();
2262+                         // Update the translog location for flushListener if (1) the writeLocation has changed during the flush and 
2263+                         // (2) indexWriter has committed all the changes (checks must be done in this order). 
2264+                         // If the indexWriter has uncommitted changes, they will be flushed by the next flush as intended. 
2265+                         final  Translog .Location  writeLocationAfterFlush  = translog .getLastWriteLocation ();
2266+                         if  (writeLocationAfterFlush .equals (commitLocation ) == false  && hasUncommittedChanges () == false ) {
2267+                             assert  writeLocationAfterFlush .compareTo (commitLocation ) > 0 
2268+                                 : writeLocationAfterFlush  + " <= "  + commitLocation ;
2269+                             commitLocation  = writeLocationAfterFlush ;
2270+                         }
2271+                         // Use the timestamp from when the flush started, but only update it in case of success, so that any exception in 
2272+                         // the above lines would not lead the engine to think that it recently flushed, when it did not. 
2273+                         this .lastFlushTimestamp  = lastFlushTimestamp ;
2274+                     } catch  (AlreadyClosedException  e ) {
2275+                         failOnTragicEvent (e );
2276+                         throw  e ;
2277+                     } catch  (Exception  e ) {
2278+                         throw  new  FlushFailedEngineException (shardId , e );
2279+                     }
2280+                     refreshLastCommittedSegmentInfos ();
2281+                     generation  = lastCommittedSegmentInfos .getGeneration ();
2282+                     flushListener .afterFlush (generation , commitLocation );
2283+                 } else  {
2284+                     generation  = lastCommittedSegmentInfos .getGeneration ();
2285+                 }
2286+             } catch  (FlushFailedEngineException  ex ) {
2287+                 maybeFailEngine ("flush" , ex );
2288+                 listener .onFailure (ex );
2289+                 return ;
2290+             } catch  (Exception  e ) {
2291+                 listener .onFailure (e );
2292+                 return ;
2293+             } finally  {
2294+                 totalFlushTimeExcludingWaitingOnLock .inc (System .nanoTime () - startTime );
2295+                 flushLock .unlock ();
2296+                 logger .trace ("released flush lock" );
22772297            }
2278-         } catch  (FlushFailedEngineException  ex ) {
2279-             maybeFailEngine ("flush" , ex );
2280-             listener .onFailure (ex );
2281-             return ;
2282-         } catch  (Exception  e ) {
2283-             listener .onFailure (e );
2284-             return ;
22852298        } finally  {
2286-             totalFlushTimeExcludingWaitingOnLock .inc (System .nanoTime () - startTime );
2287-             flushLock .unlock ();
2288-             logger .trace ("released flush lock" );
2299+             engineReadLock .unlock ();
22892300        }
22902301
22912302        afterFlush (generation );
0 commit comments