4040import java .io .Closeable ;
4141import java .io .IOException ;
4242import java .util .ArrayList ;
43- import java .util .Collection ;
4443import java .util .Collections ;
4544import java .util .List ;
4645import java .util .Map ;
4746import java .util .UUID ;
4847import java .util .concurrent .atomic .AtomicBoolean ;
4948import java .util .concurrent .locks .Lock ;
5049import java .util .concurrent .locks .ReentrantReadWriteLock ;
51- import java .util .stream .Collectors ;
5250
5351import static org .opensearch .index .BucketedCompositeDirectory .CHILD_DIRECTORY_PREFIX ;
5452
@@ -209,12 +207,12 @@ public CriteriaBasedIndexWriterLookup getLookupMap() {
209207 * each refresh cycle.
210208 *
211209 */
212- public static final class CriteriaBasedIndexWriterLookup implements Closeable {
210+ public static class CriteriaBasedIndexWriterLookup implements Closeable {
213211 private final Map <String , DisposableIndexWriter > criteriaBasedIndexWriterMap ;
214212 private final Map <BytesRef , DeleteEntry > lastDeleteEntrySet ;
215213 private final Map <BytesRef , String > criteria ;
216214 private final ReentrantReadWriteLock mapLock ;
217- private final CriteriaBasedWriterLock mapReadLock ;
215+ CriteriaBasedWriterLock mapReadLock ;
218216 private final ReleasableLock mapWriteLock ;
219217 private final long version ;
220218 private boolean closed ;
@@ -300,7 +298,7 @@ public boolean isClosed() {
300298 return closed ;
301299 }
302300
303- private static final class CriteriaBasedWriterLock implements Releasable {
301+ static class CriteriaBasedWriterLock implements Releasable {
304302 private final Lock lock ;
305303 // a per-thread count indicating how many times the thread has entered the lock; only works if assertions are enabled
306304 private final ThreadLocal <Integer > holdingThreads ;
@@ -405,9 +403,9 @@ public Term getTerm() {
405403 *
406404 * @opensearch.internal
407405 */
408- final static class LiveIndexWriterDeletesMap {
406+ static class LiveIndexWriterDeletesMap {
409407 // All writes (adds and deletes) go into here:
410- final CriteriaBasedIndexWriterLookup current ;
408+ CriteriaBasedIndexWriterLookup current ;
411409
412410 // Used while refresh is running, and to hold adds/deletes until refresh finishes. We read from both current and old on lookup:
413411 final CriteriaBasedIndexWriterLookup old ;
@@ -468,36 +466,41 @@ String getCriteriaForDoc(BytesRef key) {
468466 DisposableIndexWriter computeIndexWriterIfAbsentForCriteria (
469467 String criteria ,
470468 CheckedBiFunction <String , CriteriaBasedIndexWriterLookup , DisposableIndexWriter , IOException > indexWriterSupplier ,
471- ShardId shardId
469+ ShardId shardId ,
470+ int maxRetryOnLookupMapAcquisitionException
472471 ) {
473472 boolean success = false ;
474473 CriteriaBasedIndexWriterLookup current = null ;
475474 try {
476- current = getCurrentMap ();
475+ int counter = 0 ;
476+ while ((current == null || current .isClosed ()) && counter < maxRetryOnLookupMapAcquisitionException ) {
477+ // This function acquires a first read lock on a map which does not have any write lock present. Current keeps
478+ // on getting rotated during refresh, so there will be one current on which read lock can be obtained.
479+ // Validate that no write lock is applied on the map and the map is not closed. Idea here is write lock was
480+ // never applied on this map as write lock gets only during closing time. We are doing this instead of acquire,
481+ // because acquire can also apply a read lock in case refresh completed and map is closed.
482+ current = this .current .mapReadLock .tryAcquire ();
483+ if (current != null && current .isClosed () == true ) {
484+ current .mapReadLock .close ();
485+ current = null ;
486+ }
487+
488+ ++counter ;
489+ }
490+
477491 if (current == null || current .isClosed ()) {
478492 throw new LookupMapLockAcquisitionException (shardId , "Unable to obtain lock on the current Lookup map" , null );
479493 }
480-
481494 DisposableIndexWriter writer = current .computeIndexWriterIfAbsentForCriteria (criteria , indexWriterSupplier );
482495 success = true ;
483496 return writer ;
484497 } finally {
485- if (success == false && current != null ) {
486- assert current .mapReadLock .isHeldByCurrentThread () == true ;
498+ if (success == false && current != null && current .mapReadLock .isHeldByCurrentThread () == true ) {
487499 current .mapReadLock .close ();
488500 }
489501 }
490502 }
491503
492- // This function acquires a first read lock on a map which does not have any write lock present. Current keeps
493- // on getting rotated during refresh, so there will be one current on which read lock can be obtained.
494- // Validate that no write lock is applied on the map and the map is not closed. Idea here is write lock was
495- // never applied on this map as write lock gets only during closing time. We are doing this instead of acquire,
496- // because acquire can also apply a read lock in case refresh completed and map is closed.
497- CriteriaBasedIndexWriterLookup getCurrentMap () {
498- return current .mapReadLock .tryAcquire ();
499- }
500-
501504 // Used for Test Case.
502505 ReleasableLock acquireCurrentWriteLock () {
503506 return current .mapWriteLock .acquire ();
@@ -672,7 +675,8 @@ DisposableIndexWriter computeIndexWriterIfAbsentForCriteria(
672675 return currentLiveIndexWriterDeletesMap .computeIndexWriterIfAbsentForCriteria (
673676 criteria ,
674677 indexWriterSupplier ,
675- engineConfig .getShardId ()
678+ engineConfig .getShardId (),
679+ engineConfig .getIndexSettings ().getMaxRetryOnLookupMapAcquisitionException ()
676680 );
677681 }
678682
@@ -684,12 +688,8 @@ public Map<String, DisposableIndexWriter> getMarkForRefreshIndexWriterMap() {
684688 public long getFlushingBytes () {
685689 ensureOpen ();
686690 long flushingBytes = 0 ;
687- Collection <IndexWriter > currentWriterSet = liveIndexWriterDeletesMap .current .criteriaBasedIndexWriterMap .values ()
688- .stream ()
689- .map (DisposableIndexWriter ::getIndexWriter )
690- .collect (Collectors .toSet ());
691- for (IndexWriter currentWriter : currentWriterSet ) {
692- flushingBytes += currentWriter .getFlushingBytes ();
691+ for (DisposableIndexWriter disposableIndexWriter : liveIndexWriterDeletesMap .current .criteriaBasedIndexWriterMap .values ()) {
692+ flushingBytes += disposableIndexWriter .getIndexWriter ().getFlushingBytes ();
693693 }
694694
695695 return flushingBytes + accumulatingIndexWriter .getFlushingBytes ();
@@ -699,13 +699,8 @@ public long getFlushingBytes() {
699699 public long getPendingNumDocs () {
700700 ensureOpen ();
701701 long pendingNumDocs = 0 ;
702- Collection <IndexWriter > currentWriterSet = liveIndexWriterDeletesMap .current .criteriaBasedIndexWriterMap .values ()
703- .stream ()
704- .map (DisposableIndexWriter ::getIndexWriter )
705- .collect (Collectors .toSet ());
706- ;
707- for (IndexWriter currentWriter : currentWriterSet ) {
708- pendingNumDocs += currentWriter .getPendingNumDocs ();
702+ for (DisposableIndexWriter disposableIndexWriter : liveIndexWriterDeletesMap .current .criteriaBasedIndexWriterMap .values ()) {
703+ pendingNumDocs += disposableIndexWriter .getIndexWriter ().getPendingNumDocs ();
709704 }
710705
711706 // TODO: Should we add docs for old writer as well?
@@ -733,24 +728,17 @@ public boolean hasUncommittedChanges() {
733728
734729 @ Override
735730 public Throwable getTragicException () {
736- Collection <IndexWriter > currentWriterSet = liveIndexWriterDeletesMap .current .criteriaBasedIndexWriterMap .values ()
737- .stream ()
738- .map (DisposableIndexWriter ::getIndexWriter )
739- .collect (Collectors .toSet ());
740- for (IndexWriter writer : currentWriterSet ) {
741- if (writer .isOpen () == false && writer .getTragicException () != null ) {
742- return writer .getTragicException ();
731+ for (DisposableIndexWriter disposableIndexWriter : liveIndexWriterDeletesMap .current .criteriaBasedIndexWriterMap .values ()) {
732+ if (disposableIndexWriter .getIndexWriter ().isOpen () == false
733+ && disposableIndexWriter .getIndexWriter ().getTragicException () != null ) {
734+ return disposableIndexWriter .getIndexWriter ().getTragicException ();
743735 }
744736 }
745737
746- Collection <IndexWriter > oldWriterSet = liveIndexWriterDeletesMap .old .criteriaBasedIndexWriterMap .values ()
747- .stream ()
748- .map (DisposableIndexWriter ::getIndexWriter )
749- .collect (Collectors .toSet ());
750- ;
751- for (IndexWriter writer : oldWriterSet ) {
752- if (writer .isOpen () == false && writer .getTragicException () != null ) {
753- return writer .getTragicException ();
738+ for (DisposableIndexWriter disposableIndexWriter : liveIndexWriterDeletesMap .old .criteriaBasedIndexWriterMap .values ()) {
739+ if (disposableIndexWriter .getIndexWriter ().isOpen () == false
740+ && disposableIndexWriter .getIndexWriter ().getTragicException () != null ) {
741+ return disposableIndexWriter .getIndexWriter ().getTragicException ();
754742 }
755743 }
756744
@@ -765,27 +753,19 @@ public Throwable getTragicException() {
765753 public final long ramBytesUsed () {
766754 ensureOpen ();
767755 long ramBytesUsed = 0 ;
768- Collection <IndexWriter > currentWriterSet = liveIndexWriterDeletesMap .current .criteriaBasedIndexWriterMap .values ()
769- .stream ()
770- .map (DisposableIndexWriter ::getIndexWriter )
771- .collect (Collectors .toSet ());
772756
773757 try (ReleasableLock ignore = liveIndexWriterDeletesMap .current .mapWriteLock .acquire ()) {
774- for (IndexWriter indexWriter : currentWriterSet ) {
775- if (indexWriter .isOpen () == true ) {
776- ramBytesUsed += indexWriter .ramBytesUsed ();
758+ for (DisposableIndexWriter disposableIndexWriter : liveIndexWriterDeletesMap . current . criteriaBasedIndexWriterMap . values () ) {
759+ if (disposableIndexWriter . getIndexWriter () .isOpen () == true ) {
760+ ramBytesUsed += disposableIndexWriter . getIndexWriter () .ramBytesUsed ();
777761 }
778762 }
779763 }
780764
781- Collection <IndexWriter > oldWriterSet = liveIndexWriterDeletesMap .old .criteriaBasedIndexWriterMap .values ()
782- .stream ()
783- .map (DisposableIndexWriter ::getIndexWriter )
784- .collect (Collectors .toSet ());
785765 try (ReleasableLock ignore = liveIndexWriterDeletesMap .old .mapWriteLock .acquire ()) {
786- for (IndexWriter indexWriter : oldWriterSet ) {
787- if (indexWriter .isOpen () == true ) {
788- ramBytesUsed += indexWriter .ramBytesUsed ();
766+ for (DisposableIndexWriter disposableIndexWriter : liveIndexWriterDeletesMap . old . criteriaBasedIndexWriterMap . values () ) {
767+ if (disposableIndexWriter . getIndexWriter () .isOpen () == true ) {
768+ ramBytesUsed += disposableIndexWriter . getIndexWriter () .ramBytesUsed ();
789769 }
790770 }
791771 }
@@ -813,24 +793,15 @@ public final synchronized Iterable<Map.Entry<String, String>> getLiveCommitData(
813793
814794 public void rollback () throws IOException {
815795 if (shouldClose ()) {
816- Collection <IndexWriter > currentWriterSet = liveIndexWriterDeletesMap .current .criteriaBasedIndexWriterMap .values ()
817- .stream ()
818- .map (DisposableIndexWriter ::getIndexWriter )
819- .collect (Collectors .toSet ());
820-
821- for (IndexWriter indexWriter : currentWriterSet ) {
822- if (indexWriter .isOpen () == true ) {
823- indexWriter .rollback ();
796+ for (DisposableIndexWriter disposableIndexWriter : liveIndexWriterDeletesMap .current .criteriaBasedIndexWriterMap .values ()) {
797+ if (disposableIndexWriter .getIndexWriter ().isOpen () == true ) {
798+ disposableIndexWriter .getIndexWriter ().rollback ();
824799 }
825800 }
826801
827- Collection <IndexWriter > oldWriterSet = liveIndexWriterDeletesMap .old .criteriaBasedIndexWriterMap .values ()
828- .stream ()
829- .map (DisposableIndexWriter ::getIndexWriter )
830- .collect (Collectors .toSet ());
831- for (IndexWriter indexWriter : oldWriterSet ) {
832- if (indexWriter .isOpen () == true ) {
833- indexWriter .rollback ();
802+ for (DisposableIndexWriter disposableIndexWriter : liveIndexWriterDeletesMap .old .criteriaBasedIndexWriterMap .values ()) {
803+ if (disposableIndexWriter .getIndexWriter ().isOpen () == true ) {
804+ disposableIndexWriter .getIndexWriter ().rollback ();
834805 }
835806 }
836807
0 commit comments