Skip to content

Commit 1fceab9

Browse files
Change downsampling thread pool settings (#96821) (#96916)
Here we change the settings for the rollup thread pool like follows: * we use a fixed number of threads (>= 1) * we use a limited queue for tasks waiting execution One one side we parallelise rollup operations, on the other side, if too many rollup requests are received, rollup tasks will be rejected to avoid overloading the node. Resolves #96758
1 parent 79b9fd3 commit 1fceab9

File tree

4 files changed

+103
-5
lines changed

4 files changed

+103
-5
lines changed

docs/changelog/96821.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 96821
2+
summary: Change rollup thread pool settings
3+
area: TSDB
4+
type: enhancement
5+
issues:
6+
- 96758

x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleIndexerAction.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,12 @@
3030
import org.elasticsearch.xpack.core.ClientHelper;
3131
import org.elasticsearch.xpack.core.downsample.DownsampleIndexerAction;
3232
import org.elasticsearch.xpack.core.rollup.action.RollupShardTask;
33+
import org.elasticsearch.xpack.rollup.Rollup;
3334

3435
import java.io.IOException;
3536
import java.util.Arrays;
3637
import java.util.concurrent.atomic.AtomicReferenceArray;
3738

38-
import static org.elasticsearch.xpack.rollup.Rollup.TASK_THREAD_POOL_NAME;
39-
4039
/**
4140
* A {@link TransportBroadcastAction} that rollups all the shards of a source index into a new rollup index.
4241
*
@@ -69,7 +68,7 @@ public TransportDownsampleIndexerAction(
6968
indexNameExpressionResolver,
7069
DownsampleIndexerAction.Request::new,
7170
DownsampleIndexerAction.ShardDownsampleRequest::new,
72-
TASK_THREAD_POOL_NAME
71+
Rollup.DOWSAMPLE_TASK_THREAD_POOL_NAME
7372
);
7473
this.client = new OriginSettingClient(client, ClientHelper.ROLLUP_ORIGIN);
7574
this.clusterService = clusterService;

x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.common.settings.Settings;
2222
import org.elasticsearch.common.settings.SettingsFilter;
2323
import org.elasticsearch.common.settings.SettingsModule;
24+
import org.elasticsearch.common.util.concurrent.EsExecutors;
2425
import org.elasticsearch.env.Environment;
2526
import org.elasticsearch.env.NodeEnvironment;
2627
import org.elasticsearch.persistent.PersistentTasksExecutor;
@@ -90,6 +91,8 @@ public class Rollup extends Plugin implements ActionPlugin, PersistentTaskPlugin
9091
public static final int CURRENT_ROLLUP_VERSION = ROLLUP_VERSION_V2;
9192

9293
public static final String TASK_THREAD_POOL_NAME = RollupField.NAME + "_indexing";
94+
public static final String DOWSAMPLE_TASK_THREAD_POOL_NAME = "downsample_indexing";
95+
public static final int DOWNSAMPLE_TASK_THREAD_POOL_QUEUE_SIZE = 256;
9396

9497
public static final String ROLLUP_TEMPLATE_VERSION_FIELD = "rollup-version";
9598

@@ -162,7 +165,7 @@ public List<RestHandler> getRestHandlers(
162165

163166
@Override
164167
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settingsToUse) {
165-
FixedExecutorBuilder indexing = new FixedExecutorBuilder(
168+
final FixedExecutorBuilder rollup = new FixedExecutorBuilder(
166169
settingsToUse,
167170
Rollup.TASK_THREAD_POOL_NAME,
168171
1,
@@ -171,7 +174,16 @@ public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settingsToUse) {
171174
false
172175
);
173176

174-
return Collections.singletonList(indexing);
177+
final FixedExecutorBuilder downsample = new FixedExecutorBuilder(
178+
settingsToUse,
179+
Rollup.DOWSAMPLE_TASK_THREAD_POOL_NAME,
180+
ThreadPool.searchOrGetThreadPoolSize(EsExecutors.allocatedProcessors(settingsToUse)),
181+
Rollup.DOWNSAMPLE_TASK_THREAD_POOL_QUEUE_SIZE,
182+
"xpack.downsample.thread_pool",
183+
false
184+
);
185+
186+
return List.of(rollup, downsample);
175187
}
176188

177189
@Override

x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@
9999
import java.util.Locale;
100100
import java.util.Map;
101101
import java.util.Optional;
102+
import java.util.concurrent.CountDownLatch;
102103
import java.util.concurrent.ExecutionException;
103104
import java.util.concurrent.TimeUnit;
104105
import java.util.stream.Collectors;
@@ -1148,4 +1149,84 @@ private String createDataStream() throws Exception {
11481149
assertAcked(client().execute(CreateDataStreamAction.INSTANCE, new CreateDataStreamAction.Request(dataStreamName)).get());
11491150
return dataStreamName;
11501151
}
1152+
1153+
public void testConcurrentRollup() throws IOException, InterruptedException {
1154+
final DownsampleConfig config = new DownsampleConfig(randomInterval());
1155+
SourceSupplier sourceSupplier = () -> {
1156+
String ts = randomDateForInterval(config.getInterval());
1157+
double labelDoubleValue = DATE_FORMATTER.parseMillis(ts);
1158+
int labelIntegerValue = randomInt();
1159+
long labelLongValue = randomLong();
1160+
String labelIpv4Address = NetworkAddress.format(randomIp(true));
1161+
String labelIpv6Address = NetworkAddress.format(randomIp(false));
1162+
Date labelDateValue = randomDate();
1163+
int keywordArraySize = randomIntBetween(2, 5);
1164+
String[] keywordArray = new String[keywordArraySize];
1165+
for (int i = 0; i < keywordArraySize; ++i) {
1166+
keywordArray[i] = randomAlphaOfLength(10);
1167+
}
1168+
int doubleArraySize = randomIntBetween(3, 10);
1169+
double[] doubleArray = new double[doubleArraySize];
1170+
for (int i = 0; i < doubleArraySize; ++i) {
1171+
doubleArray[i] = randomDouble();
1172+
}
1173+
return XContentFactory.jsonBuilder()
1174+
.startObject()
1175+
.field(FIELD_TIMESTAMP, ts)
1176+
.field(FIELD_DIMENSION_1, randomFrom(dimensionValues))
1177+
.field(FIELD_DIMENSION_2, randomIntBetween(1, 10))
1178+
.field(FIELD_NUMERIC_1, randomInt())
1179+
.field(FIELD_NUMERIC_2, DATE_FORMATTER.parseMillis(ts))
1180+
.startObject(FIELD_AGG_METRIC)
1181+
.field("min", randomDoubleBetween(-2000, -1001, true))
1182+
.field("max", randomDoubleBetween(-1000, 1000, true))
1183+
.field("sum", randomIntBetween(100, 10000))
1184+
.field("value_count", randomIntBetween(100, 1000))
1185+
.endObject()
1186+
.field(FIELD_LABEL_DOUBLE, labelDoubleValue)
1187+
.field(FIELD_METRIC_LABEL_DOUBLE, labelDoubleValue)
1188+
.field(FIELD_LABEL_INTEGER, labelIntegerValue)
1189+
.field(FIELD_LABEL_KEYWORD, ts)
1190+
.field(FIELD_LABEL_UNMAPPED, randomBoolean() ? labelLongValue : labelDoubleValue)
1191+
.field(FIELD_LABEL_TEXT, ts)
1192+
.field(FIELD_LABEL_BOOLEAN, randomBoolean())
1193+
.field(FIELD_LABEL_IPv4_ADDRESS, labelIpv4Address)
1194+
.field(FIELD_LABEL_IPv6_ADDRESS, labelIpv6Address)
1195+
.field(FIELD_LABEL_DATE, labelDateValue)
1196+
.field(FIELD_LABEL_KEYWORD_ARRAY, keywordArray)
1197+
.field(FIELD_LABEL_DOUBLE_ARRAY, doubleArray)
1198+
.startObject(FIELD_LABEL_AGG_METRIC)
1199+
.field("min", randomDoubleBetween(-2000, -1001, true))
1200+
.field("max", randomDoubleBetween(-1000, 1000, true))
1201+
.field("sum", Double.valueOf(randomIntBetween(100, 10000)))
1202+
.field("value_count", randomIntBetween(100, 1000))
1203+
.endObject()
1204+
.endObject();
1205+
};
1206+
docCount = 512; // Hard code to have 512 documents in the source index, otherwise running this test take too long.
1207+
bulkIndex(sourceIndex, sourceSupplier);
1208+
prepareSourceIndex(sourceIndex);
1209+
1210+
int n = randomIntBetween(3, 6);
1211+
final CountDownLatch rollupComplete = new CountDownLatch(n);
1212+
final List<String> targets = new ArrayList<>();
1213+
final List<Thread> threads = new ArrayList<>();
1214+
for (int i = 0; i < n; i++) {
1215+
final String targetIndex = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
1216+
targets.add(targetIndex);
1217+
threads.add(new Thread(() -> {
1218+
rollup(sourceIndex, targetIndex, config);
1219+
rollupComplete.countDown();
1220+
}));
1221+
}
1222+
for (int i = 0; i < n; i++) {
1223+
threads.get(i).start();
1224+
}
1225+
1226+
assertTrue(rollupComplete.await(30, TimeUnit.SECONDS));
1227+
1228+
for (int i = 0; i < n; i++) {
1229+
assertRollupIndex(sourceIndex, targets.get(i), config);
1230+
}
1231+
}
11511232
}

0 commit comments

Comments
 (0)