Skip to content

Commit 617cbd0

Browse files
DaveCTurneromricohenn
authored andcommitted
Record action name for all outbound messages (elastic#125399)
On outbound messages we know the action name whether it's a request or response, so we can report it in logs rather than just relying on the payload's type.
1 parent ed8701d commit 617cbd0

File tree

6 files changed

+100
-139
lines changed

6 files changed

+100
-139
lines changed

server/src/main/java/org/elasticsearch/transport/OutboundHandler.java

Lines changed: 41 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.elasticsearch.common.recycler.Recycler;
3232
import org.elasticsearch.common.transport.NetworkExceptionHelper;
3333
import org.elasticsearch.common.util.concurrent.ThreadContext;
34-
import org.elasticsearch.core.Nullable;
3534
import org.elasticsearch.core.RefCounted;
3635
import org.elasticsearch.core.Releasable;
3736
import org.elasticsearch.core.Releasables;
@@ -116,6 +115,7 @@ void sendRequest(
116115
assert assertValidTransportVersion(transportVersion);
117116
sendMessage(
118117
channel,
118+
MessageDirection.REQUEST,
119119
action,
120120
request,
121121
requestId,
@@ -148,7 +148,8 @@ void sendResponse(
148148
try {
149149
sendMessage(
150150
channel,
151-
null,
151+
MessageDirection.RESPONSE,
152+
action,
152153
response,
153154
requestId,
154155
isHandshake,
@@ -190,7 +191,8 @@ void sendErrorResponse(
190191
try {
191192
sendMessage(
192193
channel,
193-
null,
194+
MessageDirection.RESPONSE_ERROR,
195+
action,
194196
msg,
195197
requestId,
196198
false,
@@ -206,29 +208,36 @@ void sendErrorResponse(
206208
}
207209
}
208210

211+
public enum MessageDirection {
212+
REQUEST,
213+
RESPONSE,
214+
RESPONSE_ERROR
215+
}
216+
209217
private void sendMessage(
210218
TcpChannel channel,
211-
@Nullable String requestAction,
219+
MessageDirection messageDirection,
220+
String action,
212221
Writeable writeable,
213222
long requestId,
214223
boolean isHandshake,
215-
Compression.Scheme compressionScheme,
224+
Compression.Scheme possibleCompressionScheme,
216225
TransportVersion version,
217226
ResponseStatsConsumer responseStatsConsumer,
218227
Releasable onAfter
219228
) throws IOException {
220-
compressionScheme = writeable instanceof BytesTransportRequest ? null : compressionScheme;
229+
assert action != null;
230+
final var compressionScheme = writeable instanceof BytesTransportRequest ? null : possibleCompressionScheme;
221231
final BytesReference message;
222232
boolean serializeSuccess = false;
223-
final boolean isError = writeable instanceof RemoteTransportException;
224233
final RecyclerBytesStreamOutput byteStreamOutput = new RecyclerBytesStreamOutput(recycler);
225234
try {
226235
message = serialize(
227-
requestAction,
236+
messageDirection,
237+
action,
228238
requestId,
229239
isHandshake,
230240
version,
231-
isError,
232241
compressionScheme,
233242
writeable,
234243
threadPool.getThreadContext(),
@@ -244,14 +253,23 @@ private void sendMessage(
244253
}
245254
}
246255
responseStatsConsumer.addResponseStats(message.length());
247-
final var responseType = writeable.getClass();
248-
final boolean compress = compressionScheme != null;
256+
final var messageType = writeable.getClass();
249257
internalSend(
250258
channel,
251259
message,
252-
requestAction == null
253-
? () -> "Response{" + requestId + "}{" + isError + "}{" + compress + "}{" + isHandshake + "}{" + responseType + "}"
254-
: () -> "Request{" + requestAction + "}{" + requestId + "}{" + isError + "}{" + compress + "}{" + isHandshake + "}",
260+
() -> (messageDirection == MessageDirection.REQUEST ? "Request{" : "Response{")
261+
+ action
262+
+ "}{id="
263+
+ requestId
264+
+ "}{err="
265+
+ (messageDirection == MessageDirection.RESPONSE_ERROR)
266+
+ "}{cs="
267+
+ compressionScheme
268+
+ "}{hs="
269+
+ isHandshake
270+
+ "}{t="
271+
+ messageType
272+
+ "}",
255273
ActionListener.releasing(
256274
message instanceof ReleasableBytesReference r
257275
? Releasables.wrap(byteStreamOutput, onAfter, r)
@@ -262,38 +280,39 @@ private void sendMessage(
262280

263281
// public for tests
264282
public static BytesReference serialize(
265-
@Nullable String requestAction,
283+
MessageDirection messageDirection,
284+
String action,
266285
long requestId,
267286
boolean isHandshake,
268287
TransportVersion version,
269-
boolean isError,
270288
Compression.Scheme compressionScheme,
271289
Writeable writeable,
272290
ThreadContext threadContext,
273291
RecyclerBytesStreamOutput byteStreamOutput
274292
) throws IOException {
293+
assert action != null;
275294
assert byteStreamOutput.position() == 0;
276295
byteStreamOutput.setTransportVersion(version);
277296
byteStreamOutput.skip(TcpHeader.HEADER_SIZE);
278297
threadContext.writeTo(byteStreamOutput);
279-
if (requestAction != null) {
298+
if (messageDirection == MessageDirection.REQUEST) {
280299
if (version.before(TransportVersions.V_8_0_0)) {
281300
// empty features array
282301
byteStreamOutput.writeStringArray(Strings.EMPTY_ARRAY);
283302
}
284-
byteStreamOutput.writeString(requestAction);
303+
byteStreamOutput.writeString(action);
285304
}
286305

287306
final int variableHeaderLength = Math.toIntExact(byteStreamOutput.position() - TcpHeader.HEADER_SIZE);
288307
BytesReference message = serializeMessageBody(writeable, compressionScheme, version, byteStreamOutput);
289308
byte status = 0;
290-
if (requestAction == null) {
309+
if (messageDirection != MessageDirection.REQUEST) {
291310
status = TransportStatus.setResponse(status);
292311
}
293312
if (isHandshake) {
294313
status = TransportStatus.setHandshake(status);
295314
}
296-
if (isError) {
315+
if (messageDirection == MessageDirection.RESPONSE_ERROR) {
297316
status = TransportStatus.setError(status);
298317
}
299318
if (compressionScheme != null) {
@@ -316,6 +335,8 @@ private static BytesReference serializeMessageBody(
316335
try {
317336
stream.setTransportVersion(version);
318337
if (writeable instanceof BytesTransportRequest bRequest) {
338+
assert stream == byteStreamOutput;
339+
assert compressionScheme == null;
319340
bRequest.writeThin(stream);
320341
zeroCopyBuffer = bRequest.bytes;
321342
} else if (writeable instanceof RemoteTransportException remoteTransportException) {

server/src/test/java/org/elasticsearch/transport/InboundDecoderTests.java

Lines changed: 30 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -56,32 +56,17 @@ public void testDecode() throws IOException {
5656
}
5757

5858
try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) {
59-
final BytesReference totalBytes;
60-
if (isRequest) {
61-
totalBytes = OutboundHandler.serialize(
62-
action,
63-
requestId,
64-
false,
65-
TransportVersion.current(),
66-
false,
67-
null,
68-
new TestRequest(randomAlphaOfLength(100)),
69-
threadContext,
70-
os
71-
);
72-
} else {
73-
totalBytes = OutboundHandler.serialize(
74-
null,
75-
requestId,
76-
false,
77-
TransportVersion.current(),
78-
false,
79-
null,
80-
new TestResponse(randomAlphaOfLength(100)),
81-
threadContext,
82-
os
83-
);
84-
}
59+
final BytesReference totalBytes = OutboundHandler.serialize(
60+
isRequest ? OutboundHandler.MessageDirection.REQUEST : OutboundHandler.MessageDirection.RESPONSE,
61+
action,
62+
requestId,
63+
false,
64+
TransportVersion.current(),
65+
null,
66+
isRequest ? new TestRequest(randomAlphaOfLength(100)) : new TestResponse(randomAlphaOfLength(100)),
67+
threadContext,
68+
os
69+
);
8570
int totalHeaderSize = TcpHeader.HEADER_SIZE + totalBytes.getInt(TcpHeader.VARIABLE_HEADER_SIZE_POSITION);
8671
final BytesReference messageBytes = totalBytes.slice(totalHeaderSize, totalBytes.length() - totalHeaderSize);
8772

@@ -144,11 +129,11 @@ private void doHandshakeCompatibilityTest(TransportVersion transportVersion, Com
144129

145130
try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) {
146131
final BytesReference bytes = OutboundHandler.serialize(
132+
OutboundHandler.MessageDirection.REQUEST,
147133
action,
148134
requestId,
149135
true,
150136
transportVersion,
151-
false,
152137
compressionScheme,
153138
new TestRequest(randomAlphaOfLength(100)),
154139
threadContext,
@@ -195,11 +180,11 @@ public void testClientChannelTypeFailsDecodingRequests() throws Exception {
195180

196181
try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) {
197182
final BytesReference bytes = OutboundHandler.serialize(
183+
OutboundHandler.MessageDirection.REQUEST,
198184
action,
199185
requestId,
200186
isHandshake,
201187
version,
202-
false,
203188
randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4, null),
204189
new TestRequest(randomAlphaOfLength(100)),
205190
threadContext,
@@ -243,11 +228,11 @@ public void testServerChannelTypeFailsDecodingResponses() throws Exception {
243228

244229
try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) {
245230
final BytesReference bytes = OutboundHandler.serialize(
246-
null,
231+
OutboundHandler.MessageDirection.RESPONSE,
232+
"test:action",
247233
requestId,
248234
isHandshake,
249235
version,
250-
false,
251236
randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4, null),
252237
new TestRequest(randomAlphaOfLength(100)),
253238
threadContext,
@@ -281,38 +266,23 @@ public void testCompressedDecode() throws IOException {
281266
} else {
282267
threadContext.addResponseHeader(headerKey, headerValue);
283268
}
284-
final BytesReference totalBytes;
285-
TransportMessage transportMessage;
286269
Compression.Scheme scheme = randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4);
287270

288271
try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) {
289-
if (isRequest) {
290-
transportMessage = new TestRequest(randomAlphaOfLength(100));
291-
totalBytes = OutboundHandler.serialize(
292-
action,
293-
requestId,
294-
false,
295-
TransportVersion.current(),
296-
false,
297-
scheme,
298-
transportMessage,
299-
threadContext,
300-
os
301-
);
302-
} else {
303-
transportMessage = new TestResponse(randomAlphaOfLength(100));
304-
totalBytes = OutboundHandler.serialize(
305-
null,
306-
requestId,
307-
false,
308-
TransportVersion.current(),
309-
false,
310-
scheme,
311-
transportMessage,
312-
threadContext,
313-
os
314-
);
315-
}
272+
final TransportMessage transportMessage = isRequest
273+
? new TestRequest(randomAlphaOfLength(100))
274+
: new TestResponse(randomAlphaOfLength(100));
275+
final BytesReference totalBytes = OutboundHandler.serialize(
276+
isRequest ? OutboundHandler.MessageDirection.REQUEST : OutboundHandler.MessageDirection.RESPONSE,
277+
action,
278+
requestId,
279+
false,
280+
TransportVersion.current(),
281+
scheme,
282+
transportMessage,
283+
threadContext,
284+
os
285+
);
316286
final BytesStreamOutput out = new BytesStreamOutput();
317287
transportMessage.writeTo(out);
318288
final BytesReference uncompressedBytes = out.bytes();
@@ -373,11 +343,11 @@ public void testVersionIncompatibilityDecodeException() throws IOException {
373343
final ReleasableBytesReference releasable1;
374344
try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) {
375345
final BytesReference bytes = OutboundHandler.serialize(
346+
OutboundHandler.MessageDirection.REQUEST,
376347
action,
377348
requestId,
378349
false,
379350
incompatibleVersion,
380-
false,
381351
Compression.Scheme.DEFLATE,
382352
new TestRequest(randomAlphaOfLength(100)),
383353
threadContext,

server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,11 +172,11 @@ public TestResponse read(StreamInput in) throws IOException {
172172
String requestValue = randomAlphaOfLength(10);
173173
BytesRefRecycler recycler = new BytesRefRecycler(PageCacheRecycler.NON_RECYCLING_INSTANCE);
174174
BytesReference fullRequestBytes = OutboundHandler.serialize(
175+
OutboundHandler.MessageDirection.REQUEST,
175176
action,
176177
requestId,
177178
false,
178179
TransportVersion.current(),
179-
false,
180180
null,
181181
new TestRequest(requestValue),
182182
threadPool.getThreadContext(),

0 commit comments

Comments
 (0)