Skip to content

Commit dbcae67

Browse files
committed
Fix indexing regression and bug fixes for grouping criteria
Signed-off-by: RS146BIJAY <[email protected]>
1 parent b7f013f commit dbcae67

File tree

8 files changed

+102
-243
lines changed

8 files changed

+102
-243
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
100100
- Fixed handling of property index in BulkRequest during deserialization ([#20132](https://github.com/opensearch-project/OpenSearch/pull/20132))
101101
- Fix negative CPU usage values in node stats ([#19120](https://github.com/opensearch-project/OpenSearch/issues/19120))
102102
- Fix duplicate registration of FieldDataCache dynamic setting ([20140](https://github.com/opensearch-project/OpenSearch/pull/20140))
103+
- Fix toBuilder method in EngineConfig to include mergedSegmentTransferTracker([20105](https://github.com/opensearch-project/OpenSearch/pull/20105))
104+
- Fix indexing regression and bug fixes for grouping criteria. ([20145](https://github.com/opensearch-project/OpenSearch/pull/20145))
103105

104106
### Dependencies
105107
- Bump Apache Lucene from 10.3.1 to 10.3.2 ([#20026](https://github.com/opensearch-project/OpenSearch/pull/20026))

server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@
8787
import org.opensearch.index.IndexingPressureService;
8888
import org.opensearch.index.SegmentReplicationPressureService;
8989
import org.opensearch.index.engine.Engine;
90-
import org.opensearch.index.engine.LookupMapLockAcquisitionException;
9190
import org.opensearch.index.engine.VersionConflictEngineException;
9291
import org.opensearch.index.get.GetResult;
9392
import org.opensearch.index.mapper.MapperException;
@@ -728,15 +727,7 @@ && isConflictException(executionResult.getFailure().getCause())
728727
&& context.getRetryCounter() < ((UpdateRequest) docWriteRequest).retryOnConflict()) {
729728
context.resetForExecutionForRetry();
730729
return;
731-
} else if (isFailed
732-
&& context.getPrimary() != null
733-
&& context.getPrimary().indexSettings() != null
734-
&& context.getPrimary().indexSettings().isContextAwareEnabled()
735-
&& isLookupMapLockAcquisitionException(executionResult.getFailure().getCause())
736-
&& context.getRetryCounter() < context.getPrimary().indexSettings().getMaxRetryOnLookupMapAcquisitionException()) {
737-
context.resetForExecutionForRetry();
738-
return;
739-
}
730+
}
740731
final BulkItemResponse response;
741732
if (isUpdate) {
742733
response = processUpdateResponse((UpdateRequest) docWriteRequest, context.getConcreteIndex(), executionResult, updateResult);
@@ -765,10 +756,6 @@ private static boolean isConflictException(final Exception e) {
765756
return ExceptionsHelper.unwrapCause(e) instanceof VersionConflictEngineException;
766757
}
767758

768-
private static boolean isLookupMapLockAcquisitionException(final Exception e) {
769-
return ExceptionsHelper.unwrapCause(e) instanceof LookupMapLockAcquisitionException;
770-
}
771-
772759
/**
773760
* Creates a new bulk item result from the given requests and result of performing the update operation on the shard.
774761
*/

server/src/main/java/org/opensearch/index/IndexSettings.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -498,9 +498,9 @@ public static IndexMergePolicy fromString(String text) {
498498
*/
499499
public static final Setting<Integer> INDEX_MAX_RETRY_ON_LOOKUP_MAP_LOCK_ACQUISITION_EXCEPTION = Setting.intSetting(
500500
"index.context_aware.max_retry_on_lookup_map_acquisition_exception",
501-
15,
502-
5,
503501
100,
502+
5,
503+
500,
504504
Setting.Property.IndexScope,
505505
Property.Dynamic
506506
);

server/src/main/java/org/opensearch/index/engine/CompositeIndexWriter.java

Lines changed: 50 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,13 @@
4040
import java.io.Closeable;
4141
import java.io.IOException;
4242
import java.util.ArrayList;
43-
import java.util.Collection;
4443
import java.util.Collections;
4544
import java.util.List;
4645
import java.util.Map;
4746
import java.util.UUID;
4847
import java.util.concurrent.atomic.AtomicBoolean;
4948
import java.util.concurrent.locks.Lock;
5049
import java.util.concurrent.locks.ReentrantReadWriteLock;
51-
import java.util.stream.Collectors;
5250

5351
import static org.opensearch.index.BucketedCompositeDirectory.CHILD_DIRECTORY_PREFIX;
5452

@@ -209,12 +207,12 @@ public CriteriaBasedIndexWriterLookup getLookupMap() {
209207
* each refresh cycle.
210208
*
211209
*/
212-
public static final class CriteriaBasedIndexWriterLookup implements Closeable {
210+
public static class CriteriaBasedIndexWriterLookup implements Closeable {
213211
private final Map<String, DisposableIndexWriter> criteriaBasedIndexWriterMap;
214212
private final Map<BytesRef, DeleteEntry> lastDeleteEntrySet;
215213
private final Map<BytesRef, String> criteria;
216214
private final ReentrantReadWriteLock mapLock;
217-
private final CriteriaBasedWriterLock mapReadLock;
215+
CriteriaBasedWriterLock mapReadLock;
218216
private final ReleasableLock mapWriteLock;
219217
private final long version;
220218
private boolean closed;
@@ -300,7 +298,7 @@ public boolean isClosed() {
300298
return closed;
301299
}
302300

303-
private static final class CriteriaBasedWriterLock implements Releasable {
301+
static class CriteriaBasedWriterLock implements Releasable {
304302
private final Lock lock;
305303
// a per-thread count indicating how many times the thread has entered the lock; only works if assertions are enabled
306304
private final ThreadLocal<Integer> holdingThreads;
@@ -405,9 +403,9 @@ public Term getTerm() {
405403
*
406404
* @opensearch.internal
407405
*/
408-
final static class LiveIndexWriterDeletesMap {
406+
static class LiveIndexWriterDeletesMap {
409407
// All writes (adds and deletes) go into here:
410-
final CriteriaBasedIndexWriterLookup current;
408+
CriteriaBasedIndexWriterLookup current;
411409

412410
// Used while refresh is running, and to hold adds/deletes until refresh finishes. We read from both current and old on lookup:
413411
final CriteriaBasedIndexWriterLookup old;
@@ -468,36 +466,41 @@ String getCriteriaForDoc(BytesRef key) {
468466
DisposableIndexWriter computeIndexWriterIfAbsentForCriteria(
469467
String criteria,
470468
CheckedBiFunction<String, CriteriaBasedIndexWriterLookup, DisposableIndexWriter, IOException> indexWriterSupplier,
471-
ShardId shardId
469+
ShardId shardId,
470+
int maxRetryOnLookupMapAcquisitionException
472471
) {
473472
boolean success = false;
474473
CriteriaBasedIndexWriterLookup current = null;
475474
try {
476-
current = getCurrentMap();
475+
int counter = 0;
476+
while ((current == null || current.isClosed()) && counter < maxRetryOnLookupMapAcquisitionException) {
477+
// This function acquires a first read lock on a map which does not have any write lock present. Current keeps
478+
// on getting rotated during refresh, so there will be one current on which read lock can be obtained.
479+
// Validate that no write lock is applied on the map and the map is not closed. Idea here is write lock was
480+
// never applied on this map as write lock gets only during closing time. We are doing this instead of acquire,
481+
// because acquire can also apply a read lock in case refresh completed and map is closed.
482+
current = this.current.mapReadLock.tryAcquire();
483+
if (current != null && current.isClosed() == true) {
484+
current.mapReadLock.close();
485+
current = null;
486+
}
487+
488+
++counter;
489+
}
490+
477491
if (current == null || current.isClosed()) {
478492
throw new LookupMapLockAcquisitionException(shardId, "Unable to obtain lock on the current Lookup map", null);
479493
}
480-
481494
DisposableIndexWriter writer = current.computeIndexWriterIfAbsentForCriteria(criteria, indexWriterSupplier);
482495
success = true;
483496
return writer;
484497
} finally {
485-
if (success == false && current != null) {
486-
assert current.mapReadLock.isHeldByCurrentThread() == true;
498+
if (success == false && current != null && current.mapReadLock.isHeldByCurrentThread() == true) {
487499
current.mapReadLock.close();
488500
}
489501
}
490502
}
491503

492-
// This function acquires a first read lock on a map which does not have any write lock present. Current keeps
493-
// on getting rotated during refresh, so there will be one current on which read lock can be obtained.
494-
// Validate that no write lock is applied on the map and the map is not closed. Idea here is write lock was
495-
// never applied on this map as write lock gets only during closing time. We are doing this instead of acquire,
496-
// because acquire can also apply a read lock in case refresh completed and map is closed.
497-
CriteriaBasedIndexWriterLookup getCurrentMap() {
498-
return current.mapReadLock.tryAcquire();
499-
}
500-
501504
// Used for Test Case.
502505
ReleasableLock acquireCurrentWriteLock() {
503506
return current.mapWriteLock.acquire();
@@ -672,7 +675,8 @@ DisposableIndexWriter computeIndexWriterIfAbsentForCriteria(
672675
return currentLiveIndexWriterDeletesMap.computeIndexWriterIfAbsentForCriteria(
673676
criteria,
674677
indexWriterSupplier,
675-
engineConfig.getShardId()
678+
engineConfig.getShardId(),
679+
engineConfig.getIndexSettings().getMaxRetryOnLookupMapAcquisitionException()
676680
);
677681
}
678682

@@ -684,12 +688,8 @@ public Map<String, DisposableIndexWriter> getMarkForRefreshIndexWriterMap() {
684688
public long getFlushingBytes() {
685689
ensureOpen();
686690
long flushingBytes = 0;
687-
Collection<IndexWriter> currentWriterSet = liveIndexWriterDeletesMap.current.criteriaBasedIndexWriterMap.values()
688-
.stream()
689-
.map(DisposableIndexWriter::getIndexWriter)
690-
.collect(Collectors.toSet());
691-
for (IndexWriter currentWriter : currentWriterSet) {
692-
flushingBytes += currentWriter.getFlushingBytes();
691+
for (DisposableIndexWriter disposableIndexWriter : liveIndexWriterDeletesMap.current.criteriaBasedIndexWriterMap.values()) {
692+
flushingBytes += disposableIndexWriter.getIndexWriter().getFlushingBytes();
693693
}
694694

695695
return flushingBytes + accumulatingIndexWriter.getFlushingBytes();
@@ -699,13 +699,8 @@ public long getFlushingBytes() {
699699
public long getPendingNumDocs() {
700700
ensureOpen();
701701
long pendingNumDocs = 0;
702-
Collection<IndexWriter> currentWriterSet = liveIndexWriterDeletesMap.current.criteriaBasedIndexWriterMap.values()
703-
.stream()
704-
.map(DisposableIndexWriter::getIndexWriter)
705-
.collect(Collectors.toSet());
706-
;
707-
for (IndexWriter currentWriter : currentWriterSet) {
708-
pendingNumDocs += currentWriter.getPendingNumDocs();
702+
for (DisposableIndexWriter disposableIndexWriter : liveIndexWriterDeletesMap.current.criteriaBasedIndexWriterMap.values()) {
703+
pendingNumDocs += disposableIndexWriter.getIndexWriter().getPendingNumDocs();
709704
}
710705

711706
// TODO: Should we add docs for old writer as well?
@@ -733,24 +728,17 @@ public boolean hasUncommittedChanges() {
733728

734729
@Override
735730
public Throwable getTragicException() {
736-
Collection<IndexWriter> currentWriterSet = liveIndexWriterDeletesMap.current.criteriaBasedIndexWriterMap.values()
737-
.stream()
738-
.map(DisposableIndexWriter::getIndexWriter)
739-
.collect(Collectors.toSet());
740-
for (IndexWriter writer : currentWriterSet) {
741-
if (writer.isOpen() == false && writer.getTragicException() != null) {
742-
return writer.getTragicException();
731+
for (DisposableIndexWriter disposableIndexWriter : liveIndexWriterDeletesMap.current.criteriaBasedIndexWriterMap.values()) {
732+
if (disposableIndexWriter.getIndexWriter().isOpen() == false
733+
&& disposableIndexWriter.getIndexWriter().getTragicException() != null) {
734+
return disposableIndexWriter.getIndexWriter().getTragicException();
743735
}
744736
}
745737

746-
Collection<IndexWriter> oldWriterSet = liveIndexWriterDeletesMap.old.criteriaBasedIndexWriterMap.values()
747-
.stream()
748-
.map(DisposableIndexWriter::getIndexWriter)
749-
.collect(Collectors.toSet());
750-
;
751-
for (IndexWriter writer : oldWriterSet) {
752-
if (writer.isOpen() == false && writer.getTragicException() != null) {
753-
return writer.getTragicException();
738+
for (DisposableIndexWriter disposableIndexWriter : liveIndexWriterDeletesMap.old.criteriaBasedIndexWriterMap.values()) {
739+
if (disposableIndexWriter.getIndexWriter().isOpen() == false
740+
&& disposableIndexWriter.getIndexWriter().getTragicException() != null) {
741+
return disposableIndexWriter.getIndexWriter().getTragicException();
754742
}
755743
}
756744

@@ -765,27 +753,19 @@ public Throwable getTragicException() {
765753
public final long ramBytesUsed() {
766754
ensureOpen();
767755
long ramBytesUsed = 0;
768-
Collection<IndexWriter> currentWriterSet = liveIndexWriterDeletesMap.current.criteriaBasedIndexWriterMap.values()
769-
.stream()
770-
.map(DisposableIndexWriter::getIndexWriter)
771-
.collect(Collectors.toSet());
772756

773757
try (ReleasableLock ignore = liveIndexWriterDeletesMap.current.mapWriteLock.acquire()) {
774-
for (IndexWriter indexWriter : currentWriterSet) {
775-
if (indexWriter.isOpen() == true) {
776-
ramBytesUsed += indexWriter.ramBytesUsed();
758+
for (DisposableIndexWriter disposableIndexWriter : liveIndexWriterDeletesMap.current.criteriaBasedIndexWriterMap.values()) {
759+
if (disposableIndexWriter.getIndexWriter().isOpen() == true) {
760+
ramBytesUsed += disposableIndexWriter.getIndexWriter().ramBytesUsed();
777761
}
778762
}
779763
}
780764

781-
Collection<IndexWriter> oldWriterSet = liveIndexWriterDeletesMap.old.criteriaBasedIndexWriterMap.values()
782-
.stream()
783-
.map(DisposableIndexWriter::getIndexWriter)
784-
.collect(Collectors.toSet());
785765
try (ReleasableLock ignore = liveIndexWriterDeletesMap.old.mapWriteLock.acquire()) {
786-
for (IndexWriter indexWriter : oldWriterSet) {
787-
if (indexWriter.isOpen() == true) {
788-
ramBytesUsed += indexWriter.ramBytesUsed();
766+
for (DisposableIndexWriter disposableIndexWriter : liveIndexWriterDeletesMap.old.criteriaBasedIndexWriterMap.values()) {
767+
if (disposableIndexWriter.getIndexWriter().isOpen() == true) {
768+
ramBytesUsed += disposableIndexWriter.getIndexWriter().ramBytesUsed();
789769
}
790770
}
791771
}
@@ -813,24 +793,15 @@ public final synchronized Iterable<Map.Entry<String, String>> getLiveCommitData(
813793

814794
public void rollback() throws IOException {
815795
if (shouldClose()) {
816-
Collection<IndexWriter> currentWriterSet = liveIndexWriterDeletesMap.current.criteriaBasedIndexWriterMap.values()
817-
.stream()
818-
.map(DisposableIndexWriter::getIndexWriter)
819-
.collect(Collectors.toSet());
820-
821-
for (IndexWriter indexWriter : currentWriterSet) {
822-
if (indexWriter.isOpen() == true) {
823-
indexWriter.rollback();
796+
for (DisposableIndexWriter disposableIndexWriter : liveIndexWriterDeletesMap.current.criteriaBasedIndexWriterMap.values()) {
797+
if (disposableIndexWriter.getIndexWriter().isOpen() == true) {
798+
disposableIndexWriter.getIndexWriter().rollback();
824799
}
825800
}
826801

827-
Collection<IndexWriter> oldWriterSet = liveIndexWriterDeletesMap.old.criteriaBasedIndexWriterMap.values()
828-
.stream()
829-
.map(DisposableIndexWriter::getIndexWriter)
830-
.collect(Collectors.toSet());
831-
for (IndexWriter indexWriter : oldWriterSet) {
832-
if (indexWriter.isOpen() == true) {
833-
indexWriter.rollback();
802+
for (DisposableIndexWriter disposableIndexWriter : liveIndexWriterDeletesMap.old.criteriaBasedIndexWriterMap.values()) {
803+
if (disposableIndexWriter.getIndexWriter().isOpen() == true) {
804+
disposableIndexWriter.getIndexWriter().rollback();
834805
}
835806
}
836807

server/src/main/java/org/opensearch/index/mapper/MapperService.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@
8484
import java.util.function.BooleanSupplier;
8585
import java.util.function.Function;
8686
import java.util.function.Supplier;
87+
import java.util.stream.Collectors;
8788

8889
import static java.util.Collections.emptyMap;
8990
import static java.util.Collections.unmodifiableMap;
@@ -690,7 +691,9 @@ public boolean isCompositeIndexPresent() {
690691
}
691692

692693
public Set<CompositeMappedFieldType> getCompositeFieldTypes() {
693-
return compositeMappedFieldTypes;
694+
return compositeMappedFieldTypes.stream()
695+
.filter(compositeMappedFieldType -> compositeMappedFieldType instanceof StarTreeMapper.StarTreeFieldType)
696+
.collect(Collectors.toSet());
694697
}
695698

696699
private Set<CompositeMappedFieldType> getCompositeFieldTypesFromMapper() {

0 commit comments

Comments
 (0)