Skip to content

Commit 40158bb

Browse files
Dilli-Babu-Godarigodaridillibabu
authored andcommitted
Java Scan operator raw input reported wrong
1 parent 4be75f0 commit 40158bb

File tree

12 files changed

+174
-16
lines changed

12 files changed

+174
-16
lines changed

presto-hive/src/main/java/com/facebook/presto/hive/orc/AggregatedOrcPageSource.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public class AggregatedOrcPageSource
5353
private boolean completed;
5454
private long readTimeNanos;
5555
private long completedBytes;
56+
private long decompressedBytes;
5657

5758
public AggregatedOrcPageSource(List<HiveColumnHandle> columnHandles, Footer footer, TypeManager typeManager, StandardFunctionResolution functionResolution)
5859
{
@@ -74,6 +75,18 @@ public long getCompletedPositions()
7475
return 0;
7576
}
7677

78+
@Override
79+
public long getDecompressedBytes()
80+
{
81+
return decompressedBytes;
82+
}
83+
84+
@Override
85+
public long getDecompressedPositions()
86+
{
87+
return 0;
88+
}
89+
7790
@Override
7891
public long getReadTimeNanos()
7992
{
@@ -126,7 +139,10 @@ else if (functionResolution.isMinFunction(functionHandle)) {
126139

127140
completed = true;
128141
readTimeNanos += System.nanoTime() - start;
129-
return new Page(batchSize, blocks);
142+
Page page = new Page(batchSize, blocks);
143+
// Track decompressed bytes for aggregated page source
144+
decompressedBytes += page.getSizeInBytes();
145+
return page;
130146
}
131147

132148
private void writeMinMax(int columnIndex, Type type, HiveType hiveType, BlockBuilder blockBuilder, boolean isMin)

presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcBatchPageSource.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ public class OrcBatchPageSource
6464

6565
private int batchId;
6666
private long completedPositions;
67+
private long decompressedBytes;
6768
private boolean closed;
6869

6970
private final OrcAggregatedMemoryContext systemMemoryContext;
@@ -157,6 +158,18 @@ public long getCompletedPositions()
157158
return completedPositions;
158159
}
159160

161+
@Override
162+
public long getDecompressedBytes()
163+
{
164+
return decompressedBytes;
165+
}
166+
167+
@Override
168+
public long getDecompressedPositions()
169+
{
170+
return completedPositions;
171+
}
172+
160173
@Override
161174
public long getReadTimeNanos()
162175
{
@@ -199,7 +212,10 @@ else if (constantBlocks[fieldId] != null) {
199212
blocks[fieldId] = new LazyBlock(batchSize, new OrcBlockLoader(hiveColumnIndexes[fieldId]));
200213
}
201214
}
202-
return new Page(batchSize, blocks);
215+
Page page = new Page(batchSize, blocks);
216+
// Track decompressed bytes - this represents data after decompression but before filtering
217+
decompressedBytes += page.getSizeInBytes();
218+
return page;
203219
}
204220
catch (PrestoException e) {
205221
closeWithSuppression(e);

presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcSelectivePageSource.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ public class OrcSelectivePageSource
5151
private final RowIDCoercer coercer;
5252
private final boolean supplyRowIDs;
5353
private final OptionalInt rowIDColumnIndex;
54+
private long decompressedBytes;
5455
private boolean closed;
5556

5657
OrcSelectivePageSource(
@@ -95,6 +96,18 @@ public long getCompletedPositions()
9596
return recordReader.getReadPositions();
9697
}
9798

99+
@Override
100+
public long getDecompressedBytes()
101+
{
102+
return decompressedBytes;
103+
}
104+
105+
@Override
106+
public long getDecompressedPositions()
107+
{
108+
return recordReader.getReadPositions();
109+
}
110+
98111
@Override
99112
public long getReadTimeNanos()
100113
{
@@ -123,6 +136,10 @@ public Page getNextPage()
123136
if (page == null) {
124137
close();
125138
}
139+
else {
140+
// Track decompressed bytes - this represents data after decompression but before filtering
141+
decompressedBytes += page.getSizeInBytes();
142+
}
126143
return page;
127144
}
128145
catch (InvalidFunctionArgumentException e) {

presto-hive/src/main/java/com/facebook/presto/hive/pagefile/PageFilePageSource.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ public class PageFilePageSource
4242
private long completedBytes;
4343
private long readTimeNanos;
4444
private long memoryUsageBytes;
45+
private long decompressedBytes;
46+
private long decompressedPositions;
4547

4648
public PageFilePageSource(
4749
FSDataInputStream inputStream,
@@ -127,7 +129,10 @@ public Page getNextPage()
127129
long pageSizeInBytes = page.getSizeInBytes();
128130
completedBytes += pageSizeInBytes;
129131
memoryUsageBytes = Math.max(memoryUsageBytes, pageSizeInBytes);
130-
return new Page(page.getPositionCount(), blocks);
132+
Page resultPage = new Page(page.getPositionCount(), blocks);
133+
decompressedBytes += resultPage.getSizeInBytes();
134+
decompressedPositions += resultPage.getPositionCount();
135+
return resultPage;
131136
}
132137

133138
@Override
@@ -136,6 +141,18 @@ public long getSystemMemoryUsage()
136141
return memoryUsageBytes;
137142
}
138143

144+
@Override
145+
public long getDecompressedBytes()
146+
{
147+
return decompressedBytes;
148+
}
149+
150+
@Override
151+
public long getDecompressedPositions()
152+
{
153+
return decompressedPositions;
154+
}
155+
139156
@Override
140157
public void close()
141158
throws IOException

presto-hive/src/main/java/com/facebook/presto/hive/parquet/AggregatedParquetPageSource.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ public class AggregatedParquetPageSource
6060
private boolean completed;
6161
private long readTimeNanos;
6262
private long completedBytes;
63+
private long decompressedBytes;
64+
private long decompressedPositions;
6365

6466
public AggregatedParquetPageSource(List<HiveColumnHandle> columnHandles, ParquetMetadata parquetMetadata, TypeManager typeManager, StandardFunctionResolution functionResolution)
6567
{
@@ -131,7 +133,10 @@ else if (functionResolution.isMinFunction(functionHandle)) {
131133

132134
completed = true;
133135
readTimeNanos += System.nanoTime() - start;
134-
return new Page(batchSize, blocks);
136+
Page page = new Page(batchSize, blocks);
137+
decompressedBytes += page.getSizeInBytes();
138+
decompressedPositions += page.getPositionCount();
139+
return page;
135140
}
136141

137142
private long getRowCountFromParquetMetadata(ParquetMetadata parquetMetadata)
@@ -245,6 +250,18 @@ public long getSystemMemoryUsage()
245250
return 0;
246251
}
247252

253+
@Override
254+
public long getDecompressedBytes()
255+
{
256+
return decompressedBytes;
257+
}
258+
259+
@Override
260+
public long getDecompressedPositions()
261+
{
262+
return decompressedPositions;
263+
}
264+
248265
@Override
249266
public void close()
250267
throws IOException

presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSource.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public class ParquetPageSource
5858

5959
private int batchId;
6060
private long completedPositions;
61+
private long decompressedBytes;
6162
private boolean closed;
6263

6364
private final RuntimeStats runtimeStats;
@@ -116,6 +117,18 @@ public long getCompletedPositions()
116117
return completedPositions;
117118
}
118119

120+
@Override
121+
public long getDecompressedBytes()
122+
{
123+
return decompressedBytes;
124+
}
125+
126+
@Override
127+
public long getDecompressedPositions()
128+
{
129+
return completedPositions;
130+
}
131+
119132
@Override
120133
public long getReadTimeNanos()
121134
{
@@ -163,7 +176,10 @@ public Page getNextPage()
163176
}
164177
}
165178
}
166-
return new Page(batchSize, blocks);
179+
Page page = new Page(batchSize, blocks);
180+
// Track decompressed bytes - represents data after decompression but before filtering
181+
decompressedBytes += page.getSizeInBytes();
182+
return page;
167183
}
168184
catch (PrestoException e) {
169185
closeWithSuppression(e);

presto-hive/src/main/java/com/facebook/presto/hive/rcfile/RcFilePageSource.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ public class RcFilePageSource
5656

5757
private int pageId;
5858
private long completedPositions;
59+
private long decompressedBytes;
60+
private long decompressedPositions;
5961

6062
private boolean closed;
6163

@@ -149,7 +151,10 @@ public Page getNextPage()
149151
}
150152
}
151153

152-
return new Page(currentPageSize, blocks);
154+
Page page = new Page(currentPageSize, blocks);
155+
decompressedBytes += page.getSizeInBytes();
156+
decompressedPositions += page.getPositionCount();
157+
return page;
153158
}
154159
catch (PrestoException e) {
155160
closeWithSuppression(e);
@@ -193,6 +198,18 @@ public long getSystemMemoryUsage()
193198
return GUESSED_MEMORY_USAGE;
194199
}
195200

201+
@Override
202+
public long getDecompressedBytes()
203+
{
204+
return decompressedBytes;
205+
}
206+
207+
@Override
208+
public long getDecompressedPositions()
209+
{
210+
return decompressedPositions;
211+
}
212+
196213
private void closeWithSuppression(Throwable throwable)
197214
{
198215
requireNonNull(throwable, "throwable is null");

presto-main-base/src/main/java/com/facebook/presto/operator/PageSourceOperator.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ public class PageSourceOperator
3131
private final ConnectorPageSource pageSource;
3232
private final OperatorContext operatorContext;
3333
private long completedBytes;
34+
private long decompressedBytes;
3435
private long readTimeNanos;
3536

3637
public PageSourceOperator(ConnectorPageSource pageSource, OperatorContext operatorContext)
@@ -89,12 +90,16 @@ public Page getOutput()
8990
return null;
9091
}
9192

92-
// update operator stats
93+
// update operator stats with three-tier metrics
9394
long endCompletedBytes = pageSource.getCompletedBytes();
95+
long endDecompressedBytes = pageSource.getDecompressedBytes();
9496
long endReadTimeNanos = pageSource.getReadTimeNanos();
97+
// Raw input: compressed bytes from storage
9598
operatorContext.recordRawInputWithTiming(endCompletedBytes - completedBytes, page.getPositionCount(), endReadTimeNanos - readTimeNanos);
96-
operatorContext.recordProcessedInput(page.getSizeInBytes(), page.getPositionCount());
99+
// Processed input: decompressed bytes (middle tier - after decompression, before filtering)
100+
operatorContext.recordProcessedInput(endDecompressedBytes - decompressedBytes, page.getPositionCount());
97101
completedBytes = endCompletedBytes;
102+
decompressedBytes = endDecompressedBytes;
98103
readTimeNanos = endReadTimeNanos;
99104

100105
// assure the page is in memory before handing to another operator

presto-main-base/src/main/java/com/facebook/presto/operator/ScanFilterAndProjectOperator.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,6 @@ private void recordInputStats()
361361
long inputBytes = endCompletedBytes - completedBytes;
362362
long inputBytesReadTime = endReadTimeNanos - readTimeNanos;
363363
long positionCount = endCompletedPositions - completedPositions;
364-
operatorContext.recordProcessedInput(inputBytes, positionCount);
365364
operatorContext.recordRawInputWithTiming(inputBytes, positionCount, inputBytesReadTime);
366365
RuntimeStats runtimeStats = pageSource.getRuntimeStats();
367366
if (runtimeStats != null) {
@@ -392,6 +391,14 @@ private Page recordProcessedInput(Page page)
392391
}
393392
}
394393

394+
// Record processed input: actual block sizes after filtering/projection
395+
// If blocks is null, no lazy blocks were found, so use the original page size
396+
long processedBytes = (blocks == null) ? page.getSizeInBytes() : blockSizeSum;
397+
operatorContext.recordProcessedInput(processedBytes, page.getPositionCount());
398+
399+
// Record raw input: bytes read from storage before filtering (connector metrics)
400+
recordInputStats();
401+
395402
return (blocks == null) ? page : new Page(page.getPositionCount(), blocks);
396403
}
397404

presto-main-base/src/main/java/com/facebook/presto/operator/TableScanOperator.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ public void noMoreOperators()
116116

117117
private long completedBytes;
118118
private long completedPositions;
119+
private long decompressedBytes;
119120
private long readTimeNanos;
120121

121122
public TableScanOperator(
@@ -269,7 +270,7 @@ public Page getOutput()
269270
page = page.getLoadedPage();
270271
}
271272

272-
// update operator stats
273+
// Record raw input (compressed bytes from storage) and processed input (decompressed bytes)
273274
recordInputStats();
274275

275276
// updating system memory usage should happen after page is loaded.
@@ -281,15 +282,19 @@ public Page getOutput()
281282
private void recordInputStats()
282283
{
283284
checkState(source != null, "source must not be null");
284-
// update operator stats
285+
// update operator stats with three-tier metrics
285286
long endCompletedBytes = source.getCompletedBytes();
286287
long endCompletedPositions = source.getCompletedPositions();
288+
long endDecompressedBytes = source.getDecompressedBytes();
287289
long endReadTimeNanos = source.getReadTimeNanos();
288290
long inputBytes = endCompletedBytes - completedBytes;
289291
long inputBytesReadTime = endReadTimeNanos - readTimeNanos;
290292
long positionCount = endCompletedPositions - completedPositions;
291-
operatorContext.recordProcessedInput(inputBytes, positionCount);
293+
long decompressedInputBytes = endDecompressedBytes - decompressedBytes;
294+
// Raw input: compressed bytes from storage
292295
operatorContext.recordRawInputWithTiming(inputBytes, positionCount, inputBytesReadTime);
296+
// Processed input: decompressed bytes (middle tier - after decompression, before filtering)
297+
operatorContext.recordProcessedInput(decompressedInputBytes, positionCount);
293298
RuntimeStats runtimeStats = source.getRuntimeStats();
294299
if (runtimeStats != null) {
295300
runtimeStats.addMetricValueIgnoreZero(STORAGE_READ_TIME_NANOS, NANO, inputBytesReadTime);
@@ -298,6 +303,7 @@ private void recordInputStats()
298303
}
299304
completedBytes = endCompletedBytes;
300305
completedPositions = endCompletedPositions;
306+
decompressedBytes = endDecompressedBytes;
301307
readTimeNanos = endReadTimeNanos;
302308
}
303309
}

0 commit comments

Comments
 (0)