Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fixed handling of property index in BulkRequest during deserialization ([#20132](https://github.com/opensearch-project/OpenSearch/pull/20132))
- Fix negative CPU usage values in node stats ([#19120](https://github.com/opensearch-project/OpenSearch/issues/19120))
- Fix duplicate registration of FieldDataCache dynamic setting ([20140](https://github.com/opensearch-project/OpenSearch/pull/20140))
- Fix toBuilder method in EngineConfig to include mergedSegmentTransferTracker([20105](https://github.com/opensearch-project/OpenSearch/pull/20105))
- Fix indexing regression and bug fixes for grouping criteria. ([20145](https://github.com/opensearch-project/OpenSearch/pull/20145))

### Dependencies
- Bump Apache Lucene from 10.3.1 to 10.3.2 ([#20026](https://github.com/opensearch-project/OpenSearch/pull/20026))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@
import org.opensearch.index.IndexingPressureService;
import org.opensearch.index.SegmentReplicationPressureService;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.engine.LookupMapLockAcquisitionException;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.index.get.GetResult;
import org.opensearch.index.mapper.MapperException;
Expand Down Expand Up @@ -728,15 +727,7 @@ && isConflictException(executionResult.getFailure().getCause())
&& context.getRetryCounter() < ((UpdateRequest) docWriteRequest).retryOnConflict()) {
context.resetForExecutionForRetry();
return;
} else if (isFailed
&& context.getPrimary() != null
&& context.getPrimary().indexSettings() != null
&& context.getPrimary().indexSettings().isContextAwareEnabled()
&& isLookupMapLockAcquisitionException(executionResult.getFailure().getCause())
&& context.getRetryCounter() < context.getPrimary().indexSettings().getMaxRetryOnLookupMapAcquisitionException()) {
context.resetForExecutionForRetry();
return;
}
}
final BulkItemResponse response;
if (isUpdate) {
response = processUpdateResponse((UpdateRequest) docWriteRequest, context.getConcreteIndex(), executionResult, updateResult);
Expand Down Expand Up @@ -765,10 +756,6 @@ private static boolean isConflictException(final Exception e) {
return ExceptionsHelper.unwrapCause(e) instanceof VersionConflictEngineException;
}

private static boolean isLookupMapLockAcquisitionException(final Exception e) {
return ExceptionsHelper.unwrapCause(e) instanceof LookupMapLockAcquisitionException;
}

/**
* Creates a new bulk item result from the given requests and result of performing the update operation on the shard.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,9 +498,9 @@ public static IndexMergePolicy fromString(String text) {
*/
public static final Setting<Integer> INDEX_MAX_RETRY_ON_LOOKUP_MAP_LOCK_ACQUISITION_EXCEPTION = Setting.intSetting(
"index.context_aware.max_retry_on_lookup_map_acquisition_exception",
15,
5,
100,
5,
500,
Setting.Property.IndexScope,
Property.Dynamic
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,13 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

import static org.opensearch.index.BucketedCompositeDirectory.CHILD_DIRECTORY_PREFIX;

Expand Down Expand Up @@ -209,12 +207,12 @@ public CriteriaBasedIndexWriterLookup getLookupMap() {
* each refresh cycle.
*
*/
public static final class CriteriaBasedIndexWriterLookup implements Closeable {
public static class CriteriaBasedIndexWriterLookup implements Closeable {
private final Map<String, DisposableIndexWriter> criteriaBasedIndexWriterMap;
private final Map<BytesRef, DeleteEntry> lastDeleteEntrySet;
private final Map<BytesRef, String> criteria;
private final ReentrantReadWriteLock mapLock;
private final CriteriaBasedWriterLock mapReadLock;
CriteriaBasedWriterLock mapReadLock;
private final ReleasableLock mapWriteLock;
private final long version;
private boolean closed;
Expand Down Expand Up @@ -300,7 +298,7 @@ public boolean isClosed() {
return closed;
}

private static final class CriteriaBasedWriterLock implements Releasable {
static class CriteriaBasedWriterLock implements Releasable {
private final Lock lock;
// a per-thread count indicating how many times the thread has entered the lock; only works if assertions are enabled
private final ThreadLocal<Integer> holdingThreads;
Expand Down Expand Up @@ -405,9 +403,9 @@ public Term getTerm() {
*
* @opensearch.internal
*/
final static class LiveIndexWriterDeletesMap {
static class LiveIndexWriterDeletesMap {
// All writes (adds and deletes) go into here:
final CriteriaBasedIndexWriterLookup current;
CriteriaBasedIndexWriterLookup current;

// Used while refresh is running, and to hold adds/deletes until refresh finishes. We read from both current and old on lookup:
final CriteriaBasedIndexWriterLookup old;
Expand Down Expand Up @@ -468,16 +466,31 @@ String getCriteriaForDoc(BytesRef key) {
DisposableIndexWriter computeIndexWriterIfAbsentForCriteria(
String criteria,
CheckedBiFunction<String, CriteriaBasedIndexWriterLookup, DisposableIndexWriter, IOException> indexWriterSupplier,
ShardId shardId
ShardId shardId,
int maxRetryOnLookupMapAcquisitionException
) {
boolean success = false;
CriteriaBasedIndexWriterLookup current = null;
try {
current = getCurrentMap();
int counter = 0;
while ((current == null || current.isClosed()) && counter < maxRetryOnLookupMapAcquisitionException) {
// This function acquires a first read lock on a map which does not have any write lock present. Current keeps
// on getting rotated during refresh, so there will be one current on which read lock can be obtained.
// Validate that no write lock is applied on the map and the map is not closed. Idea here is write lock was
// never applied on this map as write lock gets only during closing time. We are doing this instead of acquire,
// because acquire can also apply a read lock in case refresh completed and map is closed.
current = this.current.mapReadLock.tryAcquire();
if (current != null && current.isClosed() == true) {
current.mapReadLock.close();
current = null;
}
Comment on lines +483 to +486
Copy link
Contributor

@Bukhtawar Bukhtawar Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we move this logic centrally in close(). Also didn't quite understand why the close operation done as a part of write lock acquisition doesn't handle this logic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This handles the scenario where try acquire succeded in obtaining the lock on the current writer but the map itself rotated and the writer got closed. In that case, we close the old writer as we retry to obtain lock again on the current. As this ensures the lock is correctly released on the old writer before we try acquiring lock on new writer.


++counter;
}

if (current == null || current.isClosed()) {
throw new LookupMapLockAcquisitionException(shardId, "Unable to obtain lock on the current Lookup map", null);
}

DisposableIndexWriter writer = current.computeIndexWriterIfAbsentForCriteria(criteria, indexWriterSupplier);
success = true;
return writer;
Expand All @@ -489,15 +502,6 @@ DisposableIndexWriter computeIndexWriterIfAbsentForCriteria(
}
}

// This function acquires a first read lock on a map which does not have any write lock present. Current keeps
// on getting rotated during refresh, so there will be one current on which read lock can be obtained.
// Validate that no write lock is applied on the map and the map is not closed. Idea here is write lock was
// never applied on this map as write lock gets only during closing time. We are doing this instead of acquire,
// because acquire can also apply a read lock in case refresh completed and map is closed.
CriteriaBasedIndexWriterLookup getCurrentMap() {
return current.mapReadLock.tryAcquire();
}

// Used for Test Case.
ReleasableLock acquireCurrentWriteLock() {
return current.mapWriteLock.acquire();
Expand Down Expand Up @@ -672,7 +676,8 @@ DisposableIndexWriter computeIndexWriterIfAbsentForCriteria(
return currentLiveIndexWriterDeletesMap.computeIndexWriterIfAbsentForCriteria(
criteria,
indexWriterSupplier,
engineConfig.getShardId()
engineConfig.getShardId(),
engineConfig.getIndexSettings().getMaxRetryOnLookupMapAcquisitionException()
);
}

Expand All @@ -684,12 +689,8 @@ public Map<String, DisposableIndexWriter> getMarkForRefreshIndexWriterMap() {
public long getFlushingBytes() {
ensureOpen();
long flushingBytes = 0;
Collection<IndexWriter> currentWriterSet = liveIndexWriterDeletesMap.current.criteriaBasedIndexWriterMap.values()
.stream()
.map(DisposableIndexWriter::getIndexWriter)
.collect(Collectors.toSet());
for (IndexWriter currentWriter : currentWriterSet) {
flushingBytes += currentWriter.getFlushingBytes();
for (DisposableIndexWriter disposableIndexWriter : liveIndexWriterDeletesMap.current.criteriaBasedIndexWriterMap.values()) {
flushingBytes += disposableIndexWriter.getIndexWriter().getFlushingBytes();
}

return flushingBytes + accumulatingIndexWriter.getFlushingBytes();
Expand All @@ -699,13 +700,8 @@ public long getFlushingBytes() {
public long getPendingNumDocs() {
ensureOpen();
long pendingNumDocs = 0;
Collection<IndexWriter> currentWriterSet = liveIndexWriterDeletesMap.current.criteriaBasedIndexWriterMap.values()
.stream()
.map(DisposableIndexWriter::getIndexWriter)
.collect(Collectors.toSet());
;
for (IndexWriter currentWriter : currentWriterSet) {
pendingNumDocs += currentWriter.getPendingNumDocs();
for (DisposableIndexWriter disposableIndexWriter : liveIndexWriterDeletesMap.current.criteriaBasedIndexWriterMap.values()) {
pendingNumDocs += disposableIndexWriter.getIndexWriter().getPendingNumDocs();
}

// TODO: Should we add docs for old writer as well?
Expand Down Expand Up @@ -733,24 +729,17 @@ public boolean hasUncommittedChanges() {

@Override
public Throwable getTragicException() {
Collection<IndexWriter> currentWriterSet = liveIndexWriterDeletesMap.current.criteriaBasedIndexWriterMap.values()
.stream()
.map(DisposableIndexWriter::getIndexWriter)
.collect(Collectors.toSet());
for (IndexWriter writer : currentWriterSet) {
if (writer.isOpen() == false && writer.getTragicException() != null) {
return writer.getTragicException();
for (DisposableIndexWriter disposableIndexWriter : liveIndexWriterDeletesMap.current.criteriaBasedIndexWriterMap.values()) {
if (disposableIndexWriter.getIndexWriter().isOpen() == false
&& disposableIndexWriter.getIndexWriter().getTragicException() != null) {
return disposableIndexWriter.getIndexWriter().getTragicException();
}
}

Collection<IndexWriter> oldWriterSet = liveIndexWriterDeletesMap.old.criteriaBasedIndexWriterMap.values()
.stream()
.map(DisposableIndexWriter::getIndexWriter)
.collect(Collectors.toSet());
;
for (IndexWriter writer : oldWriterSet) {
if (writer.isOpen() == false && writer.getTragicException() != null) {
return writer.getTragicException();
for (DisposableIndexWriter disposableIndexWriter : liveIndexWriterDeletesMap.old.criteriaBasedIndexWriterMap.values()) {
if (disposableIndexWriter.getIndexWriter().isOpen() == false
&& disposableIndexWriter.getIndexWriter().getTragicException() != null) {
return disposableIndexWriter.getIndexWriter().getTragicException();
}
}

Expand All @@ -765,27 +754,19 @@ public Throwable getTragicException() {
public final long ramBytesUsed() {
ensureOpen();
long ramBytesUsed = 0;
Collection<IndexWriter> currentWriterSet = liveIndexWriterDeletesMap.current.criteriaBasedIndexWriterMap.values()
.stream()
.map(DisposableIndexWriter::getIndexWriter)
.collect(Collectors.toSet());

try (ReleasableLock ignore = liveIndexWriterDeletesMap.current.mapWriteLock.acquire()) {
for (IndexWriter indexWriter : currentWriterSet) {
if (indexWriter.isOpen() == true) {
ramBytesUsed += indexWriter.ramBytesUsed();
for (DisposableIndexWriter disposableIndexWriter : liveIndexWriterDeletesMap.current.criteriaBasedIndexWriterMap.values()) {
if (disposableIndexWriter.getIndexWriter().isOpen() == true) {
ramBytesUsed += disposableIndexWriter.getIndexWriter().ramBytesUsed();
}
}
}

Collection<IndexWriter> oldWriterSet = liveIndexWriterDeletesMap.old.criteriaBasedIndexWriterMap.values()
.stream()
.map(DisposableIndexWriter::getIndexWriter)
.collect(Collectors.toSet());
try (ReleasableLock ignore = liveIndexWriterDeletesMap.old.mapWriteLock.acquire()) {
for (IndexWriter indexWriter : oldWriterSet) {
if (indexWriter.isOpen() == true) {
ramBytesUsed += indexWriter.ramBytesUsed();
for (DisposableIndexWriter disposableIndexWriter : liveIndexWriterDeletesMap.old.criteriaBasedIndexWriterMap.values()) {
if (disposableIndexWriter.getIndexWriter().isOpen() == true) {
ramBytesUsed += disposableIndexWriter.getIndexWriter().ramBytesUsed();
}
}
}
Expand Down Expand Up @@ -813,24 +794,15 @@ public final synchronized Iterable<Map.Entry<String, String>> getLiveCommitData(

public void rollback() throws IOException {
if (shouldClose()) {
Collection<IndexWriter> currentWriterSet = liveIndexWriterDeletesMap.current.criteriaBasedIndexWriterMap.values()
.stream()
.map(DisposableIndexWriter::getIndexWriter)
.collect(Collectors.toSet());

for (IndexWriter indexWriter : currentWriterSet) {
if (indexWriter.isOpen() == true) {
indexWriter.rollback();
for (DisposableIndexWriter disposableIndexWriter : liveIndexWriterDeletesMap.current.criteriaBasedIndexWriterMap.values()) {
if (disposableIndexWriter.getIndexWriter().isOpen() == true) {
disposableIndexWriter.getIndexWriter().rollback();
}
}

Collection<IndexWriter> oldWriterSet = liveIndexWriterDeletesMap.old.criteriaBasedIndexWriterMap.values()
.stream()
.map(DisposableIndexWriter::getIndexWriter)
.collect(Collectors.toSet());
for (IndexWriter indexWriter : oldWriterSet) {
if (indexWriter.isOpen() == true) {
indexWriter.rollback();
for (DisposableIndexWriter disposableIndexWriter : liveIndexWriterDeletesMap.old.criteriaBasedIndexWriterMap.values()) {
if (disposableIndexWriter.getIndexWriter().isOpen() == true) {
disposableIndexWriter.getIndexWriter().rollback();
Comment on lines +803 to +805
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might not be thread-safe for instance the index writer might be closed while we are doing a rollback?

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableMap;
Expand Down Expand Up @@ -690,7 +691,9 @@ public boolean isCompositeIndexPresent() {
}

public Set<CompositeMappedFieldType> getCompositeFieldTypes() {
return compositeMappedFieldTypes;
return compositeMappedFieldTypes.stream()
.filter(compositeMappedFieldType -> compositeMappedFieldType instanceof StarTreeMapper.StarTreeFieldType)
.collect(Collectors.toSet());
}

private Set<CompositeMappedFieldType> getCompositeFieldTypesFromMapper() {
Expand Down
Loading
Loading