Skip to content

Commit 803d131

Browse files
committed
LUCENE-9535: Try to do larger flushes.
DWPTPool currently always returns the last DWPT that was added to the pool. By returning the largest DWPT instead, we could try to do larger flushes by finishing DWPTs that are close to being full instead of the last one that was added to the pool, which might be close to being empty. When indexing wikimediumall, this change did not seem to improve the indexing rate significantly, but it didn't slow things down either and the number of flushes went from 224-226 to 216, about 4% less. My expectation is that our nightly benchmarks are a best-case scenario for DWPTPool as the same number of threads is dedicated to indexing over time, but in the case when you have e.g. a single fixed threadpool that is responsible for indexing into several indices, the number of indexing threads that contribute to a given index might greatly vary over time.
1 parent b7b834b commit 803d131

File tree

4 files changed

+254
-17
lines changed

4 files changed

+254
-17
lines changed

lucene/CHANGES.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,9 @@ Improvements
257257
* LUCENE-9944: Allow DrillSideways users to provide their own CollectorManager without also requiring
258258
them to provide an ExecutorService. (Greg Miller)
259259

260+
* LUCENE-9535: Improve DocumentsWriterPerThreadPool to prefer larger instances.
261+
(Adrien Grand)
262+
260263
Bug fixes
261264

262265
* LUCENE-9686: Fix read past EOF handling in DirectIODirectory. (Zach Chen,
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.lucene.index;
18+
19+
import java.util.ArrayList;
20+
import java.util.List;
21+
import java.util.ListIterator;
22+
import java.util.function.Predicate;
23+
24+
/**
25+
* An approximate priority queue, which attempts to poll items by decreasing log of the weight,
26+
* though exact ordering is not guaranteed. This class doesn't support null elements.
27+
*/
28+
final class ApproximatePriorityQueue<T> {
29+
30+
// Indexes between 0 and 63 are sparsely populated, and indexes that are
31+
// greater than or equal to 64 are densely populated
32+
// Items close to the beginning of this list are more likely to have a
33+
// higher weight.
34+
private final List<T> slots = new ArrayList<>(Long.SIZE);
35+
36+
// A bitset where ones indicate that the corresponding index in `slots` is taken.
37+
private long usedSlots = 0L;
38+
39+
ApproximatePriorityQueue() {
40+
for (int i = 0; i < Long.SIZE; ++i) {
41+
slots.add(null);
42+
}
43+
}
44+
45+
/** Add an entry to this queue that has the provided weight. */
46+
void add(T entry, long weight) {
47+
assert entry != null;
48+
49+
// The expected slot of an item is the number of leading zeros of its weight,
50+
// ie. the larger the weight, the closer an item is to the start of the array.
51+
final int expectedSlot = Long.numberOfLeadingZeros(weight);
52+
53+
// If the slot is already taken, we look for the next one that is free.
54+
// The above bitwise operation is equivalent to looping over slots until finding one that is
55+
// free.
56+
final long freeSlots = ~usedSlots;
57+
final int destinationSlot =
58+
expectedSlot + Long.numberOfTrailingZeros(freeSlots >>> expectedSlot);
59+
assert destinationSlot >= expectedSlot;
60+
if (destinationSlot < Long.SIZE) {
61+
usedSlots |= 1L << destinationSlot;
62+
T previous = slots.set(destinationSlot, entry);
63+
assert previous == null;
64+
} else {
65+
slots.add(entry);
66+
}
67+
}
68+
69+
/**
70+
* Return an entry matching the predicate. This will usually be one of the available entries that
71+
* have the highest weight, though this is not guaranteed. This method returns {@code null} if no
72+
* free entries are available.
73+
*/
74+
T poll(Predicate<T> predicate) {
75+
// Look at indexes 0..63 first, which are sparsely populated.
76+
int nextSlot = 0;
77+
do {
78+
final int nextUsedSlot = nextSlot + Long.numberOfTrailingZeros(usedSlots >>> nextSlot);
79+
if (nextUsedSlot >= Long.SIZE) {
80+
break;
81+
}
82+
final T entry = slots.get(nextUsedSlot);
83+
if (predicate.test(entry)) {
84+
usedSlots &= ~(1L << nextUsedSlot);
85+
slots.set(nextUsedSlot, null);
86+
return entry;
87+
} else {
88+
nextSlot = nextUsedSlot + 1;
89+
}
90+
} while (nextSlot < Long.SIZE);
91+
92+
// Then look at indexes 64.. which are densely populated.
93+
// Poll in descending order so that if the number of indexing threads
94+
// decreases, we keep using the same entry over and over again.
95+
// Resizing operations are also less costly on lists when items are closer
96+
// to the end of the list.
97+
for (ListIterator<T> lit = slots.listIterator(slots.size());
98+
lit.previousIndex() >= Long.SIZE; ) {
99+
final T entry = lit.previous();
100+
if (predicate.test(entry)) {
101+
lit.remove();
102+
return entry;
103+
}
104+
}
105+
106+
// No entry matching the predicate was found.
107+
return null;
108+
}
109+
110+
// Only used for assertions
111+
boolean contains(Object o) {
112+
if (o == null) {
113+
throw new NullPointerException();
114+
}
115+
return slots.contains(o);
116+
}
117+
118+
boolean isEmpty() {
119+
return usedSlots == 0 && slots.size() == Long.SIZE;
120+
}
121+
122+
boolean remove(Object o) {
123+
if (o == null) {
124+
throw new NullPointerException();
125+
}
126+
int index = slots.indexOf(o);
127+
if (index == -1) {
128+
return false;
129+
}
130+
if (index >= Long.SIZE) {
131+
slots.remove(index);
132+
} else {
133+
usedSlots &= ~(1L << index);
134+
slots.set(index, null);
135+
}
136+
return true;
137+
}
138+
}

lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,8 @@
1717
package org.apache.lucene.index;
1818

1919
import java.io.Closeable;
20-
import java.util.ArrayDeque;
2120
import java.util.ArrayList;
2221
import java.util.Collections;
23-
import java.util.Deque;
2422
import java.util.IdentityHashMap;
2523
import java.util.Iterator;
2624
import java.util.List;
@@ -46,7 +44,8 @@ final class DocumentsWriterPerThreadPool implements Iterable<DocumentsWriterPerT
4644

4745
private final Set<DocumentsWriterPerThread> dwpts =
4846
Collections.newSetFromMap(new IdentityHashMap<>());
49-
private final Deque<DocumentsWriterPerThread> freeList = new ArrayDeque<>();
47+
private final ApproximatePriorityQueue<DocumentsWriterPerThread> freeList =
48+
new ApproximatePriorityQueue<>();
5049
private final Supplier<DocumentsWriterPerThread> dwptFactory;
5150
private int takenWriterPermits = 0;
5251
private boolean closed;
@@ -116,21 +115,12 @@ private synchronized DocumentsWriterPerThread newWriter() {
116115
DocumentsWriterPerThread getAndLock() {
117116
synchronized (this) {
118117
ensureOpen();
119-
// Important that we are LIFO here! This way if number of concurrent indexing threads was once
120-
// high,
121-
// but has now reduced, we only use a limited number of DWPTs. This also guarantees that if we
122-
// have suddenly
123-
// a single thread indexing
124-
final Iterator<DocumentsWriterPerThread> descendingIterator = freeList.descendingIterator();
125-
while (descendingIterator.hasNext()) {
126-
DocumentsWriterPerThread perThread = descendingIterator.next();
127-
if (perThread.tryLock()) {
128-
descendingIterator.remove();
129-
return perThread;
130-
}
118+
DocumentsWriterPerThread dwpt = freeList.poll(DocumentsWriterPerThread::tryLock);
119+
if (dwpt == null) {
120+
dwpt = newWriter();
131121
}
132122
// DWPT is already locked before return by this method:
133-
return newWriter();
123+
return dwpt;
134124
}
135125
}
136126

@@ -141,10 +131,11 @@ private void ensureOpen() {
141131
}
142132

143133
void marksAsFreeAndUnlock(DocumentsWriterPerThread state) {
134+
final long ramBytesUsed = state.ramBytesUsed();
144135
synchronized (this) {
145136
assert dwpts.contains(state)
146137
: "we tried to add a DWPT back to the pool but the pool doesn't know aobut this DWPT";
147-
freeList.add(state);
138+
freeList.add(state, ramBytesUsed);
148139
}
149140
state.unlock();
150141
}
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.lucene.index;
18+
19+
import org.apache.lucene.util.LuceneTestCase;
20+
21+
public class TestApproximatePriorityQueue extends LuceneTestCase {
22+
23+
public void testBasics() {
24+
ApproximatePriorityQueue<Long> pq = new ApproximatePriorityQueue<>();
25+
pq.add(8L, 8L);
26+
pq.add(32L, 32L);
27+
pq.add(0L, 0L);
28+
assertFalse(pq.isEmpty());
29+
assertEquals(Long.valueOf(32L), pq.poll(x -> true));
30+
assertFalse(pq.isEmpty());
31+
assertEquals(Long.valueOf(8L), pq.poll(x -> true));
32+
assertFalse(pq.isEmpty());
33+
assertEquals(Long.valueOf(0L), pq.poll(x -> true));
34+
assertTrue(pq.isEmpty());
35+
assertNull(pq.poll(x -> true));
36+
}
37+
38+
public void testPollThenAdd() {
39+
ApproximatePriorityQueue<Long> pq = new ApproximatePriorityQueue<>();
40+
pq.add(8L, 8L);
41+
assertEquals(Long.valueOf(8L), pq.poll(x -> true));
42+
assertNull(pq.poll(x -> true));
43+
pq.add(0L, 0L);
44+
assertEquals(Long.valueOf(0L), pq.poll(x -> true));
45+
assertNull(pq.poll(x -> true));
46+
pq.add(0L, 0L);
47+
assertEquals(Long.valueOf(0L), pq.poll(x -> true));
48+
assertNull(pq.poll(x -> true));
49+
}
50+
51+
public void testCollision() {
52+
ApproximatePriorityQueue<Long> pq = new ApproximatePriorityQueue<>();
53+
pq.add(2L, 2L);
54+
pq.add(1L, 1L);
55+
pq.add(0L, 0L);
56+
pq.add(3L, 3L); // Same nlz as 2
57+
assertFalse(pq.isEmpty());
58+
assertEquals(Long.valueOf(2L), pq.poll(x -> true));
59+
assertFalse(pq.isEmpty());
60+
assertEquals(Long.valueOf(1L), pq.poll(x -> true));
61+
assertFalse(pq.isEmpty());
62+
assertEquals(Long.valueOf(3L), pq.poll(x -> true));
63+
assertFalse(pq.isEmpty());
64+
assertEquals(Long.valueOf(0L), pq.poll(x -> true));
65+
assertTrue(pq.isEmpty());
66+
assertNull(pq.poll(x -> true));
67+
}
68+
69+
public void testPollWithPredicate() {
70+
ApproximatePriorityQueue<Long> pq = new ApproximatePriorityQueue<>();
71+
pq.add(8L, 8L);
72+
pq.add(32L, 32L);
73+
pq.add(0L, 0L);
74+
assertEquals(Long.valueOf(8L), pq.poll(x -> x == 8));
75+
assertNull(pq.poll(x -> x == 8));
76+
assertFalse(pq.isEmpty());
77+
}
78+
79+
public void testCollisionPollWithPredicate() {
80+
ApproximatePriorityQueue<Long> pq = new ApproximatePriorityQueue<>();
81+
pq.add(2L, 2L);
82+
pq.add(1L, 1L);
83+
pq.add(0L, 0L);
84+
pq.add(3L, 3L); // Same nlz as 2
85+
assertEquals(Long.valueOf(1L), pq.poll(x -> x % 2 == 1));
86+
assertEquals(Long.valueOf(3L), pq.poll(x -> x % 2 == 1));
87+
assertNull(pq.poll(x -> x % 2 == 1));
88+
assertFalse(pq.isEmpty());
89+
}
90+
91+
public void testRemove() {
92+
ApproximatePriorityQueue<Long> pq = new ApproximatePriorityQueue<>();
93+
pq.add(8L, 8L);
94+
pq.add(32L, 32L);
95+
pq.add(0L, 0L);
96+
97+
assertFalse(pq.remove(16L));
98+
assertFalse(pq.remove(9L));
99+
assertTrue(pq.remove(8L));
100+
assertTrue(pq.remove(0L));
101+
assertFalse(pq.remove(0L));
102+
assertTrue(pq.remove(32L));
103+
assertTrue(pq.isEmpty());
104+
}
105+
}

0 commit comments

Comments
 (0)