Skip to content

Commit 9e7bc0b

Browse files
committed
IoStats changes
1 parent d092171 commit 9e7bc0b

File tree

12 files changed

+250
-50
lines changed

12 files changed

+250
-50
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -459,8 +459,8 @@ public enum Statistic {
459459
"Gauge of active memory in use",
460460
TYPE_GAUGE),
461461

462-
/* Stream Write statistics */
463462

463+
/* Stream Write statistics */
464464
STREAM_WRITE_EXCEPTIONS(
465465
StreamStatisticNames.STREAM_WRITE_EXCEPTIONS,
466466
"Count of stream write failures reported",
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.s3a.impl.streams;
20+
21+
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
22+
import org.apache.hadoop.fs.statistics.DurationTracker;
23+
import software.amazon.s3.analyticsaccelerator.util.RequestCallback;
24+
25+
/**
26+
* Implementation of AAL's RequestCallback interface that tracks analytics operations.
27+
*/
28+
public class AnalyticsRequestCallback implements RequestCallback {
29+
private final S3AInputStreamStatistics statistics;
30+
31+
/**
32+
* Create a new callback instance.
33+
* @param statistics the statistics to update
34+
*/
35+
public AnalyticsRequestCallback(S3AInputStreamStatistics statistics) {
36+
this.statistics = statistics;
37+
}
38+
39+
@Override
40+
public void onGetRequest() {
41+
statistics.initiateGetRequest();
42+
}
43+
44+
@Override
45+
public void onHeadRequest() {
46+
statistics.incrementAnalyticsHeadRequests();
47+
}
48+
}
49+

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import software.amazon.s3.analyticsaccelerator.util.InputPolicy;
4141
import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation;
4242
import software.amazon.s3.analyticsaccelerator.util.S3URI;
43+
import software.amazon.s3.analyticsaccelerator.util.RequestCallback;
4344

4445
import org.slf4j.Logger;
4546
import org.slf4j.LoggerFactory;
@@ -72,6 +73,7 @@ public AnalyticsStream(final ObjectReadParameters parameters,
7273
final S3SeekableInputStreamFactory s3SeekableInputStreamFactory) throws IOException {
7374
super(InputStreamType.Analytics, parameters);
7475
S3ObjectAttributes s3Attributes = parameters.getObjectAttributes();
76+
7577
this.inputStream = s3SeekableInputStreamFactory.createStream(S3URI.of(s3Attributes.getBucket(),
7678
s3Attributes.getKey()), buildOpenStreamInformation(parameters));
7779
getS3AStreamStatistics().streamOpened(InputStreamType.Analytics);
@@ -80,13 +82,21 @@ public AnalyticsStream(final ObjectReadParameters parameters,
8082
@Override
8183
public int read() throws IOException {
8284
throwIfClosed();
85+
86+
getS3AStreamStatistics().readOperationStarted(getPos(), 1);
87+
8388
int bytesRead;
8489
try {
8590
bytesRead = inputStream.read();
8691
} catch (IOException ioe) {
8792
onReadFailure(ioe);
8893
throw ioe;
8994
}
95+
96+
if (bytesRead != -1) {
97+
incrementBytesRead(1);
98+
}
99+
90100
return bytesRead;
91101
}
92102

@@ -122,26 +132,42 @@ public synchronized long getPos() {
122132
*/
123133
public int readTail(byte[] buf, int off, int len) throws IOException {
124134
throwIfClosed();
135+
getS3AStreamStatistics().readOperationStarted(getPos(), len);
136+
125137
int bytesRead;
126138
try {
127139
bytesRead = inputStream.readTail(buf, off, len);
128140
} catch (IOException ioe) {
129141
onReadFailure(ioe);
130142
throw ioe;
131143
}
144+
145+
if (bytesRead > 0) {
146+
incrementBytesRead(bytesRead);
147+
}
148+
132149
return bytesRead;
133150
}
134151

135152
@Override
136153
public int read(byte[] buf, int off, int len) throws IOException {
137154
throwIfClosed();
155+
long pos = getPos();
156+
157+
getS3AStreamStatistics().readOperationStarted(pos, len);
158+
138159
int bytesRead;
139160
try {
140161
bytesRead = inputStream.read(buf, off, len);
141162
} catch (IOException ioe) {
142163
onReadFailure(ioe);
143164
throw ioe;
144165
}
166+
167+
if (bytesRead > 0) {
168+
incrementBytesRead(bytesRead);
169+
}
170+
145171
return bytesRead;
146172
}
147173

@@ -247,10 +273,13 @@ private void onReadFailure(IOException ioe) throws IOException {
247273
}
248274

249275
private OpenStreamInformation buildOpenStreamInformation(ObjectReadParameters parameters) {
276+
277+
final RequestCallback requestCallback = new AnalyticsRequestCallback(getS3AStreamStatistics());
278+
250279
OpenStreamInformation.OpenStreamInformationBuilder openStreamInformationBuilder =
251280
OpenStreamInformation.builder()
252281
.inputPolicy(mapS3AInputPolicyToAAL(parameters.getContext()
253-
.getInputPolicy()));
282+
.getInputPolicy())).requestCallback(requestCallback);
254283

255284
if (parameters.getObjectAttributes().getETag() != null) {
256285
openStreamInformationBuilder.objectMetadata(ObjectMetadata.builder()
@@ -300,4 +329,16 @@ protected void throwIfClosed() throws IOException {
300329
throw new IOException(getKey() + ": " + FSExceptionMessages.STREAM_IS_CLOSED);
301330
}
302331
}
332+
333+
/**
334+
* Increment the bytes read counter if there is a stats instance
335+
* and the number of bytes read is more than zero.
336+
* @param bytesRead number of bytes read
337+
*/
338+
private void incrementBytesRead(long bytesRead) {
339+
getS3AStreamStatistics().bytesRead(bytesRead);
340+
if (getContext().getStats() != null && bytesRead > 0) {
341+
getContext().getStats().incrementBytesRead(bytesRead);
342+
}
343+
}
303344
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,12 @@ public void seekForwards(final long skipped,
159159
final long bytesReadInSeek) {
160160

161161
}
162-
162+
@Override
163+
public void incrementAnalyticsGetRequests() {
164+
}
165+
@Override
166+
public void incrementAnalyticsHeadRequests() {
167+
}
163168
@Override
164169
public long streamOpened() {
165170
return 0;

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java

Lines changed: 125 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,19 @@
4343
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_PARQUET;
4444
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
4545
import static org.apache.hadoop.fs.audit.AuditStatisticNames.AUDIT_REQUEST_EXECUTION;
46+
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
47+
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
4648
import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
4749
import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
4850
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
4951
import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData;
5052
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
53+
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES;
54+
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_OPERATIONS;
5155
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED;
5256
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.ANALYTICS_STREAM_FACTORY_CLOSED;
57+
import static org.apache.hadoop.io.Sizes.S_1K;
58+
import static org.apache.hadoop.io.Sizes.S_1M;
5359
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
5460

5561
/**
@@ -105,6 +111,13 @@ public void testConnectorFrameWorkIntegration() throws Throwable {
105111
Assertions.assertThat(objectInputStream.streamType()).isEqualTo(InputStreamType.Analytics);
106112
Assertions.assertThat(objectInputStream.getInputPolicy())
107113
.isEqualTo(S3AInputPolicy.Sequential);
114+
115+
verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES, 500);
116+
verifyStatisticCounterValue(ioStats, STREAM_READ_OPERATIONS, 1);
117+
118+
long streamBytesRead = objectInputStream.getS3AStreamStatistics().getBytesRead();
119+
Assertions.assertThat(streamBytesRead).as("Stream statistics should track bytes read")
120+
.isEqualTo(500);
108121
}
109122

110123
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
@@ -136,14 +149,24 @@ public void testMalformedParquetFooter() throws IOException {
136149

137150
byte[] buffer = new byte[500];
138151
IOStatistics ioStats;
152+
int bytesRead;
139153

140154
try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
141155
ioStats = inputStream.getIOStatistics();
142156
inputStream.seek(5);
143-
inputStream.read(buffer, 0, 500);
157+
bytesRead = inputStream.read(buffer, 0, 500);
158+
159+
ObjectInputStream objectInputStream = (ObjectInputStream) inputStream.getWrappedStream();
160+
long streamBytesRead = objectInputStream.getS3AStreamStatistics().getBytesRead();
161+
Assertions.assertThat(streamBytesRead).as("Stream statistics should track bytes read")
162+
.isEqualTo(bytesRead);
163+
144164
}
145165

146166
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
167+
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 1);
168+
// S3A passes in the meta data on file open, we expect AAL to make no HEAD requests
169+
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0);
147170
}
148171

149172
/**
@@ -173,17 +196,23 @@ public void testMultiRowGroupParquet() throws Throwable {
173196
}
174197

175198
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
176-
199+
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 1);
177200
try (FSDataInputStream inputStream = getFileSystem().openFile(dest)
178201
.must(FS_OPTION_OPENFILE_READ_POLICY, FS_OPTION_OPENFILE_READ_POLICY_PARQUET)
179202
.build().get()) {
180203
ioStats = inputStream.getIOStatistics();
181204
inputStream.readFully(buffer, 0, (int) fileStatus.getLen());
182-
}
183205

206+
verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES, (int) fileStatus.getLen());
207+
verifyStatisticCounterValue(ioStats, STREAM_READ_OPERATIONS, 1);
208+
}
184209
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
185210

186211
verifyStatisticCounterValue(getFileSystem().getIOStatistics(), AUDIT_REQUEST_EXECUTION, 4);
212+
213+
// S3A passes in the meta-data(content length) on file open,
214+
// we expect AAL to make no HEAD requests
215+
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0);
187216
}
188217

189218
@Test
@@ -203,4 +232,97 @@ public void testInvalidConfigurationThrows() throws Exception {
203232
() -> S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration));
204233
}
205234

235+
/**
236+
* TXT files(SEQUENTIAL format) use SequentialPrefetcher(requests the entire 10MB file).
237+
* RangeOptimiser splits ranges larger than maxRangeSizeBytes (8MB) using partSizeBytes (8MB)
238+
* The 10MB range gets split into: [0-8MB) and [8MB-10MB)
239+
* Each split range becomes a separate Block, resulting in 2 GET requests:
240+
*/
241+
@Test
242+
public void testLargeFileMultipleGets() throws Throwable {
243+
describe("Large file should trigger multiple GET requests");
244+
245+
Path dest = path("large-test-file.txt");
246+
byte[] data = dataset(10 * S_1M, 256, 255);
247+
writeDataset(getFileSystem(), dest, data, 10 * S_1M, 1024, true);
248+
249+
byte[] buffer = new byte[S_1M * 10];
250+
try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
251+
IOStatistics ioStats = inputStream.getIOStatistics();
252+
inputStream.readFully(buffer);
253+
254+
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 2);
255+
// Because S3A passes in the meta-data(content length) on file open,
256+
// we expect AAL to make no HEAD requests
257+
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0);
258+
}
259+
}
260+
261+
@Test
262+
public void testSmallFileSingleGet() throws Throwable {
263+
describe("Small file should trigger only one GET request");
264+
265+
Path dest = path("small-test-file.txt");
266+
byte[] data = dataset(S_1M, 256, 255);
267+
writeDataset(getFileSystem(), dest, data, S_1M, 1024, true);
268+
269+
byte[] buffer = new byte[S_1M];
270+
try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
271+
IOStatistics ioStats = inputStream.getIOStatistics();
272+
inputStream.readFully(buffer);
273+
274+
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 1);
275+
// Because S3A passes in the meta-data(content length) on file open,
276+
// we expect AAL to make no HEAD requests
277+
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0);
278+
}
279+
}
280+
281+
282+
@Test
283+
public void testRandomSeekPatternGets() throws Throwable {
284+
describe("Random seek pattern should optimize GET requests");
285+
286+
Path dest = path("seek-test.txt");
287+
byte[] data = dataset(5 * S_1M, 256, 255);
288+
writeDataset(getFileSystem(), dest, data, 5 * S_1M, 1024, true);
289+
290+
byte[] buffer = new byte[S_1M];
291+
try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
292+
IOStatistics ioStats = inputStream.getIOStatistics();
293+
294+
inputStream.read(buffer);
295+
inputStream.seek(2 * S_1M);
296+
inputStream.read(new byte[512 * S_1K]);
297+
inputStream.seek(3 * S_1M);
298+
inputStream.read(new byte[512 * S_1K]);
299+
300+
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 1);
301+
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0);
302+
}
303+
}
304+
305+
306+
@Test
307+
public void testSequentialStreamsNoDuplicateGets() throws Throwable {
308+
describe("Sequential streams reading same object should not duplicate GETs");
309+
310+
Path dest = path("sequential-test.txt");
311+
byte[] data = dataset(S_1M, 256, 255);
312+
writeDataset(getFileSystem(), dest, data, S_1M, 1024, true);
313+
314+
byte[] buffer = new byte[1024];
315+
try (FSDataInputStream stream1 = getFileSystem().open(dest);
316+
FSDataInputStream stream2 = getFileSystem().open(dest)) {
317+
318+
stream1.read(buffer);
319+
stream2.read(buffer);
320+
321+
IOStatistics stats1 = stream1.getIOStatistics();
322+
IOStatistics stats2 = stream2.getIOStatistics();
323+
324+
verifyStatisticCounterValue(stats1, STREAM_READ_ANALYTICS_GET_REQUESTS, 1);
325+
verifyStatisticCounterValue(stats2, STREAM_READ_ANALYTICS_GET_REQUESTS, 0);
326+
}
327+
}
206328
}

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
4646
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
4747
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disablePrefetching;
48-
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
4948
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
5049
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
5150
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES;
@@ -81,10 +80,7 @@ protected Configuration createConfiguration() {
8180
public void setup() throws Exception {
8281
super.setup();
8382
executor = HadoopExecutors.newFixedThreadPool(SMALL_THREADS);
84-
// Analytics accelerator currently does not support IOStatisticsContext, this will be added as
85-
// part of https://issues.apache.org/jira/browse/HADOOP-19364
86-
skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
87-
"Analytics Accelerator currently does not support IOStatisticsContext");
83+
8884

8985
}
9086

0 commit comments

Comments
 (0)