Skip to content

Commit 1e75da1

Browse files
author
Vladimir Kotal
authored
report duration of PendingFileCompleter (#2947)
1 parent 555fbbc commit 1e75da1

File tree

7 files changed

+324
-63
lines changed

7 files changed

+324
-63
lines changed

opengrok-indexer/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@ Portions Copyright (c) 2017-2018, Chris Fraire <[email protected]>.
4646
<groupId>org.apache.bcel</groupId>
4747
<artifactId>bcel</artifactId>
4848
</dependency>
49+
<dependency>
50+
<groupId>org.apache.commons</groupId>
51+
<artifactId>commons-lang3</artifactId>
52+
<version>${apache-commons-lang3.version}</version>
53+
</dependency>
4954
<dependency>
5055
<groupId>org.apache.lucene</groupId>
5156
<artifactId>lucene-core</artifactId>

opengrok-indexer/src/main/java/org/opengrok/indexer/index/IndexDatabase.java

Lines changed: 3 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@
9999
import org.opengrok.indexer.util.ForbiddenSymlinkException;
100100
import org.opengrok.indexer.util.IOUtils;
101101
import org.opengrok.indexer.util.ObjectPool;
102+
import org.opengrok.indexer.util.Progress;
102103
import org.opengrok.indexer.util.Statistics;
103104
import org.opengrok.indexer.util.TandemPath;
104105
import org.opengrok.indexer.web.Util;
@@ -1026,27 +1027,6 @@ private boolean isLocal(String path) {
10261027
return local;
10271028
}
10281029

1029-
private void printProgress(String dir, int currentCount, int totalCount) {
1030-
if (totalCount > 0 && RuntimeEnvironment.getInstance().isPrintProgress()) {
1031-
Level currentLevel;
1032-
if (currentCount <= 1 || currentCount >= totalCount ||
1033-
currentCount % 100 == 0) {
1034-
currentLevel = Level.INFO;
1035-
} else if (currentCount % 50 == 0) {
1036-
currentLevel = Level.FINE;
1037-
} else if (currentCount % 10 == 0) {
1038-
currentLevel = Level.FINER;
1039-
} else {
1040-
currentLevel = Level.FINEST;
1041-
}
1042-
if (LOGGER.isLoggable(currentLevel)) {
1043-
LOGGER.log(currentLevel, "Progress: {0} ({1}%) for {2}",
1044-
new Object[]{currentCount, currentCount * 100.0f /
1045-
totalCount, dir});
1046-
}
1047-
}
1048-
}
1049-
10501030
/**
10511031
* Executes the first, serial stage of indexing, recursively.
10521032
* <p>Files at least are counted, and any deleted or updated files (based on
@@ -1180,7 +1160,7 @@ private void indexParallel(String dir, IndexDownArgs args) {
11801160
ObjectPool<Ctags> ctagsPool = parallelizer.getCtagsPool();
11811161

11821162
Map<Boolean, List<IndexFileWork>> bySuccess = null;
1183-
try {
1163+
try (Progress progress = new Progress(LOGGER, dir, worksCount)) {
11841164
bySuccess = parallelizer.getForkJoinPool().submit(() ->
11851165
args.works.parallelStream().collect(
11861166
Collectors.groupingByConcurrent((x) -> {
@@ -1225,8 +1205,7 @@ private void indexParallel(String dir, IndexDownArgs args) {
12251205
}
12261206
}
12271207

1228-
int ncount = currentCounter.incrementAndGet();
1229-
printProgress(dir, ncount, worksCount);
1208+
progress.increment();
12301209
return ret;
12311210
}
12321211
}))).get();

opengrok-indexer/src/main/java/org/opengrok/indexer/index/PendingFileCompleter.java

Lines changed: 67 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import java.nio.file.SimpleFileVisitor;
3535
import java.nio.file.StandardCopyOption;
3636
import java.nio.file.attribute.BasicFileAttributes;
37+
import java.time.Duration;
38+
import java.time.Instant;
3739
import java.util.Comparator;
3840
import java.util.HashSet;
3941
import java.util.List;
@@ -43,7 +45,10 @@
4345
import java.util.logging.Level;
4446
import java.util.logging.Logger;
4547
import java.util.stream.Collectors;
48+
49+
import org.apache.commons.lang3.time.DurationFormatUtils;
4650
import org.opengrok.indexer.logger.LoggerFactory;
51+
import org.opengrok.indexer.util.Progress;
4752
import org.opengrok.indexer.util.TandemPath;
4853

4954
/**
@@ -166,12 +171,27 @@ public boolean add(PendingFileRenaming e) {
166171
* @throws java.io.IOException if an I/O error occurs
167172
*/
168173
public int complete() throws IOException {
174+
Instant start = Instant.now();
169175
int numDeletions = completeDeletions();
170-
LOGGER.log(Level.FINE, "deleted {0} file(s)", numDeletions);
176+
LOGGER.log(Level.FINE, "deleted {0} file(s) (took {1})",
177+
new Object[]{numDeletions, DurationFormatUtils.
178+
formatDurationWords(Duration.between(start, Instant.now()).toMillis(),
179+
true, true)});
180+
181+
start = Instant.now();
171182
int numRenamings = completeRenamings();
172-
LOGGER.log(Level.FINE, "renamed {0} file(s)", numRenamings);
183+
LOGGER.log(Level.FINE, "renamed {0} file(s) (took {1})",
184+
new Object[]{numRenamings, DurationFormatUtils.
185+
formatDurationWords(Duration.between(start, Instant.now()).toMillis(),
186+
true, true)});
187+
188+
start = Instant.now();
173189
int numLinkages = completeLinkages();
174-
LOGGER.log(Level.FINE, "affirmed links for {0} path(s)", numLinkages);
190+
LOGGER.log(Level.FINE, "affirmed links for {0} path(s) (took {1})",
191+
new Object[]{numLinkages, DurationFormatUtils.
192+
formatDurationWords(Duration.between(start, Instant.now()).toMillis(),
193+
true, true)});
194+
175195
return numDeletions + numRenamings + numLinkages;
176196
}
177197

@@ -194,18 +214,21 @@ private int completeRenamings() throws IOException {
194214
new PendingFileRenamingExec(f.getTransientPath(),
195215
f.getAbsolutePath())).collect(
196216
Collectors.toList());
197-
198-
Map<Boolean, List<PendingFileRenamingExec>> bySuccess =
199-
pendingExecs.parallelStream().collect(
200-
Collectors.groupingByConcurrent((x) -> {
201-
try {
202-
doRename(x);
203-
return true;
204-
} catch (IOException e) {
205-
x.exception = e;
206-
return false;
207-
}
208-
}));
217+
Map<Boolean, List<PendingFileRenamingExec>> bySuccess;
218+
219+
try (Progress progress = new Progress(LOGGER, "pending renames", numPending)) {
220+
bySuccess = pendingExecs.parallelStream().collect(
221+
Collectors.groupingByConcurrent((x) -> {
222+
progress.increment();
223+
try {
224+
doRename(x);
225+
return true;
226+
} catch (IOException e) {
227+
x.exception = e;
228+
return false;
229+
}
230+
}));
231+
}
209232
renames.clear();
210233

211234
List<PendingFileRenamingExec> failures = bySuccess.getOrDefault(
@@ -240,18 +263,21 @@ private int completeDeletions() throws IOException {
240263
parallelStream().map(f ->
241264
new PendingFileDeletionExec(f.getAbsolutePath())).collect(
242265
Collectors.toList());
243-
244-
Map<Boolean, List<PendingFileDeletionExec>> bySuccess =
245-
pendingExecs.parallelStream().collect(
246-
Collectors.groupingByConcurrent((x) -> {
247-
try {
248-
doDelete(x);
249-
return true;
250-
} catch (IOException e) {
251-
x.exception = e;
252-
return false;
253-
}
254-
}));
266+
Map<Boolean, List<PendingFileDeletionExec>> bySuccess;
267+
268+
try (Progress progress = new Progress(LOGGER, "pending deletions", numPending)) {
269+
bySuccess = pendingExecs.parallelStream().collect(
270+
Collectors.groupingByConcurrent((x) -> {
271+
progress.increment();
272+
try {
273+
doDelete(x);
274+
return true;
275+
} catch (IOException e) {
276+
x.exception = e;
277+
return false;
278+
}
279+
}));
280+
}
255281
deletions.clear();
256282

257283
List<PendingFileDeletionExec> successes = bySuccess.getOrDefault(
@@ -292,17 +318,20 @@ private int completeLinkages() throws IOException {
292318
new PendingSymlinkageExec(f.getSourcePath(),
293319
f.getTargetRelPath())).collect(Collectors.toList());
294320

295-
Map<Boolean, List<PendingSymlinkageExec>> bySuccess =
296-
pendingExecs.parallelStream().collect(
297-
Collectors.groupingByConcurrent((x) -> {
298-
try {
299-
doLink(x);
300-
return true;
301-
} catch (IOException e) {
302-
x.exception = e;
303-
return false;
304-
}
305-
}));
321+
Map<Boolean, List<PendingSymlinkageExec>> bySuccess;
322+
try (Progress progress = new Progress(LOGGER, "pending renames", numPending)) {
323+
bySuccess = pendingExecs.parallelStream().collect(
324+
Collectors.groupingByConcurrent((x) -> {
325+
progress.increment();
326+
try {
327+
doLink(x);
328+
return true;
329+
} catch (IOException e) {
330+
x.exception = e;
331+
return false;
332+
}
333+
}));
334+
}
306335
linkages.clear();
307336

308337
List<PendingSymlinkageExec> failures = bySuccess.getOrDefault(
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
* CDDL HEADER START
3+
*
4+
* The contents of this file are subject to the terms of the
5+
* Common Development and Distribution License (the "License").
6+
* You may not use this file except in compliance with the License.
7+
*
8+
* See LICENSE.txt included in this distribution for the specific
9+
* language governing permissions and limitations under the License.
10+
*
11+
* When distributing Covered Code, include this CDDL HEADER in each
12+
* file and include the License file at LICENSE.txt.
13+
* If applicable, add the following below this CDDL HEADER, with the
14+
* fields enclosed by brackets "[]" replaced with your own identifying
15+
* information: Portions Copyright [yyyy] [name of copyright owner]
16+
*
17+
* CDDL HEADER END
18+
*/
19+
20+
/*
21+
* Copyright (c) 2007, 2019, Oracle and/or its affiliates. All rights reserved.
22+
* Portions Copyright (c) 2017-2018, Chris Fraire <[email protected]>.
23+
*/
24+
25+
package org.opengrok.indexer.util;
26+
27+
import org.opengrok.indexer.configuration.RuntimeEnvironment;
28+
29+
import java.util.concurrent.atomic.AtomicLong;
30+
import java.util.logging.Level;
31+
import java.util.logging.Logger;
32+
33+
public class Progress implements AutoCloseable {
34+
private final Logger logger;
35+
private final long totalCount;
36+
private final String suffix;
37+
38+
private AtomicLong currentCount = new AtomicLong();
39+
private Thread loggerThread = null;
40+
private volatile boolean run;
41+
42+
private final Object sync = new Object();
43+
44+
/**
45+
* @param logger logger instance
46+
* @param suffix string suffix to identify the operation
47+
* @param totalCount total count
48+
*/
49+
public Progress(Logger logger, String suffix, long totalCount) {
50+
this.logger = logger;
51+
this.suffix = suffix;
52+
this.totalCount = totalCount;
53+
54+
// Assuming printProgress configuration setting cannot be changed on the fly.
55+
if (totalCount > 0 && RuntimeEnvironment.getInstance().isPrintProgress()) {
56+
// spawn a logger thread.
57+
run = true;
58+
loggerThread = new Thread(this::logLoop,
59+
"progress-thread-" + suffix.replaceAll(" ", "_"));
60+
loggerThread.start();
61+
}
62+
}
63+
64+
// for testing
65+
Thread getLoggerThread() {
66+
return loggerThread;
67+
}
68+
69+
/**
70+
* Increment counter. The actual logging will be done eventually.
71+
*/
72+
public void increment() {
73+
this.currentCount.incrementAndGet();
74+
75+
if (loggerThread != null) {
76+
// nag the thread.
77+
synchronized (sync) {
78+
sync.notify();
79+
}
80+
}
81+
}
82+
83+
private void logLoop() {
84+
long cachedCount = 0;
85+
86+
while (true) {
87+
long currentCount = this.currentCount.get();
88+
Level currentLevel;
89+
90+
if (cachedCount < currentCount) {
91+
if (currentCount <= 1 || currentCount % 100 == 0) {
92+
currentLevel = Level.INFO;
93+
} else if (currentCount % 50 == 0) {
94+
currentLevel = Level.FINE;
95+
} else if (currentCount % 10 == 0) {
96+
currentLevel = Level.FINER;
97+
} else {
98+
currentLevel = Level.FINEST;
99+
}
100+
101+
// Do not log if there was no progress.
102+
if (logger.isLoggable(currentLevel)) {
103+
logger.log(currentLevel, "Progress: {0} ({1}%) for {2}",
104+
new Object[]{currentCount, currentCount * 100.0f /
105+
totalCount, suffix});
106+
}
107+
}
108+
109+
if (!run) {
110+
return;
111+
}
112+
113+
cachedCount = currentCount;
114+
115+
// wait for event
116+
try {
117+
synchronized (sync) {
118+
if (!run) {
119+
// Loop once more to do the final logging.
120+
continue;
121+
}
122+
sync.wait();
123+
}
124+
} catch (InterruptedException e) {
125+
logger.log(Level.WARNING, "logger thread interrupted");
126+
}
127+
}
128+
}
129+
130+
@Override
131+
public void close() {
132+
if (loggerThread == null) {
133+
return;
134+
}
135+
136+
try {
137+
run = false;
138+
synchronized (sync) {
139+
sync.notify();
140+
}
141+
loggerThread.join();
142+
} catch (InterruptedException e) {
143+
logger.log(Level.WARNING, "logger thread interrupted");
144+
}
145+
}
146+
}

0 commit comments

Comments
 (0)