diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java index 6389742167def..e992a1e1c5ae4 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java @@ -459,8 +459,8 @@ public enum Statistic { "Gauge of active memory in use", TYPE_GAUGE), - /* Stream Write statistics */ + /* Stream Write statistics */ STREAM_WRITE_EXCEPTIONS( StreamStatisticNames.STREAM_WRITE_EXCEPTIONS, "Count of stream write failures reported", diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java new file mode 100644 index 0000000000000..b4d99d8328426 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl.streams; + +import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; +import org.apache.hadoop.fs.statistics.DurationTracker; +import software.amazon.s3.analyticsaccelerator.util.RequestCallback; + +/** + * Implementation of AAL's RequestCallback interface that tracks analytics operations. + */ +public class AnalyticsRequestCallback implements RequestCallback { + private final S3AInputStreamStatistics statistics; + + /** + * Create a new callback instance. + * @param statistics the statistics to update + */ + public AnalyticsRequestCallback(S3AInputStreamStatistics statistics) { + this.statistics = statistics; + } + + @Override + public void onGetRequest() { + statistics.initiateGetRequest(); + } + + @Override + public void onHeadRequest() { + statistics.incrementAnalyticsHeadRequests(); + } +} + diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java index 8920b5b2dfc7c..bca8f122dab09 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java @@ -40,6 +40,7 @@ import software.amazon.s3.analyticsaccelerator.util.InputPolicy; import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation; import software.amazon.s3.analyticsaccelerator.util.S3URI; +import software.amazon.s3.analyticsaccelerator.util.RequestCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,6 +73,7 @@ public AnalyticsStream(final ObjectReadParameters parameters, final S3SeekableInputStreamFactory s3SeekableInputStreamFactory) throws IOException { super(InputStreamType.Analytics, parameters); S3ObjectAttributes s3Attributes = parameters.getObjectAttributes(); + this.inputStream = s3SeekableInputStreamFactory.createStream(S3URI.of(s3Attributes.getBucket(), s3Attributes.getKey()), buildOpenStreamInformation(parameters)); getS3AStreamStatistics().streamOpened(InputStreamType.Analytics); @@ -80,6 +82,9 @@ public AnalyticsStream(final ObjectReadParameters parameters, @Override public int read() throws IOException { throwIfClosed(); + + getS3AStreamStatistics().readOperationStarted(getPos(), 1); + int bytesRead; try { bytesRead = inputStream.read(); @@ -87,6 +92,11 @@ public int read() throws IOException { onReadFailure(ioe); throw ioe; } + + if (bytesRead != -1) { + incrementBytesRead(1); + } + return bytesRead; } @@ -122,6 +132,8 @@ public synchronized long getPos() { */ public int readTail(byte[] buf, int off, int len) throws IOException { throwIfClosed(); + getS3AStreamStatistics().readOperationStarted(getPos(), len); + int bytesRead; try { bytesRead = inputStream.readTail(buf, off, len); @@ -129,12 +141,21 @@ public int readTail(byte[] buf, int off, int len) throws IOException { onReadFailure(ioe); throw ioe; } + + if (bytesRead > 0) { + incrementBytesRead(bytesRead); + } + return bytesRead; } @Override public int read(byte[] buf, int off, int len) throws IOException { throwIfClosed(); + long pos = getPos(); + + getS3AStreamStatistics().readOperationStarted(pos, len); + int bytesRead; try { bytesRead = inputStream.read(buf, off, len); @@ -142,6 +163,11 @@ public int read(byte[] buf, int off, int len) throws IOException { onReadFailure(ioe); throw ioe; } + + if (bytesRead > 0) { + incrementBytesRead(bytesRead); + } + return bytesRead; } @@ -247,10 +273,13 @@ private void onReadFailure(IOException ioe) throws IOException { } private OpenStreamInformation buildOpenStreamInformation(ObjectReadParameters parameters) { + + final RequestCallback requestCallback = new AnalyticsRequestCallback(getS3AStreamStatistics()); + OpenStreamInformation.OpenStreamInformationBuilder openStreamInformationBuilder = OpenStreamInformation.builder() .inputPolicy(mapS3AInputPolicyToAAL(parameters.getContext() - .getInputPolicy())); + .getInputPolicy())).requestCallback(requestCallback); if (parameters.getObjectAttributes().getETag() != null) { openStreamInformationBuilder.objectMetadata(ObjectMetadata.builder() @@ -300,4 +329,16 @@ protected void throwIfClosed() throws IOException { throw new IOException(getKey() + ": " + FSExceptionMessages.STREAM_IS_CLOSED); } } + + /** + * Increment the bytes read counter if there is a stats instance + * and the number of bytes read is more than zero. + * @param bytesRead number of bytes read + */ + private void incrementBytesRead(long bytesRead) { + getS3AStreamStatistics().bytesRead(bytesRead); + if (getContext().getStats() != null && bytesRead > 0) { + getContext().getStats().incrementBytesRead(bytesRead); + } + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java index 26b9f2b1568ca..48739181efee6 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java @@ -159,7 +159,12 @@ public void seekForwards(final long skipped, final long bytesReadInSeek) { } - + @Override + public void incrementAnalyticsGetRequests() { + } + @Override + public void incrementAnalyticsHeadRequests() { + } @Override public long streamOpened() { return 0; diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java index 8f8f90f9b1e65..c921fda32a680 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java @@ -43,13 +43,19 @@ import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_PARQUET; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE; import static org.apache.hadoop.fs.audit.AuditStatisticNames.AUDIT_REQUEST_EXECUTION; +import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset; +import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX; import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_OPERATIONS; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.ANALYTICS_STREAM_FACTORY_CLOSED; +import static org.apache.hadoop.io.Sizes.S_1K; +import static org.apache.hadoop.io.Sizes.S_1M; import static org.apache.hadoop.test.LambdaTestUtils.intercept; /** @@ -105,6 +111,13 @@ public void testConnectorFrameWorkIntegration() throws Throwable { Assertions.assertThat(objectInputStream.streamType()).isEqualTo(InputStreamType.Analytics); Assertions.assertThat(objectInputStream.getInputPolicy()) .isEqualTo(S3AInputPolicy.Sequential); + + verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES, 500); + verifyStatisticCounterValue(ioStats, STREAM_READ_OPERATIONS, 1); + + long streamBytesRead = objectInputStream.getS3AStreamStatistics().getBytesRead(); + Assertions.assertThat(streamBytesRead).as("Stream statistics should track bytes read") + .isEqualTo(500); } verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); @@ -136,14 +149,24 @@ public void testMalformedParquetFooter() throws IOException { byte[] buffer = new byte[500]; IOStatistics ioStats; + int bytesRead; try (FSDataInputStream inputStream = getFileSystem().open(dest)) { ioStats = inputStream.getIOStatistics(); inputStream.seek(5); - inputStream.read(buffer, 0, 500); + bytesRead = inputStream.read(buffer, 0, 500); + + ObjectInputStream objectInputStream = (ObjectInputStream) inputStream.getWrappedStream(); + long streamBytesRead = objectInputStream.getS3AStreamStatistics().getBytesRead(); + Assertions.assertThat(streamBytesRead).as("Stream statistics should track bytes read") + .isEqualTo(bytesRead); + } verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); + verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 1); + // S3A passes in the meta data on file open, we expect AAL to make no HEAD requests + verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0); } /** @@ -173,17 +196,23 @@ public void testMultiRowGroupParquet() throws Throwable { } verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); - + verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 1); try (FSDataInputStream inputStream = getFileSystem().openFile(dest) .must(FS_OPTION_OPENFILE_READ_POLICY, FS_OPTION_OPENFILE_READ_POLICY_PARQUET) .build().get()) { ioStats = inputStream.getIOStatistics(); inputStream.readFully(buffer, 0, (int) fileStatus.getLen()); - } + verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES, (int) fileStatus.getLen()); + verifyStatisticCounterValue(ioStats, STREAM_READ_OPERATIONS, 1); + } verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); verifyStatisticCounterValue(getFileSystem().getIOStatistics(), AUDIT_REQUEST_EXECUTION, 4); + + // S3A passes in the meta-data(content length) on file open, + // we expect AAL to make no HEAD requests + verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0); } @Test @@ -203,4 +232,97 @@ public void testInvalidConfigurationThrows() throws Exception { () -> S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration)); } + /** + * TXT files(SEQUENTIAL format) use SequentialPrefetcher(requests the entire 10MB file). + * RangeOptimiser splits ranges larger than maxRangeSizeBytes (8MB) using partSizeBytes (8MB) + * The 10MB range gets split into: [0-8MB) and [8MB-10MB) + * Each split range becomes a separate Block, resulting in 2 GET requests: + */ + @Test + public void testLargeFileMultipleGets() throws Throwable { + describe("Large file should trigger multiple GET requests"); + + Path dest = path("large-test-file.txt"); + byte[] data = dataset(10 * S_1M, 256, 255); + writeDataset(getFileSystem(), dest, data, 10 * S_1M, 1024, true); + + byte[] buffer = new byte[S_1M * 10]; + try (FSDataInputStream inputStream = getFileSystem().open(dest)) { + IOStatistics ioStats = inputStream.getIOStatistics(); + inputStream.readFully(buffer); + + verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 2); + // Because S3A passes in the meta-data(content length) on file open, + // we expect AAL to make no HEAD requests + verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0); + } + } + + @Test + public void testSmallFileSingleGet() throws Throwable { + describe("Small file should trigger only one GET request"); + + Path dest = path("small-test-file.txt"); + byte[] data = dataset(S_1M, 256, 255); + writeDataset(getFileSystem(), dest, data, S_1M, 1024, true); + + byte[] buffer = new byte[S_1M]; + try (FSDataInputStream inputStream = getFileSystem().open(dest)) { + IOStatistics ioStats = inputStream.getIOStatistics(); + inputStream.readFully(buffer); + + verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 1); + // Because S3A passes in the meta-data(content length) on file open, + // we expect AAL to make no HEAD requests + verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0); + } + } + + + @Test + public void testRandomSeekPatternGets() throws Throwable { + describe("Random seek pattern should optimize GET requests"); + + Path dest = path("seek-test.txt"); + byte[] data = dataset(5 * S_1M, 256, 255); + writeDataset(getFileSystem(), dest, data, 5 * S_1M, 1024, true); + + byte[] buffer = new byte[S_1M]; + try (FSDataInputStream inputStream = getFileSystem().open(dest)) { + IOStatistics ioStats = inputStream.getIOStatistics(); + + inputStream.read(buffer); + inputStream.seek(2 * S_1M); + inputStream.read(new byte[512 * S_1K]); + inputStream.seek(3 * S_1M); + inputStream.read(new byte[512 * S_1K]); + + verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 1); + verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0); + } + } + + + @Test + public void testSequentialStreamsNoDuplicateGets() throws Throwable { + describe("Sequential streams reading same object should not duplicate GETs"); + + Path dest = path("sequential-test.txt"); + byte[] data = dataset(S_1M, 256, 255); + writeDataset(getFileSystem(), dest, data, S_1M, 1024, true); + + byte[] buffer = new byte[1024]; + try (FSDataInputStream stream1 = getFileSystem().open(dest); + FSDataInputStream stream2 = getFileSystem().open(dest)) { + + stream1.read(buffer); + stream2.read(buffer); + + IOStatistics stats1 = stream1.getIOStatistics(); + IOStatistics stats2 = stream2.getIOStatistics(); + + verifyStatisticCounterValue(stats1, STREAM_READ_ANALYTICS_GET_REQUESTS, 1); + verifyStatisticCounterValue(stats2, STREAM_READ_ANALYTICS_GET_REQUESTS, 0); + } + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java index 12a1cd7d8f63e..2bc342717a16c 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java @@ -45,7 +45,6 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset; import static org.apache.hadoop.fs.s3a.S3ATestUtils.disablePrefetching; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES; @@ -81,10 +80,7 @@ protected Configuration createConfiguration() { public void setup() throws Exception { super.setup(); executor = HadoopExecutors.newFixedThreadPool(SMALL_THREADS); - // Analytics accelerator currently does not support IOStatisticsContext, this will be added as - // part of https://issues.apache.org/jira/browse/HADOOP-19364 - skipIfAnalyticsAcceleratorEnabled(getConfiguration(), - "Analytics Accelerator currently does not support IOStatisticsContext"); + } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java index 2ae28c74fe5b7..548b30a3b2dcb 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java @@ -28,7 +28,6 @@ import java.io.IOException; import java.io.InputStream; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString; /** @@ -52,10 +51,6 @@ public void testMetricsRegister() @Test public void testStreamStatistics() throws IOException { - // Analytics accelerator currently does not support IOStatistics, this will be added as - // part of https://issues.apache.org/jira/browse/HADOOP-19364 - skipIfAnalyticsAcceleratorEnabled(getConfiguration(), - "Analytics Accelerator currently does not support stream statistics"); S3AFileSystem fs = getFileSystem(); Path file = path("testStreamStatistics"); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java index 9ad2c0625a094..7a62f76dd0b7d 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java @@ -42,7 +42,6 @@ import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest; import org.apache.hadoop.fs.statistics.IOStatisticsLogging; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.s3a.Statistic.ACTION_HTTP_GET_REQUEST; import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_FILES_CREATED; import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_MARKER_PUT; @@ -174,10 +173,7 @@ private void abortActiveStream() throws IOException { public void testCostOfCreatingMagicFile() throws Throwable { describe("Files created under magic paths skip existence checks and marker deletes"); - // Analytics accelerator currently does not support IOStatistics, this will be added as - // part of https://issues.apache.org/jira/browse/HADOOP-19364 - skipIfAnalyticsAcceleratorEnabled(getConfiguration(), - "Analytics Accelerator currently does not support stream statistics"); + S3AFileSystem fs = getFileSystem(); Path destFile = methodSubPath("file.txt"); fs.delete(destFile.getParent(), true); @@ -255,10 +251,6 @@ public void testCostOfCreatingMagicFile() throws Throwable { public void testCostOfSavingLoadingPendingFile() throws Throwable { describe("Verify costs of saving .pending file under a magic path"); - // Analytics accelerator currently does not support IOStatistics, this will be added as - // part of https://issues.apache.org/jira/browse/HADOOP-19364 - skipIfAnalyticsAcceleratorEnabled(getConfiguration(), - "Analytics Accelerator currently does not support stream statistics"); S3AFileSystem fs = getFileSystem(); Path partDir = methodSubPath("file.pending"); Path destFile = new Path(partDir, "file.pending"); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java index 484d1cc2c3121..e958c98942095 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java @@ -30,7 +30,6 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.junit.jupiter.api.Assertions.assertEquals; /** @@ -48,10 +47,6 @@ public class ITestS3AFileContextStatistics extends FCStatisticsBaseTest { @BeforeEach public void setUp() throws Exception { conf = new Configuration(); - // Analytics accelerator currently does not support IOStatistics, this will be added as - // part of https://issues.apache.org/jira/browse/HADOOP-19364 - skipIfAnalyticsAcceleratorEnabled(conf, - "Analytics Accelerator currently does not support stream statistics"); fc = S3ATestUtils.createTestFileContext(conf); testRootPath = fileContextTestHelper.getTestRootPath(fc, "test"); fc.mkdir(testRootPath, diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java index c1c03ca6e7212..34a7d5a54037b 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java @@ -58,7 +58,6 @@ import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching; import static org.apache.hadoop.fs.s3a.S3ATestUtils.getS3AInputStream; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.s3a.S3ATestUtils.streamType; import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE; import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_OPENED; @@ -113,10 +112,7 @@ public Configuration createConfiguration() { @Override public void setup() throws Exception { super.setup(); - // Analytics accelerator currently does not support IOStatistics, this will be added as - // part of https://issues.apache.org/jira/browse/HADOOP-19364 - skipIfAnalyticsAcceleratorEnabled(getConfiguration(), - "Analytics Accelerator currently does not support stream statistics"); + S3AFileSystem fs = getFileSystem(); testFile = methodPath(); @@ -177,6 +173,11 @@ public void testStreamIsNotChecksummed() throws Throwable { // if prefetching is enabled, skip this test assumeNoPrefetching(); + // Skip for Analytics streams - checksum validation only exists in S3AInputStream. + // AnalyticsStream handles data integrity through AWS Analytics Accelerator internally. + if (isAnalyticsStream()) { + skip("Analytics stream doesn't use checksums"); + } S3AFileSystem fs = getFileSystem(); // open the file @@ -261,10 +262,13 @@ public void testOpenFileLongerLengthReadFully() throws Throwable { } }, always(), - // two GET calls were made, one for readFully, - // the second on the read() past the EOF - // the operation has got as far as S3 - probe(!prefetching(), STREAM_READ_OPENED, 1 + 1)); + // Analytics stream: 1 open (persistent connection) + // S3AInputStream: 2 opens (reopen on EOF) + // two GET calls were made, one for readFully, + // the second on the read() past the EOF + // the operation has got as far as S3 + probe(!prefetching() && !isAnalyticsStream(), STREAM_READ_OPENED, 2), + probe(!prefetching() && isAnalyticsStream(), STREAM_READ_OPENED, 1)); // now on a new stream, try a full read from after the EOF verifyMetrics(() -> { @@ -348,7 +352,9 @@ public void testReadPastEOF() throws Throwable { } }, always(), - probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, extra)); + // Analytics streams don't make HTTP requests when reading past EOF + probe(!prefetching && !isAnalyticsStream(), Statistic.ACTION_HTTP_GET_REQUEST, extra), + probe(!prefetching && isAnalyticsStream(), Statistic.ACTION_HTTP_GET_REQUEST, 0)); } /** @@ -461,6 +467,14 @@ private boolean prefetching() { return InputStreamType.Prefetch == streamType(getFileSystem()); } + /** + * Is the current stream type Analytics? + * @return true if Analytics stream is enabled. + */ + private boolean isAnalyticsStream() { + return streamType(getFileSystem()) == InputStreamType.Analytics; + } + /** * Skip the test if prefetching is enabled. */ diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java index 4165f7a6c9cb9..35f28d0eb08af 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java @@ -30,7 +30,6 @@ import org.apache.hadoop.fs.statistics.StreamStatisticNames; import org.apache.hadoop.test.tags.IntegrationTest; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.*; /** @@ -86,10 +85,7 @@ public List outputStreamStatisticKeys() { @Test @Override public void testInputStreamStatisticRead() throws Throwable { - // Analytics accelerator currently does not support IOStatistics, this will be added as - // part of https://issues.apache.org/jira/browse/HADOOP-19364 - skipIfAnalyticsAcceleratorEnabled(getContract().getConf(), - "Analytics Accelerator currently does not support stream statistics"); super.testInputStreamStatisticRead(); } + } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java index 2f54cab00b13d..81a719fbea2b2 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java @@ -31,7 +31,6 @@ import org.apache.hadoop.fs.statistics.IOStatisticAssertions; import org.apache.hadoop.fs.statistics.StreamStatisticNames; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; public class ITestS3AFileSystemStatistic extends AbstractS3ATestBase { @@ -44,10 +43,6 @@ public class ITestS3AFileSystemStatistic extends AbstractS3ATestBase { */ @Test public void testBytesReadWithStream() throws IOException { - // Analytics accelerator currently does not support IOStatistics, this will be added as - // part of https://issues.apache.org/jira/browse/HADOOP-19364 - skipIfAnalyticsAcceleratorEnabled(getConfiguration(), - "Analytics Accelerator currently does not support stream statistics"); S3AFileSystem fs = getFileSystem(); Path filePath = path(getMethodName()); byte[] oneKbBuf = new byte[ONE_KB];