Skip to content

Commit 32b3d28

Browse files
authored
Merge branch 'main' into fixKmeanstests
2 parents 9d1db8e + 547d4a4 commit 32b3d28

File tree

6 files changed

+134
-181
lines changed

6 files changed

+134
-181
lines changed

docs/reference/elasticsearch/mapping-reference/sparse-vector.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ PUT my-index
2626

2727
## Token pruning
2828
```{applies_to}
29-
stack: preview 9.1
29+
stack: ga 9.1
3030
```
3131

3232
With any new indices created, token pruning will be turned on by default with appropriate defaults. You can control this behaviour using the optional `index_options` parameters for the field:
@@ -63,23 +63,23 @@ The following parameters are accepted by `sparse_vector` fields:
6363
* Exclude the field from [_source](/reference/elasticsearch/rest-apis/retrieve-selected-fields.md#source-filtering).
6464
* Use [synthetic `_source`](/reference/elasticsearch/mapping-reference/mapping-source-field.md#synthetic-source).
6565

66-
index_options {applies_to}`stack: preview 9.1`
66+
index_options {applies_to}`stack: ga 9.1`
6767
: (Optional, object) You can set index options for your `sparse_vector` field to determine if you should prune tokens, and the parameter configurations for the token pruning. If pruning options are not set in your [`sparse_vector` query](/reference/query-languages/query-dsl/query-dsl-sparse-vector-query.md), Elasticsearch will use the default options configured for the field, if any.
6868

6969
Parameters for `index_options` are:
7070

71-
`prune` {applies_to}`stack: preview 9.1`
71+
`prune` {applies_to}`stack: ga 9.1`
7272
: (Optional, boolean) Whether to perform pruning, omitting the non-significant tokens from the query to improve query performance. If `prune` is true but the `pruning_config` is not specified, pruning will occur but default values will be used. Default: true.
7373

74-
`pruning_config` {applies_to}`stack: preview 9.1`
74+
`pruning_config` {applies_to}`stack: ga 9.1`
7575
: (Optional, object) Optional pruning configuration. If enabled, this will omit non-significant tokens from the query in order to improve query performance. This is only used if `prune` is set to `true`. If `prune` is set to `true` but `pruning_config` is not specified, default values will be used. If `prune` is set to false but `pruning_config` is specified, an exception will occur.
7676

7777
Parameters for `pruning_config` include:
7878

79-
`tokens_freq_ratio_threshold` {applies_to}`stack: preview 9.1`
79+
`tokens_freq_ratio_threshold` {applies_to}`stack: ga 9.1`
8080
: (Optional, integer) Tokens whose frequency is more than `tokens_freq_ratio_threshold` times the average frequency of all tokens in the specified field are considered outliers and pruned. This value must between 1 and 100. Default: `5`.
8181

82-
`tokens_weight_threshold` {applies_to}`stack: preview 9.1`
82+
`tokens_weight_threshold` {applies_to}`stack: ga 9.1`
8383
: (Optional, float) Tokens whose weight is less than `tokens_weight_threshold` are considered insignificant and pruned. This value must be between 0 and 1. Default: `0.4`.
8484

8585
::::{note}

muted-tests.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -596,6 +596,9 @@ tests:
596596
- class: org.elasticsearch.repositories.blobstore.BlobStoreCorruptionIT
597597
method: testCorruptionDetection
598598
issue: https://github.com/elastic/elasticsearch/issues/130536
599+
- class: org.elasticsearch.multiproject.test.CoreWithMultipleProjectsClientYamlTestSuiteIT
600+
method: test {yaml=indices.resolve_index/10_basic_resolve_index/Resolve index with hidden and closed indices}
601+
issue: https://github.com/elastic/elasticsearch/issues/130568
599602

600603
# Examples:
601604
#

server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java

Lines changed: 32 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,7 @@ private void processBulkIndexIngestRequest(
297297
) {
298298
final long ingestStartTimeInNanos = relativeTimeNanos();
299299
final BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original);
300+
final Thread originalThread = Thread.currentThread();
300301
getIngestService(original).executeBulkRequest(
301302
metadata.id(),
302303
original.numberOfActions(),
@@ -305,50 +306,42 @@ private void processBulkIndexIngestRequest(
305306
(indexName) -> resolveFailureStore(indexName, metadata, threadPool.absoluteTimeInMillis()),
306307
bulkRequestModifier::markItemForFailureStore,
307308
bulkRequestModifier::markItemAsFailed,
308-
(originalThread, exception) -> {
309-
if (exception != null) {
310-
logger.debug("failed to execute pipeline for a bulk request", exception);
311-
listener.onFailure(exception);
309+
listener.delegateFailureAndWrap((l, unused) -> {
310+
long ingestTookInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeNanos() - ingestStartTimeInNanos);
311+
BulkRequest bulkRequest = bulkRequestModifier.getBulkRequest();
312+
ActionListener<BulkResponse> actionListener = bulkRequestModifier.wrapActionListenerIfNeeded(ingestTookInMillis, l);
313+
if (bulkRequest.requests().isEmpty()) {
314+
// at this stage, the transport bulk action can't deal with a bulk request with no requests,
315+
// so we stop and send an empty response back to the client.
316+
// (this will happen if pre-processing all items in the bulk failed)
317+
actionListener.onResponse(new BulkResponse(new BulkItemResponse[0], 0));
312318
} else {
313-
long ingestTookInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeNanos() - ingestStartTimeInNanos);
314-
BulkRequest bulkRequest = bulkRequestModifier.getBulkRequest();
315-
ActionListener<BulkResponse> actionListener = bulkRequestModifier.wrapActionListenerIfNeeded(
316-
ingestTookInMillis,
317-
listener
318-
);
319-
if (bulkRequest.requests().isEmpty()) {
320-
// at this stage, the transport bulk action can't deal with a bulk request with no requests,
321-
// so we stop and send an empty response back to the client.
322-
// (this will happen if pre-processing all items in the bulk failed)
323-
actionListener.onResponse(new BulkResponse(new BulkItemResponse[0], 0));
324-
} else {
325-
ActionRunnable<BulkResponse> runnable = new ActionRunnable<>(actionListener) {
326-
@Override
327-
protected void doRun() throws IOException {
328-
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, actionListener);
329-
}
319+
ActionRunnable<BulkResponse> runnable = new ActionRunnable<>(actionListener) {
320+
@Override
321+
protected void doRun() throws IOException {
322+
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, actionListener);
323+
}
330324

331-
@Override
332-
public boolean isForceExecution() {
333-
// If we fork back to a coordination thread we **not** should fail, because tp queue is full.
334-
// (Otherwise the work done during ingest will be lost)
335-
// It is okay to force execution here. Throttling of write requests happens prior to
336-
// ingest when a node receives a bulk request.
337-
return true;
338-
}
339-
};
340-
// If a processor went async and returned a response on a different thread then
341-
// before we continue the bulk request we should fork back on a coordination thread. Otherwise it is fine to perform
342-
// coordination steps on the write thread
343-
if (originalThread == Thread.currentThread()) {
344-
runnable.run();
345-
} else {
346-
executor.execute(runnable);
325+
@Override
326+
public boolean isForceExecution() {
327+
// If we fork back to a coordination thread we **not** should fail, because tp queue is full.
328+
// (Otherwise the work done during ingest will be lost)
329+
// It is okay to force execution here. Throttling of write requests happens prior to
330+
// ingest when a node receives a bulk request.
331+
return true;
347332
}
333+
};
334+
// If a processor went async and returned a response on a different thread then
335+
// before we continue the bulk request we should fork back on a coordination thread. Otherwise it is fine to perform
336+
// coordination steps on the write thread
337+
if (originalThread == Thread.currentThread()) {
338+
runnable.run();
339+
} else {
340+
executor.execute(runnable);
348341
}
349342
}
350-
},
351-
executor
343+
344+
})
352345
);
353346
}
354347

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,6 @@
9696
import java.util.Set;
9797
import java.util.TreeMap;
9898
import java.util.concurrent.CopyOnWriteArrayList;
99-
import java.util.concurrent.Executor;
10099
import java.util.function.BiConsumer;
101100
import java.util.function.BiFunction;
102101
import java.util.function.Consumer;
@@ -854,10 +853,7 @@ private static IngestPipelinesExecutionResult failWithoutStoringIn(String index,
854853
* @param onFailure A callback executed when a document fails ingestion and does not need to be
855854
* persisted. Accepts the slot in the collection of requests that the document
856855
* occupies, and the exception that the document encountered.
857-
* @param onCompletion A callback executed once all documents have been processed. Accepts the thread
858-
* that ingestion completed on or an exception in the event that the entire operation
859-
* has failed.
860-
* @param executor Which executor the bulk request should be executed on.
856+
* @param listener A callback executed once all documents have been processed.
861857
*/
862858
public void executeBulkRequest(
863859
final ProjectId projectId,
@@ -867,25 +863,23 @@ public void executeBulkRequest(
867863
final Function<String, Boolean> resolveFailureStore,
868864
final TriConsumer<Integer, String, Exception> onStoreFailure,
869865
final TriConsumer<Integer, Exception, IndexDocFailureStoreStatus> onFailure,
870-
final BiConsumer<Thread, Exception> onCompletion,
871-
final Executor executor
866+
final ActionListener<Void> listener
872867
) {
873868
assert numberOfActionRequests > 0 : "numberOfActionRequests must be greater than 0 but was [" + numberOfActionRequests + "]";
874869

875870
// Adapt handler to ensure node features during ingest logic
876871
final Function<String, Boolean> adaptedResolveFailureStore = wrapResolverWithFeatureCheck(resolveFailureStore);
877872

878-
executor.execute(new AbstractRunnable() {
873+
new AbstractRunnable() {
879874

880875
@Override
881876
public void onFailure(Exception e) {
882-
onCompletion.accept(null, e);
877+
listener.onFailure(e);
883878
}
884879

885880
@Override
886881
protected void doRun() {
887-
final Thread originalThread = Thread.currentThread();
888-
try (var refs = new RefCountingRunnable(() -> onCompletion.accept(originalThread, null))) {
882+
try (var refs = new RefCountingRunnable(() -> listener.onResponse(null))) {
889883
int i = 0;
890884
for (DocWriteRequest<?> actionRequest : actionRequests) {
891885
IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(actionRequest);
@@ -964,7 +958,7 @@ public void onFailure(Exception e) {
964958
}
965959
}
966960
}
967-
});
961+
}.run();
968962
}
969963

970964
/**

0 commit comments

Comments
 (0)