|
67 | 67 | import org.elasticsearch.common.util.concurrent.ThreadContext; |
68 | 68 | import org.elasticsearch.core.Assertions; |
69 | 69 | import org.elasticsearch.core.Booleans; |
| 70 | +import org.elasticsearch.core.CheckedFunction; |
70 | 71 | import org.elasticsearch.core.IOUtils; |
71 | 72 | import org.elasticsearch.core.Nullable; |
72 | 73 | import org.elasticsearch.core.Releasable; |
@@ -1020,24 +1021,17 @@ private VersionValue resolveDocVersion(final Operation op, boolean loadSeqNo) th |
1020 | 1021 | VersionValue versionValue = getVersionFromMap(op.uid()); |
1021 | 1022 | if (versionValue == null) { |
1022 | 1023 | assert incrementIndexVersionLookup(); // used for asserting in tests |
1023 | | - final VersionsAndSeqNoResolver.DocIdAndVersion docIdAndVersion; |
1024 | | - try (Searcher searcher = acquireSearcher("load_version", SearcherScope.INTERNAL)) { |
1025 | | - if (engineConfig.getIndexSettings().getMode() == IndexMode.TIME_SERIES) { |
1026 | | - assert engineConfig.getLeafSorter() == DataStream.TIMESERIES_LEAF_READERS_SORTER; |
1027 | | - docIdAndVersion = VersionsAndSeqNoResolver.timeSeriesLoadDocIdAndVersion( |
1028 | | - searcher.getIndexReader(), |
1029 | | - op.uid(), |
1030 | | - op.id(), |
1031 | | - loadSeqNo |
1032 | | - ); |
1033 | | - } else { |
1034 | | - docIdAndVersion = VersionsAndSeqNoResolver.timeSeriesLoadDocIdAndVersion( |
1035 | | - searcher.getIndexReader(), |
1036 | | - op.uid(), |
1037 | | - loadSeqNo |
1038 | | - ); |
| 1024 | + final VersionsAndSeqNoResolver.DocIdAndVersion docIdAndVersion = performActionWithDirectoryReader( |
| 1025 | + SearcherScope.INTERNAL, |
| 1026 | + directoryReader -> { |
| 1027 | + if (engineConfig.getIndexSettings().getMode() == IndexMode.TIME_SERIES) { |
| 1028 | + assert engineConfig.getLeafSorter() == DataStream.TIMESERIES_LEAF_READERS_SORTER; |
| 1029 | + return VersionsAndSeqNoResolver.timeSeriesLoadDocIdAndVersion(directoryReader, op.uid(), op.id(), loadSeqNo); |
| 1030 | + } else { |
| 1031 | + return VersionsAndSeqNoResolver.timeSeriesLoadDocIdAndVersion(directoryReader, op.uid(), loadSeqNo); |
| 1032 | + } |
1039 | 1033 | } |
1040 | | - } |
| 1034 | + ); |
1041 | 1035 | if (docIdAndVersion != null) { |
1042 | 1036 | versionValue = new IndexVersionValue(null, docIdAndVersion.version, docIdAndVersion.seqNo, docIdAndVersion.primaryTerm); |
1043 | 1037 | } |
@@ -3454,4 +3448,26 @@ public LiveVersionMap getLiveVersionMap() { |
3454 | 3448 | protected long getPreCommitSegmentGeneration() { |
3455 | 3449 | return preCommitSegmentGeneration.get(); |
3456 | 3450 | } |
| 3451 | + |
| 3452 | + <T> T performActionWithDirectoryReader(SearcherScope scope, CheckedFunction<DirectoryReader, T, IOException> action) |
| 3453 | + throws EngineException { |
| 3454 | + assert scope == SearcherScope.INTERNAL : "performActionWithDirectoryReader(...) isn't prepared for external usage"; |
| 3455 | + assert store.hasReferences(); |
| 3456 | + try { |
| 3457 | + ReferenceManager<ElasticsearchDirectoryReader> referenceManager = getReferenceManager(scope); |
| 3458 | + ElasticsearchDirectoryReader acquire = referenceManager.acquire(); |
| 3459 | + try { |
| 3460 | + return action.apply(acquire); |
| 3461 | + } finally { |
| 3462 | + referenceManager.release(acquire); |
| 3463 | + } |
| 3464 | + } catch (AlreadyClosedException ex) { |
| 3465 | + throw ex; |
| 3466 | + } catch (Exception ex) { |
| 3467 | + maybeFailEngine("perform_action_directory_reader", ex); |
| 3468 | + ensureOpen(ex); // throw EngineCloseException here if we are already closed |
| 3469 | + logger.error("failed to perform action with directory reader", ex); |
| 3470 | + throw new EngineException(shardId, "failed to perform action with directory reader", ex); |
| 3471 | + } |
| 3472 | + } |
3457 | 3473 | } |
0 commit comments