38
38
import java .nio .file .Path ;
39
39
import java .time .Duration ;
40
40
import java .time .Instant ;
41
+ import java .util .ArrayList ;
41
42
import java .util .Collection ;
42
43
import java .util .Collections ;
43
44
import java .util .HashSet ;
44
45
import java .util .List ;
45
46
import java .util .Map ;
46
47
import java .util .Map .Entry ;
47
48
import java .util .Set ;
49
+ import java .util .concurrent .Callable ;
48
50
import java .util .concurrent .ConcurrentHashMap ;
49
51
import java .util .concurrent .ExecutorService ;
50
52
import java .util .concurrent .Executors ;
@@ -61,6 +63,8 @@ public final class Suggester implements Closeable {
61
63
62
64
private static final String PROJECTS_DISABLED_KEY = "" ;
63
65
66
+ private static final int MAX_TIME_MS = 1000 ;
67
+
64
68
private static final Logger logger = Logger .getLogger (Suggester .class .getName ());
65
69
66
70
private final Map <String , SuggesterProjectData > projectData = new ConcurrentHashMap <>();
@@ -79,6 +83,8 @@ public final class Suggester implements Closeable {
79
83
80
84
private final Set <String > allowedFields ;
81
85
86
+ private final ExecutorService executorService = Executors .newWorkStealingPool ();
87
+
82
88
/**
83
89
* @param suggesterDir directory under which the suggester data should be created
84
90
* @param resultSize maximum number of suggestions that should be returned
@@ -129,13 +135,13 @@ public void init(final Collection<NamedIndexDir> luceneIndexes) {
129
135
synchronized (lock ) {
130
136
logger .log (Level .INFO , "Initializing suggester" );
131
137
132
- ExecutorService executorService = Executors .newFixedThreadPool ( Runtime . getRuntime (). availableProcessors () );
138
+ ExecutorService executor = Executors .newWorkStealingPool ( );
133
139
134
140
for (NamedIndexDir indexDir : luceneIndexes ) {
135
- submitInitIfIndexExists (executorService , indexDir );
141
+ submitInitIfIndexExists (executor , indexDir );
136
142
}
137
143
138
- shutdownAndAwaitTermination (executorService , "Suggester successfully initialized" );
144
+ shutdownAndAwaitTermination (executor , "Suggester successfully initialized" );
139
145
}
140
146
}
141
147
@@ -195,6 +201,7 @@ private void shutdownAndAwaitTermination(final ExecutorService executorService,
195
201
logger .log (Level .INFO , logMessageOnSuccess );
196
202
} catch (InterruptedException e ) {
197
203
logger .log (Level .SEVERE , "Interrupted while building suggesters" , e );
204
+ Thread .currentThread ().interrupt ();
198
205
}
199
206
}
200
207
@@ -211,18 +218,18 @@ public void rebuild(final Collection<NamedIndexDir> indexDirs) {
211
218
synchronized (lock ) {
212
219
logger .log (Level .INFO , "Rebuilding following suggesters: {0}" , indexDirs );
213
220
214
- ExecutorService executorService = Executors .newFixedThreadPool ( Runtime . getRuntime (). availableProcessors () );
221
+ ExecutorService executor = Executors .newWorkStealingPool ( );
215
222
216
223
for (NamedIndexDir indexDir : indexDirs ) {
217
224
SuggesterProjectData data = this .projectData .get (indexDir .name );
218
225
if (data != null ) {
219
- executorService .submit (getRebuildRunnable (data ));
226
+ executor .submit (getRebuildRunnable (data ));
220
227
} else {
221
- submitInitIfIndexExists (executorService , indexDir );
228
+ submitInitIfIndexExists (executor , indexDir );
222
229
}
223
230
}
224
231
225
- shutdownAndAwaitTermination (executorService , "Suggesters for " + indexDirs + " were successfully rebuilt" );
232
+ shutdownAndAwaitTermination (executor , "Suggesters for " + indexDirs + " were successfully rebuilt" );
226
233
}
227
234
}
228
235
@@ -287,8 +294,21 @@ public List<LookupResultItem> search(
287
294
indexReaders .get (0 ).getReader ()));
288
295
}
289
296
290
- List <LookupResultItem > results = readers .parallelStream ().flatMap (namedIndexReader -> {
297
+ List <LookupResultItem > results ;
298
+ if (!SuggesterUtils .isComplexQuery (query , suggesterQuery )) { // use WFST for lone prefix
299
+ results = prefixLookup (readers , (SuggesterPrefixQuery ) suggesterQuery );
300
+ } else {
301
+ results = complexLookup (readers , suggesterQuery , query );
302
+ }
303
+
304
+ return SuggesterUtils .combineResults (results , resultSize );
305
+ }
291
306
307
+ private List <LookupResultItem > prefixLookup (
308
+ final List <NamedIndexReader > readers ,
309
+ final SuggesterPrefixQuery suggesterQuery
310
+ ) {
311
+ return readers .parallelStream ().flatMap (namedIndexReader -> {
292
312
SuggesterProjectData data = projectData .get (namedIndexReader .name );
293
313
if (data == null ) {
294
314
logger .log (Level .FINE , "{0} not yet initialized" , namedIndexReader .name );
@@ -300,26 +320,45 @@ public List<LookupResultItem> search(
300
320
}
301
321
302
322
try {
303
- if (!SuggesterUtils .isComplexQuery (query , suggesterQuery )) { // use WFST for lone prefix
304
- String prefix = ((SuggesterPrefixQuery ) suggesterQuery ).getPrefix ().text ();
305
-
306
- return data .lookup (suggesterQuery .getField (), prefix , resultSize )
307
- .stream ()
308
- .map (item -> new LookupResultItem (item .key .toString (), namedIndexReader .name , item .value ));
309
- } else {
310
- SuggesterSearcher searcher = new SuggesterSearcher (namedIndexReader .reader , resultSize );
323
+ String prefix = suggesterQuery .getPrefix ().text ();
311
324
312
- List <LookupResultItem > resultItems = searcher .suggest (query , namedIndexReader .name , suggesterQuery ,
313
- data .getSearchCounts (suggesterQuery .getField ()));
314
-
315
- return resultItems .stream ();
316
- }
325
+ return data .lookup (suggesterQuery .getField (), prefix , resultSize )
326
+ .stream ()
327
+ .map (item -> new LookupResultItem (item .key .toString (), namedIndexReader .name , item .value ));
317
328
} finally {
318
329
data .unlock ();
319
330
}
320
331
}).collect (Collectors .toList ());
332
+ }
321
333
322
- return SuggesterUtils .combineResults (results , resultSize );
334
+ private List <LookupResultItem > complexLookup (
335
+ final List <NamedIndexReader > readers ,
336
+ final SuggesterQuery suggesterQuery ,
337
+ final Query query
338
+ ) {
339
+ List <LookupResultItem > results = new ArrayList <>(readers .size () * resultSize );
340
+ List <SuggesterSearchTask > searchTasks = new ArrayList <>(readers .size ());
341
+ for (NamedIndexReader ir : readers ) {
342
+ searchTasks .add (new SuggesterSearchTask (ir , query , suggesterQuery , results ));
343
+ }
344
+
345
+ try {
346
+ executorService .invokeAll (searchTasks , MAX_TIME_MS , TimeUnit .MILLISECONDS );
347
+ } catch (InterruptedException e ) {
348
+ logger .log (Level .WARNING , "Interrupted while invoking suggester search" , e );
349
+ Thread .currentThread ().interrupt ();
350
+ }
351
+
352
+ // wait for tasks to finish
353
+ for (SuggesterSearchTask searchTask : searchTasks ) {
354
+ if (!searchTask .started ) {
355
+ continue ;
356
+ }
357
+ // "spin lock" – should be fast since all the tasks either finished or were interrupted
358
+ while (!searchTask .finished ) {
359
+ }
360
+ }
361
+ return results ;
323
362
}
324
363
325
364
/**
@@ -422,6 +461,7 @@ public List<Entry<BytesRef, Integer>> getSearchCounts(
422
461
*/
423
462
@ Override
424
463
public void close () {
464
+ executorService .shutdownNow ();
425
465
projectData .values ().forEach (f -> {
426
466
try {
427
467
f .close ();
@@ -431,6 +471,62 @@ public void close() {
431
471
});
432
472
}
433
473
474
+ private class SuggesterSearchTask implements Callable <Void > {
475
+
476
+ private final NamedIndexReader namedIndexReader ;
477
+ private final Query query ;
478
+ private final SuggesterQuery suggesterQuery ;
479
+ private final List <LookupResultItem > results ;
480
+
481
+ private volatile boolean finished = false ;
482
+ private volatile boolean started = false ;
483
+
484
+ SuggesterSearchTask (
485
+ final NamedIndexReader namedIndexReader ,
486
+ final Query query ,
487
+ final SuggesterQuery suggesterQuery ,
488
+ final List <LookupResultItem > results
489
+ ) {
490
+ this .namedIndexReader = namedIndexReader ;
491
+ this .query = query ;
492
+ this .suggesterQuery = suggesterQuery ;
493
+ this .results = results ;
494
+ }
495
+
496
+ @ Override
497
+ public Void call () {
498
+ try {
499
+ started = true ;
500
+
501
+ SuggesterProjectData data = projectData .get (namedIndexReader .name );
502
+ if (data == null ) {
503
+ logger .log (Level .FINE , "{0} not yet initialized" , namedIndexReader .name );
504
+ return null ;
505
+ }
506
+ boolean gotLock = data .tryLock ();
507
+ if (!gotLock ) { // do not wait for rebuild
508
+ return null ;
509
+ }
510
+
511
+ try {
512
+ SuggesterSearcher searcher = new SuggesterSearcher (namedIndexReader .reader , resultSize );
513
+
514
+ List <LookupResultItem > resultItems = searcher .suggest (query , namedIndexReader .name , suggesterQuery ,
515
+ data .getSearchCounts (suggesterQuery .getField ()));
516
+
517
+ synchronized (results ) {
518
+ results .addAll (resultItems );
519
+ }
520
+ } finally {
521
+ data .unlock ();
522
+ }
523
+ } finally {
524
+ finished = true ;
525
+ }
526
+ return null ;
527
+ }
528
+ }
529
+
434
530
/**
435
531
* Model classes for holding project name and path to ist index directory.
436
532
*/
0 commit comments