|
19 | 19 | import org.apache.lucene.store.IOContext; |
20 | 20 | import org.apache.lucene.store.IndexOutput; |
21 | 21 | import org.apache.lucene.store.RateLimitedIndexOutput; |
22 | | -import org.apache.lucene.util.SetOnce; |
23 | 22 | import org.elasticsearch.common.logging.Loggers; |
24 | 23 | import org.elasticsearch.common.settings.Setting; |
25 | 24 | import org.elasticsearch.common.util.concurrent.AbstractRunnable; |
@@ -231,15 +230,15 @@ public IndexOutput createOutput(String name, IOContext context) throws IOExcepti |
231 | 230 |
|
232 | 231 | final class MergeTask extends AbstractRunnable implements Comparable<MergeTask> { |
233 | 232 | private final String name; |
234 | | - private final SetOnce<Long> mergeStartTimeNS; |
| 233 | + private final AtomicLong mergeStartTimeNS; |
235 | 234 | private final MergeSource mergeSource; |
236 | 235 | private final OnGoingMerge onGoingMerge; |
237 | 236 | private final MergeRateLimiter rateLimiter; |
238 | 237 | private final boolean supportsIOThrottling; |
239 | 238 |
|
240 | 239 | MergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, boolean supportsIOThrottling, String name) { |
241 | 240 | this.name = name; |
242 | | - this.mergeStartTimeNS = new SetOnce<>(); |
| 241 | + this.mergeStartTimeNS = new AtomicLong(); |
243 | 242 | this.mergeSource = mergeSource; |
244 | 243 | this.onGoingMerge = new OnGoingMerge(merge); |
245 | 244 | this.rateLimiter = new MergeRateLimiter(merge.getMergeProgress()); |
@@ -268,15 +267,14 @@ public void setIORateLimit(double mbPerSec) { |
268 | 267 | } |
269 | 268 |
|
270 | 269 | public boolean isRunning() { |
271 | | - return mergeStartTimeNS.get() != null; |
| 270 | + return mergeStartTimeNS.get() > 0L; |
272 | 271 | } |
273 | 272 |
|
274 | 273 | @Override |
275 | 274 | public void doRun() throws Exception { |
276 | | - if (isRunning()) { |
| 275 | + if (!mergeStartTimeNS.compareAndSet(0L, System.nanoTime())) { |
277 | 276 | throw new IllegalStateException("Cannot run the same merge task multiple times"); |
278 | 277 | } |
279 | | - mergeStartTimeNS.set(System.nanoTime()); |
280 | 278 | try { |
281 | 279 | beforeMerge(onGoingMerge); |
282 | 280 | mergeTracking.mergeStarted(onGoingMerge); |
@@ -351,7 +349,6 @@ public void onFailure(Exception e) { |
351 | 349 | if (isRunning() == false) { |
352 | 350 | throw new IllegalStateException("onFailure must only be invoked after doRun"); |
353 | 351 | } |
354 | | - assert this.mergeStartTimeNS.get() != null : "onFailure should always be invoked after doRun"; |
355 | 352 | // most commonly the merge should've already be aborted by now, |
356 | 353 | // plus the engine is probably going to be failed when any merge fails, |
357 | 354 | // but keep this in case something believes calling `MergeTask#onFailure` is a sane way to abort a merge |
|
0 commit comments