Skip to content

Commit 3521f8e

Browse files
authored
Merge branch 'main' into adding-settings-to-data-streams
2 parents b1d5944 + bd1a638 commit 3521f8e

File tree

147 files changed

+4888
-2721
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

147 files changed

+4888
-2721
lines changed

docs/changelog/125570.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 125570
2+
summary: ES|QL random sampling
3+
area: Machine Learning
4+
type: feature
5+
issues: []

modules/mapper-extras/src/test/java/org/elasticsearch/index/mapper/extras/ScaledFloatFieldBlockLoaderTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99

1010
package org.elasticsearch.index.mapper.extras;
1111

12+
import org.elasticsearch.datageneration.FieldType;
1213
import org.elasticsearch.index.mapper.NumberFieldBlockLoaderTestCase;
13-
import org.elasticsearch.logsdb.datageneration.FieldType;
1414
import org.elasticsearch.plugins.Plugin;
1515

1616
import java.util.Collection;

modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java

Lines changed: 86 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,9 @@
4747
import org.elasticsearch.client.internal.node.NodeClient;
4848
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
4949
import org.elasticsearch.cluster.node.DiscoveryNodes;
50+
import org.elasticsearch.common.Strings;
5051
import org.elasticsearch.common.bytes.ReleasableBytesReference;
52+
import org.elasticsearch.common.collect.Iterators;
5153
import org.elasticsearch.common.component.AbstractLifecycleComponent;
5254
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
5355
import org.elasticsearch.common.settings.ClusterSettings;
@@ -63,6 +65,7 @@
6365
import org.elasticsearch.http.HttpBodyTracer;
6466
import org.elasticsearch.http.HttpServerTransport;
6567
import org.elasticsearch.http.HttpTransportSettings;
68+
import org.elasticsearch.index.IndexingPressure;
6669
import org.elasticsearch.plugins.ActionPlugin;
6770
import org.elasticsearch.plugins.Plugin;
6871
import org.elasticsearch.rest.BaseRestHandler;
@@ -73,13 +76,17 @@
7376
import org.elasticsearch.rest.RestResponse;
7477
import org.elasticsearch.rest.RestStatus;
7578
import org.elasticsearch.tasks.Task;
79+
import org.elasticsearch.test.ClusterServiceUtils;
7680
import org.elasticsearch.test.ESIntegTestCase;
7781
import org.elasticsearch.test.MockLog;
7882
import org.elasticsearch.test.junit.annotations.TestLogging;
83+
import org.elasticsearch.test.rest.ObjectPath;
7984
import org.elasticsearch.transport.Transports;
8085
import org.elasticsearch.transport.netty4.Netty4Utils;
86+
import org.elasticsearch.xcontent.json.JsonXContent;
8187

8288
import java.nio.channels.ClosedChannelException;
89+
import java.nio.charset.StandardCharsets;
8390
import java.util.Collection;
8491
import java.util.List;
8592
import java.util.Map;
@@ -102,6 +109,7 @@
102109
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
103110
import static org.hamcrest.Matchers.anEmptyMap;
104111
import static org.hamcrest.Matchers.empty;
112+
import static org.hamcrest.Matchers.greaterThan;
105113
import static org.hamcrest.Matchers.instanceOf;
106114
import static org.hamcrest.Matchers.lessThanOrEqualTo;
107115

@@ -361,7 +369,12 @@ public void test413TooLargeOnExpect100Continue() throws Exception {
361369

362370
// ensures that oversized chunked encoded request has maxContentLength limit and returns 413
363371
public void testOversizedChunkedEncoding() throws Exception {
364-
try (var clientContext = newClientContext(t -> {/* ignore exception from e.g. server closing socket */})) {
372+
try (
373+
var clientContext = newClientContext(
374+
internalCluster().getRandomNodeName(),
375+
t -> {/* ignore exception from e.g. server closing socket */}
376+
)
377+
) {
365378
var opaqueId = clientContext.newOpaqueId();
366379
final var requestBodyStream = new HttpChunkedInput(
367380
new ChunkedStream(new ByteBufInputStream(Unpooled.wrappedBuffer(randomByteArrayOfLength(MAX_CONTENT_LENGTH + 1)))),
@@ -501,6 +514,72 @@ private void assertHttpBodyLogging(Consumer<ClientContext> test) throws Exceptio
501514
}
502515
}
503516

517+
public void testBulkIndexingRequestSplitting() throws Exception {
518+
final var watermarkBytes = between(100, 200);
519+
final var tinyNode = internalCluster().startCoordinatingOnlyNode(
520+
Settings.builder()
521+
.put(IndexingPressure.SPLIT_BULK_LOW_WATERMARK.getKey(), ByteSizeValue.ofBytes(watermarkBytes))
522+
.put(IndexingPressure.SPLIT_BULK_LOW_WATERMARK_SIZE.getKey(), ByteSizeValue.ofBytes(watermarkBytes))
523+
.build()
524+
);
525+
526+
try (var clientContext = newClientContext(tinyNode, cause -> ExceptionsHelper.maybeDieOnAnotherThread(new AssertionError(cause)))) {
527+
final var request = new DefaultHttpRequest(HTTP_1_1, POST, "/_bulk");
528+
request.headers().add(CONTENT_TYPE, APPLICATION_JSON);
529+
HttpUtil.setTransferEncodingChunked(request, true);
530+
531+
final var channel = clientContext.channel();
532+
channel.writeAndFlush(request);
533+
534+
final var indexName = randomIdentifier();
535+
final var indexCreatedListener = ClusterServiceUtils.addTemporaryStateListener(
536+
cs -> Iterators.filter(
537+
cs.metadata().indicesAllProjects().iterator(),
538+
indexMetadata -> indexMetadata.getIndex().getName().equals(indexName)
539+
).hasNext()
540+
);
541+
542+
indexCreatedListener.addListener(ActionListener.running(() -> logger.info("--> index created")));
543+
544+
final var valueLength = between(10, 30);
545+
final var docSizeBytes = "{'field':''}".length() + valueLength;
546+
final var itemCount = between(watermarkBytes / docSizeBytes + 1, 300); // enough to split at least once
547+
assertThat(itemCount * docSizeBytes, greaterThan(watermarkBytes));
548+
for (int i = 0; i < itemCount; i++) {
549+
channel.write(new DefaultHttpContent(Unpooled.wrappedBuffer(Strings.format("""
550+
{"index":{"_index":"%s"}}
551+
{"field":"%s"}
552+
""", indexName, randomAlphaOfLength(valueLength)).getBytes(StandardCharsets.UTF_8))));
553+
}
554+
555+
channel.flush();
556+
safeAwait(indexCreatedListener); // index must be created before we finish sending the request
557+
558+
channel.writeAndFlush(new DefaultLastHttpContent());
559+
final var response = clientContext.getNextResponse();
560+
try {
561+
assertEquals(RestStatus.OK.getStatus(), response.status().code());
562+
final ObjectPath responseBody;
563+
final var copy = response.content().copy(); // Netty4Utils doesn't handle direct buffers, so copy to heap first
564+
try {
565+
responseBody = ObjectPath.createFromXContent(JsonXContent.jsonXContent, Netty4Utils.toBytesReference(copy));
566+
} finally {
567+
copy.release();
568+
}
569+
assertFalse(responseBody.evaluate("errors"));
570+
assertEquals(itemCount, responseBody.evaluateArraySize("items"));
571+
for (int i = 0; i < itemCount; i++) {
572+
assertEquals(
573+
RestStatus.CREATED.getStatus(),
574+
(int) asInstanceOf(int.class, responseBody.evaluateExact("items", Integer.toString(i), "index", "status"))
575+
);
576+
}
577+
} finally {
578+
response.release();
579+
}
580+
}
581+
}
582+
504583
static FullHttpRequest fullHttpRequest(String opaqueId, ByteBuf content) {
505584
var request = new DefaultFullHttpRequest(HTTP_1_1, POST, ControlServerRequestPlugin.ROUTE, Unpooled.wrappedBuffer(content));
506585
request.headers().add(CONTENT_LENGTH, content.readableBytes());
@@ -539,11 +618,13 @@ protected boolean addMockHttpTransport() {
539618
private static final LongSupplier idGenerator = new AtomicLong()::getAndIncrement;
540619

541620
private ClientContext newClientContext() throws Exception {
542-
return newClientContext(cause -> ExceptionsHelper.maybeDieOnAnotherThread(new AssertionError(cause)));
621+
return newClientContext(
622+
internalCluster().getRandomNodeName(),
623+
cause -> ExceptionsHelper.maybeDieOnAnotherThread(new AssertionError(cause))
624+
);
543625
}
544626

545-
private ClientContext newClientContext(Consumer<Throwable> exceptionHandler) throws Exception {
546-
var nodeName = internalCluster().getRandomNodeName();
627+
private ClientContext newClientContext(String nodeName, Consumer<Throwable> exceptionHandler) throws Exception {
547628
var clientResponseQueue = new LinkedBlockingDeque<FullHttpResponse>(16);
548629
final var httpServerTransport = internalCluster().getInstance(HttpServerTransport.class, nodeName);
549630
var remoteAddr = randomFrom(httpServerTransport.boundAddress().boundAddresses());
@@ -556,7 +637,7 @@ private ClientContext newClientContext(Consumer<Throwable> exceptionHandler) thr
556637
protected void initChannel(SocketChannel ch) {
557638
var p = ch.pipeline();
558639
p.addLast(new HttpClientCodec());
559-
p.addLast(new HttpObjectAggregator(ByteSizeUnit.KB.toIntBytes(4)));
640+
p.addLast(new HttpObjectAggregator(ByteSizeUnit.MB.toIntBytes(4)));
560641
p.addLast(new SimpleChannelInboundHandler<FullHttpResponse>() {
561642
@Override
562643
protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) {

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,8 @@ static TransportVersion def(int id) {
226226
public static final TransportVersion SYNONYMS_REFRESH_PARAM = def(9_060_0_00);
227227
public static final TransportVersion DOC_FIELDS_AS_LIST = def(9_061_0_00);
228228
public static final TransportVersion DENSE_VECTOR_OFF_HEAP_STATS = def(9_062_00_0);
229-
public static final TransportVersion SETTINGS_IN_DATA_STREAMS = def(9_063_00_0);
229+
public static final TransportVersion RANDOM_SAMPLER_QUERY_BUILDER = def(9_063_0_00);
230+
public static final TransportVersion SETTINGS_IN_DATA_STREAMS = def(9_064_00_0);
230231

231232
/*
232233
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/search/SearchModule.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@
134134
import org.elasticsearch.search.aggregations.bucket.sampler.UnmappedSampler;
135135
import org.elasticsearch.search.aggregations.bucket.sampler.random.InternalRandomSampler;
136136
import org.elasticsearch.search.aggregations.bucket.sampler.random.RandomSamplerAggregationBuilder;
137+
import org.elasticsearch.search.aggregations.bucket.sampler.random.RandomSamplingQueryBuilder;
137138
import org.elasticsearch.search.aggregations.bucket.terms.DoubleTerms;
138139
import org.elasticsearch.search.aggregations.bucket.terms.LongRareTerms;
139140
import org.elasticsearch.search.aggregations.bucket.terms.LongTerms;
@@ -1186,6 +1187,9 @@ private void registerQueryParsers(List<SearchPlugin> plugins) {
11861187
registerQuery(new QuerySpec<>(ExactKnnQueryBuilder.NAME, ExactKnnQueryBuilder::new, parser -> {
11871188
throw new IllegalArgumentException("[exact_knn] queries cannot be provided directly");
11881189
}));
1190+
registerQuery(
1191+
new QuerySpec<>(RandomSamplingQueryBuilder.NAME, RandomSamplingQueryBuilder::new, RandomSamplingQueryBuilder::fromXContent)
1192+
);
11891193

11901194
registerFromPlugin(plugins, SearchPlugin::getQueries, this::registerQuery);
11911195
}

server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/random/RandomSamplingQuery.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,14 +44,34 @@ public final class RandomSamplingQuery extends Query {
4444
* can be generated
4545
*/
4646
public RandomSamplingQuery(double p, int seed, int hash) {
47-
if (p <= 0.0 || p >= 1.0) {
48-
throw new IllegalArgumentException("RandomSampling probability must be between 0.0 and 1.0, was [" + p + "]");
49-
}
47+
checkProbabilityRange(p);
5048
this.p = p;
5149
this.seed = seed;
5250
this.hash = hash;
5351
}
5452

53+
/**
54+
* Verifies that the probability is within the (0.0, 1.0) range.
55+
* @throws IllegalArgumentException in case of an invalid probability.
56+
*/
57+
public static void checkProbabilityRange(double p) throws IllegalArgumentException {
58+
if (p <= 0.0 || p >= 1.0) {
59+
throw new IllegalArgumentException("RandomSampling probability must be strictly between 0.0 and 1.0, was [" + p + "]");
60+
}
61+
}
62+
63+
public double probability() {
64+
return p;
65+
}
66+
67+
public int seed() {
68+
return seed;
69+
}
70+
71+
public int hash() {
72+
return hash;
73+
}
74+
5575
@Override
5676
public String toString(String field) {
5777
return "RandomSamplingQuery{" + "p=" + p + ", seed=" + seed + ", hash=" + hash + '}';
@@ -98,13 +118,13 @@ public void visit(QueryVisitor visitor) {
98118
/**
99119
* A DocIDSetIter that skips a geometrically random number of documents
100120
*/
101-
static class RandomSamplingIterator extends DocIdSetIterator {
121+
public static class RandomSamplingIterator extends DocIdSetIterator {
102122
private final int maxDoc;
103123
private final double p;
104124
private final FastGeometric distribution;
105125
private int doc = -1;
106126

107-
RandomSamplingIterator(int maxDoc, double p, IntSupplier rng) {
127+
public RandomSamplingIterator(int maxDoc, double p, IntSupplier rng) {
108128
this.maxDoc = maxDoc;
109129
this.p = p;
110130
this.distribution = new FastGeometric(rng, p);

0 commit comments

Comments
 (0)