Skip to content

Commit cf85f04

Browse files
authored
Revert "Remove PreShardSyncedFlush (#136409)" (#136503)
This reverts commit 5c3d5ae.
1 parent 4290a8e commit cf85f04

File tree

2 files changed

+53
-0
lines changed

2 files changed

+53
-0
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ static TransportVersion def(int id) {
6060
public static final TransportVersion V_7_8_1 = def(7_08_01_99);
6161
public static final TransportVersion V_7_9_0 = def(7_09_00_99);
6262
public static final TransportVersion V_7_10_0 = def(7_10_00_99);
63+
public static final TransportVersion V_8_0_0 = def(8_00_00_99);
6364
public static final TransportVersion V_8_8_0 = def(8_08_00_99);
6465
public static final TransportVersion V_8_8_1 = def(8_08_01_99);
6566
/*

server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.action.admin.indices.flush;
1111

12+
import org.elasticsearch.TransportVersions;
1213
import org.elasticsearch.action.ActionListener;
1314
import org.elasticsearch.action.ActionType;
1415
import org.elasticsearch.action.support.ActionFilters;
@@ -17,11 +18,17 @@
1718
import org.elasticsearch.cluster.action.shard.ShardStateAction;
1819
import org.elasticsearch.cluster.service.ClusterService;
1920
import org.elasticsearch.common.io.stream.StreamInput;
21+
import org.elasticsearch.common.io.stream.StreamOutput;
2022
import org.elasticsearch.common.settings.Settings;
2123
import org.elasticsearch.index.shard.IndexShard;
24+
import org.elasticsearch.index.shard.ShardId;
2225
import org.elasticsearch.indices.IndicesService;
2326
import org.elasticsearch.injection.guice.Inject;
27+
import org.elasticsearch.tasks.Task;
2428
import org.elasticsearch.threadpool.ThreadPool;
29+
import org.elasticsearch.transport.AbstractTransportRequest;
30+
import org.elasticsearch.transport.TransportChannel;
31+
import org.elasticsearch.transport.TransportRequestHandler;
2532
import org.elasticsearch.transport.TransportService;
2633

2734
import java.io.IOException;
@@ -57,6 +64,12 @@ public TransportShardFlushAction(
5764
PrimaryActionExecution.RejectOnOverload,
5865
ReplicaActionExecution.SubjectToCircuitBreaker
5966
);
67+
transportService.registerRequestHandler(
68+
PRE_SYNCED_FLUSH_ACTION_NAME,
69+
threadPool.executor(ThreadPool.Names.FLUSH),
70+
PreShardSyncedFlushRequest::new,
71+
new PreSyncedFlushTransportHandler(indicesService)
72+
);
6073
}
6174

6275
@Override
@@ -83,4 +96,43 @@ protected void shardOperationOnReplica(ShardFlushRequest request, IndexShard rep
8396
return new ReplicaResult();
8497
}));
8598
}
99+
100+
// TODO: Remove this transition in 9.0
101+
private static final String PRE_SYNCED_FLUSH_ACTION_NAME = "internal:indices/flush/synced/pre";
102+
103+
private static class PreShardSyncedFlushRequest extends AbstractTransportRequest {
104+
private final ShardId shardId;
105+
106+
private PreShardSyncedFlushRequest(StreamInput in) throws IOException {
107+
super(in);
108+
assert in.getTransportVersion().before(TransportVersions.V_8_0_0) : "received pre_sync request from a new node";
109+
this.shardId = new ShardId(in);
110+
}
111+
112+
@Override
113+
public String toString() {
114+
return "PreShardSyncedFlushRequest{" + "shardId=" + shardId + '}';
115+
}
116+
117+
@Override
118+
public void writeTo(StreamOutput out) throws IOException {
119+
assert false : "must not send pre_sync request from a new node";
120+
throw new UnsupportedOperationException("");
121+
}
122+
}
123+
124+
private static final class PreSyncedFlushTransportHandler implements TransportRequestHandler<PreShardSyncedFlushRequest> {
125+
private final IndicesService indicesService;
126+
127+
PreSyncedFlushTransportHandler(IndicesService indicesService) {
128+
this.indicesService = indicesService;
129+
}
130+
131+
@Override
132+
public void messageReceived(PreShardSyncedFlushRequest request, TransportChannel channel, Task task) {
133+
IndexShard indexShard = indicesService.indexServiceSafe(request.shardId.getIndex()).getShard(request.shardId.id());
134+
indexShard.flush(new FlushRequest().force(false).waitIfOngoing(true));
135+
throw new UnsupportedOperationException("Synced flush was removed and a normal flush was performed instead.");
136+
}
137+
}
86138
}

0 commit comments

Comments
 (0)