Skip to content

Commit 209caaf

Browse files
authored
Add audit logging for streamed HTTP content (elastic#130594)
1 parent e22b207 commit 209caaf

File tree

6 files changed

+48
-16
lines changed

6 files changed

+48
-16
lines changed

docs/changelog/130594.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 130594
2+
summary: Add audit logging for stream content
3+
area: Network
4+
type: enhancement
5+
issues: []

x-pack/plugin/security/qa/audit/src/javaRestTest/java/org/elasticsearch/xpack/security/audit/AuditIT.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,18 +109,19 @@ public void testFilteringOfRequestBodies() throws Exception {
109109

110110
public void testAuditAuthenticationSuccessForStreamingRequest() throws Exception {
111111
final Request request = new Request("POST", "/testindex/_bulk");
112-
request.setEntity(new StringEntity("""
112+
final String content = """
113113
{"index":{}}
114114
{}
115-
""", ContentType.create("application/x-ndjson", StandardCharsets.UTF_8)));
115+
""";
116+
request.setEntity(new StringEntity(content, ContentType.create("application/x-ndjson", StandardCharsets.UTF_8)));
116117
executeAndVerifyAudit(
117118
request,
118119
AuditLevel.AUTHENTICATION_SUCCESS,
119120
event -> assertThat(
120121
event,
121122
allOf(
122123
hasEntry(LoggingAuditTrail.AUTHENTICATION_TYPE_FIELD_NAME, "REALM"),
123-
hasEntry(LoggingAuditTrail.REQUEST_BODY_FIELD_NAME, "Request body had not been received at the time of the audit event")
124+
hasEntry(LoggingAuditTrail.REQUEST_BODY_FIELD_NAME, content)
124125
)
125126
)
126127
);

x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/audit/AuditTrailService.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.xpack.core.security.authc.AuthenticationToken;
1919
import org.elasticsearch.xpack.core.security.authz.AuthorizationEngine.AuthorizationInfo;
2020
import org.elasticsearch.xpack.security.Security;
21+
import org.elasticsearch.xpack.security.audit.logfile.LoggingAuditTrail;
2122
import org.elasticsearch.xpack.security.transport.filter.SecurityIpFilterRule;
2223

2324
import java.net.InetSocketAddress;
@@ -53,6 +54,14 @@ public AuditTrail get() {
5354
}
5455
}
5556

57+
public boolean includeRequestBody() {
58+
if (get() instanceof LoggingAuditTrail trail) {
59+
return trail.includeRequestBody();
60+
} else {
61+
return false;
62+
}
63+
}
64+
5665
// TODO: this method only exists for access to LoggingAuditTrail in a Node for testing.
5766
// DO NOT USE IT, IT WILL BE REMOVED IN THE FUTURE
5867
public AuditTrail getAuditTrail() {

x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/audit/AuditUtil.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,6 @@ public class AuditUtil {
2727

2828
public static String restRequestContent(RestRequest request) {
2929
if (request.hasContent()) {
30-
if (request.isStreamedContent()) {
31-
return "Request body had not been received at the time of the audit event";
32-
}
3330
var content = request.content();
3431
try {
3532
return XContentHelper.convertToJson(content, false, false, request.getXContentType());

x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/audit/logfile/LoggingAuditTrail.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,7 @@ public class LoggingAuditTrail implements AuditTrail, ClusterStateListener {
350350
final EventFilterPolicyRegistry eventFilterPolicyRegistry;
351351
// package for testing
352352
volatile EnumSet<AuditLevel> events;
353-
boolean includeRequestBody;
353+
volatile boolean includeRequestBody;
354354
// fields that all entries have in common
355355
EntryCommonFields entryCommonFields;
356356

@@ -1072,6 +1072,10 @@ public void coordinatingActionResponse(
10721072
// not implemented yet
10731073
}
10741074

1075+
public boolean includeRequestBody() {
1076+
return includeRequestBody;
1077+
}
1078+
10751079
private LogEntryBuilder securityChangeLogEntryBuilder(String requestId) {
10761080
return new LogEntryBuilder(false).with(EVENT_TYPE_FIELD_NAME, SECURITY_CHANGE_ORIGIN_FIELD_VALUE).withRequestId(requestId);
10771081
}

x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/rest/SecurityRestFilter.java

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,10 @@
2222
import org.elasticsearch.xpack.security.authz.restriction.WorkflowService;
2323
import org.elasticsearch.xpack.security.operator.OperatorPrivileges;
2424

25+
import java.util.function.Consumer;
26+
2527
import static org.elasticsearch.core.Strings.format;
28+
import static org.elasticsearch.rest.RestContentAggregator.aggregate;
2629

2730
public class SecurityRestFilter implements RestInterceptor {
2831

@@ -70,16 +73,29 @@ public void intercept(RestRequest request, RestChannel channel, RestHandler targ
7073
return;
7174
}
7275

73-
final RestRequest wrappedRequest = maybeWrapRestRequest(request, targetHandler);
74-
auditTrailService.get().authenticationSuccess(wrappedRequest);
75-
secondaryAuthenticator.authenticateAndAttachToContext(wrappedRequest, ActionListener.wrap(secondaryAuthentication -> {
76-
if (secondaryAuthentication != null) {
77-
logger.trace("Found secondary authentication {} in REST request [{}]", secondaryAuthentication, request.uri());
78-
}
79-
WorkflowService.resolveWorkflowAndStoreInThreadContext(targetHandler, threadContext);
76+
// RestRequest might have stream content, in some cases we need to aggregate request content, for example audit logging.
77+
final Consumer<RestRequest> aggregationCallback = (aggregatedRestRequest) -> {
78+
final RestRequest wrappedRequest = maybeWrapRestRequest(aggregatedRestRequest, targetHandler);
79+
auditTrailService.get().authenticationSuccess(wrappedRequest);
80+
secondaryAuthenticator.authenticateAndAttachToContext(wrappedRequest, ActionListener.wrap(secondaryAuthentication -> {
81+
if (secondaryAuthentication != null) {
82+
logger.trace(
83+
"Found secondary authentication {} in REST request [{}]",
84+
secondaryAuthentication,
85+
aggregatedRestRequest.uri()
86+
);
87+
}
88+
WorkflowService.resolveWorkflowAndStoreInThreadContext(targetHandler, threadContext);
89+
90+
doHandleRequest(aggregatedRestRequest, channel, targetHandler, listener);
91+
}, e -> handleException(aggregatedRestRequest, e, listener)));
92+
};
93+
if (request.isStreamedContent() && auditTrailService.includeRequestBody()) {
94+
aggregate(request, aggregationCallback::accept);
95+
} else {
96+
aggregationCallback.accept(request);
97+
}
8098

81-
doHandleRequest(request, channel, targetHandler, listener);
82-
}, e -> handleException(request, e, listener)));
8399
}
84100

85101
private void doHandleRequest(RestRequest request, RestChannel channel, RestHandler targetHandler, ActionListener<Boolean> listener) {

0 commit comments

Comments
 (0)