Skip to content

Commit 19c9dfe

Browse files
authored
[8.x] Future-proof transport handshake testing (elastic#120140) (elastic#120197)
* Future-proof transport handshake testing (elastic#120140) The testing around `TransportHandshaker` is rather specific about the transport versions in use. In fact, these tests are mostly independent of transport version choices. With this commit we move away from hard-coding specific transport versions to using randomness to indicate that the specific choice of transport version does not really matter. This has the benefit that these tests will need minimal further changes to migrate to a new transport handshake version. * UpdateForV9 not needed in backport
1 parent e39937b commit 19c9dfe

File tree

4 files changed

+48
-34
lines changed

4 files changed

+48
-34
lines changed

server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -121,12 +121,12 @@ final class TransportHandshaker {
121121
* [3] Parent task ID should be empty; see org.elasticsearch.tasks.TaskId.writeTo for its structure.
122122
*/
123123

124-
static final TransportVersion EARLIEST_HANDSHAKE_VERSION = TransportVersion.fromId(6080099);
125-
static final TransportVersion REQUEST_HANDSHAKE_VERSION = TransportVersions.MINIMUM_COMPATIBLE;
124+
static final TransportVersion V7_HANDSHAKE_VERSION = TransportVersion.fromId(6_08_00_99);
125+
static final TransportVersion V8_HANDSHAKE_VERSION = TransportVersion.fromId(7_17_00_99);
126126
static final TransportVersion V9_HANDSHAKE_VERSION = TransportVersion.fromId(8_800_00_0);
127127
static final Set<TransportVersion> ALLOWED_HANDSHAKE_VERSIONS = Set.of(
128-
EARLIEST_HANDSHAKE_VERSION,
129-
REQUEST_HANDSHAKE_VERSION,
128+
V7_HANDSHAKE_VERSION,
129+
V8_HANDSHAKE_VERSION,
130130
V9_HANDSHAKE_VERSION
131131
);
132132

@@ -166,7 +166,7 @@ void sendHandshake(
166166
);
167167
boolean success = false;
168168
try {
169-
handshakeRequestSender.sendRequest(node, channel, requestId, REQUEST_HANDSHAKE_VERSION);
169+
handshakeRequestSender.sendRequest(node, channel, requestId, V8_HANDSHAKE_VERSION);
170170

171171
threadPool.schedule(
172172
() -> handler.handleLocalException(new ConnectTransportException(node, "handshake_timeout[" + timeout + "]")),

server/src/test/java/org/elasticsearch/transport/InboundDecoderTests.java

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -126,11 +126,10 @@ public void testDecode() throws IOException {
126126
}
127127

128128
public void testDecodePreHeaderSizeVariableInt() throws IOException {
129-
// TODO: Can delete test on 9.0
130129
Compression.Scheme compressionScheme = randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.DEFLATE, null);
131130
String action = "test-request";
132131
long requestId = randomNonNegativeLong();
133-
final TransportVersion preHeaderVariableInt = TransportHandshaker.EARLIEST_HANDSHAKE_VERSION;
132+
final TransportVersion preHeaderVariableInt = TransportHandshaker.V7_HANDSHAKE_VERSION;
134133
final String contentValue = randomAlphaOfLength(100);
135134
// 8.0 is only compatible with handshakes on a pre-variable int version
136135
final OutboundMessage message = new OutboundMessage.Request(
@@ -189,7 +188,7 @@ public void testDecodeHandshakeV7Compatibility() throws IOException {
189188
final String headerKey = randomAlphaOfLength(10);
190189
final String headerValue = randomAlphaOfLength(20);
191190
threadContext.putHeader(headerKey, headerValue);
192-
TransportVersion handshakeCompat = TransportHandshaker.EARLIEST_HANDSHAKE_VERSION;
191+
TransportVersion handshakeCompat = TransportHandshaker.V7_HANDSHAKE_VERSION;
193192
OutboundMessage message = new OutboundMessage.Request(
194193
threadContext,
195194
new TestRequest(randomAlphaOfLength(100)),
@@ -225,8 +224,8 @@ public void testDecodeHandshakeV7Compatibility() throws IOException {
225224
}
226225

227226
public void testDecodeHandshakeV8Compatibility() throws IOException {
228-
doHandshakeCompatibilityTest(TransportHandshaker.REQUEST_HANDSHAKE_VERSION, null);
229-
doHandshakeCompatibilityTest(TransportHandshaker.REQUEST_HANDSHAKE_VERSION, Compression.Scheme.DEFLATE);
227+
doHandshakeCompatibilityTest(TransportHandshaker.V8_HANDSHAKE_VERSION, null);
228+
doHandshakeCompatibilityTest(TransportHandshaker.V8_HANDSHAKE_VERSION, Compression.Scheme.DEFLATE);
230229
}
231230

232231
public void testDecodeHandshakeV9Compatibility() throws IOException {
@@ -286,13 +285,18 @@ public void testClientChannelTypeFailsDecodingRequests() throws Exception {
286285
}
287286
}
288287
// a request
288+
final var isHandshake = randomBoolean();
289+
final var version = isHandshake
290+
? randomFrom(TransportHandshaker.ALLOWED_HANDSHAKE_VERSIONS)
291+
: TransportVersionUtils.randomCompatibleVersion(random());
292+
logger.info("--> version = {}", version);
289293
OutboundMessage message = new OutboundMessage.Request(
290294
threadContext,
291295
new TestRequest(randomAlphaOfLength(100)),
292-
TransportHandshaker.REQUEST_HANDSHAKE_VERSION,
296+
version,
293297
action,
294298
requestId,
295-
randomBoolean(),
299+
isHandshake,
296300
randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4, null)
297301
);
298302

@@ -309,9 +313,9 @@ public void testClientChannelTypeFailsDecodingRequests() throws Exception {
309313
try (InboundDecoder decoder = new InboundDecoder(recycler, randomFrom(ChannelType.SERVER, ChannelType.MIX))) {
310314
final ArrayList<Object> fragments = new ArrayList<>();
311315
int bytesConsumed = decoder.decode(wrapAsReleasable(bytes), fragments::add);
312-
int totalHeaderSize = TcpHeader.headerSize(TransportVersion.current()) + bytes.getInt(
313-
TcpHeader.VARIABLE_HEADER_SIZE_POSITION
314-
);
316+
int totalHeaderSize = TcpHeader.headerSize(version) + (version.onOrAfter(TransportVersions.V_7_6_0)
317+
? bytes.getInt(TcpHeader.VARIABLE_HEADER_SIZE_POSITION)
318+
: 0);
315319
assertEquals(totalHeaderSize, bytesConsumed);
316320
final Header header = (Header) fragments.get(0);
317321
assertEquals(requestId, header.getRequestId());
@@ -331,12 +335,16 @@ public void testServerChannelTypeFailsDecodingResponses() throws Exception {
331335
}
332336
}
333337
// a response
338+
final var isHandshake = randomBoolean();
339+
final var version = isHandshake
340+
? randomFrom(TransportHandshaker.ALLOWED_HANDSHAKE_VERSIONS)
341+
: TransportVersionUtils.randomCompatibleVersion(random());
334342
OutboundMessage message = new OutboundMessage.Response(
335343
threadContext,
336344
new TestResponse(randomAlphaOfLength(100)),
337-
TransportHandshaker.REQUEST_HANDSHAKE_VERSION,
345+
version,
338346
requestId,
339-
randomBoolean(),
347+
isHandshake,
340348
randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4, null)
341349
);
342350

@@ -351,9 +359,9 @@ public void testServerChannelTypeFailsDecodingResponses() throws Exception {
351359
try (InboundDecoder decoder = new InboundDecoder(recycler, randomFrom(ChannelType.CLIENT, ChannelType.MIX))) {
352360
final ArrayList<Object> fragments = new ArrayList<>();
353361
int bytesConsumed = decoder.decode(wrapAsReleasable(bytes), fragments::add);
354-
int totalHeaderSize = TcpHeader.headerSize(TransportVersion.current()) + bytes.getInt(
355-
TcpHeader.VARIABLE_HEADER_SIZE_POSITION
356-
);
362+
int totalHeaderSize = TcpHeader.headerSize(version) + (version.onOrAfter(TransportVersions.V_7_6_0)
363+
? bytes.getInt(TcpHeader.VARIABLE_HEADER_SIZE_POSITION)
364+
: 0);
357365
assertEquals(totalHeaderSize, bytesConsumed);
358366
final Header header = (Header) fragments.get(0);
359367
assertEquals(requestId, header.getRequestId());
@@ -449,7 +457,7 @@ public void testCompressedDecodeHandshakeCompatibility() throws IOException {
449457
final String headerKey = randomAlphaOfLength(10);
450458
final String headerValue = randomAlphaOfLength(20);
451459
threadContext.putHeader(headerKey, headerValue);
452-
TransportVersion handshakeCompat = TransportHandshaker.EARLIEST_HANDSHAKE_VERSION;
460+
TransportVersion handshakeCompat = TransportHandshaker.V7_HANDSHAKE_VERSION;
453461
OutboundMessage message = new OutboundMessage.Request(
454462
threadContext,
455463
new TestRequest(randomAlphaOfLength(100)),

server/src/test/java/org/elasticsearch/transport/OutboundHandlerTests.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.apache.logging.log4j.Level;
1313
import org.elasticsearch.ElasticsearchException;
1414
import org.elasticsearch.TransportVersion;
15+
import org.elasticsearch.TransportVersions;
1516
import org.elasticsearch.action.ActionListener;
1617
import org.elasticsearch.action.support.PlainActionFuture;
1718
import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -133,11 +134,13 @@ public void testSendRawBytes() {
133134

134135
public void testSendRequest() throws IOException {
135136
ThreadContext threadContext = threadPool.getThreadContext();
136-
TransportVersion version = TransportHandshaker.REQUEST_HANDSHAKE_VERSION;
137137
String action = "handshake";
138138
long requestId = randomLongBetween(0, 300);
139139
boolean isHandshake = randomBoolean();
140-
boolean compress = randomBoolean();
140+
TransportVersion version = isHandshake
141+
? randomFrom(TransportHandshaker.ALLOWED_HANDSHAKE_VERSIONS)
142+
: TransportVersionUtils.randomCompatibleVersion(random());
143+
boolean compress = version.onOrAfter(TransportVersions.MINIMUM_COMPATIBLE) && randomBoolean();
141144
String value = "message";
142145
threadContext.putHeader("header", "header_value");
143146
TestRequest request = new TestRequest(value);
@@ -204,11 +207,13 @@ public void onRequestSent(
204207

205208
public void testSendResponse() throws IOException {
206209
ThreadContext threadContext = threadPool.getThreadContext();
207-
TransportVersion version = TransportHandshaker.REQUEST_HANDSHAKE_VERSION;
208210
String action = "handshake";
209211
long requestId = randomLongBetween(0, 300);
210212
boolean isHandshake = randomBoolean();
211-
boolean compress = randomBoolean();
213+
TransportVersion version = isHandshake
214+
? randomFrom(TransportHandshaker.ALLOWED_HANDSHAKE_VERSIONS)
215+
: TransportVersionUtils.randomCompatibleVersion(random());
216+
boolean compress = version.onOrAfter(TransportVersions.MINIMUM_COMPATIBLE) && randomBoolean();
212217

213218
String value = "message";
214219
threadContext.putHeader("header", "header_value");
@@ -269,8 +274,8 @@ public void onResponseSent(long requestId, String action, TransportResponse resp
269274

270275
public void testErrorResponse() throws IOException {
271276
ThreadContext threadContext = threadPool.getThreadContext();
272-
TransportVersion version = TransportHandshaker.REQUEST_HANDSHAKE_VERSION;
273-
String action = "handshake";
277+
TransportVersion version = TransportVersionUtils.randomCompatibleVersion(random());
278+
String action = "not-a-handshake";
274279
long requestId = randomLongBetween(0, 300);
275280
threadContext.putHeader("header", "header_value");
276281
ElasticsearchException error = new ElasticsearchException("boom");
@@ -322,7 +327,7 @@ public void onResponseSent(long requestId, String action, Exception error) {
322327
}
323328

324329
public void testSendErrorAfterFailToSendResponse() throws Exception {
325-
TransportVersion version = TransportHandshaker.REQUEST_HANDSHAKE_VERSION;
330+
TransportVersion version = TransportVersionUtils.randomCompatibleVersion(random());
326331
String action = randomAlphaOfLength(10);
327332
long requestId = randomLongBetween(0, 300);
328333
var response = new ReleasbleTestResponse(randomAlphaOfLength(10)) {

server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ public class TransportHandshakerTests extends ESTestCase {
3737
private TestThreadPool threadPool;
3838
private TransportHandshaker.HandshakeRequestSender requestSender;
3939

40+
private static final TransportVersion HANDSHAKE_REQUEST_VERSION = TransportHandshaker.V8_HANDSHAKE_VERSION;
41+
4042
@Override
4143
public void setUp() throws Exception {
4244
super.setUp();
@@ -64,7 +66,7 @@ public void testHandshakeRequestAndResponse() throws IOException {
6466
long reqId = randomLongBetween(1, 10);
6567
handshaker.sendHandshake(reqId, node, channel, new TimeValue(30, TimeUnit.SECONDS), versionFuture);
6668

67-
verify(requestSender).sendRequest(node, channel, reqId, TransportHandshaker.REQUEST_HANDSHAKE_VERSION);
69+
verify(requestSender).sendRequest(node, channel, reqId, HANDSHAKE_REQUEST_VERSION);
6870

6971
assertFalse(versionFuture.isDone());
7072

@@ -87,7 +89,7 @@ public void testHandshakeRequestFutureVersionsCompatibility() throws IOException
8789
long reqId = randomLongBetween(1, 10);
8890
handshaker.sendHandshake(reqId, node, channel, new TimeValue(30, TimeUnit.SECONDS), new PlainActionFuture<>());
8991

90-
verify(requestSender).sendRequest(node, channel, reqId, TransportHandshaker.REQUEST_HANDSHAKE_VERSION);
92+
verify(requestSender).sendRequest(node, channel, reqId, HANDSHAKE_REQUEST_VERSION);
9193

9294
TransportHandshaker.HandshakeRequest handshakeRequest = new TransportHandshaker.HandshakeRequest(TransportVersion.current());
9395
BytesStreamOutput currentHandshakeBytes = new BytesStreamOutput();
@@ -123,7 +125,7 @@ public void testHandshakeError() throws IOException {
123125
long reqId = randomLongBetween(1, 10);
124126
handshaker.sendHandshake(reqId, node, channel, new TimeValue(30, TimeUnit.SECONDS), versionFuture);
125127

126-
verify(requestSender).sendRequest(node, channel, reqId, TransportHandshaker.REQUEST_HANDSHAKE_VERSION);
128+
verify(requestSender).sendRequest(node, channel, reqId, HANDSHAKE_REQUEST_VERSION);
127129

128130
assertFalse(versionFuture.isDone());
129131

@@ -138,8 +140,7 @@ public void testHandshakeError() throws IOException {
138140
public void testSendRequestThrowsException() throws IOException {
139141
PlainActionFuture<TransportVersion> versionFuture = new PlainActionFuture<>();
140142
long reqId = randomLongBetween(1, 10);
141-
doThrow(new IOException("boom")).when(requestSender)
142-
.sendRequest(node, channel, reqId, TransportHandshaker.REQUEST_HANDSHAKE_VERSION);
143+
doThrow(new IOException("boom")).when(requestSender).sendRequest(node, channel, reqId, HANDSHAKE_REQUEST_VERSION);
143144

144145
handshaker.sendHandshake(reqId, node, channel, new TimeValue(30, TimeUnit.SECONDS), versionFuture);
145146

@@ -154,7 +155,7 @@ public void testHandshakeTimeout() throws IOException {
154155
long reqId = randomLongBetween(1, 10);
155156
handshaker.sendHandshake(reqId, node, channel, new TimeValue(100, TimeUnit.MILLISECONDS), versionFuture);
156157

157-
verify(requestSender).sendRequest(node, channel, reqId, TransportHandshaker.REQUEST_HANDSHAKE_VERSION);
158+
verify(requestSender).sendRequest(node, channel, reqId, HANDSHAKE_REQUEST_VERSION);
158159

159160
ConnectTransportException cte = expectThrows(ConnectTransportException.class, versionFuture::actionGet);
160161
assertThat(cte.getMessage(), containsString("handshake_timeout"));

0 commit comments

Comments
 (0)