Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,19 @@ public long getCompletedPositions()
return 0;
}

@Override
public long getDecompressedBytes()
{
// For aggregated page source, there's no decompression - just return completed bytes
return completedBytes;
}

@Override
public long getDecompressedPositions()
{
return 0;
}

@Override
public long getReadTimeNanos()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,18 @@ public long getCompletedPositions()
return completedPositions;
}

@Override
public long getDecompressedBytes()
{
return orcDataSource.getReadBytes();
}

@Override
public long getDecompressedPositions()
{
return completedPositions;
}

@Override
public long getReadTimeNanos()
{
Expand Down Expand Up @@ -199,7 +211,8 @@ else if (constantBlocks[fieldId] != null) {
blocks[fieldId] = new LazyBlock(batchSize, new OrcBlockLoader(hiveColumnIndexes[fieldId]));
}
}
return new Page(batchSize, blocks);
Page page = new Page(batchSize, blocks);
return page;
}
catch (PrestoException e) {
closeWithSuppression(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,18 @@ public long getCompletedPositions()
return recordReader.getReadPositions();
}

@Override
public long getDecompressedBytes()
{
return orcDataSource.getReadBytes();
}

@Override
public long getDecompressedPositions()
{
return recordReader.getReadPositions();
}

@Override
public long getReadTimeNanos()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,21 @@ public long getSystemMemoryUsage()
return memoryUsageBytes;
}

@Override
public long getDecompressedBytes()
{
// PageFile format stores pages in a serialized format.
// The completedBytes already represents the decompressed data size
// since we're reading from the deserialized pages.
return completedBytes;
}

@Override
public long getDecompressedPositions()
{
return completedPositions;
}

@Override
public void close()
throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class AggregatedParquetPageSource
private boolean completed;
private long readTimeNanos;
private long completedBytes;
private long decompressedPositions;

public AggregatedParquetPageSource(List<HiveColumnHandle> columnHandles, ParquetMetadata parquetMetadata, TypeManager typeManager, StandardFunctionResolution functionResolution)
{
Expand Down Expand Up @@ -245,6 +246,19 @@ public long getSystemMemoryUsage()
return 0;
}

@Override
public long getDecompressedBytes()
{
// For aggregated page source, there's no decompression - just return completed bytes
return completedBytes;
}

@Override
public long getDecompressedPositions()
{
return decompressedPositions;
}

@Override
public void close()
throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,18 @@ public long getCompletedPositions()
return completedPositions;
}

@Override
public long getDecompressedBytes()
{
return parquetReader.getDataSource().getReadBytes();
}

@Override
public long getDecompressedPositions()
{
return completedPositions;
}

@Override
public long getReadTimeNanos()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class RcFilePageSource

private int pageId;
private long completedPositions;
private long decompressedPositions;

private boolean closed;

Expand Down Expand Up @@ -193,6 +194,18 @@ public long getSystemMemoryUsage()
return GUESSED_MEMORY_USAGE;
}

@Override
public long getDecompressedBytes()
{
return rcFileReader.getBytesRead();
}

@Override
public long getDecompressedPositions()
{
return decompressedPositions;
}

private void closeWithSuppression(Throwable throwable)
{
requireNonNull(throwable, "throwable is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class PageSourceOperator
private final ConnectorPageSource pageSource;
private final OperatorContext operatorContext;
private long completedBytes;
private long decompressedBytes;
private long readTimeNanos;

public PageSourceOperator(ConnectorPageSource pageSource, OperatorContext operatorContext)
Expand Down Expand Up @@ -89,12 +90,16 @@ public Page getOutput()
return null;
}

// update operator stats
// update operator stats with three-tier metrics
long endCompletedBytes = pageSource.getCompletedBytes();
long endDecompressedBytes = pageSource.getDecompressedBytes();
long endReadTimeNanos = pageSource.getReadTimeNanos();
// Raw input: compressed bytes from storage
operatorContext.recordRawInputWithTiming(endCompletedBytes - completedBytes, page.getPositionCount(), endReadTimeNanos - readTimeNanos);
operatorContext.recordProcessedInput(page.getSizeInBytes(), page.getPositionCount());
// Processed input: decompressed bytes (middle tier - after decompression, before filtering)
operatorContext.recordProcessedInput(endDecompressedBytes - decompressedBytes, page.getPositionCount());
completedBytes = endCompletedBytes;
decompressedBytes = endDecompressedBytes;
readTimeNanos = endReadTimeNanos;

// assure the page is in memory before handing to another operator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,6 @@ private void recordInputStats()
long inputBytes = endCompletedBytes - completedBytes;
long inputBytesReadTime = endReadTimeNanos - readTimeNanos;
long positionCount = endCompletedPositions - completedPositions;
operatorContext.recordProcessedInput(inputBytes, positionCount);
operatorContext.recordRawInputWithTiming(inputBytes, positionCount, inputBytesReadTime);
RuntimeStats runtimeStats = pageSource.getRuntimeStats();
if (runtimeStats != null) {
Expand Down Expand Up @@ -392,6 +391,14 @@ private Page recordProcessedInput(Page page)
}
}

// Record processed input: actual block sizes after filtering/projection
// If blocks is null, no lazy blocks were found, so use the original page size
long processedBytes = (blocks == null) ? page.getSizeInBytes() : blockSizeSum;
operatorContext.recordProcessedInput(processedBytes, page.getPositionCount());

// Record raw input: bytes read from storage before filtering (connector metrics)
recordInputStats();

return (blocks == null) ? page : new Page(page.getPositionCount(), blocks);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ public void noMoreOperators()

private long completedBytes;
private long completedPositions;
private long decompressedBytes;
private long readTimeNanos;

public TableScanOperator(
Expand Down Expand Up @@ -269,7 +270,7 @@ public Page getOutput()
page = page.getLoadedPage();
}

// update operator stats
// Record raw input (compressed bytes from storage) and processed input (decompressed bytes)
recordInputStats();

// updating system memory usage should happen after page is loaded.
Expand All @@ -281,15 +282,19 @@ public Page getOutput()
private void recordInputStats()
{
checkState(source != null, "source must not be null");
// update operator stats
// update operator stats with three-tier metrics
long endCompletedBytes = source.getCompletedBytes();
long endCompletedPositions = source.getCompletedPositions();
long endDecompressedBytes = source.getDecompressedBytes();
long endReadTimeNanos = source.getReadTimeNanos();
long inputBytes = endCompletedBytes - completedBytes;
long inputBytesReadTime = endReadTimeNanos - readTimeNanos;
long positionCount = endCompletedPositions - completedPositions;
operatorContext.recordProcessedInput(inputBytes, positionCount);
long decompressedInputBytes = endDecompressedBytes - decompressedBytes;
// Raw input: compressed bytes from storage
operatorContext.recordRawInputWithTiming(inputBytes, positionCount, inputBytesReadTime);
// Processed input: decompressed bytes (middle tier - after decompression, before filtering)
operatorContext.recordProcessedInput(decompressedInputBytes, positionCount);
RuntimeStats runtimeStats = source.getRuntimeStats();
if (runtimeStats != null) {
runtimeStats.addMetricValueIgnoreZero(STORAGE_READ_TIME_NANOS, NANO, inputBytesReadTime);
Expand All @@ -298,6 +303,7 @@ private void recordInputStats()
}
completedBytes = endCompletedBytes;
completedPositions = endCompletedPositions;
decompressedBytes = endDecompressedBytes;
readTimeNanos = endReadTimeNanos;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,43 @@ public interface ConnectorPageSource
CompletableFuture<?> NOT_BLOCKED = CompletableFuture.completedFuture(null);

/**
* Gets the number of input bytes processed by this page source so far.
* Gets the number of raw input bytes read from storage by this page source so far.
* This represents bytes read from physical storage (e.g., compressed ORC/Parquet files).
* If size is not available, this method should return zero.
*/
long getCompletedBytes();

/**
* Gets the number of input rows processed by this page source so far.
* Gets the number of raw input rows read from storage by this page source so far.
* This represents rows before any filtering or processing.
* If number is not available, this method should return zero.
*/
long getCompletedPositions();

/**
* Gets the number of decompressed bytes processed by this page source so far.
* This represents bytes after decompression but before filtering/projection.
* For connectors without compression or that don't track this separately,
* the default implementation returns the same as getCompletedBytes().
* If size is not available, this method should return zero.
*/
default long getDecompressedBytes()
{
return getCompletedBytes();
}

/**
* Gets the number of decompressed rows processed by this page source so far.
* This represents rows after decompression but before filtering/projection.
* For connectors that don't track this separately, the default implementation
* returns the same as getCompletedPositions().
* If number is not available, this method should return zero.
*/
default long getDecompressedPositions()
{
return getCompletedPositions();
}

/**
* Gets the wall time this page source spent reading data from the input.
* If read time is not available, this method should return zero.
Expand Down
Loading