Skip to content

Commit d092171

Browse files
authored
HADOOP-19365. S3A: Adds in support for auditing for AAL. (#7723)
1 parent d115595 commit d092171

File tree

8 files changed

+103
-15
lines changed

8 files changed

+103
-15
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1920,7 +1920,9 @@ private FSDataInputStream executeOpen(
19201920
.withContext(readContext.build())
19211921
.withObjectAttributes(createObjectAttributes(path, fileStatus))
19221922
.withStreamStatistics(inputStreamStats)
1923-
.withEncryptionSecrets(getEncryptionSecrets());
1923+
.withEncryptionSecrets(getEncryptionSecrets())
1924+
.withAuditSpan(auditSpan);
1925+
19241926
return new FSDataInputStream(getStore().readObject(parameters));
19251927
}
19261928

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.List;
2222

2323
import software.amazon.awssdk.core.SdkRequest;
24+
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
2425
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
2526
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
2627
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
@@ -50,6 +51,8 @@
5051
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OBJECT_LIST_REQUEST;
5152
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OBJECT_PUT_REQUEST;
5253
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.STORE_EXISTS_PROBE;
54+
import static software.amazon.s3.analyticsaccelerator.request.Constants.OPERATION_NAME;
55+
import static software.amazon.s3.analyticsaccelerator.request.Constants.SPAN_ID;
5356

5457
/**
5558
* Extract information from a request.
@@ -193,6 +196,18 @@ private RequestInfo writing(final String verb,
193196
|| request instanceof CreateSessionRequest;
194197
}
195198

199+
/**
200+
* If spanId and operation name are set by dependencies such as AAL, then this returns true. Allows for auditing
201+
* of requests which are made outside S3A's requestFactory.
202+
*
203+
* @param executionAttributes request execution attributes
204+
* @return true if request is audited outside of current span
205+
*/
206+
public static boolean isRequestAuditedOutsideOfCurrentSpan(ExecutionAttributes executionAttributes) {
207+
return executionAttributes.getAttribute(SPAN_ID) != null
208+
&& executionAttributes.getAttribute(OPERATION_NAME) != null;
209+
}
210+
196211
/**
197212
* Predicate which returns true if the request is part of the
198213
* multipart upload API -and which therefore must be rejected

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java

Lines changed: 42 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED;
6262
import static org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer.isRequestMultipartIO;
6363
import static org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer.isRequestNotAlwaysInSpan;
64+
import static org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer.isRequestAuditedOutsideOfCurrentSpan;
6465
import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.OUTSIDE_SPAN;
6566
import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.REFERRER_HEADER_ENABLED;
6667
import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.REFERRER_HEADER_ENABLED_DEFAULT;
@@ -69,6 +70,8 @@
6970
import static org.apache.hadoop.fs.s3a.commit.CommitUtils.extractJobID;
7071
import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.HEADER_REFERRER;
7172
import static org.apache.hadoop.fs.s3a.statistics.impl.StatisticsFromAwsSdkImpl.mapErrorStatusCodeToStatisticName;
73+
import static software.amazon.s3.analyticsaccelerator.request.Constants.OPERATION_NAME;
74+
import static software.amazon.s3.analyticsaccelerator.request.Constants.SPAN_ID;
7275

7376
/**
7477
* The LoggingAuditor logs operations at DEBUG (in SDK Request) and
@@ -85,7 +88,6 @@ public class LoggingAuditor
8588
private static final Logger LOG =
8689
LoggerFactory.getLogger(LoggingAuditor.class);
8790

88-
8991
/**
9092
* Some basic analysis for the logs.
9193
*/
@@ -267,7 +269,14 @@ HttpReferrerAuditHeader getReferrer(AuditSpanS3A span) {
267269
*/
268270
private class LoggingAuditSpan extends AbstractAuditSpanImpl {
269271

270-
private final HttpReferrerAuditHeader referrer;
272+
private HttpReferrerAuditHeader referrer;
273+
274+
/**
275+
* Builder for the referrer header. Requests that execute outside S3A, such as in AAL, will initially have SpanId
276+
* of the outside-span operation. For such requests, the spanId and operation name in this builder is overwritten
277+
* in the modifyHttpRequest execution interceptor.
278+
*/
279+
private final HttpReferrerAuditHeader.Builder headerBuilder;
271280

272281
/**
273282
* Attach Range of data for GetObject Request.
@@ -300,7 +309,7 @@ private LoggingAuditSpan(
300309
final String path2) {
301310
super(spanId, operationName);
302311

303-
this.referrer = HttpReferrerAuditHeader.builder()
312+
this.headerBuilder = HttpReferrerAuditHeader.builder()
304313
.withContextId(getAuditorId())
305314
.withSpanId(spanId)
306315
.withOperationName(operationName)
@@ -312,8 +321,9 @@ private LoggingAuditSpan(
312321
currentThreadID())
313322
.withAttribute(PARAM_TIMESTAMP, Long.toString(getTimestamp()))
314323
.withEvaluated(context.getEvaluatedEntries())
315-
.withFilter(filters)
316-
.build();
324+
.withFilter(filters);
325+
326+
this.referrer = this.headerBuilder.build();
317327

318328
this.description = referrer.buildHttpReferrer();
319329
}
@@ -384,6 +394,26 @@ public SdkHttpRequest modifyHttpRequest(Context.ModifyHttpRequest context,
384394
SdkHttpRequest httpRequest = context.httpRequest();
385395
SdkRequest sdkRequest = context.request();
386396

397+
// If spanId and operationName are set in execution attributes, then use these values,
398+
// instead of the ones in the current span. This is useful when requests are happening in dependencies such as
399+
// the analytics accelerator library (AAL), where they cannot be attached to the correct span. In which case, AAL
400+
// will attach the current spanId and operationName via execution attributes during it's request creation. These
401+
// can then used to update the values in the logger and referrer header. Without this overwriting, the operation
402+
// name and corresponding span will be whichever is active on the thread the request is getting executed on.
403+
boolean isRequestAuditedOutsideCurrentSpan = isRequestAuditedOutsideOfCurrentSpan(executionAttributes);
404+
405+
String spanId = isRequestAuditedOutsideCurrentSpan ?
406+
executionAttributes.getAttribute(SPAN_ID) : getSpanId();
407+
408+
String operationName = isRequestAuditedOutsideCurrentSpan ?
409+
executionAttributes.getAttribute(OPERATION_NAME) : getOperationName();
410+
411+
if (isRequestAuditedOutsideCurrentSpan) {
412+
this.headerBuilder.withSpanId(spanId);
413+
this.headerBuilder.withOperationName(operationName);
414+
this.referrer = this.headerBuilder.build();
415+
}
416+
387417
// attach range for GetObject requests
388418
attachRangeFromRequest(httpRequest, executionAttributes);
389419

@@ -400,11 +430,12 @@ public SdkHttpRequest modifyHttpRequest(Context.ModifyHttpRequest context,
400430
.appendHeader(HEADER_REFERRER, header)
401431
.build();
402432
}
433+
403434
if (LOG.isDebugEnabled()) {
404435
LOG.debug("[{}] {} Executing {} with {}; {}",
405436
currentThreadID(),
406-
getSpanId(),
407-
getOperationName(),
437+
spanId,
438+
operationName,
408439
analyzer.analyze(context.request()),
409440
header);
410441
}
@@ -533,10 +564,12 @@ public void beforeExecution(Context.BeforeExecution context,
533564
+ analyzer.analyze(context.request());
534565
final String unaudited = getSpanId() + " "
535566
+ UNAUDITED_OPERATION + " " + error;
567+
// If request is attached to a span in the modifyHttpRequest, as is the case for requests made by AAL, treat it
568+
// as an audited request.
536569
if (isRequestNotAlwaysInSpan(context.request())) {
537-
// can get by auditing during a copy, so don't overreact
570+
// can get by auditing during a copy, so don't overreact.
538571
LOG.debug(unaudited);
539-
} else {
572+
} else if (!isRequestAuditedOutsideOfCurrentSpan(executionAttributes)) {
540573
final RuntimeException ex = new AuditFailureException(unaudited);
541574
LOG.debug(unaudited, ex);
542575
if (isRejectOutOfSpan()) {
@@ -547,5 +580,4 @@ public void beforeExecution(Context.BeforeExecution context,
547580
super.beforeExecution(context, executionAttributes);
548581
}
549582
}
550-
551583
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -994,8 +994,7 @@ private class FactoryCallbacks implements StreamFactoryCallbacks {
994994

995995
@Override
996996
public S3Client getOrCreateSyncClient() throws IOException {
997-
// Needs support of the CRT before the requireCRT can be used
998-
LOG.debug("Stream factory requested async client");
997+
LOG.debug("Stream factory requested sync client");
999998
return clientManager().getOrCreateS3Client();
1000999
}
10011000

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import software.amazon.s3.analyticsaccelerator.common.ObjectRange;
3737
import software.amazon.s3.analyticsaccelerator.request.EncryptionSecrets;
3838
import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata;
39+
import software.amazon.s3.analyticsaccelerator.request.StreamAuditContext;
3940
import software.amazon.s3.analyticsaccelerator.util.InputPolicy;
4041
import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation;
4142
import software.amazon.s3.analyticsaccelerator.util.S3URI;
@@ -257,12 +258,18 @@ private OpenStreamInformation buildOpenStreamInformation(ObjectReadParameters pa
257258
.etag(parameters.getObjectAttributes().getETag()).build());
258259
}
259260

261+
260262
if (parameters.getEncryptionSecrets().getEncryptionMethod() == S3AEncryptionMethods.SSE_C) {
261263
EncryptionSecretOperations.getSSECustomerKey(parameters.getEncryptionSecrets())
262264
.ifPresent(base64customerKey -> openStreamInformationBuilder.encryptionSecrets(
263265
EncryptionSecrets.builder().sseCustomerKey(Optional.of(base64customerKey)).build()));
264266
}
265267

268+
openStreamInformationBuilder.streamAuditContext(StreamAuditContext.builder()
269+
.operationName(parameters.getAuditSpan().getOperationName())
270+
.spanId(parameters.getAuditSpan().getSpanId())
271+
.build());
272+
266273
return openStreamInformationBuilder.build();
267274
}
268275

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,7 @@ public StreamFactoryRequirements factoryRequirements() {
9797
vectorContext.setMinSeekForVectoredReads(0);
9898

9999
return new StreamFactoryRequirements(0,
100-
0, vectorContext,
101-
StreamFactoryRequirements.Requirements.ExpectUnauditedGetRequests);
100+
0, vectorContext);
102101
}
103102

104103
@Override

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectReadParameters.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
2626
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
2727
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
28+
import org.apache.hadoop.fs.store.audit.AuditSpan;
2829

2930
import static java.util.Objects.requireNonNull;
3031

@@ -75,6 +76,11 @@ public final class ObjectReadParameters {
7576
*/
7677
private EncryptionSecrets encryptionSecrets;
7778

79+
/**
80+
* Span for which this stream is being created.
81+
*/
82+
private AuditSpan auditSpan;
83+
7884
/**
7985
* Getter.
8086
* @return Encryption secrets.
@@ -196,6 +202,24 @@ public ObjectReadParameters withDirectoryAllocator(final LocalDirAllocator value
196202
return this;
197203
}
198204

205+
/**
206+
* Getter.
207+
* @return Audit span.
208+
*/
209+
public AuditSpan getAuditSpan() {
210+
return auditSpan;
211+
}
212+
213+
/**
214+
* Set audit span.
215+
* @param value new value
216+
* @return the builder
217+
*/
218+
public ObjectReadParameters withAuditSpan(final AuditSpan value) {
219+
auditSpan = value;
220+
return this;
221+
}
222+
199223
/**
200224
* Validate that all attributes are as expected.
201225
* Mock tests can skip this if required.
@@ -210,6 +234,7 @@ public ObjectReadParameters validate() {
210234
requireNonNull(objectAttributes, "objectAttributes");
211235
requireNonNull(streamStatistics, "streamStatistics");
212236
requireNonNull(encryptionSecrets, "encryptionSecrets");
237+
requireNonNull(auditSpan, "auditSpan");
213238
return this;
214239
}
215240
}

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
4343
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_PARQUET;
4444
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
45+
import static org.apache.hadoop.fs.audit.AuditStatisticNames.AUDIT_REQUEST_EXECUTION;
4546
import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
4647
import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
4748
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
@@ -109,6 +110,12 @@ public void testConnectorFrameWorkIntegration() throws Throwable {
109110
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
110111
fs.close();
111112
verifyStatisticCounterValue(fs.getIOStatistics(), ANALYTICS_STREAM_FACTORY_CLOSED, 1);
113+
114+
// Expect 4 audited requests. One HEAD, and 3 GETs. The 3 GETs are because the read policy is WHOLE_FILE,
115+
// in which case, AAL will start prefetching till EoF on file open in 8MB chunks. The file read here
116+
// s3://noaa-cors-pds/raw/2023/017/ohfh/OHFH017d.23_.gz, has a size of ~21MB, resulting in 3 GETS:
117+
// [0-8388607, 8388608-16777215, 16777216-21511173].
118+
verifyStatisticCounterValue(fs.getIOStatistics(), AUDIT_REQUEST_EXECUTION, 4);
112119
}
113120

114121
@Test
@@ -175,6 +182,8 @@ public void testMultiRowGroupParquet() throws Throwable {
175182
}
176183

177184
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
185+
186+
verifyStatisticCounterValue(getFileSystem().getIOStatistics(), AUDIT_REQUEST_EXECUTION, 4);
178187
}
179188

180189
@Test

0 commit comments

Comments
 (0)