Skip to content

Commit d4c6319

Browse files
authored
Fix TestIndexWriterWithThreads exceeding max file handles. (#15437)
1 parent e2687ad commit d4c6319

File tree

4 files changed

+70
-73
lines changed

4 files changed

+70
-73
lines changed

lucene/core/src/java/org/apache/lucene/util/ThreadInterruptedException.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
/**
2020
* Thrown by lucene on detecting that Thread.interrupt() had been called. Unlike Java's
21-
* InterruptedException, this exception is not checked..
21+
* InterruptedException, this exception is not checked.
2222
*/
2323
public final class ThreadInterruptedException extends RuntimeException {
2424
public ThreadInterruptedException(InterruptedException ie) {

lucene/core/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java

Lines changed: 67 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.lucene.index;
1818

19+
import com.carrotsearch.randomizedtesting.RandomizedTest;
1920
import java.io.IOException;
2021
import java.util.concurrent.BrokenBarrierException;
2122
import java.util.concurrent.CyclicBarrier;
@@ -37,23 +38,25 @@
3738
import org.apache.lucene.tests.analysis.MockAnalyzer;
3839
import org.apache.lucene.tests.index.RandomIndexWriter;
3940
import org.apache.lucene.tests.index.SuppressingConcurrentMergeScheduler;
41+
import org.apache.lucene.tests.mockfile.HandleLimitFS;
4042
import org.apache.lucene.tests.store.BaseDirectoryWrapper;
4143
import org.apache.lucene.tests.store.MockDirectoryWrapper;
4244
import org.apache.lucene.tests.util.LineFileDocs;
4345
import org.apache.lucene.tests.util.LuceneTestCase;
4446
import org.apache.lucene.tests.util.TestUtil;
4547
import org.apache.lucene.util.Bits;
4648
import org.apache.lucene.util.BytesRef;
47-
import org.apache.lucene.util.SuppressForbidden;
4849
import org.apache.lucene.util.ThreadInterruptedException;
4950

5051
/** MultiThreaded IndexWriter tests */
5152
@LuceneTestCase.SuppressCodecs("SimpleText")
53+
// Some of these tests spin uncoordinated threads that generate lots of documents and have very
54+
// small limits on in-memory buffers, etc. They occasionally can exceed the default
55+
// max handles limit...
56+
@HandleLimitFS.MaxOpenHandles(limit = HandleLimitFS.MaxOpenHandles.MAX_OPEN_FILES * 4)
5257
public class TestIndexWriterWithThreads extends LuceneTestCase {
53-
5458
// Used by test cases below
5559
private static class IndexerThread extends Thread {
56-
5760
private final CyclicBarrier syncStart;
5861
Throwable error;
5962
IndexWriter writer;
@@ -66,7 +69,6 @@ public IndexerThread(IndexWriter writer, boolean noErrors, CyclicBarrier syncSta
6669
this.syncStart = syncStart;
6770
}
6871

69-
@SuppressForbidden(reason = "Thread sleep")
7072
@Override
7173
public void run() {
7274
try {
@@ -97,14 +99,11 @@ public void run() {
9799
System.out.println("TEST: expected exc:");
98100
ioe.printStackTrace(System.out);
99101
}
100-
// System.out.println(Thread.currentThread().getName() + ": hit exc");
101-
// ioe.printStackTrace(System.out);
102+
102103
if (ioe.getMessage().startsWith("fake disk full at")
103104
|| ioe.getMessage().equals("now failing on purpose")) {
104-
try {
105-
Thread.sleep(1);
106-
} catch (InterruptedException ie) {
107-
throw new ThreadInterruptedException(ie);
105+
if (Thread.currentThread().isInterrupted()) {
106+
throw new ThreadInterruptedException(new InterruptedException());
108107
}
109108
if (fullCount++ >= 5) break;
110109
} else {
@@ -188,77 +187,76 @@ public void testImmediateDiskFullWithThreads() throws Exception {
188187
// threads are trying to add documents. Strictly
189188
// speaking, this isn't valid us of Lucene's APIs, but we
190189
// still want to be robust to this case:
191-
@SuppressForbidden(reason = "Thread sleep")
192190
public void testCloseWithThreads() throws Exception {
193191
int NUM_THREADS = 3;
194192
int numIterations = TEST_NIGHTLY ? 7 : 3;
195193
for (int iter = 0; iter < numIterations; iter++) {
196194
if (VERBOSE) {
197195
System.out.println("\nTEST: iter=" + iter);
198196
}
199-
Directory dir = newDirectory();
197+
try (Directory dir = newDirectory()) {
198+
IndexerThread[] threads = new IndexerThread[NUM_THREADS];
199+
try (IndexWriter writer =
200+
new IndexWriter(
201+
dir,
202+
newIndexWriterConfig(new MockAnalyzer(random()))
203+
.setMaxBufferedDocs(10)
204+
.setMergeScheduler(new ConcurrentMergeScheduler())
205+
.setMergePolicy(newLogMergePolicy(4))
206+
.setCommitOnClose(false))) {
207+
((ConcurrentMergeScheduler) writer.getConfig().getMergeScheduler())
208+
.setSuppressExceptions();
209+
210+
CyclicBarrier syncStart = new CyclicBarrier(NUM_THREADS + 1);
211+
for (int i = 0; i < NUM_THREADS; i++) {
212+
threads[i] = new IndexerThread(writer, false, syncStart);
213+
threads[i].start();
214+
}
215+
syncStart.await();
200216

201-
IndexWriter writer =
202-
new IndexWriter(
203-
dir,
204-
newIndexWriterConfig(new MockAnalyzer(random()))
205-
.setMaxBufferedDocs(10)
206-
.setMergeScheduler(new ConcurrentMergeScheduler())
207-
.setMergePolicy(newLogMergePolicy(4))
208-
.setCommitOnClose(false));
209-
((ConcurrentMergeScheduler) writer.getConfig().getMergeScheduler()).setSuppressExceptions();
217+
// wait for at least this many documents to be added, then resume.
218+
int minDocsAdded = RandomizedTest.randomIntBetween(1, 50);
219+
while (true) {
220+
Thread.yield(); // intentional, spin loop instead of wall-clock wait.
210221

211-
CyclicBarrier syncStart = new CyclicBarrier(NUM_THREADS + 1);
212-
IndexerThread[] threads = new IndexerThread[NUM_THREADS];
213-
for (int i = 0; i < NUM_THREADS; i++) {
214-
threads[i] = new IndexerThread(writer, false, syncStart);
215-
threads[i].start();
216-
}
217-
syncStart.await();
222+
int docsAdded = 0;
223+
for (int i = 0; i < NUM_THREADS; i++) {
224+
docsAdded += threads[i].addCount;
218225

219-
boolean done = false;
220-
while (!done) {
221-
Thread.sleep(100);
222-
for (int i = 0; i < NUM_THREADS; i++)
223-
// only stop when at least one thread has added a doc
224-
if (threads[i].addCount > 0) {
225-
done = true;
226-
break;
227-
} else if (!threads[i].isAlive()) {
228-
fail("thread failed before indexing a single document");
229-
}
230-
}
226+
if (!threads[i].isAlive()) {
227+
fail("thread failed before indexing a single document: " + threads[i]);
228+
}
229+
}
231230

232-
if (VERBOSE) {
233-
System.out.println("\nTEST: now close");
234-
}
235-
try {
236-
writer.commit();
237-
} finally {
238-
writer.close();
239-
}
231+
if (docsAdded > minDocsAdded) {
232+
break;
233+
}
234+
}
240235

241-
// Make sure threads that are adding docs are not hung:
242-
for (int i = 0; i < NUM_THREADS; i++) {
243-
// Without fix for LUCENE-1130: one of the
244-
// threads will hang
245-
threads[i].join();
236+
// now commit and then close the index writer, while background threads still keep
237+
// adding documents.
238+
writer.commit();
239+
}
246240

247-
// [DW] this is unreachable once join() returns a thread cannot be alive.
248-
if (threads[i].isAlive()) fail("thread seems to be hung");
249-
}
241+
// Make sure threads that are adding docs are not hung.
242+
for (int i = 0; i < NUM_THREADS; i++) {
243+
// Without fix for LUCENE-1130: one of the threads will hang
244+
if (threads[i] != null) {
245+
threads[i].join();
246+
}
247+
}
250248

251-
// Quick test to make sure index is not corrupt:
252-
IndexReader reader = DirectoryReader.open(dir);
253-
PostingsEnum tdocs = TestUtil.docs(random(), reader, "field", new BytesRef("aaa"), null, 0);
254-
int count = 0;
255-
while (tdocs.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) {
256-
count++;
249+
// Quick test to make sure index is not corrupt.
250+
try (IndexReader reader = DirectoryReader.open(dir)) {
251+
PostingsEnum tdocs =
252+
TestUtil.docs(random(), reader, "field", new BytesRef("aaa"), null, 0);
253+
int count = 0;
254+
while (tdocs.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) {
255+
count++;
256+
}
257+
assertTrue(count > 0);
258+
}
257259
}
258-
assertTrue(count > 0);
259-
reader.close();
260-
261-
dir.close();
262260
}
263261
}
264262

@@ -564,7 +562,7 @@ public void testRollbackAndCommitWithThreads() throws Exception {
564562
writerRef.get().commit();
565563
final LineFileDocs docs = new LineFileDocs(random());
566564
final Thread[] threads = new Thread[threadCount];
567-
final int iters = atLeast(100);
565+
final int iters = atLeast(25);
568566
final AtomicBoolean failed = new AtomicBoolean();
569567
final Lock rollbackLock = new ReentrantLock();
570568
final Lock commitLock = new ReentrantLock();
@@ -574,7 +572,6 @@ public void testRollbackAndCommitWithThreads() throws Exception {
574572
@Override
575573
public void run() {
576574
for (int iter = 0; iter < iters && !failed.get(); iter++) {
577-
// final int x = random().nextInt(5);
578575
final int x = random().nextInt(3);
579576
try {
580577
switch (x) {
@@ -637,8 +634,8 @@ public void run() {
637634
threads[threadID].start();
638635
}
639636

640-
for (int threadID = 0; threadID < threadCount; threadID++) {
641-
threads[threadID].join();
637+
for (var t : threads) {
638+
t.join();
642639
}
643640

644641
assertTrue(!failed.get());

lucene/test-framework/src/java/org/apache/lucene/tests/util/LuceneTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -981,7 +981,7 @@ public static IndexWriterConfig newIndexWriterConfig(Analyzer a) {
981981
public static IndexWriterConfig newIndexWriterConfig(Random r, Analyzer a) {
982982
IndexWriterConfig c = new IndexWriterConfig(a);
983983
c.setSimilarity(classEnvRule.similarity);
984-
if (VERBOSE) {
984+
if (INFOSTREAM) {
985985
// Even though TestRuleSetupAndRestoreClassEnv calls
986986
// InfoStream.setDefault, we do it again here so that
987987
// the PrintStreamInfoStream.messageID increments so

lucene/test-framework/src/java/org/apache/lucene/tests/util/TestRuleTemporaryFilesCleanup.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ private FileSystem initializeFileSystem() {
117117
avoid.addAll(Arrays.asList(a.value()));
118118
}
119119
FileSystem fs = FileSystems.getDefault();
120-
if (LuceneTestCase.VERBOSE && allowed(avoid, VerboseFS.class)) {
120+
if (LuceneTestCase.INFOSTREAM && allowed(avoid, VerboseFS.class)) {
121121
fs =
122122
new VerboseFS(
123123
fs,

0 commit comments

Comments
 (0)