Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/130594.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 130594
summary: Add audit logging for stream content
area: Network
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -109,18 +109,19 @@ public void testFilteringOfRequestBodies() throws Exception {

public void testAuditAuthenticationSuccessForStreamingRequest() throws Exception {
final Request request = new Request("POST", "/testindex/_bulk");
request.setEntity(new StringEntity("""
final String content = """
{"index":{}}
{}
""", ContentType.create("application/x-ndjson", StandardCharsets.UTF_8)));
""";
request.setEntity(new StringEntity(content, ContentType.create("application/x-ndjson", StandardCharsets.UTF_8)));
executeAndVerifyAudit(
request,
AuditLevel.AUTHENTICATION_SUCCESS,
event -> assertThat(
event,
allOf(
hasEntry(LoggingAuditTrail.AUTHENTICATION_TYPE_FIELD_NAME, "REALM"),
hasEntry(LoggingAuditTrail.REQUEST_BODY_FIELD_NAME, "Request body had not been received at the time of the audit event")
hasEntry(LoggingAuditTrail.REQUEST_BODY_FIELD_NAME, content)
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.xpack.core.security.authc.AuthenticationToken;
import org.elasticsearch.xpack.core.security.authz.AuthorizationEngine.AuthorizationInfo;
import org.elasticsearch.xpack.security.Security;
import org.elasticsearch.xpack.security.audit.logfile.LoggingAuditTrail;
import org.elasticsearch.xpack.security.transport.filter.SecurityIpFilterRule;

import java.net.InetSocketAddress;
Expand Down Expand Up @@ -53,6 +54,14 @@ public AuditTrail get() {
}
}

public boolean includeRequestBody() {
if (get() instanceof LoggingAuditTrail trail) {
return trail.includeRequestBody();
} else {
return false;
}
Comment on lines +58 to +62
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one-liner?

Suggested change
if (get() instanceof LoggingAuditTrail trail) {
return trail.includeRequestBody();
} else {
return false;
}
return get() instanceof LoggingAuditTrail trail && trail.includeRequestBody();

}

// TODO: this method only exists for access to LoggingAuditTrail in a Node for testing.
// DO NOT USE IT, IT WILL BE REMOVED IN THE FUTURE
public AuditTrail getAuditTrail() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@ public class AuditUtil {

public static String restRequestContent(RestRequest request) {
if (request.hasContent()) {
if (request.isStreamedContent()) {
return "Request body had not been received at the time of the audit event";
}
var content = request.content();
try {
return XContentHelper.convertToJson(content, false, false, request.getXContentType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ public class LoggingAuditTrail implements AuditTrail, ClusterStateListener {
final EventFilterPolicyRegistry eventFilterPolicyRegistry;
// package for testing
volatile EnumSet<AuditLevel> events;
boolean includeRequestBody;
volatile boolean includeRequestBody;
// fields that all entries have in common
EntryCommonFields entryCommonFields;

Expand Down Expand Up @@ -1072,6 +1072,10 @@ public void coordinatingActionResponse(
// not implemented yet
}

public boolean includeRequestBody() {
return includeRequestBody;
}
Comment on lines +1075 to +1077
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems problematic. According to this comment, the non-volatile field includeRequestBody is guaranteed to see the latest change because it is always read after the volatile field events. It is true within LoggingAuditTrail but no longer the case if we just return it here. I think we can either also read events field here before return or having SecurityRestFilter register its own settings update consumer for the INCLUDE_REQUEST_BODY setting so that it does not need to ask LoggingAuditTrail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SecurityRestFilter needs to know both features : LoggingAuditTrail and includeRequestBody are enabled. Shouldnt be much of a difference. Kind of hacky, cutting through two layers of abstraction.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So a read for events before returnning should suffice in this case. Otherwise we may see stale value for includeRequestBody

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making includeRequestBody volatile should be cleaner then, if I still need to read a volatile field. 9351f93


private LogEntryBuilder securityChangeLogEntryBuilder(String requestId) {
return new LogEntryBuilder(false).with(EVENT_TYPE_FIELD_NAME, SECURITY_CHANGE_ORIGIN_FIELD_VALUE).withRequestId(requestId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
import org.elasticsearch.xpack.security.authz.restriction.WorkflowService;
import org.elasticsearch.xpack.security.operator.OperatorPrivileges;

import java.util.function.Consumer;

import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.rest.RestContentAggregator.aggregate;

public class SecurityRestFilter implements RestInterceptor {

Expand Down Expand Up @@ -70,16 +73,29 @@ public void intercept(RestRequest request, RestChannel channel, RestHandler targ
return;
}

final RestRequest wrappedRequest = maybeWrapRestRequest(request, targetHandler);
auditTrailService.get().authenticationSuccess(wrappedRequest);
secondaryAuthenticator.authenticateAndAttachToContext(wrappedRequest, ActionListener.wrap(secondaryAuthentication -> {
if (secondaryAuthentication != null) {
logger.trace("Found secondary authentication {} in REST request [{}]", secondaryAuthentication, request.uri());
}
WorkflowService.resolveWorkflowAndStoreInThreadContext(targetHandler, threadContext);
// RestRequest might have stream content, in some cases we need to aggregate request content, for example audit logging.
final Consumer<RestRequest> aggregationCallback = (aggregatedRestRequest) -> {
final RestRequest wrappedRequest = maybeWrapRestRequest(aggregatedRestRequest, targetHandler);
auditTrailService.get().authenticationSuccess(wrappedRequest);
secondaryAuthenticator.authenticateAndAttachToContext(wrappedRequest, ActionListener.wrap(secondaryAuthentication -> {
if (secondaryAuthentication != null) {
logger.trace(
"Found secondary authentication {} in REST request [{}]",
secondaryAuthentication,
aggregatedRestRequest.uri()
);
}
WorkflowService.resolveWorkflowAndStoreInThreadContext(targetHandler, threadContext);

doHandleRequest(aggregatedRestRequest, channel, targetHandler, listener);
}, e -> handleException(aggregatedRestRequest, e, listener)));
};
if (request.isStreamedContent() && auditTrailService.includeRequestBody()) {
aggregate(request, aggregationCallback::accept);
} else {
aggregationCallback.accept(request);
}

doHandleRequest(request, channel, targetHandler, listener);
}, e -> handleException(request, e, listener)));
}

private void doHandleRequest(RestRequest request, RestChannel channel, RestHandler targetHandler, ActionListener<Boolean> listener) {
Expand Down
Loading