diff --git a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java index 76c229ff9088..aebfd6013632 100644 --- a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java +++ b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java @@ -52,6 +52,8 @@ private static final class DateFormatInfo { private ArrayList inputFiles = new ArrayList<>(); private int nextFile = 0; private int iteration = 0; + private int[] threadIndex; + private volatile boolean threadIndexCreated; @Override public void setConfig(Config config) { @@ -102,19 +104,33 @@ public void close() throws IOException { public DocData getNextDocData(DocData docData) throws NoMoreDataException, IOException { Path f = null; String name = null; - synchronized (this) { - if (nextFile >= inputFiles.size()) { - // exhausted files, start a new round, unless forever set to false. - if (!forever) { - throw new NoMoreDataException(); - } - nextFile = 0; - iteration++; - } - f = inputFiles.get(nextFile++); - name = f.toRealPath() + "_" + iteration; + int inputFilesSize = inputFiles.size(); + + if (threadIndexCreated == false) { + createThreadIndex(); + } + + // Getting file index value which is set for each thread + int index = Integer.parseInt(Thread.currentThread().getName().substring(12)); + int fIndex = index + threadIndex[index] * threadIndex.length; + threadIndex[index]++; + + // Sanity check, if # threads is greater than # input files, wrap index + if (index >= inputFilesSize) { + index %= inputFilesSize; } + // Check if this thread has exhausted its files + if (fIndex >= inputFilesSize) { + threadIndex[index] = 0; + fIndex = index + threadIndex[index] * threadIndex.length; + threadIndex[index]++; + iteration++; + } + + f = inputFiles.get(fIndex); + name = f.toRealPath() + "_" + iteration; + try (BufferedReader reader = Files.newBufferedReader(f, StandardCharsets.UTF_8)) { // First line is the date, 3rd is the title, rest is body String dateStr = reader.readLine(); @@ -146,4 +162,11 @@ public synchronized void resetInputs() throws IOException { nextFile = 0; iteration = 0; } + + private synchronized void createThreadIndex() { + if (threadIndexCreated == false) { + threadIndex = new int[getConfig().getNumThreads()]; + threadIndexCreated = true; + } + } } diff --git a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java index d87f724d1a0c..37d89c996b3d 100644 --- a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java +++ b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java @@ -334,12 +334,16 @@ private int doParallelTasks() throws Exception { initTasksArray(); ParallelTask t[] = runningParallelTasks = new ParallelTask[repetitions * tasks.size()]; + this.getRunData().getConfig().setNumThreads(t.length); // prepare threads int index = 0; for (int k = 0; k < repetitions; k++) { for (int i = 0; i < tasksArray.length; i++) { final PerfTask task = tasksArray[i].clone(); - t[index++] = new ParallelTask(task); + t[index] = new ParallelTask(task); + // Set the thread name for guaranteed file index while processing. + t[index].setName("IndexThread-" + index); + index++; } } // run threads diff --git a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Config.java b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Config.java index 74709156c9a5..5eafb553fcf7 100644 --- a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Config.java +++ b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Config.java @@ -54,6 +54,7 @@ public class Config { private HashMap valByRound = new HashMap<>(); private HashMap colForValByRound = new HashMap<>(); private String algorithmText; + private int numThreads = 1; /** * Read both algorithm and config properties. @@ -113,6 +114,14 @@ public Config(Properties props) { } } + public void setNumThreads(int numThreads) { + this.numThreads = numThreads; + } + + public int getNumThreads() { + return numThreads; + } + @SuppressWarnings({"unchecked", "rawtypes"}) private void printProps() { System.out.println("------------> config properties:");