From 1a771bb3f621240b1c3de15d63ca44ffdec7509f Mon Sep 17 00:00:00 2001 From: balmukundblr Date: Wed, 10 Feb 2021 06:13:52 +0000 Subject: [PATCH 1/6] Added the code to read input files from multiple parallel threads for better resource utilizations --- .../byTask/feeds/ReutersContentSource.java | 48 ++++++++++++++----- .../benchmark/byTask/tasks/TaskSequence.java | 1 + .../lucene/benchmark/byTask/utils/Config.java | 9 ++++ 3 files changed, 47 insertions(+), 11 deletions(-) diff --git a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java index 76c229ff9088..6b87262edf88 100644 --- a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java +++ b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java @@ -52,6 +52,8 @@ private static final class DateFormatInfo { private ArrayList inputFiles = new ArrayList<>(); private int nextFile = 0; private int iteration = 0; + private int[] threadIndex; + private volatile boolean threadIndexCreated; @Override public void setConfig(Config config) { @@ -102,19 +104,36 @@ public void close() throws IOException { public DocData getNextDocData(DocData docData) throws NoMoreDataException, IOException { Path f = null; String name = null; - synchronized (this) { - if (nextFile >= inputFiles.size()) { - // exhausted files, start a new round, unless forever set to false. - if (!forever) { - throw new NoMoreDataException(); - } - nextFile = 0; - iteration++; - } - f = inputFiles.get(nextFile++); - name = f.toRealPath() + "_" + iteration; + int inputFilesSize = inputFiles.size(); + + /** + * synchronized (this) { if (nextFile >= inputFiles.size()) { // exhausted files, start a new + * round, unless forever set to false. if (!forever) { throw new NoMoreDataException(); } + * nextFile = 0; iteration++; } f = inputFiles.get(nextFile++); name = f.toRealPath() + "_" + + * iteration; }* + */ + if (!threadIndexCreated) { + createThreadIndex(); + } + + int index = (int) Thread.currentThread().getId() % threadIndex.length; + int fIndex = index + threadIndex[index] * threadIndex.length; + threadIndex[index]++; + + // Sanity check, if # threads is greater than # input files, wrap index + if (index >= inputFilesSize) index %= inputFilesSize; + + // Check if this thread has exhausted its files + if (fIndex >= inputFilesSize) { + threadIndex[index] = 0; + fIndex = index + threadIndex[index] * threadIndex.length; + threadIndex[index]++; + iteration++; } + f = inputFiles.get(fIndex); + name = f.toRealPath() + "_" + iteration; + try (BufferedReader reader = Files.newBufferedReader(f, StandardCharsets.UTF_8)) { // First line is the date, 3rd is the title, rest is body String dateStr = reader.readLine(); @@ -146,4 +165,11 @@ public synchronized void resetInputs() throws IOException { nextFile = 0; iteration = 0; } + + private synchronized void createThreadIndex() { + if (!threadIndexCreated) { + threadIndex = new int[getConfig().getNumThreads()]; + threadIndexCreated = true; + } + } } diff --git a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java index d87f724d1a0c..1ac09b44a5a4 100644 --- a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java +++ b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java @@ -334,6 +334,7 @@ private int doParallelTasks() throws Exception { initTasksArray(); ParallelTask t[] = runningParallelTasks = new ParallelTask[repetitions * tasks.size()]; + this.getRunData().getConfig().setNumThreads(t.length); // prepare threads int index = 0; for (int k = 0; k < repetitions; k++) { diff --git a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Config.java b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Config.java index 74709156c9a5..5eafb553fcf7 100644 --- a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Config.java +++ b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Config.java @@ -54,6 +54,7 @@ public class Config { private HashMap valByRound = new HashMap<>(); private HashMap colForValByRound = new HashMap<>(); private String algorithmText; + private int numThreads = 1; /** * Read both algorithm and config properties. @@ -113,6 +114,14 @@ public Config(Properties props) { } } + public void setNumThreads(int numThreads) { + this.numThreads = numThreads; + } + + public int getNumThreads() { + return numThreads; + } + @SuppressWarnings({"unchecked", "rawtypes"}) private void printProps() { System.out.println("------------> config properties:"); From 1b653aab66737d64c7b68837b6eecf36b5b19e9a Mon Sep 17 00:00:00 2001 From: balmukundblr Date: Wed, 10 Feb 2021 08:27:55 +0000 Subject: [PATCH 2/6] Adding comments --- .../byTask/feeds/ReutersContentSource.java | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java index 6b87262edf88..edf7b0b2d7b8 100644 --- a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java +++ b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java @@ -106,11 +106,18 @@ public DocData getNextDocData(DocData docData) throws NoMoreDataException, IOExc String name = null; int inputFilesSize = inputFiles.size(); - /** - * synchronized (this) { if (nextFile >= inputFiles.size()) { // exhausted files, start a new - * round, unless forever set to false. if (!forever) { throw new NoMoreDataException(); } - * nextFile = 0; iteration++; } f = inputFiles.get(nextFile++); name = f.toRealPath() + "_" + - * iteration; }* + /* + * synchronized (this) { + * if (nextFile >= inputFiles.size()) { // exhausted files, start a new round, unless forever set to false. + * if (!forever) { + * throw new NoMoreDataException(); + * } + * nextFile = 0; + * iteration++; + * } + * f = inputFiles.get(nextFile++); + * name = f.toRealPath() + "_" +iteration; + * }* */ if (!threadIndexCreated) { createThreadIndex(); From 3e5bffdfc79aa22af380fa702d05ecfbbaac12cb Mon Sep 17 00:00:00 2001 From: balmukundblr Date: Wed, 10 Feb 2021 16:36:54 +0000 Subject: [PATCH 3/6] Fixed the formating issue --- .../byTask/feeds/ReutersContentSource.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java index edf7b0b2d7b8..1c12f20b4564 100644 --- a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java +++ b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java @@ -107,17 +107,17 @@ public DocData getNextDocData(DocData docData) throws NoMoreDataException, IOExc int inputFilesSize = inputFiles.size(); /* - * synchronized (this) { - * if (nextFile >= inputFiles.size()) { // exhausted files, start a new round, unless forever set to false. - * if (!forever) { - * throw new NoMoreDataException(); + * synchronized (this) { + * if (nextFile >= inputFiles.size()) { // exhausted files, start a new round, unless forever set to false. + * if (!forever) { + * throw new NoMoreDataException(); + * } + * nextFile = 0; + * iteration++; + * } + * f = inputFiles.get(nextFile++); + * name = f.toRealPath() + "_" +iteration; * } - * nextFile = 0; - * iteration++; - * } - * f = inputFiles.get(nextFile++); - * name = f.toRealPath() + "_" +iteration; - * }* */ if (!threadIndexCreated) { createThreadIndex(); From 517bdea9cbbed3c233bb99df2311bb60aa4e0438 Mon Sep 17 00:00:00 2001 From: balmukundblr Date: Fri, 5 Mar 2021 08:38:23 +0000 Subject: [PATCH 4/6] Incorporating formate issue and some changes for guaranteed file index --- .../byTask/feeds/ReutersContentSource.java | 25 ++++++------------- .../benchmark/byTask/tasks/TaskSequence.java | 5 +++- 2 files changed, 11 insertions(+), 19 deletions(-) diff --git a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java index 1c12f20b4564..00bb7752948b 100644 --- a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java +++ b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java @@ -106,29 +106,18 @@ public DocData getNextDocData(DocData docData) throws NoMoreDataException, IOExc String name = null; int inputFilesSize = inputFiles.size(); - /* - * synchronized (this) { - * if (nextFile >= inputFiles.size()) { // exhausted files, start a new round, unless forever set to false. - * if (!forever) { - * throw new NoMoreDataException(); - * } - * nextFile = 0; - * iteration++; - * } - * f = inputFiles.get(nextFile++); - * name = f.toRealPath() + "_" +iteration; - * } - */ - if (!threadIndexCreated) { + if (threadIndexCreated == false) { createThreadIndex(); } - - int index = (int) Thread.currentThread().getId() % threadIndex.length; + + int index = Integer.parseInt(Thread.currentThread().getName().substring(12)); int fIndex = index + threadIndex[index] * threadIndex.length; threadIndex[index]++; // Sanity check, if # threads is greater than # input files, wrap index - if (index >= inputFilesSize) index %= inputFilesSize; + if (index >= inputFilesSize) { + index %= inputFilesSize; + } // Check if this thread has exhausted its files if (fIndex >= inputFilesSize) { @@ -174,7 +163,7 @@ public synchronized void resetInputs() throws IOException { } private synchronized void createThreadIndex() { - if (!threadIndexCreated) { + if (threadIndexCreated == false) { threadIndex = new int[getConfig().getNumThreads()]; threadIndexCreated = true; } diff --git a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java index 1ac09b44a5a4..63ba404704e0 100644 --- a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java +++ b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java @@ -340,7 +340,10 @@ private int doParallelTasks() throws Exception { for (int k = 0; k < repetitions; k++) { for (int i = 0; i < tasksArray.length; i++) { final PerfTask task = tasksArray[i].clone(); - t[index++] = new ParallelTask(task); + t[index] = new ParallelTask(task); + //Set the thread name for guaranteed file index while processing. + t[index].setName("IndexThread-"+index); + index++; } } // run threads From 84566c13377a03ffa1da7a41b9ede036a4255c14 Mon Sep 17 00:00:00 2001 From: balmukundblr Date: Fri, 5 Mar 2021 08:38:23 +0000 Subject: [PATCH 5/6] Incorporating formate issue and some changes for guaranteed file index --- .../byTask/feeds/ReutersContentSource.java | 25 ++++++------------- .../benchmark/byTask/tasks/TaskSequence.java | 5 +++- 2 files changed, 11 insertions(+), 19 deletions(-) diff --git a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java index 1c12f20b4564..00bb7752948b 100644 --- a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java +++ b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java @@ -106,29 +106,18 @@ public DocData getNextDocData(DocData docData) throws NoMoreDataException, IOExc String name = null; int inputFilesSize = inputFiles.size(); - /* - * synchronized (this) { - * if (nextFile >= inputFiles.size()) { // exhausted files, start a new round, unless forever set to false. - * if (!forever) { - * throw new NoMoreDataException(); - * } - * nextFile = 0; - * iteration++; - * } - * f = inputFiles.get(nextFile++); - * name = f.toRealPath() + "_" +iteration; - * } - */ - if (!threadIndexCreated) { + if (threadIndexCreated == false) { createThreadIndex(); } - - int index = (int) Thread.currentThread().getId() % threadIndex.length; + + int index = Integer.parseInt(Thread.currentThread().getName().substring(12)); int fIndex = index + threadIndex[index] * threadIndex.length; threadIndex[index]++; // Sanity check, if # threads is greater than # input files, wrap index - if (index >= inputFilesSize) index %= inputFilesSize; + if (index >= inputFilesSize) { + index %= inputFilesSize; + } // Check if this thread has exhausted its files if (fIndex >= inputFilesSize) { @@ -174,7 +163,7 @@ public synchronized void resetInputs() throws IOException { } private synchronized void createThreadIndex() { - if (!threadIndexCreated) { + if (threadIndexCreated == false) { threadIndex = new int[getConfig().getNumThreads()]; threadIndexCreated = true; } diff --git a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java index 1ac09b44a5a4..63ba404704e0 100644 --- a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java +++ b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java @@ -340,7 +340,10 @@ private int doParallelTasks() throws Exception { for (int k = 0; k < repetitions; k++) { for (int i = 0; i < tasksArray.length; i++) { final PerfTask task = tasksArray[i].clone(); - t[index++] = new ParallelTask(task); + t[index] = new ParallelTask(task); + //Set the thread name for guaranteed file index while processing. + t[index].setName("IndexThread-"+index); + index++; } } // run threads From 0c43c866ad584e7c8ae594dd6c98fe334cc29bb5 Mon Sep 17 00:00:00 2001 From: balmukundblr Date: Fri, 5 Mar 2021 09:32:56 +0000 Subject: [PATCH 6/6] Resolved some gradle build issues and added the comments --- .../lucene/benchmark/byTask/feeds/ReutersContentSource.java | 5 +++-- .../apache/lucene/benchmark/byTask/tasks/TaskSequence.java | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java index 00bb7752948b..aebfd6013632 100644 --- a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java +++ b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java @@ -109,14 +109,15 @@ public DocData getNextDocData(DocData docData) throws NoMoreDataException, IOExc if (threadIndexCreated == false) { createThreadIndex(); } - + + // Getting file index value which is set for each thread int index = Integer.parseInt(Thread.currentThread().getName().substring(12)); int fIndex = index + threadIndex[index] * threadIndex.length; threadIndex[index]++; // Sanity check, if # threads is greater than # input files, wrap index if (index >= inputFilesSize) { - index %= inputFilesSize; + index %= inputFilesSize; } // Check if this thread has exhausted its files diff --git a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java index 63ba404704e0..37d89c996b3d 100644 --- a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java +++ b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java @@ -341,8 +341,8 @@ private int doParallelTasks() throws Exception { for (int i = 0; i < tasksArray.length; i++) { final PerfTask task = tasksArray[i].clone(); t[index] = new ParallelTask(task); - //Set the thread name for guaranteed file index while processing. - t[index].setName("IndexThread-"+index); + // Set the thread name for guaranteed file index while processing. + t[index].setName("IndexThread-" + index); index++; } }