diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/orc/AggregatedOrcPageSource.java b/presto-hive/src/main/java/com/facebook/presto/hive/orc/AggregatedOrcPageSource.java index 64665bbbb0926..d8da43fe5d8af 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/orc/AggregatedOrcPageSource.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/orc/AggregatedOrcPageSource.java @@ -74,6 +74,19 @@ public long getCompletedPositions() return 0; } + @Override + public long getDecompressedBytes() + { + // For aggregated page source, there's no decompression - just return completed bytes + return completedBytes; + } + + @Override + public long getDecompressedPositions() + { + return 0; + } + @Override public long getReadTimeNanos() { diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcBatchPageSource.java b/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcBatchPageSource.java index 7fd753773077d..1dbea52eaa853 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcBatchPageSource.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcBatchPageSource.java @@ -157,6 +157,18 @@ public long getCompletedPositions() return completedPositions; } + @Override + public long getDecompressedBytes() + { + return orcDataSource.getReadBytes(); + } + + @Override + public long getDecompressedPositions() + { + return completedPositions; + } + @Override public long getReadTimeNanos() { @@ -199,7 +211,8 @@ else if (constantBlocks[fieldId] != null) { blocks[fieldId] = new LazyBlock(batchSize, new OrcBlockLoader(hiveColumnIndexes[fieldId])); } } - return new Page(batchSize, blocks); + Page page = new Page(batchSize, blocks); + return page; } catch (PrestoException e) { closeWithSuppression(e); diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcSelectivePageSource.java b/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcSelectivePageSource.java index 30d7ba6602d91..faf9d3a0de070 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcSelectivePageSource.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcSelectivePageSource.java @@ -95,6 +95,18 @@ public long getCompletedPositions() return recordReader.getReadPositions(); } + @Override + public long getDecompressedBytes() + { + return orcDataSource.getReadBytes(); + } + + @Override + public long getDecompressedPositions() + { + return recordReader.getReadPositions(); + } + @Override public long getReadTimeNanos() { diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/pagefile/PageFilePageSource.java b/presto-hive/src/main/java/com/facebook/presto/hive/pagefile/PageFilePageSource.java index 58eafe4dde18c..e81d1addf8bff 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/pagefile/PageFilePageSource.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/pagefile/PageFilePageSource.java @@ -136,6 +136,21 @@ public long getSystemMemoryUsage() return memoryUsageBytes; } + @Override + public long getDecompressedBytes() + { + // PageFile format stores pages in a serialized format. + // The completedBytes already represents the decompressed data size + // since we're reading from the deserialized pages. + return completedBytes; + } + + @Override + public long getDecompressedPositions() + { + return completedPositions; + } + @Override public void close() throws IOException diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/AggregatedParquetPageSource.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/AggregatedParquetPageSource.java index 70a3192190d05..b3dc176832ded 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/AggregatedParquetPageSource.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/AggregatedParquetPageSource.java @@ -60,6 +60,7 @@ public class AggregatedParquetPageSource private boolean completed; private long readTimeNanos; private long completedBytes; + private long decompressedPositions; public AggregatedParquetPageSource(List columnHandles, ParquetMetadata parquetMetadata, TypeManager typeManager, StandardFunctionResolution functionResolution) { @@ -245,6 +246,19 @@ public long getSystemMemoryUsage() return 0; } + @Override + public long getDecompressedBytes() + { + // For aggregated page source, there's no decompression - just return completed bytes + return completedBytes; + } + + @Override + public long getDecompressedPositions() + { + return decompressedPositions; + } + @Override public void close() throws IOException diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSource.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSource.java index b5389b7990aff..2973edeee5acb 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSource.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSource.java @@ -116,6 +116,18 @@ public long getCompletedPositions() return completedPositions; } + @Override + public long getDecompressedBytes() + { + return parquetReader.getDataSource().getReadBytes(); + } + + @Override + public long getDecompressedPositions() + { + return completedPositions; + } + @Override public long getReadTimeNanos() { diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/rcfile/RcFilePageSource.java b/presto-hive/src/main/java/com/facebook/presto/hive/rcfile/RcFilePageSource.java index ae0899dea1f61..e7dd562bc7e3e 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/rcfile/RcFilePageSource.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/rcfile/RcFilePageSource.java @@ -56,6 +56,7 @@ public class RcFilePageSource private int pageId; private long completedPositions; + private long decompressedPositions; private boolean closed; @@ -193,6 +194,18 @@ public long getSystemMemoryUsage() return GUESSED_MEMORY_USAGE; } + @Override + public long getDecompressedBytes() + { + return rcFileReader.getBytesRead(); + } + + @Override + public long getDecompressedPositions() + { + return decompressedPositions; + } + private void closeWithSuppression(Throwable throwable) { requireNonNull(throwable, "throwable is null"); diff --git a/presto-main-base/src/main/java/com/facebook/presto/operator/PageSourceOperator.java b/presto-main-base/src/main/java/com/facebook/presto/operator/PageSourceOperator.java index c07bad8fcf196..d16b3f4a5fadb 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/operator/PageSourceOperator.java +++ b/presto-main-base/src/main/java/com/facebook/presto/operator/PageSourceOperator.java @@ -31,6 +31,7 @@ public class PageSourceOperator private final ConnectorPageSource pageSource; private final OperatorContext operatorContext; private long completedBytes; + private long decompressedBytes; private long readTimeNanos; public PageSourceOperator(ConnectorPageSource pageSource, OperatorContext operatorContext) @@ -89,12 +90,16 @@ public Page getOutput() return null; } - // update operator stats + // update operator stats with three-tier metrics long endCompletedBytes = pageSource.getCompletedBytes(); + long endDecompressedBytes = pageSource.getDecompressedBytes(); long endReadTimeNanos = pageSource.getReadTimeNanos(); + // Raw input: compressed bytes from storage operatorContext.recordRawInputWithTiming(endCompletedBytes - completedBytes, page.getPositionCount(), endReadTimeNanos - readTimeNanos); - operatorContext.recordProcessedInput(page.getSizeInBytes(), page.getPositionCount()); + // Processed input: decompressed bytes (middle tier - after decompression, before filtering) + operatorContext.recordProcessedInput(endDecompressedBytes - decompressedBytes, page.getPositionCount()); completedBytes = endCompletedBytes; + decompressedBytes = endDecompressedBytes; readTimeNanos = endReadTimeNanos; // assure the page is in memory before handing to another operator diff --git a/presto-main-base/src/main/java/com/facebook/presto/operator/ScanFilterAndProjectOperator.java b/presto-main-base/src/main/java/com/facebook/presto/operator/ScanFilterAndProjectOperator.java index f45d8faccafd4..3b360e514cd9e 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/operator/ScanFilterAndProjectOperator.java +++ b/presto-main-base/src/main/java/com/facebook/presto/operator/ScanFilterAndProjectOperator.java @@ -361,7 +361,6 @@ private void recordInputStats() long inputBytes = endCompletedBytes - completedBytes; long inputBytesReadTime = endReadTimeNanos - readTimeNanos; long positionCount = endCompletedPositions - completedPositions; - operatorContext.recordProcessedInput(inputBytes, positionCount); operatorContext.recordRawInputWithTiming(inputBytes, positionCount, inputBytesReadTime); RuntimeStats runtimeStats = pageSource.getRuntimeStats(); if (runtimeStats != null) { @@ -392,6 +391,14 @@ private Page recordProcessedInput(Page page) } } + // Record processed input: actual block sizes after filtering/projection + // If blocks is null, no lazy blocks were found, so use the original page size + long processedBytes = (blocks == null) ? page.getSizeInBytes() : blockSizeSum; + operatorContext.recordProcessedInput(processedBytes, page.getPositionCount()); + + // Record raw input: bytes read from storage before filtering (connector metrics) + recordInputStats(); + return (blocks == null) ? page : new Page(page.getPositionCount(), blocks); } diff --git a/presto-main-base/src/main/java/com/facebook/presto/operator/TableScanOperator.java b/presto-main-base/src/main/java/com/facebook/presto/operator/TableScanOperator.java index c97e9dc2fff60..dd5760fa85492 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/operator/TableScanOperator.java +++ b/presto-main-base/src/main/java/com/facebook/presto/operator/TableScanOperator.java @@ -116,6 +116,7 @@ public void noMoreOperators() private long completedBytes; private long completedPositions; + private long decompressedBytes; private long readTimeNanos; public TableScanOperator( @@ -269,7 +270,7 @@ public Page getOutput() page = page.getLoadedPage(); } - // update operator stats + // Record raw input (compressed bytes from storage) and processed input (decompressed bytes) recordInputStats(); // updating system memory usage should happen after page is loaded. @@ -281,15 +282,19 @@ public Page getOutput() private void recordInputStats() { checkState(source != null, "source must not be null"); - // update operator stats + // update operator stats with three-tier metrics long endCompletedBytes = source.getCompletedBytes(); long endCompletedPositions = source.getCompletedPositions(); + long endDecompressedBytes = source.getDecompressedBytes(); long endReadTimeNanos = source.getReadTimeNanos(); long inputBytes = endCompletedBytes - completedBytes; long inputBytesReadTime = endReadTimeNanos - readTimeNanos; long positionCount = endCompletedPositions - completedPositions; - operatorContext.recordProcessedInput(inputBytes, positionCount); + long decompressedInputBytes = endDecompressedBytes - decompressedBytes; + // Raw input: compressed bytes from storage operatorContext.recordRawInputWithTiming(inputBytes, positionCount, inputBytesReadTime); + // Processed input: decompressed bytes (middle tier - after decompression, before filtering) + operatorContext.recordProcessedInput(decompressedInputBytes, positionCount); RuntimeStats runtimeStats = source.getRuntimeStats(); if (runtimeStats != null) { runtimeStats.addMetricValueIgnoreZero(STORAGE_READ_TIME_NANOS, NANO, inputBytesReadTime); @@ -298,6 +303,7 @@ private void recordInputStats() } completedBytes = endCompletedBytes; completedPositions = endCompletedPositions; + decompressedBytes = endDecompressedBytes; readTimeNanos = endReadTimeNanos; } } diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/ConnectorPageSource.java b/presto-spi/src/main/java/com/facebook/presto/spi/ConnectorPageSource.java index 74bc756a554e9..0f1c7fda8f91d 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/ConnectorPageSource.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/ConnectorPageSource.java @@ -26,17 +26,43 @@ public interface ConnectorPageSource CompletableFuture NOT_BLOCKED = CompletableFuture.completedFuture(null); /** - * Gets the number of input bytes processed by this page source so far. + * Gets the number of raw input bytes read from storage by this page source so far. + * This represents bytes read from physical storage (e.g., compressed ORC/Parquet files). * If size is not available, this method should return zero. */ long getCompletedBytes(); /** - * Gets the number of input rows processed by this page source so far. + * Gets the number of raw input rows read from storage by this page source so far. + * This represents rows before any filtering or processing. * If number is not available, this method should return zero. */ long getCompletedPositions(); + /** + * Gets the number of decompressed bytes processed by this page source so far. + * This represents bytes after decompression but before filtering/projection. + * For connectors without compression or that don't track this separately, + * the default implementation returns the same as getCompletedBytes(). + * If size is not available, this method should return zero. + */ + default long getDecompressedBytes() + { + return getCompletedBytes(); + } + + /** + * Gets the number of decompressed rows processed by this page source so far. + * This represents rows after decompression but before filtering/projection. + * For connectors that don't track this separately, the default implementation + * returns the same as getCompletedPositions(). + * If number is not available, this method should return zero. + */ + default long getDecompressedPositions() + { + return getCompletedPositions(); + } + /** * Gets the wall time this page source spent reading data from the input. * If read time is not available, this method should return zero.