1616 */
1717package org .apache .lucene .index ;
1818
19+ import com .carrotsearch .randomizedtesting .annotations .Timeout ;
1920import java .util .ArrayList ;
2021import java .util .HashSet ;
2122import java .util .concurrent .ConcurrentHashMap ;
2223import java .util .concurrent .ConcurrentLinkedQueue ;
24+ import java .util .concurrent .atomic .AtomicBoolean ;
2325import org .apache .lucene .document .Document ;
2426import org .apache .lucene .document .Field ;
2527import org .apache .lucene .store .Directory ;
@@ -63,6 +65,7 @@ public void sync(Directory directory) {
6365 assertTrue (directoriesToBeClosed .isEmpty ());
6466 }
6567
68+ @ Timeout (millis = 1000 * 60 * 2 )
6669 public void testCloseMultiple () throws Exception {
6770 // Write to many indexes and do merges on them.
6871 // The unit test is randomized but reproducible.
@@ -119,14 +122,21 @@ public void sync(Directory directory) {
119122
120123 // Randomly spread the documents across the indexes.
121124 // The merge ordering is calculated beforehand to make this unit test be reproducible.
125+ // TODO: why is this a concurrent linked queue if all accesses are under synchronized(lock)?!
122126 ConcurrentLinkedQueue <Integer > nextDirectoryToWrite = new ConcurrentLinkedQueue <>();
123127 for (int i = 0 ; i < DOCUMENT_COUNT ; ++i ) {
124128 int r = random ().nextInt (DIRECTORY_COUNT );
125129 nextDirectoryToWrite .add (r );
126130 ++documentCounts [r ];
127131 }
128132
133+ // Ensure all threads have at least one document to work with.
134+ for (int i = 0 ; i < documentCounts .length ; i ++) {
135+ documentCounts [i ] = Math .max (1 , documentCounts [i ]);
136+ }
137+
129138 // Write documents to each index and force some merges.
139+ final AtomicBoolean stop = new AtomicBoolean ();
130140 final ArrayList <Thread > threads = new ArrayList <>();
131141 final Object lock = new Object ();
132142 for (int schedulerIndex = 0 ; schedulerIndex < schedulers .size (); schedulerIndex ++) {
@@ -138,7 +148,7 @@ public void sync(Directory directory) {
138148 public void run () {
139149 try {
140150 IndexWriter writer = writers .get (threadID );
141- while (true ) {
151+ while (stop . get () == false ) {
142152 synchronized (lock ) {
143153 if (!nextDirectoryToWrite .isEmpty ()
144154 && nextDirectoryToWrite .peek () != threadID ) {
@@ -147,17 +157,19 @@ public void run() {
147157 && nextDirectoryToWrite .peek () != threadID ) {
148158 try {
149159 lock .wait ();
150- } catch (
151- @ SuppressWarnings ("unused" )
152- InterruptedException e ) {
153- Thread .currentThread ().interrupt ();
154- return ;
160+ } catch (InterruptedException e ) {
161+ stop .set (true );
162+ throw new RuntimeException ("subthread interrupted?" , e );
155163 }
156164 }
157165 // awaken
158166 continue ;
159167 }
160168
169+ if (documentCounts [threadID ] == 0 ) {
170+ break ;
171+ }
172+
161173 // write a doc
162174 Document doc = new Document ();
163175 Field idField = newStringField ("id" , "" , Field .Store .NO );
@@ -176,10 +188,8 @@ public void run() {
176188
177189 nextDirectoryToWrite .poll ();
178190 lock .notifyAll ();
179- if (documentCounts [threadID ] == 0 ) break ;
180191 }
181192 }
182-
183193 } catch (Exception e ) {
184194 throw new RuntimeException (e );
185195 }
0 commit comments