Skip to content

Commit 2b91e7a

Browse files
authored
Remove ValidateJoinRequest (#117225)
Since 8.3.0 (#85380) we have sent join-validation requests as a `BytesTransportRequest` to facilitate sharing these large messages (and the work needed to create them) amongst all nodes that join the cluster at around the same time. For BwC with versions earlier than 8.3.0 we use a `ValidateJoinRequest` class to represent the received data, whichever scheme it uses. We no longer need to maintain this compatibility, so we can use a bare `BytesTransportRequest` on both sender and receiver, and therefore drop the `ValidateJoinRequest` adapter and the special-cased assertion in `MockTransportService`. Relates #114808 which was reverted in #117200.
1 parent 8e6d5e2 commit 2b91e7a

File tree

6 files changed

+78
-123
lines changed

6 files changed

+78
-123
lines changed

server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@ public Coordinator(
246246
this.joinValidationService = new JoinValidationService(
247247
settings,
248248
transportService,
249+
namedWriteableRegistry,
249250
this::getStateForJoinValidationService,
250251
() -> getLastAcceptedState().metadata(),
251252
this.onJoinValidators

server/src/main/java/org/elasticsearch/cluster/coordination/JoinValidationService.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import org.elasticsearch.common.bytes.ReleasableBytesReference;
2222
import org.elasticsearch.common.compress.CompressorFactory;
2323
import org.elasticsearch.common.io.Streams;
24+
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
25+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2426
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
2527
import org.elasticsearch.common.settings.Setting;
2628
import org.elasticsearch.common.settings.Settings;
@@ -106,6 +108,7 @@ public class JoinValidationService {
106108
public JoinValidationService(
107109
Settings settings,
108110
TransportService transportService,
111+
NamedWriteableRegistry namedWriteableRegistry,
109112
Supplier<ClusterState> clusterStateSupplier,
110113
Supplier<Metadata> metadataSupplier,
111114
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators
@@ -120,9 +123,9 @@ public JoinValidationService(
120123
transportService.registerRequestHandler(
121124
JoinValidationService.JOIN_VALIDATE_ACTION_NAME,
122125
this.responseExecutor,
123-
ValidateJoinRequest::new,
126+
BytesTransportRequest::new,
124127
(request, channel, task) -> {
125-
final var remoteState = request.getOrReadState();
128+
final var remoteState = readClusterState(namedWriteableRegistry, request);
126129
final var remoteMetadata = remoteState.metadata();
127130
final var localMetadata = metadataSupplier.get();
128131
if (localMetadata.clusterUUIDCommitted() && localMetadata.clusterUUID().equals(remoteMetadata.clusterUUID()) == false) {
@@ -145,6 +148,20 @@ public JoinValidationService(
145148
);
146149
}
147150

151+
private static ClusterState readClusterState(NamedWriteableRegistry namedWriteableRegistry, BytesTransportRequest request)
152+
throws IOException {
153+
try (
154+
var bytesStreamInput = request.bytes().streamInput();
155+
var in = new NamedWriteableAwareStreamInput(
156+
CompressorFactory.COMPRESSOR.threadLocalStreamInput(bytesStreamInput),
157+
namedWriteableRegistry
158+
)
159+
) {
160+
in.setTransportVersion(request.version());
161+
return ClusterState.readFrom(in, null);
162+
}
163+
}
164+
148165
public void validateJoin(DiscoveryNode discoveryNode, ActionListener<Void> listener) {
149166
// This node isn't in the cluster yet so ClusterState#getMinTransportVersion() doesn't apply, we must obtain a specific connection
150167
// so we can check its transport version to decide how to proceed.

server/src/main/java/org/elasticsearch/cluster/coordination/ValidateJoinRequest.java

Lines changed: 0 additions & 92 deletions
This file was deleted.

server/src/main/java/org/elasticsearch/transport/BytesTransportRequest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,14 @@
1414
import org.elasticsearch.common.bytes.ReleasableBytesReference;
1515
import org.elasticsearch.common.io.stream.StreamInput;
1616
import org.elasticsearch.common.io.stream.StreamOutput;
17-
import org.elasticsearch.core.RefCounted;
1817

1918
import java.io.IOException;
2019

2120
/**
2221
* A specialized, bytes only request, that can potentially be optimized on the network
2322
* layer, specifically for the same large buffer send to several nodes.
2423
*/
25-
public class BytesTransportRequest extends TransportRequest implements RefCounted {
24+
public class BytesTransportRequest extends TransportRequest {
2625

2726
final ReleasableBytesReference bytes;
2827
private final TransportVersion version;

server/src/test/java/org/elasticsearch/cluster/coordination/JoinValidationServiceTests.java

Lines changed: 55 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,14 @@
2222
import org.elasticsearch.cluster.node.DiscoveryNode;
2323
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
2424
import org.elasticsearch.cluster.node.DiscoveryNodes;
25+
import org.elasticsearch.common.bytes.ReleasableBytesReference;
2526
import org.elasticsearch.common.component.Lifecycle;
27+
import org.elasticsearch.common.compress.CompressorFactory;
28+
import org.elasticsearch.common.io.Streams;
2629
import org.elasticsearch.common.io.stream.BytesStreamOutput;
2730
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
2831
import org.elasticsearch.common.io.stream.NamedWriteableRegistryTests;
32+
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
2933
import org.elasticsearch.common.io.stream.StreamOutput;
3034
import org.elasticsearch.common.settings.Settings;
3135
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@@ -38,6 +42,7 @@
3842
import org.elasticsearch.test.transport.MockTransport;
3943
import org.elasticsearch.threadpool.TestThreadPool;
4044
import org.elasticsearch.threadpool.ThreadPool;
45+
import org.elasticsearch.transport.BytesTransportRequest;
4146
import org.elasticsearch.transport.CloseableConnection;
4247
import org.elasticsearch.transport.RemoteTransportException;
4348
import org.elasticsearch.transport.TestTransportChannel;
@@ -49,6 +54,7 @@
4954
import org.elasticsearch.transport.TransportService;
5055
import org.elasticsearch.xcontent.ToXContent;
5156

57+
import java.io.IOException;
5258
import java.util.ArrayList;
5359
import java.util.Collections;
5460
import java.util.Iterator;
@@ -155,6 +161,7 @@ public void doRun() {
155161
final var joinValidationService = new JoinValidationService(
156162
settings,
157163
transportService,
164+
writableRegistry(),
158165
() -> usually() ? clusterState : null,
159166
clusterState::metadata,
160167
List.of()
@@ -286,7 +293,14 @@ public void writeTo(StreamOutput out) {}
286293
);
287294

288295
// registers request handler
289-
new JoinValidationService(Settings.EMPTY, joiningNodeTransportService, () -> clusterState, clusterState::metadata, List.of());
296+
new JoinValidationService(
297+
Settings.EMPTY,
298+
joiningNodeTransportService,
299+
writableRegistry(),
300+
() -> clusterState,
301+
clusterState::metadata,
302+
List.of()
303+
);
290304

291305
joiningNodeTransportService.start();
292306
joiningNodeTransportService.acceptIncomingRequests();
@@ -325,6 +339,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req
325339
final var joinValidationService = new JoinValidationService(
326340
Settings.EMPTY,
327341
masterTransportService,
342+
writableRegistry(),
328343
() -> clusterState,
329344
clusterState::metadata,
330345
List.of()
@@ -349,7 +364,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req
349364
}
350365
}
351366

352-
public void testJoinValidationRejectsMismatchedClusterUUID() {
367+
public void testJoinValidationRejectsMismatchedClusterUUID() throws IOException {
353368
final var deterministicTaskQueue = new DeterministicTaskQueue();
354369
final var mockTransport = new MockTransport();
355370
final var localNode = DiscoveryNodeUtils.create("node0");
@@ -371,7 +386,14 @@ public void testJoinValidationRejectsMismatchedClusterUUID() {
371386
final var settings = Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), dataPath).build();
372387

373388
// registers request handler
374-
new JoinValidationService(settings, transportService, () -> localClusterState, localClusterState::metadata, List.of());
389+
new JoinValidationService(
390+
settings,
391+
transportService,
392+
writableRegistry(),
393+
() -> localClusterState,
394+
localClusterState::metadata,
395+
List.of()
396+
);
375397

376398
transportService.start();
377399
transportService.acceptIncomingRequests();
@@ -384,7 +406,7 @@ public void testJoinValidationRejectsMismatchedClusterUUID() {
384406
transportService.sendRequest(
385407
localNode,
386408
JoinValidationService.JOIN_VALIDATE_ACTION_NAME,
387-
new ValidateJoinRequest(otherClusterState),
409+
serializeClusterState(otherClusterState),
388410
new ActionListenerResponseHandler<>(future, in -> TransportResponse.Empty.INSTANCE, TransportResponseHandler.TRANSPORT_WORKER)
389411
);
390412
deterministicTaskQueue.runAllTasks();
@@ -401,6 +423,22 @@ public void testJoinValidationRejectsMismatchedClusterUUID() {
401423
);
402424
}
403425

426+
private static BytesTransportRequest serializeClusterState(ClusterState clusterState) {
427+
try (
428+
var bytesStream = new BytesStreamOutput();
429+
var compressedStream = new OutputStreamStreamOutput(
430+
CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.flushOnCloseStream(bytesStream))
431+
)
432+
) {
433+
compressedStream.setTransportVersion(TransportVersion.current());
434+
clusterState.writeTo(compressedStream);
435+
compressedStream.flush();
436+
return new BytesTransportRequest(ReleasableBytesReference.wrap(bytesStream.bytes()), TransportVersion.current());
437+
} catch (Exception e) {
438+
throw new AssertionError(e);
439+
}
440+
}
441+
404442
public void testJoinValidationRunsJoinValidators() {
405443
final var deterministicTaskQueue = new DeterministicTaskQueue();
406444
final var mockTransport = new MockTransport();
@@ -420,11 +458,12 @@ public void testJoinValidationRunsJoinValidators() {
420458
new JoinValidationService(
421459
Settings.EMPTY,
422460
transportService,
461+
writableRegistry(),
423462
() -> localClusterState,
424463
localClusterState::metadata,
425464
List.of((node, state) -> {
426465
assertSame(node, localNode);
427-
assertSame(state, stateForValidation);
466+
assertEquals(state.stateUUID(), stateForValidation.stateUUID());
428467
throw new IllegalStateException("simulated validation failure");
429468
})
430469
); // registers request handler
@@ -435,7 +474,7 @@ public void testJoinValidationRunsJoinValidators() {
435474
transportService.sendRequest(
436475
localNode,
437476
JoinValidationService.JOIN_VALIDATE_ACTION_NAME,
438-
new ValidateJoinRequest(stateForValidation),
477+
serializeClusterState(stateForValidation),
439478
new ActionListenerResponseHandler<>(future, in -> TransportResponse.Empty.INSTANCE, TransportResponseHandler.TRANSPORT_WORKER)
440479
);
441480
deterministicTaskQueue.runAllTasks();
@@ -467,9 +506,16 @@ protected void onSendRequest(long requestId, String action, TransportRequest req
467506
null,
468507
Collections.emptySet()
469508
);
470-
final var joinValidationService = new JoinValidationService(Settings.EMPTY, masterTransportService, () -> null, () -> {
471-
throw new AssertionError("should not be called");
472-
}, List.of());
509+
final var joinValidationService = new JoinValidationService(
510+
Settings.EMPTY,
511+
masterTransportService,
512+
writableRegistry(),
513+
() -> null,
514+
() -> {
515+
throw new AssertionError("should not be called");
516+
},
517+
List.of()
518+
);
473519
masterTransportService.start();
474520
masterTransportService.acceptIncomingRequests();
475521

test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import org.elasticsearch.core.RefCounted;
4040
import org.elasticsearch.core.Strings;
4141
import org.elasticsearch.core.TimeValue;
42-
import org.elasticsearch.core.UpdateForV9;
4342
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
4443
import org.elasticsearch.node.Node;
4544
import org.elasticsearch.plugins.Plugin;
@@ -50,7 +49,6 @@
5049
import org.elasticsearch.test.ESTestCase;
5150
import org.elasticsearch.test.tasks.MockTaskManager;
5251
import org.elasticsearch.threadpool.ThreadPool;
53-
import org.elasticsearch.transport.BytesTransportRequest;
5452
import org.elasticsearch.transport.ClusterConnectionManager;
5553
import org.elasticsearch.transport.ConnectTransportException;
5654
import org.elasticsearch.transport.ConnectionProfile;
@@ -586,13 +584,8 @@ public void sendRequest(
586584
// poor mans request cloning...
587585
BytesStreamOutput bStream = new BytesStreamOutput();
588586
request.writeTo(bStream);
589-
final TransportRequest clonedRequest;
590-
if (request instanceof BytesTransportRequest) {
591-
clonedRequest = copyRawBytesForBwC(bStream);
592-
} else {
593-
RequestHandlerRegistry<?> reg = MockTransportService.this.getRequestHandler(action);
594-
clonedRequest = reg.newRequest(bStream.bytes().streamInput());
595-
}
587+
RequestHandlerRegistry<?> reg = MockTransportService.this.getRequestHandler(action);
588+
final TransportRequest clonedRequest = reg.newRequest(bStream.bytes().streamInput());
596589
assert clonedRequest.getClass().equals(MasterNodeRequestHelper.unwrapTermOverride(request).getClass())
597590
: clonedRequest + " vs " + request;
598591

@@ -640,15 +633,6 @@ protected void doRun() throws IOException {
640633
}
641634
}
642635

643-
// Some request handlers read back a BytesTransportRequest
644-
// into a different class that cannot be re-serialized (i.e. JOIN_VALIDATE_ACTION_NAME),
645-
// in those cases we just copy the raw bytes back to a BytesTransportRequest.
646-
// This is only needed for the BwC for JOIN_VALIDATE_ACTION_NAME and can be removed in the next major
647-
@UpdateForV9(owner = UpdateForV9.Owner.DISTRIBUTED_COORDINATION)
648-
private static TransportRequest copyRawBytesForBwC(BytesStreamOutput bStream) throws IOException {
649-
return new BytesTransportRequest(bStream.bytes().streamInput());
650-
}
651-
652636
@Override
653637
public void clearCallback() {
654638
synchronized (this) {

0 commit comments

Comments
 (0)