Skip to content

Commit 5e231f7

Browse files
authored
SOLR-17150: Create MemAllowedLimit (#2708)
1 parent d40a000 commit 5e231f7

File tree

6 files changed

+386
-1
lines changed

6 files changed

+386
-1
lines changed

solr/CHANGES.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,8 @@ New Features
111111
It will now be released as a fully supported Solr feature.
112112
This feature closes SIP-13: Cross Data Center Replication. (Mark Miller, Andrzej Bialecki, Jason Gerlowski, Houston Putman)
113113

114+
* SOLR-17150: Implement `memAllowed` parameter to limit per-thread memory allocations during request processing. (Andrzej Bialecki, Gus Heck)
115+
114116
Improvements
115117
---------------------
116118
* SOLR-17158: Users using query limits (timeAllowed, cpuTimeAllowed) for whom partial results are uninteresting
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.solr.search;
18+
19+
import com.google.common.annotations.VisibleForTesting;
20+
import java.lang.invoke.MethodHandles;
21+
import java.lang.management.ManagementFactory;
22+
import java.lang.management.ThreadMXBean;
23+
import java.lang.reflect.Method;
24+
import java.util.concurrent.atomic.AtomicLong;
25+
import org.apache.solr.common.params.CommonParams;
26+
import org.apache.solr.request.SolrQueryRequest;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
/**
31+
* Enforces a memory-based limit on a given SolrQueryRequest, as specified by the {@code memAllowed}
32+
* query parameter.
33+
*
34+
* <p>This class tracks per-thread memory allocations during a request using its own ThreadLocal. It
35+
* records the current thread allocation when the instance was created (typically at the start of
36+
* SolrQueryRequest processing) as a starting point, and then on every call to {@link #shouldExit()}
37+
* it accumulates the amount of reported allocated memory since the previous call, and compares the
38+
* accumulated amount to the configured threshold, expressed in mebi-bytes.
39+
*
40+
* <p>NOTE: this class accesses {@code
41+
* com.sun.management.ThreadMXBean#getCurrentThreadAllocatedBytes} using reflection. On JVM-s where
42+
* this implementation is not available an exception will be thrown when attempting to use the
43+
* {@code memAllowed} parameter.
44+
*/
45+
public class MemAllowedLimit implements QueryLimit {
46+
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
47+
private static final double MEBI = 1024.0 * 1024.0;
48+
private static final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
49+
private static final Method GET_BYTES_METHOD;
50+
private static final boolean supported;
51+
52+
static {
53+
boolean testSupported;
54+
Method getBytesMethod = null;
55+
try {
56+
Class<?> sunThreadBeanClz = Class.forName("com.sun.management.ThreadMXBean");
57+
if (sunThreadBeanClz.isAssignableFrom(threadBean.getClass())) {
58+
Method m = sunThreadBeanClz.getMethod("isThreadAllocatedMemorySupported");
59+
Boolean supported = (Boolean) m.invoke(threadBean);
60+
if (supported) {
61+
m = sunThreadBeanClz.getMethod("setThreadAllocatedMemoryEnabled", boolean.class);
62+
m.invoke(threadBean, Boolean.TRUE);
63+
testSupported = true;
64+
getBytesMethod = sunThreadBeanClz.getMethod("getCurrentThreadAllocatedBytes");
65+
} else {
66+
testSupported = false;
67+
}
68+
} else {
69+
testSupported = false;
70+
}
71+
} catch (Exception e) {
72+
testSupported = false;
73+
}
74+
supported = testSupported;
75+
GET_BYTES_METHOD = getBytesMethod;
76+
}
77+
78+
private static final ThreadLocal<AtomicLong> threadLocalMem =
79+
ThreadLocal.withInitial(() -> new AtomicLong(-1L));
80+
81+
private long limitBytes;
82+
private final AtomicLong accumulatedMem = new AtomicLong();
83+
private long exitedAt = 0;
84+
85+
public MemAllowedLimit(SolrQueryRequest req) {
86+
if (!supported) {
87+
throw new IllegalArgumentException(
88+
"Per-thread memory allocation monitoring not available in this JVM.");
89+
}
90+
float reqMemLimit = req.getParams().getFloat(CommonParams.MEM_ALLOWED, -1.0f);
91+
if (reqMemLimit <= 0.0f) {
92+
throw new IllegalArgumentException(
93+
"Check for limit with hasMemLimit(req) before creating a MemAllowedLimit!");
94+
}
95+
limitBytes = Math.round(reqMemLimit * MEBI);
96+
// init the thread-local
97+
init();
98+
}
99+
100+
@VisibleForTesting
101+
MemAllowedLimit(float memLimit) {
102+
if (!supported) {
103+
throw new IllegalArgumentException(
104+
"Per-thread memory allocation monitoring not available in this JVM.");
105+
}
106+
limitBytes = Math.round(memLimit * MEBI);
107+
// init the thread-local
108+
init();
109+
}
110+
111+
private final void init() {
112+
long currentAllocatedBytes;
113+
try {
114+
currentAllocatedBytes = (Long) GET_BYTES_METHOD.invoke(threadBean);
115+
} catch (Exception e) {
116+
throw new IllegalArgumentException("Unexpected error checking thread allocation!", e);
117+
}
118+
AtomicLong threadMem = threadLocalMem.get();
119+
threadMem.compareAndSet(-1L, currentAllocatedBytes);
120+
}
121+
122+
private long getCurrentAllocatedBytes() {
123+
try {
124+
return (Long) GET_BYTES_METHOD.invoke(threadBean);
125+
} catch (Exception e) {
126+
throw new IllegalArgumentException("Unexpected error checking thread allocation!", e);
127+
}
128+
}
129+
130+
@VisibleForTesting
131+
static boolean isSupported() {
132+
return supported;
133+
}
134+
135+
static boolean hasMemLimit(SolrQueryRequest req) {
136+
return req.getParams().getFloat(CommonParams.MEM_ALLOWED, -1.0f) > 0.0f;
137+
}
138+
139+
@Override
140+
public boolean shouldExit() {
141+
if (exitedAt > 0L) {
142+
return true;
143+
}
144+
145+
try {
146+
long currentAllocatedBytes = getCurrentAllocatedBytes();
147+
AtomicLong threadMem = threadLocalMem.get();
148+
long lastAllocatedBytes = threadMem.get();
149+
accumulatedMem.addAndGet(currentAllocatedBytes - lastAllocatedBytes);
150+
threadMem.set(currentAllocatedBytes);
151+
if (log.isDebugEnabled()) {
152+
log.debug(
153+
"mem limit thread {} remaining delta {}",
154+
Thread.currentThread().getName(),
155+
(limitBytes - accumulatedMem.get()));
156+
}
157+
if (limitBytes < accumulatedMem.get()) {
158+
exitedAt = accumulatedMem.get();
159+
return true;
160+
}
161+
return false;
162+
} catch (Exception e) {
163+
throw new IllegalArgumentException("Unexpected error checking thread allocation!", e);
164+
}
165+
}
166+
167+
@Override
168+
public Object currentValue() {
169+
return exitedAt > 0 ? exitedAt : accumulatedMem.get();
170+
}
171+
}

solr/core/src/java/org/apache/solr/search/QueryLimits.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@ public QueryLimits(SolrQueryRequest req, SolrQueryResponse rsp) {
6969
if (hasCpuLimit(req)) {
7070
limits.add(new CpuAllowedLimit(req));
7171
}
72+
if (MemAllowedLimit.hasMemLimit(req)) {
73+
limits.add(new MemAllowedLimit(req));
74+
}
7275
}
7376
// for testing
7477
if (TestInjection.queryTimeout != null) {
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.solr.search;
18+
19+
import java.lang.invoke.MethodHandles;
20+
import java.nio.file.Files;
21+
import java.nio.file.Path;
22+
import java.util.ArrayList;
23+
import java.util.concurrent.TimeUnit;
24+
import org.apache.solr.client.solrj.SolrClient;
25+
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
26+
import org.apache.solr.client.solrj.response.QueryResponse;
27+
import org.apache.solr.cloud.SolrCloudTestCase;
28+
import org.apache.solr.index.NoMergePolicyFactory;
29+
import org.apache.solr.util.LogLevel;
30+
import org.apache.solr.util.TestInjection;
31+
import org.apache.solr.util.ThreadCpuTimer;
32+
import org.junit.AfterClass;
33+
import org.junit.Assume;
34+
import org.junit.BeforeClass;
35+
import org.junit.Test;
36+
import org.slf4j.Logger;
37+
import org.slf4j.LoggerFactory;
38+
39+
@LogLevel("org.apache.solr.search.MemAllowedLimit=DEBUG")
40+
public class TestMemAllowedLimit extends SolrCloudTestCase {
41+
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
42+
43+
private static final String COLLECTION = "test";
44+
45+
private static Path createConfigSet() throws Exception {
46+
Path configSet = createTempDir();
47+
copyMinConf(configSet.toFile());
48+
// insert an expensive search component
49+
Path solrConfig = configSet.resolve("conf/solrconfig.xml");
50+
Files.writeString(
51+
solrConfig,
52+
Files.readString(solrConfig)
53+
.replace(
54+
"<requestHandler",
55+
"<searchComponent name=\"expensiveSearchComponent\"\n"
56+
+ " class=\"org.apache.solr.search.ExpensiveSearchComponent\"/>\n"
57+
+ "\n"
58+
+ " <requestHandler")
59+
.replace(
60+
"class=\"solr.SearchHandler\">",
61+
"class=\"solr.SearchHandler\">\n"
62+
+ " <arr name=\"first-components\">\n"
63+
+ " <str>expensiveSearchComponent</str>\n"
64+
+ " </arr>\n"));
65+
return configSet.resolve("conf");
66+
}
67+
68+
@BeforeClass
69+
public static void setup() throws Exception {
70+
// Using NoMergePolicy and 100 commits we should get 100 segments (across all shards).
71+
// At this point of writing MAX_SEGMENTS_PER_SLICE in lucene is 5, so we should be
72+
// ensured that any multithreaded testing will create 20 executable tasks for the
73+
// executor that was provided to index-searcher.
74+
systemSetPropertySolrTestsMergePolicyFactory(NoMergePolicyFactory.class.getName());
75+
System.setProperty(ThreadCpuTimer.ENABLE_CPU_TIME, "true");
76+
System.setProperty("metricsEnabled", "true");
77+
Path configset = createConfigSet();
78+
configureCluster(1).addConfig("conf", configset).configure();
79+
SolrClient solrClient = cluster.getSolrClient();
80+
CollectionAdminRequest.Create create =
81+
CollectionAdminRequest.createCollection(COLLECTION, "conf", 3, 2);
82+
create.process(solrClient);
83+
waitForState("active", COLLECTION, clusterShape(3, 6));
84+
for (int j = 0; j < 100; j++) {
85+
solrClient.add(COLLECTION, sdoc("id", "id-" + j, "val_i", j % 5));
86+
solrClient.commit(COLLECTION); // need to commit every doc to create many segments.
87+
}
88+
}
89+
90+
@AfterClass
91+
public static void tearDownClass() {
92+
TestInjection.cpuTimerDelayInjectedNS = null;
93+
systemClearPropertySolrTestsMergePolicyFactory();
94+
}
95+
96+
@Test
97+
public void testLimit() throws Exception {
98+
Assume.assumeTrue("Thread memory monitoring is not available", MemAllowedLimit.isSupported());
99+
long limitMs = 100000;
100+
// 1 MiB
101+
MemAllowedLimit memLimit = new MemAllowedLimit(1f);
102+
ArrayList<byte[]> data = new ArrayList<>();
103+
long startNs = System.nanoTime();
104+
int wakeups = 0;
105+
while (!memLimit.shouldExit()) {
106+
Thread.sleep(100);
107+
// allocate memory
108+
for (int i = 0; i < 20; i++) {
109+
data.add(new byte[5000]);
110+
}
111+
wakeups++;
112+
}
113+
long endNs = System.nanoTime();
114+
assertTrue(data.size() > 1);
115+
long wallTimeDeltaMs = TimeUnit.MILLISECONDS.convert(endNs - startNs, TimeUnit.NANOSECONDS);
116+
log.info(
117+
"Time limit: {} ms, elapsed wall-clock: {} ms, wakeups: {}",
118+
limitMs,
119+
wallTimeDeltaMs,
120+
wakeups);
121+
assertTrue("Number of wakeups should be smaller than 100 but was " + wakeups, wakeups < 100);
122+
assertTrue(
123+
"Elapsed wall-clock time expected much smaller than 100ms but was " + wallTimeDeltaMs,
124+
limitMs > wallTimeDeltaMs);
125+
}
126+
127+
@Test
128+
public void testDistribLimit() throws Exception {
129+
Assume.assumeTrue("Thread memory monitoring is not available", MemAllowedLimit.isSupported());
130+
SolrClient solrClient = cluster.getSolrClient();
131+
// no limits set - should complete
132+
long dataSize = 150; // 150 KiB
133+
QueryResponse rsp =
134+
solrClient.query(
135+
COLLECTION,
136+
params("q", "id:*", "sort", "id desc", "dataSize", String.valueOf(dataSize)));
137+
assertEquals(rsp.getHeader().get("status"), 0);
138+
assertNull("should not have partial results", rsp.getHeader().get("partialResults"));
139+
140+
// memAllowed set with large value, should return full results
141+
rsp =
142+
solrClient.query(
143+
COLLECTION,
144+
params(
145+
"q",
146+
"id:*",
147+
"sort",
148+
"id asc",
149+
"memLoadCount",
150+
String.valueOf(dataSize),
151+
"stages",
152+
"prepare,process",
153+
"memAllowed",
154+
"1.5"));
155+
assertNull("should have full results", rsp.getHeader().get("partialResults"));
156+
157+
// memAllowed set, should return partial results
158+
rsp =
159+
solrClient.query(
160+
COLLECTION,
161+
params(
162+
"q",
163+
"id:*",
164+
"sort",
165+
"id asc",
166+
"memLoadCount",
167+
String.valueOf(dataSize),
168+
"stages",
169+
"prepare,process",
170+
"memAllowed",
171+
"0.2"));
172+
assertNotNull("should have partial results", rsp.getHeader().get("partialResults"));
173+
174+
// multi-threaded search
175+
// memAllowed set, should return partial results
176+
rsp =
177+
solrClient.query(
178+
COLLECTION,
179+
params(
180+
"q",
181+
"id:*",
182+
"sort",
183+
"id asc",
184+
"memLoadCount",
185+
String.valueOf(dataSize),
186+
"stages",
187+
"prepare,process",
188+
"multiThreaded",
189+
"true",
190+
"memAllowed",
191+
"0.2"));
192+
assertNotNull("should have partial results", rsp.getHeader().get("partialResults"));
193+
}
194+
}

0 commit comments

Comments
 (0)