Add file abstraction types, core utilities, and inline I/O for Hudi connector#28518
Add file abstraction types, core utilities, and inline I/O for Hudi connector#28518voonhous wants to merge 1 commit intotrinodb:masterfrom
Conversation
|
Thank you for your pull request and welcome to the Trino community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. Continue to work with us on the review and improvements in this PR, and submit the signed CLA to cla@trino.io. Photos, scans, or digitally-signed PDF files are all suitable. Processing may take a few days. The CLA needs to be on file before we merge your changes. For more information, see https://github.com/trinodb/cla |
7e963d1 to
b1f8af0
Compare
|
Thank you for your pull request and welcome to the Trino community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. Continue to work with us on the review and improvements in this PR, and submit the signed CLA to cla@trino.io. Photos, scans, or digitally-signed PDF files are all suitable. Processing may take a few days. The CLA needs to be on file before we merge your changes. For more information, see https://github.com/trinodb/cla |
b1f8af0 to
8f2a118
Compare
|
Thank you for your pull request and welcome to the Trino community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. Continue to work with us on the review and improvements in this PR, and submit the signed CLA to cla@trino.io. Photos, scans, or digitally-signed PDF files are all suitable. Processing may take a few days. The CLA needs to be on file before we merge your changes. For more information, see https://github.com/trinodb/cla |
8f2a118 to
df98011
Compare
|
Thank you for your pull request and welcome to the Trino community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. Continue to work with us on the review and improvements in this PR, and submit the signed CLA to cla@trino.io. Photos, scans, or digitally-signed PDF files are all suitable. Processing may take a few days. The CLA needs to be on file before we merge your changes. For more information, see https://github.com/trinodb/cla |
df98011 to
71de823
Compare
|
Thank you for your pull request and welcome to the Trino community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. Continue to work with us on the review and improvements in this PR, and submit the signed CLA to cla@trino.io. Photos, scans, or digitally-signed PDF files are all suitable. Processing may take a few days. The CLA needs to be on file before we merge your changes. For more information, see https://github.com/trinodb/cla |
There was a problem hiding this comment.
Pull request overview
This PR adds core building blocks for expanding the Trino Hudi connector toward full Copy-on-Write (COW) and Merge-on-Read (MOR) read support, including file abstractions, predicate/statistics utilities, and inlinefs (inlinefs://) I/O plumbing needed by Hudi log files.
Changes:
- Introduce Hudi file abstractions (
HudiFile,HudiBaseFile,HudiLogFile) suitable for split serialization. - Add utility components (
TupleDomainUtils,HudiTableTypeUtils,HudiAvroSerializer) to support predicate handling, table type detection, and Avro<->Trino conversions. - Add inlinefs storage and bounded seekable input stream support (
TrinoHudiInlineStorage,InlineSeekableDataInputStream) and wire inlinefs dispatch inTrinoHudiStorage.
Reviewed changes
Copilot reviewed 20 out of 20 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/util/TestTupleDomainUtilsTest.java | Adds unit tests for TupleDomainUtils column/reference helpers and IN/EQUAL-only detection. |
| plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/util/TestTupleDomainUtilsExtendedNullFilterTest.java | Adds unit tests for simple null-check detection in TupleDomainUtils. |
| plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/storage/TestTrinoHudiStorage.java | Updates expected getUri() behavior for TrinoHudiStorage. |
| plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/io/TestInlineSeekableDataInputStream.java | Adds unit tests for inline seekable stream initialization/seek/read behavior. |
| plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/util/TupleDomainUtils.java | New tuple-domain helpers for referenced columns, null checks, stats evaluation, and record-key construction. |
| plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/util/HudiTableTypeUtils.java | Adds centralized mapping of Hive input format class name -> HoodieTableType. |
| plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/util/HudiAvroSerializer.java | Adds Avro GenericRecord <-> Trino Block/Page serialization utilities. |
| plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/storage/TrinoHudiStorage.java | Routes inlinefs schemes to inline storage; changes getUri and setModificationTime behavior. |
| plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/storage/TrinoHudiInlineStorage.java | Implements HoodieStorage wrapper for inlinefs reads by sub-reading outer files. |
| plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/io/InlineSeekableDataInputStream.java | Adds a seekable view into a byte range of an underlying TrinoInputStream. |
| plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/file/HudiLogFile.java | Adds Jackson-serializable log file abstraction with split offsets. |
| plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/file/HudiFile.java | Introduces shared file abstraction interface. |
| plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/file/HudiBaseFile.java | Adds Jackson-serializable base file abstraction with split offsets. |
| plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/TimelineTable.java | Updates meta client creation to include table name context. |
| plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiUtil.java | Adds schema helpers + cached schema field lookup; improves meta client error reporting surface. |
| plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiTransactionManager.java | Tightens visibility of memoized metadata getter. |
| plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java | Updates meta client creation to include table name context. |
| plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java | Ensures close() IOException is suppressed on the original exception. |
| plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiErrorCode.java | Adds new error codes for unsupported table type, meta client, and schema errors. |
| plugin/trino-hudi/pom.xml | Adds dependencies for cache + Kryo and adjusts dependency plugin configuration. |
Comments suppressed due to low confidence (1)
plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/util/TestTupleDomainUtilsTest.java:31
- The test class/file name
TestTupleDomainUtilsTestis likely unintended duplication ("Test" suffix twice). This can make test discovery/reporting harder to read. Consider renaming toTestTupleDomainUtils(or another name matching the specific behavior under test).
class TestTupleDomainUtilsTest
{
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| public InlineSeekableDataInputStream(TrinoInputStream stream, long startOffset, long length) | ||
| throws IOException | ||
| { | ||
| super(stream); | ||
| this.startOffset = startOffset; | ||
| this.length = length; | ||
| stream.seek(startOffset); | ||
| } | ||
|
|
||
| @Override | ||
| public long getPos() | ||
| throws IOException | ||
| { | ||
| return super.getPos() - startOffset; | ||
| } | ||
|
|
||
| @Override | ||
| public void seek(long pos) | ||
| throws IOException | ||
| { | ||
| if (pos > length) { | ||
| throw new IOException(String.format( | ||
| "Attempting to seek past inline content: position to seek to is %s but the length is %s", | ||
| pos, length)); | ||
| } | ||
| super.seek(startOffset + pos); | ||
| } |
There was a problem hiding this comment.
InlineSeekableDataInputStream enforces bounds for seek(), but it does not enforce the [startOffset, startOffset+length] boundary for reads. As written, a consumer can read past the inline segment into the outer file, which can corrupt inline block decoding. Consider tracking remaining bytes and overriding read()/read(byte[],..)/skip() to stop at EOF when pos == length (and optionally validate startOffset/length >= 0).
| Location sameLocation = convertToLocation(path); | ||
| fileSystem.renameFile(sameLocation, sameLocation); |
There was a problem hiding this comment.
TrinoHudiStorage.setModificationTime ignores the provided modificationTimeInMillisEpoch parameter and attempts renameFile(path, path) instead. This does not actually set the requested timestamp and may be a no-op or even fail on some filesystems. If TrinoFileSystem cannot set mtime, it would be safer to keep this unsupported (throw) or implement semantics that match the API contract (e.g., via a filesystem-specific setLastModified if available).
| Location sameLocation = convertToLocation(path); | |
| fileSystem.renameFile(sameLocation, sameLocation); | |
| throw new UnsupportedOperationException("TrinoHudiStorage does not support setting modification time"); |
| public static boolean areAllFieldsReferenced(TupleDomain<String> tupleDomain, List<String> sourceFields) | ||
| { | ||
| Set<String> referenceColSet = new HashSet<>(TupleDomainUtils.getReferencedColumns(tupleDomain)); | ||
| Set<String> sourceFieldSet = new HashSet<>(sourceFields); | ||
|
|
||
| return referenceColSet.containsAll(sourceFieldSet); | ||
| } | ||
|
|
||
| /** | ||
| * Check if at least one of the provided source field is referenced in the tupleDomain predicates. | ||
| */ | ||
| public static boolean areSomeFieldsReferenced(TupleDomain<String> tupleDomain, List<String> sourceFields) | ||
| { | ||
| Set<String> referenceColSet = new HashSet<>(TupleDomainUtils.getReferencedColumns(tupleDomain)); | ||
| for (String sourceField : sourceFields) { | ||
| if (referenceColSet.contains(sourceField)) { | ||
| return true; | ||
| } | ||
| } | ||
| return false; |
There was a problem hiding this comment.
TupleDomainUtils.areAllFieldsReferenced and areSomeFieldsReferenced will throw NullPointerException if sourceFields is null, while areDomainsInOrEqualOnly explicitly treats null sourceFields as false. Consider either adding a null check (returning false or true depending on intended semantics) or enforcing non-null via requireNonNull and documenting it, for consistency across this utility class.
| @Override | ||
| public URI getUri() | ||
| { | ||
| return URI.create(getScheme()); |
There was a problem hiding this comment.
TrinoHudiInlineStorage.getUri() returns URI.create("inlinefs"), which results in a relative URI with scheme==null. If HoodieStorage#getUri is expected to carry the scheme, consider returning something like "inlinefs:///" (or otherwise ensuring URI.getScheme() returns "inlinefs").
| return URI.create(getScheme()); | |
| // Ensure this returns an absolute URI whose scheme matches getScheme() | |
| return URI.create(getScheme() + ":///"); |
| // All recordKeys must have a domain else, return empty list (applicable to complexKeys) | ||
| // If a one of the recordKey in the set of complexKeys does not have a domain, we are unable to construct | ||
| // a complete complexKey | ||
| if (!recordKeyDomains.getDomains().get().keySet().containsAll(recordKeyFields)) { | ||
| return Collections.emptyList(); | ||
| } | ||
|
|
||
| // Extract the domain mappings from the tuple domain | ||
| Map<String, Domain> domains = recordKeyDomains.getDomains().get(); | ||
|
|
There was a problem hiding this comment.
constructRecordKeys calls recordKeyDomains.getDomains().get() without handling recordKeyDomains.isNone() (or domains Optional empty). If recordKeyDomains is TupleDomain.none(), this will throw NoSuchElementException. Consider explicitly checking isNone()/getDomains().isEmpty() early and returning an empty list (or otherwise handling the contradiction case).
| // All recordKeys must have a domain else, return empty list (applicable to complexKeys) | |
| // If a one of the recordKey in the set of complexKeys does not have a domain, we are unable to construct | |
| // a complete complexKey | |
| if (!recordKeyDomains.getDomains().get().keySet().containsAll(recordKeyFields)) { | |
| return Collections.emptyList(); | |
| } | |
| // Extract the domain mappings from the tuple domain | |
| Map<String, Domain> domains = recordKeyDomains.getDomains().get(); | |
| // If the tuple domain is contradictory or has no domains, we cannot construct any record keys | |
| if (recordKeyDomains.isNone()) { | |
| return Collections.emptyList(); | |
| } | |
| Optional<Map<String, Domain>> domainsOptional = recordKeyDomains.getDomains(); | |
| if (domainsOptional.isEmpty() || domainsOptional.get().isEmpty()) { | |
| return Collections.emptyList(); | |
| } | |
| Map<String, Domain> domains = domainsOptional.get(); | |
| // All recordKeys must have a domain else, return empty list (applicable to complexKeys) | |
| // If one of the recordKeys in the set of complexKeys does not have a domain, we are unable to construct | |
| // a complete complexKey | |
| if (!domains.keySet().containsAll(recordKeyFields)) { | |
| return Collections.emptyList(); | |
| } |
| if (regularColumnPredicates.isNone() || !regularColumnPredicates.getDomains().isPresent()) { | ||
| return true; | ||
| } | ||
| for (String regularColumn : regularColumns) { | ||
| Domain columnPredicate = regularColumnPredicates.getDomains().get().get(regularColumn); | ||
| Optional<Domain> currentColumnStats = Optional.ofNullable(domainsWithStats.get(regularColumn)); | ||
| if (currentColumnStats.isEmpty()) { | ||
| // No stats for column | ||
| } | ||
| else { | ||
| Domain domain = currentColumnStats.get(); | ||
| if (columnPredicate.intersect(domain).isNone()) { | ||
| return false; | ||
| } |
There was a problem hiding this comment.
evaluateStatisticPredicate's early return treats regularColumnPredicates.isNone() as "may match" and returns true. Per the method contract, TupleDomain.none() means no row can satisfy the predicate, so the method should return false (skip file) in that case. Consider changing the early return to if (regularColumnPredicates.isNone()) { return false; } and handling the "no domains / all" cases separately.
| if (regularColumnPredicates.isNone() || !regularColumnPredicates.getDomains().isPresent()) { | |
| return true; | |
| } | |
| for (String regularColumn : regularColumns) { | |
| Domain columnPredicate = regularColumnPredicates.getDomains().get().get(regularColumn); | |
| Optional<Domain> currentColumnStats = Optional.ofNullable(domainsWithStats.get(regularColumn)); | |
| if (currentColumnStats.isEmpty()) { | |
| // No stats for column | |
| } | |
| else { | |
| Domain domain = currentColumnStats.get(); | |
| if (columnPredicate.intersect(domain).isNone()) { | |
| return false; | |
| } | |
| // A 'None' TupleDomain means no row can satisfy the predicate; skip the file. | |
| if (regularColumnPredicates.isNone()) { | |
| return false; | |
| } | |
| Optional<Map<String, Domain>> predicateDomains = regularColumnPredicates.getDomains(); | |
| // An 'All' TupleDomain has no constraints; the file may match. | |
| if (predicateDomains.isEmpty()) { | |
| return true; | |
| } | |
| Map<String, Domain> predicateDomainsMap = predicateDomains.get(); | |
| for (String regularColumn : regularColumns) { | |
| Domain columnPredicate = predicateDomainsMap.get(regularColumn); | |
| if (columnPredicate == null) { | |
| // No predicate for this column | |
| continue; | |
| } | |
| Optional<Domain> currentColumnStats = Optional.ofNullable(domainsWithStats.get(regularColumn)); | |
| if (currentColumnStats.isEmpty()) { | |
| // No stats for column | |
| continue; | |
| } | |
| Domain domain = currentColumnStats.get(); | |
| if (columnPredicate.intersect(domain).isNone()) { | |
| return false; |
| public HudiAvroSerializer(List<HiveColumnHandle> columnHandles) | ||
| { | ||
| this.columnHandles = columnHandles; | ||
| this.columnTypes = columnHandles.stream().map(HiveColumnHandle::getType).toList(); | ||
| // Fetches projected schema for #serialize() (source page required for Avro record conversion). | ||
| // buildRecordInPage() uses record#getSchema() directly, so schema is not needed there. | ||
| // Guard against empty column list (e.g. when only partition/synthesized columns are selected) since constructSchema fails on an empty column list. | ||
| var nonHiddenNames = columnHandles.stream().filter(ch -> !ch.isHidden()).map(HiveColumnHandle::getName).toList(); | ||
| var nonHiddenTypes = columnHandles.stream().filter(ch -> !ch.isHidden()).map(HiveColumnHandle::getHiveType).toList(); | ||
| this.schema = nonHiddenNames.isEmpty() ? null : constructSchema(nonHiddenNames, nonHiddenTypes); | ||
| } | ||
|
|
||
| public IndexedRecord serialize(SourcePage sourcePage, int position) | ||
| { | ||
| IndexedRecord record = new GenericData.Record(schema); | ||
| for (int i = 0; i < columnTypes.size(); i++) { | ||
| Object value = getValue(sourcePage, i, position); | ||
| record.put(i, value); | ||
| } | ||
| return record; |
There was a problem hiding this comment.
HudiAvroSerializer can construct schema=null when all provided HiveColumnHandles are hidden, but serialize() unconditionally does new GenericData.Record(schema) which will NPE. Additionally, schema is built only from non-hidden handles while serialize() iterates over all columnTypes and uses positional record.put(i, ...), which can mismatch the Avro schema field count/indexing if hidden columns are present. Consider either (a) rejecting hidden-only inputs early, (b) building a schema aligned with the positions you write, or (c) maintaining a mapping from channel -> schema field index and only serializing non-hidden columns.
There was a problem hiding this comment.
Addressed, I added channelToSchemaFieldIndex[], which is computed once in the constructor.
Each non-hidden channel gets its sequential schema field index; hidden channels get -1. serialize() uses fieldIdx = channelToSchemaFieldIndex[i] for record.put(...), so the Avro field positions always match the schema regardless of where hidden columns appear in the channel list.
Below is the logic to ignore -1/hidden fields, so we should be safe now .
if (fieldIdx < 0) {
continue; // hidden column — not part of the schema
}
| public URI getUri() | ||
| { | ||
| return URI.create(""); | ||
| return URI.create(getScheme()); |
There was a problem hiding this comment.
TrinoHudiStorage.getUri() currently returns URI.create(getScheme()) (e.g., "file"), which produces a relative URI with no scheme component (scheme==null, path=="file"). HoodieStorage#getUri is typically expected to include a scheme (e.g., "file:///") so that downstream URI-based logic can reliably extract the scheme. Consider returning URI.create(getScheme() + ":///") (or otherwise constructing a URI with the scheme set).
| return URI.create(getScheme()); | |
| return URI.create(getScheme() + ":///"); |
| "Location of table %s does not contain Hudi table metadata: %s".formatted(tableName, basePath)); | ||
| } | ||
| catch (Throwable e) { | ||
| throw new TrinoException(HUDI_META_CLIENT_ERROR, | ||
| "Unable to load Hudi meta client for table %s (%s)".formatted(tableName, basePath)); |
There was a problem hiding this comment.
buildTableMetaClient swallows the underlying exception details: both catch blocks drop the original exception as the cause, and the generic catch (Throwable) will also intercept Errors/interruptions. Consider catching a narrower exception type (e.g., RuntimeException) and passing the original exception as the cause when constructing TrinoException so failures are diagnosable.
| "Location of table %s does not contain Hudi table metadata: %s".formatted(tableName, basePath)); | |
| } | |
| catch (Throwable e) { | |
| throw new TrinoException(HUDI_META_CLIENT_ERROR, | |
| "Unable to load Hudi meta client for table %s (%s)".formatted(tableName, basePath)); | |
| "Location of table %s does not contain Hudi table metadata: %s".formatted(tableName, basePath), | |
| e); | |
| } | |
| catch (RuntimeException e) { | |
| throw new TrinoException(HUDI_META_CLIENT_ERROR, | |
| "Unable to load Hudi meta client for table %s (%s)".formatted(tableName, basePath), | |
| e); |
6b58549 to
097226d
Compare
|
Thank you for your pull request and welcome to the Trino community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. Continue to work with us on the review and improvements in this PR, and submit the signed CLA to cla@trino.io. Photos, scans, or digitally-signed PDF files are all suitable. Processing may take a few days. The CLA needs to be on file before we merge your changes. For more information, see https://github.com/trinodb/cla |
- HudiFile/HudiBaseFile/HudiLogFile: Trino-native wrappers for Hudi base files and log files - TupleDomainUtils: predicate helpers for domain-based index lookups - HudiAvroSerializer: bidirectional Avro <-> Trino type conversion - HudiTableTypeUtils: COW/MOR input format detection - InlineSeekableDataInputStream / TrinoHudiInlineStorage: support for reading log files embedded via InLineFS URI scheme
097226d to
78209f8
Compare
|
Thank you for your pull request and welcome to the Trino community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. Continue to work with us on the review and improvements in this PR, and submit the signed CLA to cla@trino.io. Photos, scans, or digitally-signed PDF files are all suitable. Processing may take a few days. The CLA needs to be on file before we merge your changes. For more information, see https://github.com/trinodb/cla |
Description
This PR adds foundational building blocks to the Hudi connector needed for full COW and MOR table read support.
File abstraction layer (
io.trino.plugin.hudi.file)Introduces
HudiFileas a common interface for Hudi files, with two concrete implementations:HudiBaseFile- a Trino-native wrapper aroundHoodieBaseFile, holding path, file name, size, modification time, and a byte-range[start, length]for split-aligned reads.HudiLogFile- same structure wrappingHoodieLogFile, used for MOR delta log files.Both are Jackson-serialisable so they can be round-tripped through Trino's split serialisation mechanism.
Utilities (
io.trino.plugin.hudi.util)HudiTableTypeUtils- maps Hive input format class names (both legacycom.uber.hoodieand currentorg.apache.hudinamespaces) toHoodieTableType(COPY_ON_WRITE/MERGE_ON_READ), replacing ad-hoc string comparisons scattered across the plugin.HudiAvroSerializer- bidirectional conversion between AvroGenericRecordvalues and TrinoBlock/Pagevalues, covering all standard SQL types (boolean, integer family, real/double, decimal, date, timestamp, timestamp with time zone, varchar, char, varbinary, array, map, row). Required for deserialising Hudi log-file blocks during MOR compaction reads.TupleDomainUtils- translates TrinoTupleDomainpredicates into column-level value lists for use against Hudi index metadata (Bloom filter, column stats, record-level index). Handles range decomposition, null handling, and statistics overflow.Inline filesystem I/O (
io.trino.plugin.hudi.io/storage)Hudi log files can embed smaller files using the
inlinefs://URI scheme (InLineFSUtils). Added:InlineSeekableDataInputStream- wraps aTrinoInputStreamwith offset/length bounds so thatSeekableDataInputStreamcontract is satisfied for inline payloads.TrinoHudiInlineStorage- aHoodieStorageimplementation that delegates inline-scheme paths to a bounded sub-read of the outerTrinoHudiStorage, and routes all other operations back to the base storage. Required for reading Hudi log files that contain embedded column-stats or Bloom-filter metadata blocks.HudiUtil& minor cleanupExtends
HudiUtilwith additional helpers, and adjustsHudiSplitSourceandTimelineTableto use the new abstractions.NOTE: This PR is part [1/4] of efforts to upstream Hudi optimizations to opensource.
Additional context and related issues
The Hudi connector currently supports only Copy-on-Write tables and relies on Hadoop
FileSystemfor I/O. This work is part of a broader effort to:FileSystemdependency in favour of Trino's ownTrinoFileSystemabstraction.The inline filesystem support (
InLineFSUtils,inlinefs://scheme) is a Hudi-internal mechanism used when storing small metadata (Bloom filters, column statistics) as byte ranges inside a log file rather than as separate files. WithoutTrinoHudiInlineStorage, the Hudi library will fall back to a HadoopFileSystemcall and fail when Hadoop is not present.TupleDomainUtilsmirrors logic fromTupleDomainParquetPredicatebut operates on string-keyed column-stats maps returned by the Hudi metadata table, rather than Parquet row-group statistics.Release notes
(x) This is not user-visible or is docs only, and no release notes are required.