Skip to content

Commit 6cc09c2

Browse files
committed
Record action name for all outbound messages
On outbound messages we know the action name whether it's a request or response, so we can report it in logs rather than just relying on the payload's type. Backport of elastic#125399 to `8.x`
1 parent 5157a7f commit 6cc09c2

File tree

6 files changed

+104
-143
lines changed

6 files changed

+104
-143
lines changed

server/src/main/java/org/elasticsearch/transport/OutboundHandler.java

Lines changed: 42 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.elasticsearch.common.recycler.Recycler;
3232
import org.elasticsearch.common.transport.NetworkExceptionHelper;
3333
import org.elasticsearch.common.util.concurrent.ThreadContext;
34-
import org.elasticsearch.core.Nullable;
3534
import org.elasticsearch.core.RefCounted;
3635
import org.elasticsearch.core.Releasable;
3736
import org.elasticsearch.core.Releasables;
@@ -114,6 +113,7 @@ void sendRequest(
114113
assert assertValidTransportVersion(transportVersion);
115114
sendMessage(
116115
channel,
116+
MessageDirection.REQUEST,
117117
action,
118118
request,
119119
requestId,
@@ -146,7 +146,8 @@ void sendResponse(
146146
try {
147147
sendMessage(
148148
channel,
149-
null,
149+
MessageDirection.RESPONSE,
150+
action,
150151
response,
151152
requestId,
152153
isHandshake,
@@ -188,7 +189,8 @@ void sendErrorResponse(
188189
try {
189190
sendMessage(
190191
channel,
191-
null,
192+
MessageDirection.RESPONSE_ERROR,
193+
action,
192194
msg,
193195
requestId,
194196
false,
@@ -204,29 +206,36 @@ void sendErrorResponse(
204206
}
205207
}
206208

209+
public enum MessageDirection {
210+
REQUEST,
211+
RESPONSE,
212+
RESPONSE_ERROR
213+
}
214+
207215
private void sendMessage(
208216
TcpChannel channel,
209-
@Nullable String requestAction,
217+
MessageDirection messageDirection,
218+
String action,
210219
Writeable writeable,
211220
long requestId,
212221
boolean isHandshake,
213-
Compression.Scheme compressionScheme,
222+
Compression.Scheme possibleCompressionScheme,
214223
TransportVersion version,
215224
ResponseStatsConsumer responseStatsConsumer,
216225
Releasable onAfter
217226
) throws IOException {
218-
compressionScheme = writeable instanceof BytesTransportRequest ? null : compressionScheme;
227+
assert action != null;
228+
final var compressionScheme = writeable instanceof BytesTransportRequest ? null : possibleCompressionScheme;
219229
final BytesReference message;
220230
boolean serializeSuccess = false;
221-
final boolean isError = writeable instanceof RemoteTransportException;
222231
final RecyclerBytesStreamOutput byteStreamOutput = new RecyclerBytesStreamOutput(recycler);
223232
try {
224233
message = serialize(
225-
requestAction,
234+
messageDirection,
235+
action,
226236
requestId,
227237
isHandshake,
228238
version,
229-
isError,
230239
compressionScheme,
231240
writeable,
232241
threadPool.getThreadContext(),
@@ -242,14 +251,23 @@ private void sendMessage(
242251
}
243252
}
244253
responseStatsConsumer.addResponseStats(message.length());
245-
final var responseType = writeable.getClass();
246-
final boolean compress = compressionScheme != null;
254+
final var messageType = writeable.getClass();
247255
internalSend(
248256
channel,
249257
message,
250-
requestAction == null
251-
? () -> "Response{" + requestId + "}{" + isError + "}{" + compress + "}{" + isHandshake + "}{" + responseType + "}"
252-
: () -> "Request{" + requestAction + "}{" + requestId + "}{" + isError + "}{" + compress + "}{" + isHandshake + "}",
258+
() -> (messageDirection == MessageDirection.REQUEST ? "Request{" : "Response{")
259+
+ action
260+
+ "}{id="
261+
+ requestId
262+
+ "}{err="
263+
+ (messageDirection == MessageDirection.RESPONSE_ERROR)
264+
+ "}{cs="
265+
+ compressionScheme
266+
+ "}{hs="
267+
+ isHandshake
268+
+ "}{t="
269+
+ messageType
270+
+ "}",
253271
ActionListener.releasing(
254272
message instanceof ReleasableBytesReference r
255273
? Releasables.wrap(byteStreamOutput, onAfter, r)
@@ -260,11 +278,11 @@ private void sendMessage(
260278

261279
// public for tests
262280
public static BytesReference serialize(
263-
@Nullable String requestAction,
281+
MessageDirection messageDirection,
282+
String action,
264283
long requestId,
265284
boolean isHandshake,
266285
TransportVersion version,
267-
boolean isError,
268286
Compression.Scheme compressionScheme,
269287
Writeable writeable,
270288
ThreadContext threadContext,
@@ -273,19 +291,20 @@ public static BytesReference serialize(
273291
compressionScheme = compressionScheme == Compression.Scheme.LZ4 && version.before(Compression.Scheme.LZ4_VERSION)
274292
? null
275293
: compressionScheme;
294+
assert action != null;
276295
assert byteStreamOutput.position() == 0;
277296
byteStreamOutput.setTransportVersion(version);
278297
final int headerSize = TcpHeader.headerSize(version);
279298
byteStreamOutput.skip(headerSize);
280299
final int variableHeaderLength;
281300
if (version.onOrAfter(TcpHeader.VERSION_WITH_HEADER_SIZE)) {
282301
threadContext.writeTo(byteStreamOutput);
283-
if (requestAction != null) {
302+
if (messageDirection == MessageDirection.REQUEST) {
284303
if (version.before(TransportVersions.V_8_0_0)) {
285304
// empty features array
286305
byteStreamOutput.writeStringArray(Strings.EMPTY_ARRAY);
287306
}
288-
byteStreamOutput.writeString(requestAction);
307+
byteStreamOutput.writeString(action);
289308
}
290309
variableHeaderLength = Math.toIntExact(byteStreamOutput.position() - headerSize);
291310
} else {
@@ -298,16 +317,16 @@ public static BytesReference serialize(
298317
byteStreamOutput,
299318
variableHeaderLength,
300319
threadContext,
301-
requestAction
320+
action
302321
);
303322
byte status = 0;
304-
if (requestAction == null) {
323+
if (messageDirection != MessageDirection.REQUEST) {
305324
status = TransportStatus.setResponse(status);
306325
}
307326
if (isHandshake) {
308327
status = TransportStatus.setHandshake(status);
309328
}
310-
if (isError) {
329+
if (messageDirection == MessageDirection.RESPONSE_ERROR) {
311330
status = TransportStatus.setError(status);
312331
}
313332
if (compressionScheme != null) {
@@ -340,6 +359,8 @@ private static BytesReference serializeMessageBody(
340359
}
341360
}
342361
if (writeable instanceof BytesTransportRequest bRequest) {
362+
assert stream == byteStreamOutput;
363+
assert compressionScheme == null;
343364
bRequest.writeThin(stream);
344365
zeroCopyBuffer = bRequest.bytes;
345366
} else if (writeable instanceof RemoteTransportException remoteTransportException) {

server/src/test/java/org/elasticsearch/transport/InboundDecoderTests.java

Lines changed: 33 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -56,32 +56,17 @@ public void testDecode() throws IOException {
5656
}
5757

5858
try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) {
59-
final BytesReference totalBytes;
60-
if (isRequest) {
61-
totalBytes = OutboundHandler.serialize(
62-
action,
63-
requestId,
64-
false,
65-
TransportVersion.current(),
66-
false,
67-
null,
68-
new TestRequest(randomAlphaOfLength(100)),
69-
threadContext,
70-
os
71-
);
72-
} else {
73-
totalBytes = OutboundHandler.serialize(
74-
null,
75-
requestId,
76-
false,
77-
TransportVersion.current(),
78-
false,
79-
null,
80-
new TestResponse(randomAlphaOfLength(100)),
81-
threadContext,
82-
os
83-
);
84-
}
59+
final BytesReference totalBytes = OutboundHandler.serialize(
60+
isRequest ? OutboundHandler.MessageDirection.REQUEST : OutboundHandler.MessageDirection.RESPONSE,
61+
action,
62+
requestId,
63+
false,
64+
TransportVersion.current(),
65+
null,
66+
isRequest ? new TestRequest(randomAlphaOfLength(100)) : new TestResponse(randomAlphaOfLength(100)),
67+
threadContext,
68+
os
69+
);
8570
int totalHeaderSize = TcpHeader.headerSize(TransportVersion.current()) + totalBytes.getInt(
8671
TcpHeader.VARIABLE_HEADER_SIZE_POSITION
8772
);
@@ -138,11 +123,11 @@ public void testDecodePreHeaderSizeVariableInt() throws IOException {
138123
// 8.0 is only compatible with handshakes on a pre-variable int version
139124
try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) {
140125
final BytesReference totalBytes = OutboundHandler.serialize(
126+
OutboundHandler.MessageDirection.REQUEST,
141127
action,
142128
requestId,
143129
true,
144130
preHeaderVariableInt,
145-
false,
146131
compressionScheme,
147132
new TestRequest(contentValue),
148133
threadContext,
@@ -195,11 +180,11 @@ public void testDecodeHandshakeV7Compatibility() throws IOException {
195180
TransportVersion handshakeCompat = TransportHandshaker.V7_HANDSHAKE_VERSION;
196181
try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) {
197182
BytesReference bytes = OutboundHandler.serialize(
183+
OutboundHandler.MessageDirection.REQUEST,
198184
action,
199185
requestId,
200186
true,
201187
handshakeCompat,
202-
false,
203188
null,
204189
new TestRequest(randomAlphaOfLength(100)),
205190
threadContext,
@@ -247,11 +232,11 @@ private void doHandshakeCompatibilityTest(TransportVersion transportVersion, Com
247232
int totalHeaderSize = TcpHeader.headerSize(transportVersion);
248233
try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) {
249234
final BytesReference bytes = OutboundHandler.serialize(
235+
OutboundHandler.MessageDirection.REQUEST,
250236
action,
251237
requestId,
252238
true,
253239
transportVersion,
254-
false,
255240
compressionScheme,
256241
new TestRequest(randomAlphaOfLength(100)),
257242
threadContext,
@@ -298,11 +283,11 @@ public void testClientChannelTypeFailsDecodingRequests() throws Exception {
298283

299284
try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) {
300285
final BytesReference bytes = OutboundHandler.serialize(
286+
OutboundHandler.MessageDirection.REQUEST,
301287
action,
302288
requestId,
303289
isHandshake,
304290
version,
305-
false,
306291
randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4, null),
307292
new TestRequest(randomAlphaOfLength(100)),
308293
threadContext,
@@ -348,11 +333,11 @@ public void testServerChannelTypeFailsDecodingResponses() throws Exception {
348333

349334
try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) {
350335
final BytesReference bytes = OutboundHandler.serialize(
351-
null,
336+
OutboundHandler.MessageDirection.RESPONSE,
337+
"test:action",
352338
requestId,
353339
isHandshake,
354340
version,
355-
false,
356341
randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4, null),
357342
new TestRequest(randomAlphaOfLength(100)),
358343
threadContext,
@@ -388,38 +373,23 @@ public void testCompressedDecode() throws IOException {
388373
} else {
389374
threadContext.addResponseHeader(headerKey, headerValue);
390375
}
391-
final BytesReference totalBytes;
392-
TransportMessage transportMessage;
393376
Compression.Scheme scheme = randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4);
394377

395378
try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) {
396-
if (isRequest) {
397-
transportMessage = new TestRequest(randomAlphaOfLength(100));
398-
totalBytes = OutboundHandler.serialize(
399-
action,
400-
requestId,
401-
false,
402-
TransportVersion.current(),
403-
false,
404-
scheme,
405-
transportMessage,
406-
threadContext,
407-
os
408-
);
409-
} else {
410-
transportMessage = new TestResponse(randomAlphaOfLength(100));
411-
totalBytes = OutboundHandler.serialize(
412-
null,
413-
requestId,
414-
false,
415-
TransportVersion.current(),
416-
false,
417-
scheme,
418-
transportMessage,
419-
threadContext,
420-
os
421-
);
422-
}
379+
final TransportMessage transportMessage = isRequest
380+
? new TestRequest(randomAlphaOfLength(100))
381+
: new TestResponse(randomAlphaOfLength(100));
382+
final BytesReference totalBytes = OutboundHandler.serialize(
383+
isRequest ? OutboundHandler.MessageDirection.REQUEST : OutboundHandler.MessageDirection.RESPONSE,
384+
action,
385+
requestId,
386+
false,
387+
TransportVersion.current(),
388+
scheme,
389+
transportMessage,
390+
threadContext,
391+
os
392+
);
423393
final BytesStreamOutput out = new BytesStreamOutput();
424394
transportMessage.writeTo(out);
425395
final BytesReference uncompressedBytes = out.bytes();
@@ -479,11 +449,11 @@ public void testCompressedDecodeHandshakeCompatibility() throws IOException {
479449
TransportVersion handshakeCompat = TransportHandshaker.V7_HANDSHAKE_VERSION;
480450
try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) {
481451
final BytesReference bytes = OutboundHandler.serialize(
452+
OutboundHandler.MessageDirection.REQUEST,
482453
action,
483454
requestId,
484455
true,
485456
handshakeCompat,
486-
false,
487457
Compression.Scheme.DEFLATE,
488458
new TestRequest(randomAlphaOfLength(100)),
489459
threadContext,
@@ -517,11 +487,11 @@ public void testVersionIncompatibilityDecodeException() throws IOException {
517487
final ReleasableBytesReference releasable1;
518488
try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) {
519489
final BytesReference bytes = OutboundHandler.serialize(
490+
OutboundHandler.MessageDirection.REQUEST,
520491
action,
521492
requestId,
522493
false,
523494
incompatibleVersion,
524-
false,
525495
Compression.Scheme.DEFLATE,
526496
new TestRequest(randomAlphaOfLength(100)),
527497
threadContext,

server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,11 +173,11 @@ public TestResponse read(StreamInput in) throws IOException {
173173
String requestValue = randomAlphaOfLength(10);
174174
BytesRefRecycler recycler = new BytesRefRecycler(PageCacheRecycler.NON_RECYCLING_INSTANCE);
175175
BytesReference fullRequestBytes = OutboundHandler.serialize(
176+
OutboundHandler.MessageDirection.REQUEST,
176177
action,
177178
requestId,
178179
false,
179180
TransportVersion.current(),
180-
false,
181181
null,
182182
new TestRequest(requestValue),
183183
threadPool.getThreadContext(),

0 commit comments

Comments
 (0)