-
Notifications
You must be signed in to change notification settings - Fork 5.5k
fix(connector): Java Scan operator raw input reported wrong #27052
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
fix(connector): Java Scan operator raw input reported wrong #27052
Conversation
|
|
Reviewer's GuideIntroduces three-tier scan metrics (raw/compressed, decompressed/processed input, and output) for Java scan operators by extending ConnectorPageSource with decompressed counters, wiring those through Hive page sources and table/scan operators, and adding tests to validate correct metric relationships for uncompressed and lazy/compressed sources. Sequence diagram for three-tier scan metrics recording in TableScanOperatorsequenceDiagram
participant T as TableScanOperator
participant S as ConnectorPageSource
participant C as OperatorContext
T->>S: getNextPage()
S-->>T: Page
T->>T: recordInputStats()
T->>S: getCompletedBytes()
S-->>T: endCompletedBytes
T->>S: getCompletedPositions()
S-->>T: endCompletedPositions
T->>S: getDecompressedBytes()
S-->>T: endDecompressedBytes
T->>S: getReadTimeNanos()
S-->>T: endReadTimeNanos
T->>T: compute inputBytes, decompressedInputBytes, positionCount, inputBytesReadTime
T->>C: recordRawInputWithTiming(inputBytes, positionCount, inputBytesReadTime)
T->>C: recordProcessedInput(decompressedInputBytes, positionCount)
T->>T: update completedBytes, completedPositions, decompressedBytes, readTimeNanos
Class diagram for updated ConnectorPageSource metrics and Java scan operatorsclassDiagram
class ConnectorPageSource {
<<interface>>
+long getCompletedBytes()
+long getCompletedPositions()
+long getDecompressedBytes()
+long getDecompressedPositions()
+long getReadTimeNanos()
+RuntimeStats getRuntimeStats()
}
class PageFilePageSource {
-long completedBytes
-long decompressedBytes
-long decompressedPositions
-long readTimeNanos
-long memoryUsageBytes
+Page getNextPage()
+long getCompletedBytes()
+long getCompletedPositions()
+long getDecompressedBytes()
+long getDecompressedPositions()
}
class AggregatedParquetPageSource {
-boolean completed
-long completedBytes
-long decompressedBytes
-long decompressedPositions
-long readTimeNanos
+Page getNextPage()
+long getCompletedBytes()
+long getCompletedPositions()
+long getDecompressedBytes()
+long getDecompressedPositions()
}
class RcFilePageSource {
-int pageId
-long completedPositions
-long decompressedBytes
-long decompressedPositions
+Page getNextPage()
+long getCompletedPositions()
+long getDecompressedBytes()
+long getDecompressedPositions()
}
class AggregatedOrcPageSource {
-boolean completed
-long completedBytes
-long decompressedBytes
-long readTimeNanos
+Page getNextPage()
+long getCompletedBytes()
+long getCompletedPositions()
+long getDecompressedBytes()
+long getDecompressedPositions()
}
class OrcBatchPageSource {
-int batchId
-long completedPositions
-long decompressedBytes
-boolean closed
+Page getNextPage()
+long getCompletedPositions()
+long getDecompressedBytes()
+long getDecompressedPositions()
}
class ParquetPageSource {
-int batchId
-long completedPositions
-long decompressedBytes
-boolean closed
+Page getNextPage()
+long getCompletedPositions()
+long getDecompressedBytes()
+long getDecompressedPositions()
}
class OrcSelectivePageSource {
-RowIDCoercer coercer
-boolean supplyRowIDs
-OptionalInt rowIDColumnIndex
-long decompressedBytes
-boolean closed
+Page getNextPage()
+long getCompletedPositions()
+long getDecompressedBytes()
+long getDecompressedPositions()
}
class TableScanOperator {
-ConnectorPageSource source
-OperatorContext operatorContext
-long completedBytes
-long completedPositions
-long decompressedBytes
-long readTimeNanos
+Page getOutput()
-void recordInputStats()
}
class PageSourceOperator {
-ConnectorPageSource pageSource
-OperatorContext operatorContext
-long completedBytes
-long decompressedBytes
-long readTimeNanos
+Page getOutput()
}
class ScanFilterAndProjectOperator {
-ConnectorPageSource pageSource
-OperatorContext operatorContext
-long completedBytes
-long completedPositions
-long readTimeNanos
-Page recordProcessedInput(Page page)
-void recordInputStats()
}
class OperatorContext {
+void recordRawInputWithTiming(long bytes, long positions, long readTimeNanos)
+void recordProcessedInput(long bytes, long positions)
}
ConnectorPageSource <|.. PageFilePageSource
ConnectorPageSource <|.. AggregatedParquetPageSource
ConnectorPageSource <|.. RcFilePageSource
ConnectorPageSource <|.. AggregatedOrcPageSource
ConnectorPageSource <|.. OrcBatchPageSource
ConnectorPageSource <|.. ParquetPageSource
ConnectorPageSource <|.. OrcSelectivePageSource
TableScanOperator o-- ConnectorPageSource
TableScanOperator o-- OperatorContext
PageSourceOperator o-- ConnectorPageSource
PageSourceOperator o-- OperatorContext
ScanFilterAndProjectOperator o-- ConnectorPageSource
ScanFilterAndProjectOperator o-- OperatorContext
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
540e15e to
ad4418c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey - I've found 1 issue, and left some high level feedback:
- In ScanFilterAndProjectOperator, recordProcessedInput(Page) now calls recordInputStats(), so please double-check that recordInputStats() is not also invoked elsewhere on the same operator lifecycle to avoid double-counting raw input bytes/positions.
- For the new decompressed metrics, consider tracking decompressedPositions consistently wherever you track decompressedBytes (e.g., AggregatedOrcPageSource currently always returns 0 for decompressed positions), so consumers of the API can rely on both values being meaningful when decompression is accounted for.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In ScanFilterAndProjectOperator, recordProcessedInput(Page) now calls recordInputStats(), so please double-check that recordInputStats() is not also invoked elsewhere on the same operator lifecycle to avoid double-counting raw input bytes/positions.
- For the new decompressed metrics, consider tracking decompressedPositions consistently wherever you track decompressedBytes (e.g., AggregatedOrcPageSource currently always returns 0 for decompressed positions), so consumers of the API can rely on both values being meaningful when decompression is accounted for.
## Individual Comments
### Comment 1
<location> `presto-main-base/src/test/java/com/facebook/presto/operator/TestScanFilterAndProjectOperator.java:599-608` </location>
<code_context>
+ // Verify three-tier metrics:
</code_context>
<issue_to_address>
**suggestion (testing):** Also assert positions-related metrics to cover edge cases around decompressed vs completed positions
In `testThreeTierMetricsWithLazyBlocks`, you currently only validate byte-level metrics. Since positions are also tracked (on the page source and via operator stats), please assert those as well to ensure they stay consistent, e.g.: (1) `assertEquals(pageSource.getCompletedPositions(), inputPage.getPositionCount())`, (2) `assertEquals(pageSource.getDecompressedPositions(), inputPage.getPositionCount())`, and (3) `assertEquals(stats.getRawInputPositions(), stats.getInputPositions())` (or the appropriate operator-level accessors). This will help catch regressions where byte accounting is correct but position accounting is not, particularly with lazy loading.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
...o-main-base/src/test/java/com/facebook/presto/operator/TestScanFilterAndProjectOperator.java
Outdated
Show resolved
Hide resolved
40158bb to
c75648b
Compare
|
Please edit the release note to follow the general level of detail used in previous releases - see the 0.296 release notes for examples. What would a reader need to know about the change in this PR? |
Updated, Thanks! |
c75648b to
f8821d4
Compare
Description
Java Scan operator raw input reported wrong
Motivation and Context
Impact
Test Plan
Contributor checklist
Release Notes
Please follow release notes guidelines and fill in the release notes below.