Skip to content

Commit 0297cce

Browse files
Merge branch 'main' into threadpool-merge-scheduler
2 parents 0e714a1 + 110b206 commit 0297cce

File tree

15 files changed

+221
-167
lines changed

15 files changed

+221
-167
lines changed

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,13 +75,15 @@ public void addTracingHandler(ChunkHandler chunkHandler) {
7575

7676
@Override
7777
public void next() {
78-
assert closing == false : "cannot request next chunk on closing stream";
7978
assert handler != null : "handler must be set before requesting next chunk";
8079
requestContext = threadContext.newStoredContext();
8180
channel.eventLoop().submit(() -> {
8281
activityTracker.startActivity();
8382
requested = true;
8483
try {
84+
if (closing) {
85+
return;
86+
}
8587
if (buf == null) {
8688
channel.read();
8789
} else {

muted-tests.yml

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -178,9 +178,6 @@ tests:
178178
issue: https://github.com/elastic/elasticsearch/issues/118374
179179
- class: org.elasticsearch.xpack.esql.action.EsqlActionBreakerIT
180180
issue: https://github.com/elastic/elasticsearch/issues/118238
181-
- class: org.elasticsearch.reservedstate.service.FileSettingsServiceTests
182-
method: testInvalidJSON
183-
issue: https://github.com/elastic/elasticsearch/issues/116521
184181
- class: org.elasticsearch.xpack.ccr.rest.ShardChangesRestIT
185182
method: testShardChangesNoOperation
186183
issue: https://github.com/elastic/elasticsearch/issues/118800
@@ -244,11 +241,17 @@ tests:
244241
- class: org.elasticsearch.search.ccs.CrossClusterIT
245242
method: testCancel
246243
issue: https://github.com/elastic/elasticsearch/issues/108061
247-
- class: org.elasticsearch.http.netty4.Netty4IncrementalRequestHandlingIT
248-
method: testOversizedChunkedEncoding
249-
issue: https://github.com/elastic/elasticsearch/issues/120444
250244
- class: org.elasticsearch.xpack.logsdb.seqno.RetentionLeaseRestIT
251245
issue: https://github.com/elastic/elasticsearch/issues/120434
246+
- class: org.elasticsearch.entitlement.qa.EntitlementsAllowedIT
247+
method: testCheckActionWithPolicyPass {pathPrefix=allowed actionName=create_ldap_cert_store}
248+
issue: https://github.com/elastic/elasticsearch/issues/120422
249+
- class: org.elasticsearch.entitlement.qa.EntitlementsAllowedIT
250+
method: testCheckActionWithPolicyPass {pathPrefix=allowed_nonmodular actionName=create_ldap_cert_store}
251+
issue: https://github.com/elastic/elasticsearch/issues/120423
252+
- class: org.elasticsearch.reservedstate.service.FileSettingsServiceTests
253+
method: testInvalidJSON
254+
issue: https://github.com/elastic/elasticsearch/issues/120482
252255

253256
# Examples:
254257
#

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,8 @@ static TransportVersion def(int id) {
156156
public static final TransportVersion ELASTIC_INFERENCE_SERVICE_UNIFIED_CHAT_COMPLETIONS_INTEGRATION = def(8_822_00_0);
157157
public static final TransportVersion KQL_QUERY_TECH_PREVIEW = def(8_823_00_0);
158158
public static final TransportVersion ESQL_PROFILE_ROWS_PROCESSED = def(8_824_00_0);
159-
public static final TransportVersion BYTE_SIZE_VALUE_ALWAYS_USES_BYTES = def(8_825_00_0);
159+
public static final TransportVersion BYTE_SIZE_VALUE_ALWAYS_USES_BYTES_1 = def(8_825_00_0);
160+
public static final TransportVersion REVERT_BYTE_SIZE_VALUE_ALWAYS_USES_BYTES_1 = def(8_826_00_0);
160161

161162
/*
162163
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/common/unit/ByteSizeValue.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
package org.elasticsearch.common.unit;
1111

1212
import org.elasticsearch.ElasticsearchParseException;
13+
import org.elasticsearch.TransportVersion;
1314
import org.elasticsearch.common.Strings;
1415
import org.elasticsearch.common.io.stream.StreamInput;
1516
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -24,7 +25,8 @@
2425
import java.util.Locale;
2526
import java.util.Objects;
2627

27-
import static org.elasticsearch.TransportVersions.BYTE_SIZE_VALUE_ALWAYS_USES_BYTES;
28+
import static org.elasticsearch.TransportVersions.BYTE_SIZE_VALUE_ALWAYS_USES_BYTES_1;
29+
import static org.elasticsearch.TransportVersions.REVERT_BYTE_SIZE_VALUE_ALWAYS_USES_BYTES_1;
2830
import static org.elasticsearch.common.unit.ByteSizeUnit.BYTES;
2931
import static org.elasticsearch.common.unit.ByteSizeUnit.GB;
3032
import static org.elasticsearch.common.unit.ByteSizeUnit.KB;
@@ -111,7 +113,8 @@ static ByteSizeValue newByteSizeValue(long sizeInBytes, ByteSizeUnit desiredUnit
111113
public static ByteSizeValue readFrom(StreamInput in) throws IOException {
112114
long size = in.readZLong();
113115
ByteSizeUnit unit = ByteSizeUnit.readFrom(in);
114-
if (in.getTransportVersion().onOrAfter(BYTE_SIZE_VALUE_ALWAYS_USES_BYTES)) {
116+
TransportVersion tv = in.getTransportVersion();
117+
if (tv.onOrAfter(BYTE_SIZE_VALUE_ALWAYS_USES_BYTES_1) && tv.before(REVERT_BYTE_SIZE_VALUE_ALWAYS_USES_BYTES_1)) {
115118
return newByteSizeValue(size, unit);
116119
} else {
117120
return of(size, unit);
@@ -120,7 +123,8 @@ public static ByteSizeValue readFrom(StreamInput in) throws IOException {
120123

121124
@Override
122125
public void writeTo(StreamOutput out) throws IOException {
123-
if (out.getTransportVersion().onOrAfter(BYTE_SIZE_VALUE_ALWAYS_USES_BYTES)) {
126+
TransportVersion tv = out.getTransportVersion();
127+
if (tv.onOrAfter(BYTE_SIZE_VALUE_ALWAYS_USES_BYTES_1) && tv.before(REVERT_BYTE_SIZE_VALUE_ALWAYS_USES_BYTES_1)) {
124128
out.writeZLong(sizeInBytes);
125129
} else {
126130
out.writeZLong(Math.divideExact(sizeInBytes, desiredUnit.toBytes(1)));

server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.Iterator;
2929
import java.util.List;
3030
import java.util.function.Consumer;
31+
import java.util.function.Supplier;
3132

3233
import static org.elasticsearch.core.Strings.format;
3334

@@ -349,4 +350,16 @@ public void afterFilesRestoredFromRepository(IndexShard indexShard) {
349350
}
350351
}
351352
}
353+
354+
@Override
355+
public void onAcquirePrimaryOperationPermit(IndexShard indexShard, Supplier<ActionListener<Void>> onPermitAcquiredListenerSupplier) {
356+
for (IndexEventListener listener : listeners) {
357+
try {
358+
listener.onAcquirePrimaryOperationPermit(indexShard, onPermitAcquiredListenerSupplier);
359+
} catch (Exception e) {
360+
logger.warn(() -> "[" + indexShard.shardId() + "] failed to invoke the listener on acquiring a primary permit", e);
361+
throw e;
362+
}
363+
}
364+
}
352365
}

server/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import org.elasticsearch.index.IndexSettings;
1818
import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason;
1919

20+
import java.util.function.Supplier;
21+
2022
/**
2123
* An index event listener is the primary extension point for plugins and build-in services
2224
* to react / listen to per-index and per-shard events. These listeners are registered per-index
@@ -190,4 +192,14 @@ default void afterIndexShardRecovery(IndexShard indexShard, ActionListener<Void>
190192
* @param indexShard the shard that is recovering
191193
*/
192194
default void afterFilesRestoredFromRepository(IndexShard indexShard) {}
195+
196+
/**
197+
* Called when a single primary permit is acquired for the given shard (see
198+
* {@link IndexShard#acquirePrimaryOperationPermit(ActionListener, java.util.concurrent.Executor)}).
199+
*
200+
* @param indexShard the shard of which a primary permit is requested
201+
* @param onPermitAcquiredListenerSupplier call this immediately to get a listener when the permit is acquired. The listener must be
202+
* completed in order for the permit to be given to the acquiring operation.
203+
*/
204+
default void onAcquirePrimaryOperationPermit(IndexShard indexShard, Supplier<ActionListener<Void>> onPermitAcquiredListenerSupplier) {}
193205
}

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 32 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
3535
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
3636
import org.elasticsearch.action.support.PlainActionFuture;
37+
import org.elasticsearch.action.support.RefCountingListener;
3738
import org.elasticsearch.action.support.SubscribableListener;
3839
import org.elasticsearch.action.support.replication.PendingReplicationActions;
3940
import org.elasticsearch.action.support.replication.ReplicationResponse;
@@ -189,7 +190,6 @@
189190
import static org.elasticsearch.core.Strings.format;
190191
import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
191192
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
192-
import static org.elasticsearch.index.shard.IndexShard.PrimaryPermitCheck.CHECK_PRIMARY_MODE;
193193

194194
public class IndexShard extends AbstractIndexShardComponent implements IndicesClusterStateService.Shard {
195195

@@ -779,28 +779,10 @@ public void relocated(
779779
final String targetAllocationId,
780780
final BiConsumer<ReplicationTracker.PrimaryContext, ActionListener<Void>> consumer,
781781
final ActionListener<Void> listener
782-
) throws IllegalIndexShardStateException, IllegalStateException {
783-
relocated(targetNodeId, targetAllocationId, consumer, listener, null);
784-
}
785-
786-
/**
787-
* Provides an variant of {@link IndexShard#relocated(String, String, BiConsumer, ActionListener, Releasable)} with an option
788-
* to relocate the shard under externally acquired primary permits.
789-
*
790-
* @param acquiredPrimaryPermits if null, waits until all the primary permits are acquired, otherwise it calls the consumer immediately
791-
*/
792-
public void relocated(
793-
final String targetNodeId,
794-
final String targetAllocationId,
795-
final BiConsumer<ReplicationTracker.PrimaryContext, ActionListener<Void>> consumer,
796-
final ActionListener<Void> listener,
797-
@Nullable final Releasable acquiredPrimaryPermits
798782
) throws IllegalIndexShardStateException, IllegalStateException {
799783
assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting;
800-
assert acquiredPrimaryPermits == null || indexShardOperationPermits.getActiveOperationsCount() == OPERATIONS_BLOCKED
801-
: "external primary permits are provided but not held by the shard";
802784
try (Releasable forceRefreshes = refreshListeners.forceRefreshes()) {
803-
ActionListener<Releasable> onAcquired = new ActionListener<>() {
785+
indexShardOperationPermits.blockOperations(new ActionListener<>() {
804786
@Override
805787
public void onResponse(Releasable releasable) {
806788
boolean success = false;
@@ -878,13 +860,8 @@ public void onFailure(Exception e) {
878860
listener.onFailure(e);
879861
}
880862
}
881-
};
882-
if (acquiredPrimaryPermits == null) {
883-
// Wait on current thread because this execution is wrapped by CancellableThreads and we want to be able to interrupt it
884-
indexShardOperationPermits.blockOperations(onAcquired, 30L, TimeUnit.MINUTES, EsExecutors.DIRECT_EXECUTOR_SERVICE);
885-
} else {
886-
ActionListener.completeWith(onAcquired, () -> acquiredPrimaryPermits);
887-
}
863+
}, 30L, TimeUnit.MINUTES, EsExecutors.DIRECT_EXECUTOR_SERVICE); // Wait on current thread because this execution is wrapped by
864+
// CancellableThreads and we want to be able to interrupt it
888865
}
889866
}
890867

@@ -3592,100 +3569,69 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
35923569
);
35933570
}
35943571

3595-
/**
3596-
* Check to run before running the primary permit operation
3597-
*/
3598-
public enum PrimaryPermitCheck {
3599-
CHECK_PRIMARY_MODE,
3600-
/**
3601-
* IMPORTANT: Currently intented to be used only for acquiring primary permits during the recovery of hollow shards.
3602-
* Don't disable primary mode checks unless you're really sure.
3603-
*/
3604-
NONE
3605-
}
3606-
36073572
/**
36083573
* Acquire a primary operation permit whenever the shard is ready for indexing. If a permit is directly available, the provided
36093574
* ActionListener will be called on the calling thread. During relocation hand-off, permit acquisition can be delayed. The provided
36103575
* ActionListener will then be called using the provided executor.
36113576
*/
36123577
public void acquirePrimaryOperationPermit(ActionListener<Releasable> onPermitAcquired, Executor executorOnDelay) {
3613-
acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, false, CHECK_PRIMARY_MODE);
3578+
acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, false);
36143579
}
36153580

36163581
public void acquirePrimaryOperationPermit(
36173582
ActionListener<Releasable> onPermitAcquired,
36183583
Executor executorOnDelay,
36193584
boolean forceExecution
3620-
) {
3621-
acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, forceExecution, CHECK_PRIMARY_MODE);
3622-
}
3623-
3624-
public void acquirePrimaryOperationPermit(
3625-
ActionListener<Releasable> onPermitAcquired,
3626-
Executor executorOnDelay,
3627-
boolean forceExecution,
3628-
PrimaryPermitCheck primaryPermitCheck
36293585
) {
36303586
verifyNotClosed();
36313587
assert shardRouting.primary() : "acquirePrimaryOperationPermit should only be called on primary shard: " + shardRouting;
3632-
indexShardOperationPermits.acquire(
3633-
wrapPrimaryOperationPermitListener(primaryPermitCheck, onPermitAcquired),
3634-
executorOnDelay,
3635-
forceExecution
3636-
);
3588+
3589+
ActionListener<Releasable> onPermitAcquiredWrapped = onPermitAcquired.delegateFailureAndWrap((delegate, releasable) -> {
3590+
final ActionListener<Releasable> wrappedListener = indexShardOperationPermits.wrapContextPreservingActionListener(
3591+
delegate,
3592+
executorOnDelay,
3593+
forceExecution
3594+
);
3595+
try (var listeners = new RefCountingListener(wrappedListener.map(unused -> releasable))) {
3596+
indexEventListener.onAcquirePrimaryOperationPermit(this, () -> listeners.acquire());
3597+
}
3598+
});
3599+
3600+
indexShardOperationPermits.acquire(wrapPrimaryOperationPermitListener(onPermitAcquiredWrapped), executorOnDelay, forceExecution);
36373601
}
36383602

36393603
public boolean isPrimaryMode() {
36403604
assert indexShardOperationPermits.getActiveOperationsCount() != 0 : "must hold permit to check primary mode";
36413605
return replicationTracker.isPrimaryMode();
36423606
}
36433607

3644-
public void acquireAllPrimaryOperationsPermits(final ActionListener<Releasable> onPermitAcquired, final TimeValue timeout) {
3645-
acquireAllPrimaryOperationsPermits(onPermitAcquired, timeout, CHECK_PRIMARY_MODE);
3646-
}
3647-
36483608
/**
36493609
* Acquire all primary operation permits. Once all permits are acquired, the provided ActionListener is called.
36503610
* It is the responsibility of the caller to close the {@link Releasable}.
36513611
*/
3652-
public void acquireAllPrimaryOperationsPermits(
3653-
final ActionListener<Releasable> onPermitAcquired,
3654-
final TimeValue timeout,
3655-
final PrimaryPermitCheck primaryPermitCheck
3656-
) {
3612+
public void acquireAllPrimaryOperationsPermits(final ActionListener<Releasable> onPermitAcquired, final TimeValue timeout) {
36573613
verifyNotClosed();
36583614
assert shardRouting.primary() : "acquireAllPrimaryOperationsPermits should only be called on primary shard: " + shardRouting;
36593615

3660-
asyncBlockOperations(
3661-
wrapPrimaryOperationPermitListener(primaryPermitCheck, onPermitAcquired),
3662-
timeout.duration(),
3663-
timeout.timeUnit()
3664-
);
3616+
asyncBlockOperations(wrapPrimaryOperationPermitListener(onPermitAcquired), timeout.duration(), timeout.timeUnit());
36653617
}
36663618

36673619
/**
3668-
* Wraps the action to run on a primary after acquiring permit.
3620+
* Wraps the action to run on a primary after acquiring permit. This wrapping is used to check if the shard is in primary mode before
3621+
* executing the action.
36693622
*
3670-
* @param primaryPermitCheck check to run before the primary mode operation
36713623
* @param listener the listener to wrap
36723624
* @return the wrapped listener
36733625
*/
3674-
private ActionListener<Releasable> wrapPrimaryOperationPermitListener(
3675-
final PrimaryPermitCheck primaryPermitCheck,
3676-
final ActionListener<Releasable> listener
3677-
) {
3678-
return switch (primaryPermitCheck) {
3679-
case CHECK_PRIMARY_MODE -> listener.delegateFailure((l, r) -> {
3680-
if (isPrimaryMode()) {
3681-
l.onResponse(r);
3682-
} else {
3683-
r.close();
3684-
l.onFailure(new ShardNotInPrimaryModeException(shardId, state));
3685-
}
3686-
});
3687-
case NONE -> listener;
3688-
};
3626+
private ActionListener<Releasable> wrapPrimaryOperationPermitListener(final ActionListener<Releasable> listener) {
3627+
return listener.delegateFailure((l, r) -> {
3628+
if (isPrimaryMode()) {
3629+
l.onResponse(r);
3630+
} else {
3631+
r.close();
3632+
l.onFailure(new ShardNotInPrimaryModeException(shardId, state));
3633+
}
3634+
});
36893635
}
36903636

36913637
private void asyncBlockOperations(ActionListener<Releasable> onPermitAcquired, long timeout, TimeUnit timeUnit) {
@@ -3723,7 +3669,7 @@ public void runUnderPrimaryPermit(final Runnable runnable, final Consumer<Except
37233669
runnable.run();
37243670
}
37253671
}, onFailure);
3726-
acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, false, CHECK_PRIMARY_MODE);
3672+
acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay);
37273673
}
37283674

37293675
private <E extends Exception> void bumpPrimaryTerm(

0 commit comments

Comments
 (0)