Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions qa/serverless-system-properties/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

import org.apache.tools.ant.filters.ReplaceTokens

apply plugin: 'elasticsearch.build'

dependencies {
testImplementation project(':test:framework')
testImplementation project(':server')
}

tasks.named("test").configure {
systemProperty 'es.stateless.allow.index.refresh_interval.override', 'true'
// systemProperty 'es.serverless_transport', 'true'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need this, right? I wonder why the CI didn't fail?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh hmm that's a good point - I commented that out for a test but then forgot I'd done so. CI passes because we weakened this to a warning in #128589 but did not adjust the es.serverless_transport test to assert the lack of a warning. Fixed in 7c9ffc2.

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public static IndexMetadata newIndexMeta(String name, Settings indexSettings) {
}

public void testStatelessMinRefreshIntervalOverride() {
assumeTrue(
assertTrue(
"This test depends on system property configured in build.gradle",
Booleans.parseBoolean(
System.getProperty(IndexSettings.RefreshIntervalValidator.STATELESS_ALLOW_INDEX_REFRESH_INTERVAL_OVERRIDE, "false")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.transport;

import org.elasticsearch.Build;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.cluster.node.VersionInformation;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.netty4.Netty4Transport;
import org.elasticsearch.transport.netty4.SharedGroupFactory;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static java.util.Collections.emptySet;
import static org.elasticsearch.transport.AbstractSimpleTransportTestCase.IGNORE_DESERIALIZATION_ERRORS_SETTING;

public class ServerlessTransportHandshakeTests extends ESTestCase {

private static ThreadPool threadPool;

@BeforeClass
public static void startThreadPool() {
threadPool = new TestThreadPool(ServerlessTransportHandshakeTests.class.getSimpleName());
}

private final List<TransportService> transportServices = new ArrayList<>();

private TransportService startServices(String nodeNameAndId, Settings settings, TransportInterceptor transportInterceptor) {
TcpTransport transport = new Netty4Transport(
settings,
TransportVersion.current(),
threadPool,
new NetworkService(Collections.emptyList()),
PageCacheRecycler.NON_RECYCLING_INSTANCE,
new NamedWriteableRegistry(Collections.emptyList()),
new NoneCircuitBreakerService(),
new SharedGroupFactory(settings)
);
TransportService transportService = new MockTransportService(
settings,
transport,
threadPool,
transportInterceptor,
(boundAddress) -> DiscoveryNodeUtils.builder(nodeNameAndId)
.name(nodeNameAndId)
.address(boundAddress.publishAddress())
.roles(emptySet())
.version(VersionInformation.CURRENT)
.build(),
null,
Collections.emptySet(),
nodeNameAndId
);
transportService.start();
transportService.acceptIncomingRequests();
transportServices.add(transportService);
return transportService;
}

@After
public void tearDown() throws Exception {
for (TransportService transportService : transportServices) {
transportService.close();
}
super.tearDown();
}

@AfterClass
public static void terminateThreadPool() {
ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
// since static must set to null to be eligible for collection
threadPool = null;
}

public void testAcceptsMismatchedServerlessBuildHash() {
assumeTrue("Current build needs to be a snapshot", Build.current().isSnapshot());
final var transportInterceptorA = new BuildHashModifyingTransportInterceptor();
final var transportInterceptorB = new BuildHashModifyingTransportInterceptor();
final Settings settings = Settings.builder()
.put("cluster.name", "a")
.put(IGNORE_DESERIALIZATION_ERRORS_SETTING.getKey(), true) // suppress assertions to test production error-handling
.build();
final TransportService transportServiceA = startServices("TS_A", settings, transportInterceptorA);
final TransportService transportServiceB = startServices("TS_B", settings, transportInterceptorB);
AbstractSimpleTransportTestCase.connectToNode(transportServiceA, transportServiceB.getLocalNode(), TestProfiles.LIGHT_PROFILE);
assertTrue(transportServiceA.nodeConnected(transportServiceB.getLocalNode()));
}

}
24 changes: 0 additions & 24 deletions server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -151,30 +151,6 @@ if (buildParams.snapshotBuild == false) {

tasks.named("test").configure {
systemProperty 'es.insecure_network_trace_enabled', 'true'
filter {
excludeTestsMatching("*.TransportServiceHandshakeTests.testAcceptsMismatchedServerlessBuildHash")
}
excludes << '**/IndexSettingsOverrideTests.class'
}

// There are tests rely on system properties to be configured differently. They must run in a separate test job
// since the default does not work for them and configuring the system properties inside the test class/method
// is too late because fields based on the system properties are often initialized statically.
TaskProvider<Test> systemPropertiesOverrideTest = tasks.register("systemPropertiesOverrideTest", Test) {
include '**/IndexSettingsOverrideTests.class'
include '**/TransportServiceHandshakeTests.class'
filter {
includeTestsMatching("*.TransportServiceHandshakeTests.testAcceptsMismatchedServerlessBuildHash")
includeTestsMatching("*.IndexSettingsOverrideTests.*")
}
systemProperty 'es.stateless.allow.index.refresh_interval.override', 'true'
systemProperty 'es.serverless_transport', 'true'
classpath = sourceSets.test.runtimeClasspath
testClassesDirs = sourceSets.test.output.classesDirs
}

tasks.named("check").configure {
dependsOn(systemPropertiesOverrideTest)
}

tasks.named("thirdPartyAudit").configure {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
package org.elasticsearch.transport;

import org.apache.logging.log4j.Level;
import org.elasticsearch.Build;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -41,7 +40,6 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

import static java.util.Collections.emptySet;
Expand Down Expand Up @@ -323,10 +321,8 @@ public void testNodeConnectWithDifferentNodeId() {
}

public void testRejectsMismatchedBuildHash() {
final DisruptingTransportInterceptor transportInterceptorA = new DisruptingTransportInterceptor();
final DisruptingTransportInterceptor transportInterceptorB = new DisruptingTransportInterceptor();
transportInterceptorA.setModifyBuildHash(true);
transportInterceptorB.setModifyBuildHash(true);
final var transportInterceptorA = new BuildHashModifyingTransportInterceptor();
final var transportInterceptorB = new BuildHashModifyingTransportInterceptor();
final Settings settings = Settings.builder()
.put("cluster.name", "a")
.put(IGNORE_DESERIALIZATION_ERRORS_SETTING.getKey(), true) // suppress assertions to test production error-handling
Expand Down Expand Up @@ -375,39 +371,9 @@ public void testRejectsMismatchedBuildHash() {
assertFalse(transportServiceA.nodeConnected(discoveryNode));
}

public void testAcceptsMismatchedServerlessBuildHash() {
assumeTrue("Current build needs to be a snapshot", Build.current().isSnapshot());
final DisruptingTransportInterceptor transportInterceptorA = new DisruptingTransportInterceptor();
final DisruptingTransportInterceptor transportInterceptorB = new DisruptingTransportInterceptor();
transportInterceptorA.setModifyBuildHash(true);
transportInterceptorB.setModifyBuildHash(true);
final Settings settings = Settings.builder()
.put("cluster.name", "a")
.put(IGNORE_DESERIALIZATION_ERRORS_SETTING.getKey(), true) // suppress assertions to test production error-handling
.build();
final TransportService transportServiceA = startServices(
"TS_A",
settings,
TransportVersion.current(),
VersionInformation.CURRENT,
transportInterceptorA
);
final TransportService transportServiceB = startServices(
"TS_B",
settings,
TransportVersion.current(),
VersionInformation.CURRENT,
transportInterceptorB
);
AbstractSimpleTransportTestCase.connectToNode(transportServiceA, transportServiceB.getLocalNode(), TestProfiles.LIGHT_PROFILE);
assertTrue(transportServiceA.nodeConnected(transportServiceB.getLocalNode()));
}

public void testAcceptsMismatchedBuildHashFromDifferentVersion() {
final DisruptingTransportInterceptor transportInterceptorA = new DisruptingTransportInterceptor();
final DisruptingTransportInterceptor transportInterceptorB = new DisruptingTransportInterceptor();
transportInterceptorA.setModifyBuildHash(true);
transportInterceptorB.setModifyBuildHash(true);
final var transportInterceptorA = new BuildHashModifyingTransportInterceptor();
final var transportInterceptorB = new BuildHashModifyingTransportInterceptor();
final TransportService transportServiceA = startServices(
"TS_A",
Settings.builder().put("cluster.name", "a").build(),
Expand All @@ -425,58 +391,4 @@ public void testAcceptsMismatchedBuildHashFromDifferentVersion() {
AbstractSimpleTransportTestCase.connectToNode(transportServiceA, transportServiceB.getLocalNode(), TestProfiles.LIGHT_PROFILE);
assertTrue(transportServiceA.nodeConnected(transportServiceB.getLocalNode()));
}

private static class DisruptingTransportInterceptor implements TransportInterceptor {

private boolean modifyBuildHash;

public void setModifyBuildHash(boolean modifyBuildHash) {
this.modifyBuildHash = modifyBuildHash;
}

@Override
public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(
String action,
Executor executor,
boolean forceExecution,
TransportRequestHandler<T> actualHandler
) {

if (TransportService.HANDSHAKE_ACTION_NAME.equals(action)) {
return (request, channel, task) -> actualHandler.messageReceived(request, new TransportChannel() {
@Override
public String getProfileName() {
return channel.getProfileName();
}

@Override
public void sendResponse(TransportResponse response) {
assertThat(response, instanceOf(TransportService.HandshakeResponse.class));
if (modifyBuildHash) {
final TransportService.HandshakeResponse handshakeResponse = (TransportService.HandshakeResponse) response;
channel.sendResponse(
new TransportService.HandshakeResponse(
handshakeResponse.getVersion(),
handshakeResponse.getBuildHash() + "-modified",
handshakeResponse.getDiscoveryNode(),
handshakeResponse.getClusterName()
)
);
} else {
channel.sendResponse(response);
}
}

@Override
public void sendResponse(Exception exception) {
channel.sendResponse(exception);

}
}, task);
} else {
return actualHandler;
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.transport;

import org.elasticsearch.test.ESTestCase;

import java.util.concurrent.Executor;

import static org.hamcrest.Matchers.instanceOf;

class BuildHashModifyingTransportInterceptor implements TransportInterceptor {

@Override
public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(
String action,
Executor executor,
boolean forceExecution,
TransportRequestHandler<T> actualHandler
) {

if (TransportService.HANDSHAKE_ACTION_NAME.equals(action)) {
return (request, channel, task) -> actualHandler.messageReceived(request, new TransportChannel() {
@Override
public String getProfileName() {
return channel.getProfileName();
}

@Override
public void sendResponse(TransportResponse response) {
ESTestCase.assertThat(response, instanceOf(TransportService.HandshakeResponse.class));
final TransportService.HandshakeResponse handshakeResponse = (TransportService.HandshakeResponse) response;
channel.sendResponse(
new TransportService.HandshakeResponse(
handshakeResponse.getVersion(),
handshakeResponse.getBuildHash() + "-modified",
handshakeResponse.getDiscoveryNode(),
handshakeResponse.getClusterName()
)
);
}

@Override
public void sendResponse(Exception exception) {
channel.sendResponse(exception);

}
}, task);
} else {
return actualHandler;
}
}
}