diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 528cd03c3b55c..40b8a3e4737fe 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -60,6 +60,7 @@ static TransportVersion def(int id) { public static final TransportVersion V_7_8_1 = def(7_08_01_99); public static final TransportVersion V_7_9_0 = def(7_09_00_99); public static final TransportVersion V_7_10_0 = def(7_10_00_99); + public static final TransportVersion V_8_0_0 = def(8_00_00_99); public static final TransportVersion V_8_8_0 = def(8_08_00_99); public static final TransportVersion V_8_8_1 = def(8_08_01_99); /* diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java index f05e16bc07897..c0a3e568ffeeb 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java @@ -9,6 +9,7 @@ package org.elasticsearch.action.admin.indices.flush; +import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.ActionFilters; @@ -17,11 +18,17 @@ import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.AbstractTransportRequest; +import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportRequestHandler; import org.elasticsearch.transport.TransportService; import java.io.IOException; @@ -57,6 +64,12 @@ public TransportShardFlushAction( PrimaryActionExecution.RejectOnOverload, ReplicaActionExecution.SubjectToCircuitBreaker ); + transportService.registerRequestHandler( + PRE_SYNCED_FLUSH_ACTION_NAME, + threadPool.executor(ThreadPool.Names.FLUSH), + PreShardSyncedFlushRequest::new, + new PreSyncedFlushTransportHandler(indicesService) + ); } @Override @@ -83,4 +96,43 @@ protected void shardOperationOnReplica(ShardFlushRequest request, IndexShard rep return new ReplicaResult(); })); } + + // TODO: Remove this transition in 9.0 + private static final String PRE_SYNCED_FLUSH_ACTION_NAME = "internal:indices/flush/synced/pre"; + + private static class PreShardSyncedFlushRequest extends AbstractTransportRequest { + private final ShardId shardId; + + private PreShardSyncedFlushRequest(StreamInput in) throws IOException { + super(in); + assert in.getTransportVersion().before(TransportVersions.V_8_0_0) : "received pre_sync request from a new node"; + this.shardId = new ShardId(in); + } + + @Override + public String toString() { + return "PreShardSyncedFlushRequest{" + "shardId=" + shardId + '}'; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + assert false : "must not send pre_sync request from a new node"; + throw new UnsupportedOperationException(""); + } + } + + private static final class PreSyncedFlushTransportHandler implements TransportRequestHandler { + private final IndicesService indicesService; + + PreSyncedFlushTransportHandler(IndicesService indicesService) { + this.indicesService = indicesService; + } + + @Override + public void messageReceived(PreShardSyncedFlushRequest request, TransportChannel channel, Task task) { + IndexShard indexShard = indicesService.indexServiceSafe(request.shardId.getIndex()).getShard(request.shardId.id()); + indexShard.flush(new FlushRequest().force(false).waitIfOngoing(true)); + throw new UnsupportedOperationException("Synced flush was removed and a normal flush was performed instead."); + } + } }