Skip to content

Commit 3397d4f

Browse files
committed
IoStats changes
1 parent 6eae158 commit 3397d4f

File tree

13 files changed

+252
-56
lines changed

13 files changed

+252
-56
lines changed

hadoop-project/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@
207207
<aws-java-sdk.version>1.12.720</aws-java-sdk.version>
208208
<aws-java-sdk-v2.version>2.29.52</aws-java-sdk-v2.version>
209209
<amazon-s3-encryption-client-java.version>3.1.1</amazon-s3-encryption-client-java.version>
210-
<amazon-s3-analyticsaccelerator-s3.version>1.0.0</amazon-s3-analyticsaccelerator-s3.version>
210+
<amazon-s3-analyticsaccelerator-s3.version>1.2.1</amazon-s3-analyticsaccelerator-s3.version>
211211
<aws.eventstream.version>1.0.1</aws.eventstream.version>
212212
<hsqldb.version>2.7.1</hsqldb.version>
213213
<frontend-maven-plugin.version>1.11.2</frontend-maven-plugin.version>

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -459,8 +459,8 @@ public enum Statistic {
459459
"Gauge of active memory in use",
460460
TYPE_GAUGE),
461461

462-
/* Stream Write statistics */
463462

463+
/* Stream Write statistics */
464464
STREAM_WRITE_EXCEPTIONS(
465465
StreamStatisticNames.STREAM_WRITE_EXCEPTIONS,
466466
"Count of stream write failures reported",
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.s3a.impl.streams;
20+
21+
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
22+
import org.apache.hadoop.fs.statistics.DurationTracker;
23+
import software.amazon.s3.analyticsaccelerator.util.RequestCallback;
24+
25+
/**
26+
* Implementation of AAL's RequestCallback interface that tracks analytics operations.
27+
*/
28+
public class AnalyticsRequestCallback implements RequestCallback {
29+
private final S3AInputStreamStatistics statistics;
30+
31+
/**
32+
* Create a new callback instance.
33+
* @param statistics the statistics to update
34+
*/
35+
public AnalyticsRequestCallback(S3AInputStreamStatistics statistics) {
36+
this.statistics = statistics;
37+
}
38+
39+
@Override
40+
public void onGetRequest() {
41+
statistics.initiateGetRequest();
42+
}
43+
44+
@Override
45+
public void onHeadRequest() {
46+
statistics.incrementAnalyticsHeadRequests();
47+
}
48+
}
49+

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import software.amazon.s3.analyticsaccelerator.util.InputPolicy;
2929
import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation;
3030
import software.amazon.s3.analyticsaccelerator.util.S3URI;
31+
import software.amazon.s3.analyticsaccelerator.util.RequestCallback;
3132

3233
import org.slf4j.Logger;
3334
import org.slf4j.LoggerFactory;
@@ -55,6 +56,7 @@ public AnalyticsStream(final ObjectReadParameters parameters,
5556
final S3SeekableInputStreamFactory s3SeekableInputStreamFactory) throws IOException {
5657
super(InputStreamType.Analytics, parameters);
5758
S3ObjectAttributes s3Attributes = parameters.getObjectAttributes();
59+
5860
this.inputStream = s3SeekableInputStreamFactory.createStream(S3URI.of(s3Attributes.getBucket(),
5961
s3Attributes.getKey()), buildOpenStreamInformation(parameters));
6062
getS3AStreamStatistics().streamOpened(InputStreamType.Analytics);
@@ -63,13 +65,21 @@ public AnalyticsStream(final ObjectReadParameters parameters,
6365
@Override
6466
public int read() throws IOException {
6567
throwIfClosed();
68+
69+
getS3AStreamStatistics().readOperationStarted(getPos(), 1);
70+
6671
int bytesRead;
6772
try {
6873
bytesRead = inputStream.read();
6974
} catch (IOException ioe) {
7075
onReadFailure(ioe);
7176
throw ioe;
7277
}
78+
79+
if (bytesRead != -1) {
80+
incrementBytesRead(1);
81+
}
82+
7383
return bytesRead;
7484
}
7585

@@ -105,26 +115,42 @@ public synchronized long getPos() {
105115
*/
106116
public int readTail(byte[] buf, int off, int len) throws IOException {
107117
throwIfClosed();
118+
getS3AStreamStatistics().readOperationStarted(getPos(), len);
119+
108120
int bytesRead;
109121
try {
110122
bytesRead = inputStream.readTail(buf, off, len);
111123
} catch (IOException ioe) {
112124
onReadFailure(ioe);
113125
throw ioe;
114126
}
127+
128+
if (bytesRead > 0) {
129+
incrementBytesRead(bytesRead);
130+
}
131+
115132
return bytesRead;
116133
}
117134

118135
@Override
119136
public int read(byte[] buf, int off, int len) throws IOException {
120137
throwIfClosed();
138+
long pos = getPos();
139+
140+
getS3AStreamStatistics().readOperationStarted(pos, len);
141+
121142
int bytesRead;
122143
try {
123144
bytesRead = inputStream.read(buf, off, len);
124145
} catch (IOException ioe) {
125146
onReadFailure(ioe);
126147
throw ioe;
127148
}
149+
150+
if (bytesRead > 0) {
151+
incrementBytesRead(bytesRead);
152+
}
153+
128154
return bytesRead;
129155
}
130156

@@ -194,10 +220,13 @@ private void onReadFailure(IOException ioe) throws IOException {
194220
}
195221

196222
private OpenStreamInformation buildOpenStreamInformation(ObjectReadParameters parameters) {
223+
224+
final RequestCallback requestCallback = new AnalyticsRequestCallback(getS3AStreamStatistics());
225+
197226
OpenStreamInformation.OpenStreamInformationBuilder openStreamInformationBuilder =
198227
OpenStreamInformation.builder()
199228
.inputPolicy(mapS3AInputPolicyToAAL(parameters.getContext()
200-
.getInputPolicy()));
229+
.getInputPolicy())).requestCallback(requestCallback);
201230

202231
if (parameters.getObjectAttributes().getETag() != null) {
203232
openStreamInformationBuilder.objectMetadata(ObjectMetadata.builder()
@@ -235,4 +264,16 @@ protected void throwIfClosed() throws IOException {
235264
throw new IOException(getKey() + ": " + FSExceptionMessages.STREAM_IS_CLOSED);
236265
}
237266
}
267+
268+
/**
269+
* Increment the bytes read counter if there is a stats instance
270+
* and the number of bytes read is more than zero.
271+
* @param bytesRead number of bytes read
272+
*/
273+
private void incrementBytesRead(long bytesRead) {
274+
getS3AStreamStatistics().bytesRead(bytesRead);
275+
if (getContext().getStats() != null && bytesRead > 0) {
276+
getContext().getStats().incrementBytesRead(bytesRead);
277+
}
278+
}
238279
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,12 @@ public void seekForwards(final long skipped,
159159
final long bytesReadInSeek) {
160160

161161
}
162-
162+
@Override
163+
public void incrementAnalyticsGetRequests() {
164+
}
165+
@Override
166+
public void incrementAnalyticsHeadRequests() {
167+
}
163168
@Override
164169
public long streamOpened() {
165170
return 0;

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java

Lines changed: 127 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,22 @@
4242
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
4343
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_PARQUET;
4444
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
45+
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
46+
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
4547
import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
4648
import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
4749
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
4850
import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData;
4951
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
52+
53+
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES;
54+
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_OPERATIONS;
55+
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_GET_REQUESTS;
56+
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_HEAD_REQUESTS;
5057
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED;
5158
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.ANALYTICS_STREAM_FACTORY_CLOSED;
59+
import static org.apache.hadoop.io.Sizes.S_1K;
60+
import static org.apache.hadoop.io.Sizes.S_1M;
5261
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
5362

5463
/**
@@ -104,6 +113,13 @@ public void testConnectorFrameWorkIntegration() throws Throwable {
104113
Assertions.assertThat(objectInputStream.streamType()).isEqualTo(InputStreamType.Analytics);
105114
Assertions.assertThat(objectInputStream.getInputPolicy())
106115
.isEqualTo(S3AInputPolicy.Sequential);
116+
117+
verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES, 500);
118+
verifyStatisticCounterValue(ioStats, STREAM_READ_OPERATIONS, 1);
119+
120+
long streamBytesRead = objectInputStream.getS3AStreamStatistics().getBytesRead();
121+
Assertions.assertThat(streamBytesRead).as("Stream statistics should track bytes read")
122+
.isEqualTo(500);
107123
}
108124

109125
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
@@ -129,14 +145,24 @@ public void testMalformedParquetFooter() throws IOException {
129145

130146
byte[] buffer = new byte[500];
131147
IOStatistics ioStats;
148+
int bytesRead;
132149

133150
try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
134151
ioStats = inputStream.getIOStatistics();
135152
inputStream.seek(5);
136-
inputStream.read(buffer, 0, 500);
153+
bytesRead = inputStream.read(buffer, 0, 500);
154+
155+
ObjectInputStream objectInputStream = (ObjectInputStream) inputStream.getWrappedStream();
156+
long streamBytesRead = objectInputStream.getS3AStreamStatistics().getBytesRead();
157+
Assertions.assertThat(streamBytesRead).as("Stream statistics should track bytes read")
158+
.isEqualTo(bytesRead);
159+
137160
}
138161

139162
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
163+
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 1);
164+
// S3A passes in the meta data on file open, we expect AAL to make no HEAD requests
165+
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0);
140166
}
141167

142168
/**
@@ -166,15 +192,20 @@ public void testMultiRowGroupParquet() throws Throwable {
166192
}
167193

168194
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
169-
195+
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 1);
170196
try (FSDataInputStream inputStream = getFileSystem().openFile(dest)
171197
.must(FS_OPTION_OPENFILE_READ_POLICY, FS_OPTION_OPENFILE_READ_POLICY_PARQUET)
172198
.build().get()) {
173199
ioStats = inputStream.getIOStatistics();
174200
inputStream.readFully(buffer, 0, (int) fileStatus.getLen());
175-
}
176201

202+
verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES, (int) fileStatus.getLen());
203+
verifyStatisticCounterValue(ioStats, STREAM_READ_OPERATIONS, 1);
204+
}
177205
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
206+
// S3A passes in the meta-data(content length) on file open,
207+
// we expect AAL to make no HEAD requests
208+
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0);
178209
}
179210

180211
@Test
@@ -194,4 +225,97 @@ public void testInvalidConfigurationThrows() throws Exception {
194225
() -> S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration));
195226
}
196227

228+
/**
229+
* TXT files(SEQUENTIAL format) use SequentialPrefetcher(requests the entire 10MB file).
230+
* RangeOptimiser splits ranges larger than maxRangeSizeBytes (8MB) using partSizeBytes (8MB)
231+
* The 10MB range gets split into: [0-8MB) and [8MB-10MB)
232+
* Each split range becomes a separate Block, resulting in 2 GET requests:
233+
*/
234+
@Test
235+
public void testLargeFileMultipleGets() throws Throwable {
236+
describe("Large file should trigger multiple GET requests");
237+
238+
Path dest = path("large-test-file.txt");
239+
byte[] data = dataset(10 * S_1M, 256, 255);
240+
writeDataset(getFileSystem(), dest, data, 10 * S_1M, 1024, true);
241+
242+
byte[] buffer = new byte[S_1M * 10];
243+
try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
244+
IOStatistics ioStats = inputStream.getIOStatistics();
245+
inputStream.readFully(buffer);
246+
247+
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 2);
248+
// Because S3A passes in the meta-data(content length) on file open,
249+
// we expect AAL to make no HEAD requests
250+
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0);
251+
}
252+
}
253+
254+
@Test
255+
public void testSmallFileSingleGet() throws Throwable {
256+
describe("Small file should trigger only one GET request");
257+
258+
Path dest = path("small-test-file.txt");
259+
byte[] data = dataset(S_1M, 256, 255);
260+
writeDataset(getFileSystem(), dest, data, S_1M, 1024, true);
261+
262+
byte[] buffer = new byte[S_1M];
263+
try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
264+
IOStatistics ioStats = inputStream.getIOStatistics();
265+
inputStream.readFully(buffer);
266+
267+
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 1);
268+
// Because S3A passes in the meta-data(content length) on file open,
269+
// we expect AAL to make no HEAD requests
270+
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0);
271+
}
272+
}
273+
274+
275+
@Test
276+
public void testRandomSeekPatternGets() throws Throwable {
277+
describe("Random seek pattern should optimize GET requests");
278+
279+
Path dest = path("seek-test.txt");
280+
byte[] data = dataset(5 * S_1M, 256, 255);
281+
writeDataset(getFileSystem(), dest, data, 5 * S_1M, 1024, true);
282+
283+
byte[] buffer = new byte[S_1M];
284+
try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
285+
IOStatistics ioStats = inputStream.getIOStatistics();
286+
287+
inputStream.read(buffer);
288+
inputStream.seek(2 * S_1M);
289+
inputStream.read(new byte[512 * S_1K]);
290+
inputStream.seek(3 * S_1M);
291+
inputStream.read(new byte[512 * S_1K]);
292+
293+
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 1);
294+
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0);
295+
}
296+
}
297+
298+
299+
@Test
300+
public void testSequentialStreamsNoDuplicateGets() throws Throwable {
301+
describe("Sequential streams reading same object should not duplicate GETs");
302+
303+
Path dest = path("sequential-test.txt");
304+
byte[] data = dataset(S_1M, 256, 255);
305+
writeDataset(getFileSystem(), dest, data, S_1M, 1024, true);
306+
307+
byte[] buffer = new byte[1024];
308+
try (FSDataInputStream stream1 = getFileSystem().open(dest);
309+
FSDataInputStream stream2 = getFileSystem().open(dest)) {
310+
311+
stream1.read(buffer);
312+
stream2.read(buffer);
313+
314+
IOStatistics stats1 = stream1.getIOStatistics();
315+
IOStatistics stats2 = stream2.getIOStatistics();
316+
317+
verifyStatisticCounterValue(stats1, STREAM_READ_ANALYTICS_GET_REQUESTS, 1);
318+
verifyStatisticCounterValue(stats2, STREAM_READ_ANALYTICS_GET_REQUESTS, 0);
319+
}
320+
}
197321
}

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
4646
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
4747
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disablePrefetching;
48-
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
4948
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
5049
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
5150
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES;
@@ -81,10 +80,7 @@ protected Configuration createConfiguration() {
8180
public void setup() throws Exception {
8281
super.setup();
8382
executor = HadoopExecutors.newFixedThreadPool(SMALL_THREADS);
84-
// Analytics accelerator currently does not support IOStatisticsContext, this will be added as
85-
// part of https://issues.apache.org/jira/browse/HADOOP-19364
86-
skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
87-
"Analytics Accelerator currently does not support IOStatisticsContext");
83+
8884

8985
}
9086

0 commit comments

Comments
 (0)