|
18 | 18 | import org.apache.lucene.index.IndexWriter; |
19 | 19 | import org.apache.lucene.index.IndexableField; |
20 | 20 | import org.apache.lucene.index.Term; |
| 21 | +import org.apache.lucene.search.ReferenceManager; |
21 | 22 | import org.apache.lucene.search.TermQuery; |
22 | 23 | import org.apache.lucene.search.TopDocs; |
23 | 24 | import org.apache.lucene.store.AlreadyClosedException; |
|
188 | 189 | import static org.hamcrest.Matchers.closeTo; |
189 | 190 | import static org.hamcrest.Matchers.containsInAnyOrder; |
190 | 191 | import static org.hamcrest.Matchers.containsString; |
| 192 | +import static org.hamcrest.Matchers.containsStringIgnoringCase; |
191 | 193 | import static org.hamcrest.Matchers.either; |
192 | 194 | import static org.hamcrest.Matchers.equalTo; |
193 | 195 | import static org.hamcrest.Matchers.everyItem; |
@@ -5191,7 +5193,7 @@ public void prepareForEngineReset() throws IOException { |
5191 | 5193 | assertThat(engine, notNullValue()); |
5192 | 5194 | EngineTestCase.ensureOpen(engine); |
5193 | 5195 | hold.onResponse(engine); |
5194 | | - safeAwait(release, TimeValue.ONE_HOUR); |
| 5196 | + safeAwait(release); |
5195 | 5197 | return null; |
5196 | 5198 | }); |
5197 | 5199 | }); |
@@ -5226,12 +5228,215 @@ public void prepareForEngineReset() throws IOException { |
5226 | 5228 | release.countDown(); |
5227 | 5229 | safeGet(reset); |
5228 | 5230 |
|
| 5231 | + assertThat(preparedForReset.get(), equalTo(true)); |
| 5232 | + |
5229 | 5233 | holdEngineThread.join(); |
5230 | 5234 | resetEngineThread.join(); |
5231 | 5235 |
|
5232 | 5236 | closeShards(shard); |
5233 | 5237 | } |
5234 | 5238 |
|
| 5239 | + public void testReentrantEngineReadLockAcquisitionInRefreshListener() throws Exception { |
| 5240 | + final var lazyShard = new AtomicReference<IndexShard>(); |
| 5241 | + final var lazyEngineConfig = new AtomicReference<EngineConfig>(); |
| 5242 | + |
| 5243 | + final var refreshStarted = new CountDownLatch(1); |
| 5244 | + final var blockRefresh = new AtomicBoolean(); |
| 5245 | + final var unblockRefresh = new CountDownLatch(1); |
| 5246 | + |
| 5247 | + final var getFromTranslogStarted = new CountDownLatch(1); |
| 5248 | + final var getFromTranslogResult = new PlainActionFuture<Boolean>(); |
| 5249 | + |
| 5250 | + final var resetStarted = new CountDownLatch(1); |
| 5251 | + |
| 5252 | + // Refresh listener that blocks on purpose (so it holds the refresh lock) and acquires the engine read lock in a reentrant manner |
| 5253 | + final var blockingRefreshListener = new ReferenceManager.RefreshListener() { |
| 5254 | + @Override |
| 5255 | + public void beforeRefresh() throws IOException { |
| 5256 | + if (blockRefresh.get()) { |
| 5257 | + try { |
| 5258 | + var shard = lazyShard.get(); |
| 5259 | + assertThat(shard, notNullValue()); |
| 5260 | + |
| 5261 | + // Asserts that the refresh is triggered by the test and not something else |
| 5262 | + assertThat(Thread.currentThread().toString(), containsStringIgnoringCase(getTestClass().getSimpleName())); |
| 5263 | + |
| 5264 | + // Asserts the current thread holds the engine read lock |
| 5265 | + var engineResetLock = lazyEngineConfig.get().getEngineResetLock(); |
| 5266 | + assertThat(engineResetLock.isReadLockedByCurrentThread(), equalTo(true)); |
| 5267 | + |
| 5268 | + refreshStarted.countDown(); |
| 5269 | + safeAwait(getFromTranslogStarted); |
| 5270 | + |
| 5271 | + // A this stage, getThread is blocked on the refresh held by the current thread |
| 5272 | + assertBusy(() -> assertThat(engineResetLock.getReadLockCount(), greaterThanOrEqualTo(2))); |
| 5273 | + assertThat(getFromTranslogResult.isDone(), equalTo(false)); |
| 5274 | + |
| 5275 | + // Waits for the resetThread |
| 5276 | + safeAwait(resetStarted); |
| 5277 | + |
| 5278 | + // The resetThread waits for the engine write lock, blocking new non-reentrant engine read lock acquisitions |
| 5279 | + assertBusy(() -> assertThat(engineResetLock.getQueuedWriterThreads(), hasSize(1))); |
| 5280 | + |
| 5281 | + // Ensure that accessing the engine from a refresh listener works, even if another thread (like resetThread) is |
| 5282 | + // waiting for the engine write lock. If we were not acquiring the engine read lock when refreshing the reader, |
| 5283 | + // we would deadlock here. |
| 5284 | + var localCheckpoint = shard.withEngine(engine -> engine.getProcessedLocalCheckpoint()); |
| 5285 | + assertThat(localCheckpoint, greaterThan(SequenceNumbers.NO_OPS_PERFORMED)); |
| 5286 | + |
| 5287 | + // Also test `getEngine` |
| 5288 | + var internalEngine = asInstanceOf(InternalEngine.class, shard.getEngine()); |
| 5289 | + assertThat(internalEngine.getTranslogStats().getUncommittedOperations(), equalTo(1)); |
| 5290 | + |
| 5291 | + // Don't block refresh again (it will flush and refresh later in prepareEngineForReset) |
| 5292 | + blockRefresh.set(false); |
| 5293 | + |
| 5294 | + safeAwait(unblockRefresh); |
| 5295 | + } catch (Exception e) { |
| 5296 | + throw new AssertionError(e); |
| 5297 | + } |
| 5298 | + } |
| 5299 | + } |
| 5300 | + |
| 5301 | + @Override |
| 5302 | + public void afterRefresh(boolean didRefresh) throws IOException {} |
| 5303 | + }; |
| 5304 | + |
| 5305 | + final var preparedForReset = new AtomicBoolean(); |
| 5306 | + final var shard = newShard(true, Settings.EMPTY, config -> { |
| 5307 | + if (preparedForReset.get()) { |
| 5308 | + return new ReadOnlyEngine(config, null, new TranslogStats(), false, Function.identity(), true, true); |
| 5309 | + } else { |
| 5310 | + var internalRefreshListeners = new ArrayList<ReferenceManager.RefreshListener>(); |
| 5311 | + internalRefreshListeners.add(blockingRefreshListener); |
| 5312 | + internalRefreshListeners.addAll(config.getInternalRefreshListener()); |
| 5313 | + |
| 5314 | + var engineConfigWithBlockingRefreshListener = new EngineConfig( |
| 5315 | + config.getShardId(), |
| 5316 | + config.getThreadPool(), |
| 5317 | + config.getThreadPoolMergeExecutorService(), |
| 5318 | + config.getIndexSettings(), |
| 5319 | + config.getWarmer(), |
| 5320 | + config.getStore(), |
| 5321 | + config.getMergePolicy(), |
| 5322 | + config.getAnalyzer(), |
| 5323 | + config.getSimilarity(), |
| 5324 | + new CodecService(null, BigArrays.NON_RECYCLING_INSTANCE), |
| 5325 | + config.getEventListener(), |
| 5326 | + config.getQueryCache(), |
| 5327 | + config.getQueryCachingPolicy(), |
| 5328 | + config.getTranslogConfig(), |
| 5329 | + config.getFlushMergesAfter(), |
| 5330 | + config.getExternalRefreshListener(), |
| 5331 | + internalRefreshListeners, |
| 5332 | + config.getIndexSort(), |
| 5333 | + config.getCircuitBreakerService(), |
| 5334 | + config.getGlobalCheckpointSupplier(), |
| 5335 | + config.retentionLeasesSupplier(), |
| 5336 | + config.getPrimaryTermSupplier(), |
| 5337 | + IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER, |
| 5338 | + config.getLeafSorter(), |
| 5339 | + config.getRelativeTimeInNanosSupplier(), |
| 5340 | + config.getIndexCommitListener(), |
| 5341 | + config.isPromotableToPrimary(), |
| 5342 | + config.getMapperService(), |
| 5343 | + config.getEngineResetLock() |
| 5344 | + ); |
| 5345 | + lazyEngineConfig.set(engineConfigWithBlockingRefreshListener); |
| 5346 | + return new InternalEngine(engineConfigWithBlockingRefreshListener) { |
| 5347 | + @Override |
| 5348 | + public void prepareForEngineReset() throws IOException { |
| 5349 | + flush(true, true); |
| 5350 | + assertTrue(preparedForReset.compareAndSet(false, true)); |
| 5351 | + } |
| 5352 | + }; |
| 5353 | + } |
| 5354 | + }); |
| 5355 | + try { |
| 5356 | + recoverShardFromStore(shard); |
| 5357 | + blockRefresh.set(true); |
| 5358 | + lazyShard.set(shard); |
| 5359 | + |
| 5360 | + // Index a doc with an auto-generated idea makes the version map unsafe, and the realtime get will have to refresh |
| 5361 | + var index = indexDoc(shard, "_doc", null /* auto-generated id */); |
| 5362 | + assertThat(index.isCreated(), equalTo(true)); |
| 5363 | + |
| 5364 | + // Trigger a refresh |
| 5365 | + var refreshThread = new Thread(() -> shard.refresh("test")); |
| 5366 | + refreshThread.start(); |
| 5367 | + |
| 5368 | + // Wait for the refresh listener to hold the resfresh lock and the engine read lock |
| 5369 | + safeAwait(refreshStarted); |
| 5370 | + |
| 5371 | + // While refresh is blocked holding the locks, triggers a getFromTranslog() that will refresh-blocking in another thread |
| 5372 | + var getThread = new Thread(() -> { |
| 5373 | + shard.withEngine(engine -> { |
| 5374 | + getFromTranslogStarted.countDown(); |
| 5375 | + try ( |
| 5376 | + // Will block on the refresh lock |
| 5377 | + var getResult = engine.get( |
| 5378 | + new Engine.Get(true, false, index.getId()), |
| 5379 | + shard.mapperService().mappingLookup(), |
| 5380 | + shard.mapperService().documentParser(), |
| 5381 | + searcher -> searcher |
| 5382 | + ) |
| 5383 | + ) { |
| 5384 | + assertThat(getResult, notNullValue()); |
| 5385 | + getFromTranslogResult.onResponse(getResult.exists()); |
| 5386 | + return null; |
| 5387 | + } |
| 5388 | + }); |
| 5389 | + }); |
| 5390 | + getThread.start(); |
| 5391 | + |
| 5392 | + final var engineResetLock = lazyEngineConfig.get().getEngineResetLock(); |
| 5393 | + safeAwait(getFromTranslogStarted); |
| 5394 | + |
| 5395 | + // Resets the engine to have a thread waiting for the engine write lock (this will block non-reentrant read lock acquisitions) |
| 5396 | + final var reset = new PlainActionFuture<Void>(); |
| 5397 | + final var resetEngineThread = new Thread(() -> { |
| 5398 | + resetStarted.countDown(); |
| 5399 | + try { |
| 5400 | + shard.acquirePrimaryOperationPermit(reset.delegateFailure((l, permit) -> { |
| 5401 | + try (permit) { |
| 5402 | + shard.resetEngine(newEngine -> { |
| 5403 | + assertThat(newEngine.getEngineConfig().getEngineResetLock(), sameInstance(engineResetLock)); |
| 5404 | + assertThat(engineResetLock.isWriteLockedByCurrentThread(), equalTo(true)); |
| 5405 | + assertThat(newEngine, instanceOf(ReadOnlyEngine.class)); |
| 5406 | + assertThat(getFromTranslogResult.isDone(), equalTo(true)); |
| 5407 | + }); |
| 5408 | + assertThat(preparedForReset.get(), equalTo(true)); |
| 5409 | + l.onResponse(null); |
| 5410 | + } |
| 5411 | + }), EsExecutors.DIRECT_EXECUTOR_SERVICE); |
| 5412 | + } catch (Exception e) { |
| 5413 | + reset.onFailure(e); |
| 5414 | + } |
| 5415 | + }); |
| 5416 | + resetEngineThread.start(); |
| 5417 | + |
| 5418 | + safeAwait(resetStarted); |
| 5419 | + |
| 5420 | + // A this stage, getThread is blocked by refreshThread, and boths threads block resetEngineThread |
| 5421 | + assertThat(getFromTranslogResult.isDone(), equalTo(false)); |
| 5422 | + |
| 5423 | + assertBusy(() -> assertThat(engineResetLock.getReadLockCount(), greaterThanOrEqualTo(2))); |
| 5424 | + assertBusy(() -> assertThat(engineResetLock.getQueuedWriterThreads(), hasItem(resetEngineThread))); |
| 5425 | + assertThat(engineResetLock.isWriteLocked(), equalTo(false)); |
| 5426 | + assertThat(engineResetLock.isReadLocked(), equalTo(true)); |
| 5427 | + |
| 5428 | + unblockRefresh.countDown(); |
| 5429 | + |
| 5430 | + safeGet(reset); |
| 5431 | + |
| 5432 | + resetEngineThread.join(); |
| 5433 | + refreshThread.join(); |
| 5434 | + getThread.join(); |
| 5435 | + } finally { |
| 5436 | + closeShards(shard); |
| 5437 | + } |
| 5438 | + } |
| 5439 | + |
5235 | 5440 | public void testShardExposesWriteLoadStats() throws Exception { |
5236 | 5441 | final IndexShard primary = newStartedShard( |
5237 | 5442 | true, |
|
0 commit comments