Skip to content

Commit e09979c

Browse files
committed
Enhance Lucene benchmark module's ReutersContentSource to use more concurrency while indexing
1 parent 22c1c78 commit e09979c

File tree

5 files changed

+63
-20
lines changed

5 files changed

+63
-20
lines changed

lucene/benchmark/conf/indexing-flush-by-RAM-multithreaded.alg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ log.queries=true
5353

5454
{ "Populate"
5555
CreateIndex
56-
[{ "MAddDocs" AddDoc } : 5000] : 4
56+
[{ {{"MAddDocs" AddDoc } : 5000} FlushIndex } ] : 8
5757
ForceMerge(1)
5858
CloseIndex
5959
}

lucene/benchmark/src/java/org/apache/lucene/benchmark/Constants.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,6 @@ public class Constants {
2828
public static Boolean[] BOOLEANS = new Boolean[] { Boolean.FALSE, Boolean.TRUE };
2929

3030
public static final int DEFAULT_MAXIMUM_DOCUMENTS = Integer.MAX_VALUE;
31+
32+
public static final String PARALLEL_TASK_THREAD_NAME_PREFIX = "ParallelTaskThread";
3133
}

lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.Date;
3131
import java.util.Locale;
3232

33+
import org.apache.lucene.benchmark.Constants;
3334
import org.apache.lucene.benchmark.byTask.utils.Config;
3435

3536
/**
@@ -52,8 +53,8 @@ private static final class DateFormatInfo {
5253
private ThreadLocal<DateFormatInfo> dateFormat = new ThreadLocal<>();
5354
private Path dataDir = null;
5455
private ArrayList<Path> inputFiles = new ArrayList<>();
55-
private int nextFile = 0;
56-
private int iteration = 0;
56+
private int[] docCountArr;
57+
private volatile boolean docCountArrCreated;
5758

5859
@Override
5960
public void setConfig(Config config) {
@@ -103,21 +104,35 @@ public void close() throws IOException {
103104

104105
@Override
105106
public DocData getNextDocData(DocData docData) throws NoMoreDataException, IOException {
106-
Path f = null;
107-
String name = null;
108-
synchronized (this) {
109-
if (nextFile >= inputFiles.size()) {
110-
// exhausted files, start a new round, unless forever set to false.
111-
if (!forever) {
112-
throw new NoMoreDataException();
113-
}
114-
nextFile = 0;
115-
iteration++;
116-
}
117-
f = inputFiles.get(nextFile++);
118-
name = f.toRealPath() + "_" + iteration;
107+
if (docCountArrCreated == false) {
108+
docCountArrInit();
119109
}
120110

111+
int threadIndexSize = Thread.currentThread().getName().length();
112+
int parallelTaskThreadSize = Constants.PARALLEL_TASK_THREAD_NAME_PREFIX.length();
113+
114+
// Extract ThreadIndex from unique ThreadName which is set with '"ParallelTaskThread-"+index',
115+
// in TaskSequence.java's doParallelTasks()
116+
int threadIndex =
117+
Integer.parseInt(
118+
Thread.currentThread()
119+
.getName()
120+
.substring(parallelTaskThreadSize + 1, threadIndexSize));
121+
122+
assert (threadIndex >= 0 && threadIndex < docCountArr.length)
123+
: "Please check threadIndex or docCountArr length";
124+
int stride = threadIndex + docCountArr[threadIndex] * docCountArr.length;
125+
int inFileSize = inputFiles.size();
126+
127+
// Modulo Operator covers all three possible senarios i.e. 1. If inputFiles.size() < Num Of
128+
// Threads 2.inputFiles.size() == Num Of Threads 3.inputFiles.size() > Num Of Threads
129+
int fileIndex = stride % inFileSize;
130+
int iteration = stride / inFileSize;
131+
docCountArr[threadIndex]++;
132+
133+
Path f = inputFiles.get(fileIndex);
134+
String name = f.toRealPath() + "_" + iteration;
135+
121136
try (BufferedReader reader = Files.newBufferedReader(f, StandardCharsets.UTF_8)) {
122137
// First line is the date, 3rd is the title, rest is body
123138
String dateStr = reader.readLine();
@@ -146,8 +161,13 @@ public DocData getNextDocData(DocData docData) throws NoMoreDataException, IOExc
146161
@Override
147162
public synchronized void resetInputs() throws IOException {
148163
super.resetInputs();
149-
nextFile = 0;
150-
iteration = 0;
164+
}
165+
166+
private synchronized void docCountArrInit() {
167+
if (docCountArrCreated == false) {
168+
docCountArr = new int[getConfig().getNumThreads()];
169+
docCountArrCreated = true;
170+
}
151171
}
152172

153173
}

lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,12 @@
1717
package org.apache.lucene.benchmark.byTask.tasks;
1818

1919

20+
import java.text.NumberFormat;
2021
import java.util.ArrayList;
2122
import java.util.List;
2223
import java.util.Locale;
23-
import java.text.NumberFormat;
2424

25+
import org.apache.lucene.benchmark.Constants;
2526
import org.apache.lucene.benchmark.byTask.PerfRunData;
2627
import org.apache.lucene.benchmark.byTask.feeds.NoMoreDataException;
2728
import org.apache.lucene.benchmark.byTask.stats.TaskStats;
@@ -345,12 +346,23 @@ private int doParallelTasks() throws Exception {
345346

346347
initTasksArray();
347348
ParallelTask t[] = runningParallelTasks = new ParallelTask[repetitions * tasks.size()];
349+
// Get number of parallel threads from algo file and set it to use in ReuersContentSource.java's
350+
// docCountArrInit()
351+
this.getRunData().getConfig().setNumThreads(t.length);
348352
// prepare threads
349353
int index = 0;
350354
for (int k=0; k<repetitions; k++) {
351355
for (int i = 0; i < tasksArray.length; i++) {
352356
final PerfTask task = tasksArray[i].clone();
353-
t[index++] = new ParallelTask(task);
357+
t[index] = new ParallelTask(task);
358+
// Setting unique ThreadName with index value which is used in ReuersContentSource.java's
359+
// getNextDocData().Please make changes
360+
// in ReuersContentSource.java's getNextDocData() for
361+
// Integer.parseInt(Thread.currentThread().getName().substring(parallelTaskThreadSize + 1,
362+
// threadIndexSize))
363+
// before making any modifications here
364+
t[index].setName(Constants.PARALLEL_TASK_THREAD_NAME_PREFIX + "-" + index);
365+
index++;
354366
}
355367
}
356368
// run threads

lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Config.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ public class Config {
5454
private HashMap<String, Object> valByRound = new HashMap<>();
5555
private HashMap<String, String> colForValByRound = new HashMap<>();
5656
private String algorithmText;
57+
private int numThreads = 1;
5758

5859
/**
5960
* Read both algorithm and config properties.
@@ -112,6 +113,14 @@ public Config (Properties props) {
112113
}
113114
}
114115

116+
public void setNumThreads(int numThreads) {
117+
this.numThreads = numThreads;
118+
}
119+
120+
public int getNumThreads() {
121+
return numThreads;
122+
}
123+
115124
@SuppressWarnings({"unchecked", "rawtypes"})
116125
private void printProps() {
117126
System.out.println("------------> config properties:");

0 commit comments

Comments
 (0)