4040import java .io .Closeable ;
4141import java .io .IOException ;
4242import java .util .ArrayList ;
43- import java .util .Collection ;
4443import java .util .Collections ;
4544import java .util .List ;
4645import java .util .Map ;
4746import java .util .UUID ;
4847import java .util .concurrent .atomic .AtomicBoolean ;
4948import java .util .concurrent .locks .Lock ;
5049import java .util .concurrent .locks .ReentrantReadWriteLock ;
51- import java .util .stream .Collectors ;
5250
5351import static org .opensearch .index .BucketedCompositeDirectory .CHILD_DIRECTORY_PREFIX ;
5452
@@ -209,12 +207,12 @@ public CriteriaBasedIndexWriterLookup getLookupMap() {
209207 * each refresh cycle.
210208 *
211209 */
212- public static final class CriteriaBasedIndexWriterLookup implements Closeable {
210+ public static class CriteriaBasedIndexWriterLookup implements Closeable {
213211 private final Map <String , DisposableIndexWriter > criteriaBasedIndexWriterMap ;
214212 private final Map <BytesRef , DeleteEntry > lastDeleteEntrySet ;
215213 private final Map <BytesRef , String > criteria ;
216214 private final ReentrantReadWriteLock mapLock ;
217- private final CriteriaBasedWriterLock mapReadLock ;
215+ CriteriaBasedWriterLock mapReadLock ;
218216 private final ReleasableLock mapWriteLock ;
219217 private final long version ;
220218 private boolean closed ;
@@ -300,7 +298,7 @@ public boolean isClosed() {
300298 return closed ;
301299 }
302300
303- private static final class CriteriaBasedWriterLock implements Releasable {
301+ static class CriteriaBasedWriterLock implements Releasable {
304302 private final Lock lock ;
305303 // a per-thread count indicating how many times the thread has entered the lock; only works if assertions are enabled
306304 private final ThreadLocal <Integer > holdingThreads ;
@@ -405,9 +403,9 @@ public Term getTerm() {
405403 *
406404 * @opensearch.internal
407405 */
408- final static class LiveIndexWriterDeletesMap {
406+ static class LiveIndexWriterDeletesMap {
409407 // All writes (adds and deletes) go into here:
410- final CriteriaBasedIndexWriterLookup current ;
408+ CriteriaBasedIndexWriterLookup current ;
411409
412410 // Used while refresh is running, and to hold adds/deletes until refresh finishes. We read from both current and old on lookup:
413411 final CriteriaBasedIndexWriterLookup old ;
@@ -468,16 +466,31 @@ String getCriteriaForDoc(BytesRef key) {
468466 DisposableIndexWriter computeIndexWriterIfAbsentForCriteria (
469467 String criteria ,
470468 CheckedBiFunction <String , CriteriaBasedIndexWriterLookup , DisposableIndexWriter , IOException > indexWriterSupplier ,
471- ShardId shardId
469+ ShardId shardId ,
470+ int maxRetryOnLookupMapAcquisitionException
472471 ) {
473472 boolean success = false ;
474473 CriteriaBasedIndexWriterLookup current = null ;
475474 try {
476- current = getCurrentMap ();
475+ int counter = 0 ;
476+ while ((current == null || current .isClosed ()) && counter < maxRetryOnLookupMapAcquisitionException ) {
477+ // This function acquires a first read lock on a map which does not have any write lock present. Current keeps
478+ // on getting rotated during refresh, so there will be one current on which read lock can be obtained.
479+ // Validate that no write lock is applied on the map and the map is not closed. Idea here is write lock was
480+ // never applied on this map as write lock gets only during closing time. We are doing this instead of acquire,
481+ // because acquire can also apply a read lock in case refresh completed and map is closed.
482+ current = this .current .mapReadLock .tryAcquire ();
483+ if (current != null && current .isClosed () == true ) {
484+ current .mapReadLock .close ();
485+ current = null ;
486+ }
487+
488+ ++counter ;
489+ }
490+
477491 if (current == null || current .isClosed ()) {
478492 throw new LookupMapLockAcquisitionException (shardId , "Unable to obtain lock on the current Lookup map" , null );
479493 }
480-
481494 DisposableIndexWriter writer = current .computeIndexWriterIfAbsentForCriteria (criteria , indexWriterSupplier );
482495 success = true ;
483496 return writer ;
@@ -489,15 +502,6 @@ DisposableIndexWriter computeIndexWriterIfAbsentForCriteria(
489502 }
490503 }
491504
492- // This function acquires a first read lock on a map which does not have any write lock present. Current keeps
493- // on getting rotated during refresh, so there will be one current on which read lock can be obtained.
494- // Validate that no write lock is applied on the map and the map is not closed. Idea here is write lock was
495- // never applied on this map as write lock gets only during closing time. We are doing this instead of acquire,
496- // because acquire can also apply a read lock in case refresh completed and map is closed.
497- CriteriaBasedIndexWriterLookup getCurrentMap () {
498- return current .mapReadLock .tryAcquire ();
499- }
500-
501505 // Used for Test Case.
502506 ReleasableLock acquireCurrentWriteLock () {
503507 return current .mapWriteLock .acquire ();
@@ -672,7 +676,8 @@ DisposableIndexWriter computeIndexWriterIfAbsentForCriteria(
672676 return currentLiveIndexWriterDeletesMap .computeIndexWriterIfAbsentForCriteria (
673677 criteria ,
674678 indexWriterSupplier ,
675- engineConfig .getShardId ()
679+ engineConfig .getShardId (),
680+ engineConfig .getIndexSettings ().getMaxRetryOnLookupMapAcquisitionException ()
676681 );
677682 }
678683
@@ -684,12 +689,8 @@ public Map<String, DisposableIndexWriter> getMarkForRefreshIndexWriterMap() {
684689 public long getFlushingBytes () {
685690 ensureOpen ();
686691 long flushingBytes = 0 ;
687- Collection <IndexWriter > currentWriterSet = liveIndexWriterDeletesMap .current .criteriaBasedIndexWriterMap .values ()
688- .stream ()
689- .map (DisposableIndexWriter ::getIndexWriter )
690- .collect (Collectors .toSet ());
691- for (IndexWriter currentWriter : currentWriterSet ) {
692- flushingBytes += currentWriter .getFlushingBytes ();
692+ for (DisposableIndexWriter disposableIndexWriter : liveIndexWriterDeletesMap .current .criteriaBasedIndexWriterMap .values ()) {
693+ flushingBytes += disposableIndexWriter .getIndexWriter ().getFlushingBytes ();
693694 }
694695
695696 return flushingBytes + accumulatingIndexWriter .getFlushingBytes ();
@@ -699,13 +700,8 @@ public long getFlushingBytes() {
699700 public long getPendingNumDocs () {
700701 ensureOpen ();
701702 long pendingNumDocs = 0 ;
702- Collection <IndexWriter > currentWriterSet = liveIndexWriterDeletesMap .current .criteriaBasedIndexWriterMap .values ()
703- .stream ()
704- .map (DisposableIndexWriter ::getIndexWriter )
705- .collect (Collectors .toSet ());
706- ;
707- for (IndexWriter currentWriter : currentWriterSet ) {
708- pendingNumDocs += currentWriter .getPendingNumDocs ();
703+ for (DisposableIndexWriter disposableIndexWriter : liveIndexWriterDeletesMap .current .criteriaBasedIndexWriterMap .values ()) {
704+ pendingNumDocs += disposableIndexWriter .getIndexWriter ().getPendingNumDocs ();
709705 }
710706
711707 // TODO: Should we add docs for old writer as well?
@@ -733,24 +729,17 @@ public boolean hasUncommittedChanges() {
733729
734730 @ Override
735731 public Throwable getTragicException () {
736- Collection <IndexWriter > currentWriterSet = liveIndexWriterDeletesMap .current .criteriaBasedIndexWriterMap .values ()
737- .stream ()
738- .map (DisposableIndexWriter ::getIndexWriter )
739- .collect (Collectors .toSet ());
740- for (IndexWriter writer : currentWriterSet ) {
741- if (writer .isOpen () == false && writer .getTragicException () != null ) {
742- return writer .getTragicException ();
732+ for (DisposableIndexWriter disposableIndexWriter : liveIndexWriterDeletesMap .current .criteriaBasedIndexWriterMap .values ()) {
733+ if (disposableIndexWriter .getIndexWriter ().isOpen () == false
734+ && disposableIndexWriter .getIndexWriter ().getTragicException () != null ) {
735+ return disposableIndexWriter .getIndexWriter ().getTragicException ();
743736 }
744737 }
745738
746- Collection <IndexWriter > oldWriterSet = liveIndexWriterDeletesMap .old .criteriaBasedIndexWriterMap .values ()
747- .stream ()
748- .map (DisposableIndexWriter ::getIndexWriter )
749- .collect (Collectors .toSet ());
750- ;
751- for (IndexWriter writer : oldWriterSet ) {
752- if (writer .isOpen () == false && writer .getTragicException () != null ) {
753- return writer .getTragicException ();
739+ for (DisposableIndexWriter disposableIndexWriter : liveIndexWriterDeletesMap .old .criteriaBasedIndexWriterMap .values ()) {
740+ if (disposableIndexWriter .getIndexWriter ().isOpen () == false
741+ && disposableIndexWriter .getIndexWriter ().getTragicException () != null ) {
742+ return disposableIndexWriter .getIndexWriter ().getTragicException ();
754743 }
755744 }
756745
@@ -765,27 +754,19 @@ public Throwable getTragicException() {
765754 public final long ramBytesUsed () {
766755 ensureOpen ();
767756 long ramBytesUsed = 0 ;
768- Collection <IndexWriter > currentWriterSet = liveIndexWriterDeletesMap .current .criteriaBasedIndexWriterMap .values ()
769- .stream ()
770- .map (DisposableIndexWriter ::getIndexWriter )
771- .collect (Collectors .toSet ());
772757
773758 try (ReleasableLock ignore = liveIndexWriterDeletesMap .current .mapWriteLock .acquire ()) {
774- for (IndexWriter indexWriter : currentWriterSet ) {
775- if (indexWriter .isOpen () == true ) {
776- ramBytesUsed += indexWriter .ramBytesUsed ();
759+ for (DisposableIndexWriter disposableIndexWriter : liveIndexWriterDeletesMap . current . criteriaBasedIndexWriterMap . values () ) {
760+ if (disposableIndexWriter . getIndexWriter () .isOpen () == true ) {
761+ ramBytesUsed += disposableIndexWriter . getIndexWriter () .ramBytesUsed ();
777762 }
778763 }
779764 }
780765
781- Collection <IndexWriter > oldWriterSet = liveIndexWriterDeletesMap .old .criteriaBasedIndexWriterMap .values ()
782- .stream ()
783- .map (DisposableIndexWriter ::getIndexWriter )
784- .collect (Collectors .toSet ());
785766 try (ReleasableLock ignore = liveIndexWriterDeletesMap .old .mapWriteLock .acquire ()) {
786- for (IndexWriter indexWriter : oldWriterSet ) {
787- if (indexWriter .isOpen () == true ) {
788- ramBytesUsed += indexWriter .ramBytesUsed ();
767+ for (DisposableIndexWriter disposableIndexWriter : liveIndexWriterDeletesMap . old . criteriaBasedIndexWriterMap . values () ) {
768+ if (disposableIndexWriter . getIndexWriter () .isOpen () == true ) {
769+ ramBytesUsed += disposableIndexWriter . getIndexWriter () .ramBytesUsed ();
789770 }
790771 }
791772 }
@@ -813,24 +794,15 @@ public final synchronized Iterable<Map.Entry<String, String>> getLiveCommitData(
813794
814795 public void rollback () throws IOException {
815796 if (shouldClose ()) {
816- Collection <IndexWriter > currentWriterSet = liveIndexWriterDeletesMap .current .criteriaBasedIndexWriterMap .values ()
817- .stream ()
818- .map (DisposableIndexWriter ::getIndexWriter )
819- .collect (Collectors .toSet ());
820-
821- for (IndexWriter indexWriter : currentWriterSet ) {
822- if (indexWriter .isOpen () == true ) {
823- indexWriter .rollback ();
797+ for (DisposableIndexWriter disposableIndexWriter : liveIndexWriterDeletesMap .current .criteriaBasedIndexWriterMap .values ()) {
798+ if (disposableIndexWriter .getIndexWriter ().isOpen () == true ) {
799+ disposableIndexWriter .getIndexWriter ().rollback ();
824800 }
825801 }
826802
827- Collection <IndexWriter > oldWriterSet = liveIndexWriterDeletesMap .old .criteriaBasedIndexWriterMap .values ()
828- .stream ()
829- .map (DisposableIndexWriter ::getIndexWriter )
830- .collect (Collectors .toSet ());
831- for (IndexWriter indexWriter : oldWriterSet ) {
832- if (indexWriter .isOpen () == true ) {
833- indexWriter .rollback ();
803+ for (DisposableIndexWriter disposableIndexWriter : liveIndexWriterDeletesMap .old .criteriaBasedIndexWriterMap .values ()) {
804+ if (disposableIndexWriter .getIndexWriter ().isOpen () == true ) {
805+ disposableIndexWriter .getIndexWriter ().rollback ();
834806 }
835807 }
836808
0 commit comments