Skip to content

Commit 40c92a0

Browse files
committed
Record action name for all outbound messages
On outbound messages we know the action name whether it's a request or response, so we can report it in logs rather than just relying on the payload's type.
1 parent 1d6a77c commit 40c92a0

File tree

6 files changed

+92
-115
lines changed

6 files changed

+92
-115
lines changed

server/src/main/java/org/elasticsearch/transport/OutboundHandler.java

Lines changed: 41 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ void sendRequest(
116116
assert assertValidTransportVersion(transportVersion);
117117
sendMessage(
118118
channel,
119+
MessageDirection.REQUEST,
119120
action,
120121
request,
121122
requestId,
@@ -148,7 +149,8 @@ void sendResponse(
148149
try {
149150
sendMessage(
150151
channel,
151-
null,
152+
MessageDirection.RESPONSE,
153+
action,
152154
response,
153155
requestId,
154156
isHandshake,
@@ -190,7 +192,8 @@ void sendErrorResponse(
190192
try {
191193
sendMessage(
192194
channel,
193-
null,
195+
MessageDirection.RESPONSE_ERROR,
196+
action,
194197
msg,
195198
requestId,
196199
false,
@@ -206,29 +209,36 @@ void sendErrorResponse(
206209
}
207210
}
208211

212+
public enum MessageDirection {
213+
REQUEST,
214+
RESPONSE,
215+
RESPONSE_ERROR
216+
}
217+
209218
private void sendMessage(
210219
TcpChannel channel,
211-
@Nullable String requestAction,
220+
MessageDirection messageDirection,
221+
String action,
212222
Writeable writeable,
213223
long requestId,
214224
boolean isHandshake,
215-
Compression.Scheme compressionScheme,
225+
Compression.Scheme possibleCompressionScheme,
216226
TransportVersion version,
217227
ResponseStatsConsumer responseStatsConsumer,
218228
Releasable onAfter
219229
) throws IOException {
220-
compressionScheme = writeable instanceof BytesTransportRequest ? null : compressionScheme;
230+
assert action != null;
231+
final var compressionScheme = writeable instanceof BytesTransportRequest ? null : possibleCompressionScheme;
221232
final BytesReference message;
222233
boolean serializeSuccess = false;
223-
final boolean isError = writeable instanceof RemoteTransportException;
224234
final RecyclerBytesStreamOutput byteStreamOutput = new RecyclerBytesStreamOutput(recycler);
225235
try {
226236
message = serialize(
227-
requestAction,
237+
messageDirection,
238+
action,
228239
requestId,
229240
isHandshake,
230241
version,
231-
isError,
232242
compressionScheme,
233243
writeable,
234244
threadPool.getThreadContext(),
@@ -244,14 +254,23 @@ private void sendMessage(
244254
}
245255
}
246256
responseStatsConsumer.addResponseStats(message.length());
247-
final var responseType = writeable.getClass();
248-
final boolean compress = compressionScheme != null;
257+
final var messageType = writeable.getClass();
249258
internalSend(
250259
channel,
251260
message,
252-
requestAction == null
253-
? () -> "Response{" + requestId + "}{" + isError + "}{" + compress + "}{" + isHandshake + "}{" + responseType + "}"
254-
: () -> "Request{" + requestAction + "}{" + requestId + "}{" + isError + "}{" + compress + "}{" + isHandshake + "}",
261+
() -> (messageDirection == MessageDirection.REQUEST ? "Request{" : "Response{")
262+
+ action
263+
+ "}{id="
264+
+ requestId
265+
+ "}{err="
266+
+ (messageDirection == MessageDirection.RESPONSE_ERROR)
267+
+ "}{cs="
268+
+ compressionScheme
269+
+ "}{hs="
270+
+ isHandshake
271+
+ "}{t="
272+
+ messageType
273+
+ "}",
255274
ActionListener.releasing(
256275
message instanceof ReleasableBytesReference r
257276
? Releasables.wrap(byteStreamOutput, onAfter, r)
@@ -262,38 +281,39 @@ private void sendMessage(
262281

263282
// public for tests
264283
public static BytesReference serialize(
265-
@Nullable String requestAction,
284+
MessageDirection messageDirection,
285+
String action,
266286
long requestId,
267287
boolean isHandshake,
268288
TransportVersion version,
269-
boolean isError,
270289
Compression.Scheme compressionScheme,
271290
Writeable writeable,
272291
ThreadContext threadContext,
273292
RecyclerBytesStreamOutput byteStreamOutput
274293
) throws IOException {
294+
assert action != null;
275295
assert byteStreamOutput.position() == 0;
276296
byteStreamOutput.setTransportVersion(version);
277297
byteStreamOutput.skip(TcpHeader.HEADER_SIZE);
278298
threadContext.writeTo(byteStreamOutput);
279-
if (requestAction != null) {
299+
if (messageDirection == MessageDirection.REQUEST) {
280300
if (version.before(TransportVersions.V_8_0_0)) {
281301
// empty features array
282302
byteStreamOutput.writeStringArray(Strings.EMPTY_ARRAY);
283303
}
284-
byteStreamOutput.writeString(requestAction);
304+
byteStreamOutput.writeString(action);
285305
}
286306

287307
final int variableHeaderLength = Math.toIntExact(byteStreamOutput.position() - TcpHeader.HEADER_SIZE);
288308
BytesReference message = serializeMessageBody(writeable, compressionScheme, version, byteStreamOutput);
289309
byte status = 0;
290-
if (requestAction == null) {
310+
if (messageDirection != MessageDirection.REQUEST) {
291311
status = TransportStatus.setResponse(status);
292312
}
293313
if (isHandshake) {
294314
status = TransportStatus.setHandshake(status);
295315
}
296-
if (isError) {
316+
if (messageDirection == MessageDirection.RESPONSE_ERROR) {
297317
status = TransportStatus.setError(status);
298318
}
299319
if (compressionScheme != null) {
@@ -316,6 +336,8 @@ private static BytesReference serializeMessageBody(
316336
try {
317337
stream.setTransportVersion(version);
318338
if (writeable instanceof BytesTransportRequest bRequest) {
339+
assert stream == byteStreamOutput;
340+
assert compressionScheme == null;
319341
bRequest.writeThin(stream);
320342
zeroCopyBuffer = bRequest.bytes;
321343
} else if (writeable instanceof RemoteTransportException remoteTransportException) {

server/src/test/java/org/elasticsearch/transport/InboundDecoderTests.java

Lines changed: 22 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -59,23 +59,23 @@ public void testDecode() throws IOException {
5959
final BytesReference totalBytes;
6060
if (isRequest) {
6161
totalBytes = OutboundHandler.serialize(
62+
isRequest ? OutboundHandler.MessageDirection.REQUEST : OutboundHandler.MessageDirection.RESPONSE,
6263
action,
6364
requestId,
6465
false,
6566
TransportVersion.current(),
66-
false,
6767
null,
6868
new TestRequest(randomAlphaOfLength(100)),
6969
threadContext,
7070
os
7171
);
7272
} else {
7373
totalBytes = OutboundHandler.serialize(
74-
null,
74+
isRequest ? OutboundHandler.MessageDirection.REQUEST : OutboundHandler.MessageDirection.RESPONSE,
75+
action,
7576
requestId,
7677
false,
7778
TransportVersion.current(),
78-
false,
7979
null,
8080
new TestResponse(randomAlphaOfLength(100)),
8181
threadContext,
@@ -144,11 +144,11 @@ private void doHandshakeCompatibilityTest(TransportVersion transportVersion, Com
144144

145145
try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) {
146146
final BytesReference bytes = OutboundHandler.serialize(
147+
OutboundHandler.MessageDirection.REQUEST,
147148
action,
148149
requestId,
149150
true,
150151
transportVersion,
151-
false,
152152
compressionScheme,
153153
new TestRequest(randomAlphaOfLength(100)),
154154
threadContext,
@@ -195,11 +195,11 @@ public void testClientChannelTypeFailsDecodingRequests() throws Exception {
195195

196196
try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) {
197197
final BytesReference bytes = OutboundHandler.serialize(
198+
OutboundHandler.MessageDirection.REQUEST,
198199
action,
199200
requestId,
200201
isHandshake,
201202
version,
202-
false,
203203
randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4, null),
204204
new TestRequest(randomAlphaOfLength(100)),
205205
threadContext,
@@ -243,11 +243,11 @@ public void testServerChannelTypeFailsDecodingResponses() throws Exception {
243243

244244
try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) {
245245
final BytesReference bytes = OutboundHandler.serialize(
246-
null,
246+
OutboundHandler.MessageDirection.RESPONSE,
247+
"test:action",
247248
requestId,
248249
isHandshake,
249250
version,
250-
false,
251251
randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4, null),
252252
new TestRequest(randomAlphaOfLength(100)),
253253
threadContext,
@@ -281,38 +281,23 @@ public void testCompressedDecode() throws IOException {
281281
} else {
282282
threadContext.addResponseHeader(headerKey, headerValue);
283283
}
284-
final BytesReference totalBytes;
285-
TransportMessage transportMessage;
286284
Compression.Scheme scheme = randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4);
287285

288286
try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) {
289-
if (isRequest) {
290-
transportMessage = new TestRequest(randomAlphaOfLength(100));
291-
totalBytes = OutboundHandler.serialize(
292-
action,
293-
requestId,
294-
false,
295-
TransportVersion.current(),
296-
false,
297-
scheme,
298-
transportMessage,
299-
threadContext,
300-
os
301-
);
302-
} else {
303-
transportMessage = new TestResponse(randomAlphaOfLength(100));
304-
totalBytes = OutboundHandler.serialize(
305-
null,
306-
requestId,
307-
false,
308-
TransportVersion.current(),
309-
false,
310-
scheme,
311-
transportMessage,
312-
threadContext,
313-
os
314-
);
315-
}
287+
final TransportMessage transportMessage = isRequest
288+
? new TestRequest(randomAlphaOfLength(100))
289+
: new TestResponse(randomAlphaOfLength(100));
290+
final BytesReference totalBytes = OutboundHandler.serialize(
291+
isRequest ? OutboundHandler.MessageDirection.REQUEST : OutboundHandler.MessageDirection.RESPONSE,
292+
action,
293+
requestId,
294+
false,
295+
TransportVersion.current(),
296+
scheme,
297+
transportMessage,
298+
threadContext,
299+
os
300+
);
316301
final BytesStreamOutput out = new BytesStreamOutput();
317302
transportMessage.writeTo(out);
318303
final BytesReference uncompressedBytes = out.bytes();
@@ -373,11 +358,11 @@ public void testVersionIncompatibilityDecodeException() throws IOException {
373358
final ReleasableBytesReference releasable1;
374359
try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) {
375360
final BytesReference bytes = OutboundHandler.serialize(
361+
OutboundHandler.MessageDirection.REQUEST,
376362
action,
377363
requestId,
378364
false,
379365
incompatibleVersion,
380-
false,
381366
Compression.Scheme.DEFLATE,
382367
new TestRequest(randomAlphaOfLength(100)),
383368
threadContext,

server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,11 +172,11 @@ public TestResponse read(StreamInput in) throws IOException {
172172
String requestValue = randomAlphaOfLength(10);
173173
BytesRefRecycler recycler = new BytesRefRecycler(PageCacheRecycler.NON_RECYCLING_INSTANCE);
174174
BytesReference fullRequestBytes = OutboundHandler.serialize(
175+
OutboundHandler.MessageDirection.REQUEST,
175176
action,
176177
requestId,
177178
false,
178179
TransportVersion.current(),
179-
false,
180180
null,
181181
new TestRequest(requestValue),
182182
threadPool.getThreadContext(),

0 commit comments

Comments
 (0)