Skip to content

Commit cfb407b

Browse files
authored
MultiIndexMergeScheduler: a production multi-tenant merge scheduler (#15015)
1 parent cefc6c7 commit cfb407b

File tree

4 files changed

+451
-1
lines changed

4 files changed

+451
-1
lines changed

lucene/CHANGES.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,8 @@ API Changes
123123

124124
New Features
125125
---------------------
126+
* GITHUB#15015: MultiIndexMergeScheduler: a production multi-tenant merge scheduler (Shawn Yarbrough)
127+
126128
* GITHUB#14404: Introducing DocValuesMultiRangeQuery.SortedNumericStabbingBuilder into sandbox.
127129
(Mikhail Khludnev)
128130

lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -583,7 +583,6 @@ public synchronized void merge(MergeSource mergeSource, MergeTrigger trigger) th
583583
// OK to spawn a new merge thread to handle this
584584
// merge:
585585
final MergeThread newMergeThread = getMergeThread(mergeSource, merge);
586-
mergeThreads.add(newMergeThread);
587586

588587
updateIOThrottle(newMergeThread.merge, newMergeThread.rateLimiter);
589588

@@ -671,6 +670,7 @@ protected synchronized MergeThread getMergeThread(MergeSource mergeSource, OneMe
671670
final MergeThread thread = new MergeThread(mergeSource, merge);
672671
thread.setDaemon(true);
673672
thread.setName("Lucene Merge Thread #" + mergeThreadCounter++);
673+
mergeThreads.add(thread);
674674
return thread;
675675
}
676676

Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.lucene.index;
18+
19+
import java.io.IOException;
20+
import org.apache.lucene.store.Directory;
21+
import org.apache.lucene.util.InfoStream;
22+
23+
/**
24+
* Multi-index or multi-tenant merge scheduling.
25+
*
26+
* <p>MultiIndexMergeScheduler builds on existing functionality in ConcurrentMergeScheduler by
27+
* automatically tracking merge sources and merge threads and the index Directory that each applies
28+
* to, and then shunting all of them into a single ConcurrentMergeScheduler instance.
29+
*
30+
* <p>The multi-tenant merge scheduling can be used easily by creating a MultiIndexMergeScheduler
31+
* instance for each index and then using each instance normally, the same way you would use a lone
32+
* ConcurrentMergeScheduler.
33+
*
34+
* @lucene.experimental
35+
*/
36+
class MultiIndexMergeScheduler extends MergeScheduler {
37+
private final Directory directory;
38+
private final CombinedMergeScheduler combinedMergeScheduler;
39+
private final boolean manageSingleton;
40+
41+
/** The main MultiIndexMergeScheduler constructor -- use this one. */
42+
public MultiIndexMergeScheduler(Directory directory) {
43+
this.directory = directory;
44+
this.combinedMergeScheduler = CombinedMergeScheduler.acquireSingleton();
45+
this.manageSingleton = true;
46+
}
47+
48+
/** Alternate MultiIndexMergeScheduler constructor for unit testing. Does not close() the CMS. */
49+
MultiIndexMergeScheduler(Directory directory, CombinedMergeScheduler combinedMergeScheduler) {
50+
this.directory = directory;
51+
this.combinedMergeScheduler = combinedMergeScheduler;
52+
this.manageSingleton = false;
53+
}
54+
55+
public Directory getDirectory() {
56+
return directory;
57+
}
58+
59+
public CombinedMergeScheduler getCombinedMergeScheduler() {
60+
return combinedMergeScheduler;
61+
}
62+
63+
@Override
64+
public void merge(MergeSource mergeSource, MergeTrigger trigger) throws IOException {
65+
var taggedMergeSource =
66+
new CombinedMergeScheduler.TaggedMergeSource(mergeSource, this.directory);
67+
this.combinedMergeScheduler.merge(taggedMergeSource, trigger);
68+
}
69+
70+
@Override
71+
public Directory wrapForMerge(MergePolicy.OneMerge merge, Directory in) {
72+
return this.combinedMergeScheduler.wrapForMerge(merge, in);
73+
}
74+
75+
/** Close this scheduler for one directory/index. Called automatically by IndexWriter. */
76+
@Override
77+
public void close() throws IOException {
78+
this.combinedMergeScheduler.sync(this.directory);
79+
if (this.manageSingleton) {
80+
CombinedMergeScheduler.releaseSingleton();
81+
}
82+
}
83+
84+
// We created this method because we cannot easily override the initialize() method
85+
// in ConcurrentMergeScheduler. We don't need the initDynamicDefaults() part in the
86+
// initialize() method, and only need the setInfoStream().
87+
public void setInfoStream(InfoStream infoStream) {
88+
this.combinedMergeScheduler.setInfoStream(infoStream);
89+
}
90+
91+
/**
92+
* CombinedMergeScheduler is used internally by MultiIndexMergeScheduler to balance resources
93+
* across multiple indices. Normally you don't need to use this.
94+
*
95+
* <p>For testing purposes, or if partitioning of tenants into groups is needed for some reason, a
96+
* CombinedMergeScheduler can be provided to the MultiIndexMergeScheduler constructor.
97+
*
98+
* <p>CombinedMergeScheduler should <b><i>not</i></b> be passed directly to IndexWriter.
99+
*/
100+
static class CombinedMergeScheduler extends ConcurrentMergeScheduler {
101+
@SuppressWarnings("NonFinalStaticField")
102+
private static CombinedMergeScheduler singleton = null;
103+
104+
@SuppressWarnings("NonFinalStaticField")
105+
private static int singletonRefCount = 0;
106+
107+
private static synchronized CombinedMergeScheduler acquireSingleton() {
108+
if (singleton == null) {
109+
singleton = new CombinedMergeScheduler();
110+
}
111+
singletonRefCount++;
112+
return singleton;
113+
}
114+
115+
private static synchronized void releaseSingleton() throws IOException {
116+
if (singletonRefCount < 1) {
117+
throw new IllegalStateException("decrementSingletonReference() called too many times");
118+
}
119+
singletonRefCount--;
120+
if (singletonRefCount == 0) {
121+
singleton.close();
122+
singleton = null;
123+
}
124+
}
125+
126+
public static synchronized CombinedMergeScheduler peekSingleton() {
127+
return singleton;
128+
}
129+
130+
// A filter pattern
131+
static class TaggedMergeSource implements MergeScheduler.MergeSource {
132+
private final MergeScheduler.MergeSource in;
133+
private final Directory directory;
134+
135+
TaggedMergeSource(MergeScheduler.MergeSource in, Directory directory) {
136+
this.in = in;
137+
this.directory = directory;
138+
}
139+
140+
public Directory getDirectory() {
141+
return this.directory;
142+
}
143+
144+
@Override
145+
public MergePolicy.OneMerge getNextMerge() {
146+
return this.in.getNextMerge();
147+
}
148+
149+
@Override
150+
public void onMergeFinished(MergePolicy.OneMerge merge) {
151+
this.in.onMergeFinished(merge);
152+
}
153+
154+
@Override
155+
public boolean hasPendingMerges() {
156+
return this.in.hasPendingMerges();
157+
}
158+
159+
@Override
160+
public void merge(MergePolicy.OneMerge merge) throws IOException {
161+
this.in.merge(merge);
162+
}
163+
}
164+
165+
public void setInfoStream(InfoStream infoStream) {
166+
this.infoStream = infoStream;
167+
}
168+
169+
public void sync(Directory directory) {
170+
boolean interrupted = false;
171+
try {
172+
while (true) {
173+
MergeThread toSync = null;
174+
synchronized (this) {
175+
for (MergeThread t : this.mergeThreads) {
176+
// In case a merge thread is calling us, don't try to sync on
177+
// itself, since that will never finish!
178+
if (t.isAlive()
179+
&& t != Thread.currentThread()
180+
// Only wait for merge threads for the current index to finish
181+
&& t.mergeSource instanceof TaggedMergeSource
182+
&& ((TaggedMergeSource) t.mergeSource).getDirectory().equals(directory)) {
183+
toSync = t;
184+
break;
185+
}
186+
}
187+
}
188+
if (toSync != null) {
189+
try {
190+
toSync.join();
191+
} catch (
192+
@SuppressWarnings("unused")
193+
InterruptedException ie) {
194+
// ignore this Exception, we will retry until all threads are dead
195+
interrupted = true;
196+
}
197+
} else {
198+
break;
199+
}
200+
}
201+
} finally {
202+
// finally, restore interrupt status:
203+
if (interrupted) {
204+
Thread.currentThread().interrupt();
205+
}
206+
}
207+
}
208+
}
209+
}

0 commit comments

Comments
 (0)