Skip to content

Support from_protobuf expression#14354

Draft
thirtiseven wants to merge 54 commits intoNVIDIA:mainfrom
thirtiseven:from_protobuf_nested
Draft

Support from_protobuf expression#14354
thirtiseven wants to merge 54 commits intoNVIDIA:mainfrom
thirtiseven:from_protobuf_nested

Conversation

@thirtiseven
Copy link
Collaborator

@thirtiseven thirtiseven commented Mar 3, 2026

Fixes #14069.
Depends on NVIDIA/spark-rapids-jni#4107

The code is ready, but it's too long to review. Feel free to review it directly or review first part of it #14419 first.

Performance tests

Scenario Description Rows Msg Data CPU GPU Speedup
flat_scalars 10 scalar fields (int/long/string/double/bool) 2,000,000 70 B 133 MB 1.69 s 0.31 s 5.40x
wide_scalars 100 scalar fields (wide output schema) 500,000 689 B 328 MB 4.82 s 0.72 s 6.69x
nested_struct 3-level nested structs, 7 top-level fields 1,000,000 174 B 165 MB 1.76 s 0.29 s 6.05x
repeated_fields repeated scalars + repeated message arrays 500,000 1733 B 826 MB 3.34 s 0.58 s 5.75x
large_message 20 scalars + large repeated + deep nesting (~25KB/msg) 50,000 34030 B 1622 MB 5.30 s 1.05 s 5.03x
Field Selection CPU GPU Speedup
All fields (full decode) 5.36 s 1.01 s 5.31x
Scalars only (20 fields) 5.07 s 1.00 s 5.05x
3 scalar fields 4.96 s 0.92 s 5.39x
1 repeated scalar array 4.87 s 0.94 s 5.17x
Repeated + nested struct 4.99 s 0.96 s 5.22x

Description (⚠️ from AI)

This PR adds GPU acceleration for Spark's from_protobuf() expression by replacing ProtobufDataToCatalyst with GpuFromProtobuf at query planning time. It bridges the Spark SQL catalyst layer to the CUDA protobuf decode kernels (provided by the companion JNI PR in spark-rapids-jni) through a clean three-layer architecture: reflection-based compatibility, typed metadata extraction/validation, and GPU expression execution.

The implementation spans ~7,800 lines of new Scala/Python/Shell code across 27 files, including ~2,900 lines of Scala plugin code, ~4,400 lines of Python integration tests and data generators, and ~450 lines of shell/proto infrastructure.

Key capabilities

  • Full from_protobuf() replacement: Transparent GPU override of ProtobufDataToCatalyst — no user-facing API changes
  • All scalar protobuf types: int32, int64, uint32, uint64, sint32/sint64 (zigzag), fixed32/sfixed32/fixed64/sfixed64, float, double, bool, string, bytes
  • Nested messages: Up to 10 levels deep, with recursive schema flattening
  • Repeated fields: Both packed and unpacked encoding, repeated scalars and repeated messages (ArrayType(StructType))
  • Enum-as-string: Configurable via enums.as.ints option — integer mode or validated string name mode
  • Default values: Per-field defaults for all scalar types, strings, bytes, and enum (preserving both numeric and display name)
  • Required field validation: Proto2-style required field checks
  • PERMISSIVE / FAILFAST modes: Configurable error handling via mode option
  • Schema projection: Two-level pruning — top-level field pruning + nested child pruning — reduces GPU decode work to only fields referenced by downstream operators
  • Post-project batch coalesce: Optional post-decode coalesce to avoid small-batch overhead from schema projection
  • Spark 3.4+ / 3.5+ compatibility: Reflection-based compat layer handles path-based and bytes-based descriptor APIs across Spark versions
  • Proto2 only: Proto3 and editions syntax are explicitly rejected with CPU fallback

Architecture

Spark from_protobuf (ProtobufDataToCatalyst)
    │
    │ GpuOverrides rule registration (Spark340PlusNonDBShims)
    ▼
ProtobufExprShims.fromProtobufRule (tagExprForGpu → convertToGpu)
    │
    │ reflection
    ▼
SparkProtobufCompat (extractExprInfo, resolveMessageDescriptor, parsePlannerOptions)
    │
    │ typed metadata
    ▼
ProtobufSchemaExtractor.analyzeAllFields → Map[String, ProtobufFieldInfo]
    │
    │ schema projection analysis
    ▼
analyzeRequiredFields → nestedFieldRequirements → prunedFieldsMap
    │
    │ flatten + validate
    ▼
ProtobufSchemaValidator (toFlattenedFieldDescriptor → validateFlattenedSchema → toFlattenedSchemaArrays)
    │
    │ parallel arrays
    ▼
GpuFromProtobuf(decodedSchema, flatArrays..., child)
    │
    │ JNI call
    ▼
Protobuf.decodeToStruct → CUDA decode → cuDF STRUCT column

File structure

New files (core)

File Lines Description
GpuFromProtobuf.scala 203 GPU expression: JNI call, null propagation, type mapping, value equality
ProtobufExprShims.scala 845 GpuOverrides rule, schema projection analysis, flatten orchestration, convertToGpu
SparkProtobufCompat.scala 371 Reflection layer: extract expr info, resolve descriptors, parse options, Spark 3.4/3.5 compat
ProtobufSchemaModel.scala 182 Typed metadata: ProtobufExprInfo, ProtobufFieldInfo, ProtobufDefaultValue, FlattenedFieldDescriptor, etc.
ProtobufSchemaExtractor.scala 236 Field analysis: type/encoding mapping, support checks, wire type resolution
ProtobufSchemaValidator.scala 183 Flatten-time validation: enum metadata, defaults, parent-child consistency, JNI array construction

New files (tests)

File Lines Description
protobuf_test.py 3,989 57 Python integration tests: GPU vs CPU correctness for all protobuf features
ProtobufExprShimsSuite.scala 616 22 Scala unit tests: compat layer, extractor, validator, ordinal remapping, semantic equality
ProtobufBatchMergeSuite.scala 115 4 Scala unit tests: post-project coalesce detection and config
data_gen.py (additions) ~385 Protobuf wire-format encoder: ProtobufMessageGen, PbScalar/PbNested/PbRepeated/PbRepeatedMessage

New files (infrastructure)

File Lines Description
main_log.proto 103 Main test proto: enums, required fields, multi-level nesting, cross-file imports
module_a_res.proto 92 External proto: repeated messages, defaults, nested repeated
module_b_res.proto 29 External proto: repeated scalars, block structures
predictor_schema.proto 82 External proto: deep multi-level nesting, empty messages
device_req.proto 11 External proto: bytes field
gen_nested_proto_data.sh 34 Proto compilation script
main_log.desc (binary) Compiled FileDescriptorSet

Modified files

File Change summary
GpuOverrides.scala GetStructFieldGpuGetStructFieldMeta, GetArrayStructFieldsGpuGetArrayStructFieldsMeta
complexTypeExtractors.scala New GpuStructFieldOrdinalTag, GpuGetStructFieldMeta, GpuGetArrayStructFieldsMeta with PRUNED_ORDINAL_TAG support
basicPhysicalOperators.scala GpuProjectExecMeta detects protobuf extraction, new forcePostProjectCoalesce parameter on GpuProjectExec
RapidsConf.scala New config spark.rapids.sql.protobuf.batchMergeAfterProject.enabled
Spark340PlusNonDBShims.scala Merges ProtobufExprShims.exprs into expression rules
GpuBoundAttribute.scala Minor: pruned struct types propagate correctly through binding
DeltaProviderBase.scala Pattern match updates for new GpuProjectExec signature (forcePostProjectCoalesce param)
run_pyspark_from_build.sh Auto-download spark-protobuf + protobuf-java JARs, driver classpath setup
spark_init_internal.py Driver classpath support for optional protobuf module
supported_ops.md Remove BINARY from unsupported child types for struct extractors

Design decisions

1. Reflection-isolated compatibility layer

All Spark expression and protobuf-java descriptor reflection is confined to SparkProtobufCompat.scala. This isolates version-specific API differences (Spark 3.4 path-based vs 3.5+ bytes-based buildDescriptor) from planning logic. Reflection failures produce explicit CPU fallback reasons rather than silent degradation.

2. Typed metadata over raw reflection

Instead of passing raw Option[Any] protobuf defaults or untyped descriptor fields through the planning pipeline, the code uses a typed metadata model (ProtobufExprInfo, ProtobufFieldInfo, ProtobufDefaultValue, ProtobufEnumMetadata). Enum defaults preserve both numeric value (defaultInt) and display name (defaultString), avoiding the ClassCastException: EnumValueDescriptor cannot be cast to String pitfall.

3. Two-level schema projection

Schema projection reduces GPU decode work by only processing fields referenced downstream:

  • Top-level pruning: Only decode top-level fields referenced in downstream ProjectExec/FilterExec/AggregateExec/SortExec/WindowExec
  • Nested pruning: Only decode children of nested messages that are actually accessed — applies uniformly to both StructType (non-repeated) and ArrayType(StructType) (repeated) nested fields

The analysis walks GetStructField / GetArrayStructFields chains upward through the plan to determine required fields. Expression identity uses semantic equality (not just reference equality) to handle Catalyst optimizer creating duplicate instances.

4. Ordinal remapping via TreeNodeTag

When schema projection prunes nested struct children, downstream GetStructField / GetArrayStructFields expressions must use remapped ordinals pointing into the pruned struct. This is done via PRUNED_ORDINAL_TAG (TreeNodeTag[Int]), set during convertToGpu and read by GpuGetStructFieldMeta.convertToGpu / GpuGetArrayStructFieldsMeta.convertToGpu.

Design principle: Operator-specific logic (ordinal remapping) stays in the Meta layer. The runtime classes (GpuGetStructField, GpuGetArrayStructFields, GpuCanonicalize) remain generic and untouched.

5. Post-project batch coalesce

Schema projection can produce narrower batches. When enabled via spark.rapids.sql.protobuf.batchMergeAfterProject.enabled, GpuProjectExecMeta detects projects that extract from protobuf decode and sets forcePostProjectCoalesce=true, which inserts a post-project coalesce and prevents the optimizer from removing it (via outputBatching = null).

6. Value equality for array-carrying types

Any type storing raw arrays that participates in expression equality (GpuFromProtobuf, FlattenedFieldDescriptor, ProtobufDescriptorSource.DescriptorBytes, ProtobufDefaultValue.BinaryValue) overrides equals/hashCode with java.util.Arrays content-based semantics. This prevents semantically identical metadata from comparing unequal by JVM identity.

7. Optional integration with graceful degradation

ProtobufExprShims.exprs loads ProtobufDataToCatalyst by reflection. If the class is not on the classpath (no spark-protobuf JAR), it returns an empty map — no error, no GPU override. Class-loading failures (ExceptionInInitializerError, LinkageError) are caught at the Error level to prevent crashing query planning.

Test coverage

Python integration tests (57 tests)

Category Tests What is covered
Scalar types 5 All scalar types, random scalars, bytes, duplicate fields, all null input
Integer encodings 2 Signed integers (zigzag), fixed integers
Bool encoding 2 Non-canonical varint for bool (scalar + repeated)
Nested messages 6 1-level, 3-level deep, 5-level deep, nested field access, batch merge, nested-with-repeated
Repeated fields 6 int32, string, all types, large array, packed, repeated-with-nested
Repeated messages 1 Repeated message decode
Enum 10 Enum cases, nested enum PERMISSIVE, sibling null propagation, defaults, repeated enum, enum-in-repeated-message, nested repeated enum-as-string
Required fields 3 Present, missing FAILFAST, nested missing PERMISSIVE
Default values 2 Scalar defaults, nested child defaults
Schema projection 7 Simple field pruning, parametrized cases, alias boundary, withColumn boundary, deep pruning (3/5 level, mixed, sibling, whole-struct)
Error handling 3 FAILFAST malformed, PERMISSIVE malformed null, all-null input
Complex / customer 2 Heavy nested proto (customer-realistic), Parquet round-trip
Bug regressions 4 Name collision, filter jump, unrelated struct collision, max depth
Cross-expression 1 Different messages on same binary column
API / options 2 Legacy signature preservation, packed repeated fixed encoding

Scala unit tests (26 tests)

Suite Tests What is covered
ProtobufExprShimsSuite 22 SparkProtobufCompat reflection (path/bytes/Spark 3.4/3.5), planner options, unsupported options, proto3 rejection, ProtobufSchemaExtractor (typed enum defaults, reflection failures, type mismatch, FLOAT/DOUBLE widening), ProtobufSchemaValidator (enum-string encoding, missing metadata rejection, incompatible defaults, non-STRUCT parent), ordinal remapping, GpuFromProtobuf semantic equality, binary default equality, FlattenedFieldDescriptor equality
ProtobufBatchMergeSuite 4 ProjectExec protobuf extraction detection (child project, same project), config default/enable, output batching drop for post-project merge

Total: 83 tests

Configurations

Config key Default Description
spark.rapids.sql.protobuf.batchMergeAfterProject.enabled false Enable post-project coalesce for projects extracting from schema-pruned from_protobuf decode

Review Guide

This PR is large (~7,800 lines) but has a well-defined layered architecture. New Scala production code is ~2,020 lines across 6 files; the rest is tests, proto schemas, and shell infrastructure. This guide provides a recommended reading order and key areas to focus on.

Recommended reading order

Read bottom-up from the metadata model to the planning orchestration:

Order File Focus Time
1 ProtobufSchemaModel.scala Data types: ProtobufExprInfo, ProtobufFieldInfo, ProtobufDefaultValue, FlattenedFieldDescriptor, FlattenedSchemaArrays. Value equality for array fields. 10 min
2 ProtobufSchemaExtractor.scala Type/encoding mapping: checkScalarEncoding maps Spark types to protobuf encodings. Cross-precision FLOAT/DOUBLE rejection. Wire type resolution. 15 min
3 ProtobufSchemaValidator.scala Flatten-time validation: toFlattenedFieldDescriptor converts typed metadata to JNI format. validateFlattenedSchema checks parent-child consistency, enum metadata. toFlattenedSchemaArrays builds parallel arrays. 15 min
4 SparkProtobufCompat.scala Reflection layer: PbReflect cached method lookups, extractExprInfo reads expression fields, resolveMessageDescriptor with Spark 3.4/3.5 retry logic, ReflectiveMessageDescriptor/ReflectiveFieldDescriptor wrappers. 20 min
5 GpuFromProtobuf.scala GPU expression: doColumnar JNI call + null propagation, sparkTypeToCudfIdOpt type mapping, equals/hashCode with deep array equality. 10 min
6 complexTypeExtractors.scala (changes) GpuStructFieldOrdinalTag, GpuGetStructFieldMeta, GpuGetArrayStructFieldsMeta — PRUNED_ORDINAL_TAG read + effective ordinal computation. 10 min
7 ProtobufExprShims.scala §1: rule + tag Lines 1–200: exprs, fromProtobufRule, tagExprForGpu top half — extract info, validate options, resolve descriptor, analyze fields. 20 min
8 ProtobufExprShims.scala §2: schema projection Lines 200–500: analyzeRequiredFields, collectStructFieldReferences, resolveFieldAccessChain, isProtobufStructReference — plan traversal and field reference collection. 25 min
9 ProtobufExprShims.scala §3: flatten + convert Lines 500–845: addFieldWithChildren, addChildFieldsFromStruct, buildPrunedFieldsMap, buildDecodedSchema, registerPrunedOrdinals, convertToGpu. 20 min
10 basicPhysicalOperators.scala (changes) GpuProjectExecMeta: protobuf detection, shouldCoalesceAfterProject, forcePostProjectCoalesce wiring. GpuProjectExecLike trait: coalesceAfter, outputBatching. 15 min
11 GpuOverrides.scala + RapidsConf.scala + Spark340PlusNonDBShims.scala (changes) Expression registration, config, rule merging — small diffs. 5 min
12 ProtobufExprShimsSuite.scala Unit tests — skim by section: compat, extractor, validator, equality. 15 min
13 ProtobufBatchMergeSuite.scala Batch merge tests — straightforward. 5 min
14 protobuf_test.py Integration tests — skim by category. Focus on enum, schema projection, deep pruning, error handling tests. 20 min
15 data_gen.py (additions) Wire-format encoder: ProtobufMessageGen, encoding functions. 10 min
16 Proto files + shell scripts Test schema structure, JAR download automation. 10 min

Total estimated review time: ~3.5–4 hours for a thorough review.

Key review areas by priority

P0: Correctness-critical
  1. ProtobufExprShims.tagExprForGpu (ProtobufExprShims.scala): This is the most complex function in the PR. It orchestrates the full planning pipeline: extract expression info → validate options → resolve descriptor → analyze fields → compute required fields → flatten schema → validate → set ordinal tags → override data type. Key things to verify:

    • All willNotWorkOnGpu paths produce meaningful fallback reasons
    • If step 5 (flatten) encounters an error, later steps (validate, registerPrunedOrdinals, overrideDataType) are NOT executed on partial state
    • analyzeAllFields failure triggers CPU fallback, not silent empty schema
  2. analyzeRequiredFields + collectStructFieldReferences (ProtobufExprShims.scala): Plan traversal for schema projection. Verify:

    • Upward walk handles ProjectExec, FilterExec, AggregateExec, SortExec, WindowExec; unknown nodes disable pruning
    • isProtobufStructReference uses semantic equality as fallback (class + semanticEquals on children), not just eq
    • resolveFieldAccessChain correctly walks GetStructField chains to the protobuf root expression
    • GetArrayStructFields with empty parent path disables pruning rather than registering a fake top-level requirement
  3. registerPrunedOrdinals (ProtobufExprShims.scala): Sets PRUNED_ORDINAL_TAG on Spark expressions. Verify:

    • Tags are set only during convertToGpu (not during analysis/tagging when CPU fallback is still possible)
    • The remapped ordinal is correctly computed from the pruned schema's child position
    • Both GetStructField and GetArrayStructFields are handled
  4. GpuGetStructFieldMeta.convertToGpu / GpuGetArrayStructFieldsMeta.convertToGpu (complexTypeExtractors.scala): Read PRUNED_ORDINAL_TAG and pass effective ordinal. Verify:

    • Falls back to expr.ordinal when tag is not present (non-protobuf usage)
    • effectiveNumFields for GetArrayStructFields uses the child struct's actual field count after pruning
  5. GpuFromProtobuf.doColumnar (GpuFromProtobuf.scala): JNI call + null propagation. Verify:

    • Input null mask is merged via mergeAndSetValidity(BinaryOp.BITWISE_AND, input.getBase) — only when input has nulls
    • FAILFAST wraps CudfException in SparkException; PERMISSIVE logs and rethrows
    • ProtobufSchemaDescriptor is lazily initialized (transient) for serialization safety

P1: Robustness

  1. SparkProtobufCompat (SparkProtobufCompat.scala): Reflection layer. Verify:

    • extractExprInfo extracts messageName, descriptorSource, options with proper error handling
    • resolveMessageDescriptor handles both path-based (Spark 3.4) and bytes-based (Spark 3.5+) descriptor construction, with retry-on-ClassCastException for Spark 3.5 path→bytes conversion
    • isGpuSupportedProtoSyntax rejects proto3, editions, null, and empty strings
    • Reflection failures always return Left/None, never throw through to the optimizer
  2. ProtobufSchemaValidator (ProtobufSchemaValidator.scala): Flatten-time validation. Verify:

    • ENC_ENUM_STRING fields must have non-empty enumValidValues and enumNames with matching lengths
    • Repeated fields with defaults are rejected
    • Parent index validity: parent must be a STRUCT field with depth = child.depth - 1
    • encodeDefaultValue handles all ProtobufDefaultValue variants including EnumValue (both numeric and string)
  3. Value equality (multiple files): Verify equals/hashCode overrides use Arrays.equals/Arrays.deepEquals for:

    • GpuFromProtobuf (all 16 schema arrays)
    • FlattenedFieldDescriptor (defaultString, enumValidValues, enumNames)
    • ProtobufDescriptorSource.DescriptorBytes
    • ProtobufDefaultValue.BinaryValue

P2: Performance & Integration

  1. Post-project coalesce (basicPhysicalOperators.scala): Verify:

    • GpuProjectExecMeta.shouldCoalesceAfterProject correctly detects ProtobufDataToCatalyst by class name via isProtobufDecodeExpr
    • forcePostProjectCoalesce=true causes outputBatching to return null (not TargetSize), preventing removal by transition rules
    • Config isProtobufBatchMergeAfterProjectEnabled defaults to false
  2. DeltaProviderBase.scala changes: Pattern match updates for new GpuProjectExec 4-parameter signature. Verify the mergeIdenticalProjects correctly OR's forcePostProjectCoalesce flags when merging.

  3. Integration test infrastructure (run_pyspark_from_build.sh): Verify:

    • spark-protobuf and protobuf-java JAR download uses correct Maven coordinates
    • Version detection from $SPARK_HOME/jars/protobuf-java-*.jar with fallback mapping
    • PROTOBUF_JARS_AVAILABLE is exported so protobuf_test.py can skip when deps are missing
    • Driver classpath is correctly set via PYSP_TEST_spark_driver_extraClassPath

Things to watch for

  • Expression identity: isProtobufStructReference must handle both reference equality (eq) and semantic equality for duplicate Catalyst instances created by SimplifyExtractValueOps or PySpark-JVM serialization. Without this, schema projection splits one decode into N separate single-field decodes.
  • Optional integration: ProtobufExprShims.exprs catches Error (not just Exception) to handle ExceptionInInitializerError from missing protobuf JAR. Returning empty map means no GPU override attempt.
  • Cross-precision rejection: ProtobufSchemaExtractor.checkScalarEncoding explicitly rejects DoubleType mapped to protobuf FLOAT (and vice versa) rather than silently coercing.
  • Proto2 only: isGpuSupportedProtoSyntax returns true only for "proto2" — proto3 and editions are not yet supported on GPU, triggering CPU fallback with an explicit reason.
  • Ordinal tag timing: PRUNED_ORDINAL_TAG is set only in convertToGpu, never during tagExprForGpu. If it were set during tagging, a subsequent CPU fallback would leave stale tags on Spark expressions.

Mapping review to test coverage

If you're reviewing... Verify these tests pass
tagExprForGpu planning pipeline All 57 integration tests (they all flow through planning)
Schema projection analysis test_from_protobuf_schema_projection_*, test_deep_pruning_*, test_from_protobuf_projection_across_*
Ordinal remapping test_from_protobuf_nested_message_field_access*, test_deep_pruning_*, array struct field meta uses pruned child field count
SparkProtobufCompat reflection compat extracts *, compat invokes *, compat retries *, compat distinguishes *
ProtobufSchemaExtractor extractor preserves *, extractor records *, extractor gives *
ProtobufSchemaValidator validator encodes *, validator rejects *, validator returns *
Enum-as-string test_from_protobuf_enum_cases, test_from_protobuf_*_enum_* (10 tests)
Default values test_from_protobuf_default_values_cases, test_from_protobuf_nested_child_default_values
Error handling test_from_protobuf_failfast_malformed_data, test_from_protobuf_permissive_malformed_returns_null
Post-project coalesce ProtobufBatchMergeSuite (4 tests), test_from_protobuf_nested_message_field_access_with_batch_merge
Value equality GpuFromProtobuf semantic equality *, protobuf binary defaults *, flattened field descriptor *
JNI integration All integration tests (they all call Protobuf.decodeToStruct through JNI)

Checklists

  • This PR has added documentation for new or modified features or behaviors.
  • This PR has added new tests or modified existing tests to cover new code paths.
    (Please explain in the PR description how the new code paths are tested, such as names of the new/existing tests that cover them.)
  • Performance testing has been performed and its results are added in the PR description. Or, an issue has been filed with a link in the PR description.

Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
@thirtiseven thirtiseven self-assigned this Mar 3, 2026
@thirtiseven
Copy link
Collaborator Author

@greptileai full review

@greptile-apps
Copy link
Contributor

greptile-apps bot commented Mar 4, 2026

Greptile Summary

This PR adds GPU acceleration for Spark's from_protobuf() expression, replacing ProtobufDataToCatalyst with GpuFromProtobuf at query-planning time. It introduces a layered architecture: a reflection-based compatibility shim (SparkProtobufCompat), a typed metadata extraction layer (ProtobufSchemaExtractor/ProtobufSchemaValidator), and a GPU expression (GpuFromProtobuf) that delegates to JNI CUDA kernels. Schema projection prunes both top-level and nested fields to reduce GPU decode work, with ordinal remapping via PRUNED_ORDINAL_TAG. Performance benchmarks show 5–6x speedup across a range of realistic protobuf schemas.

What was addressed from prior review rounds:

  • All willNotWorkOnGpu guard / early-exit issues in addFieldWithChildren and addChildFieldsFromStruct
  • Content-based equals/hashCode for all array-carrying types (GpuFromProtobuf, FlattenedFieldDescriptor, FlattenedSchemaArrays, DescriptorBytes, BinaryValue)
  • encodeDefaultValue and toDefaultValue propagate errors as Left instead of throwing
  • Proto3/editions/null syntax correctly rejected; typeName(null) returns "" conservatively
  • getWireType returns Left for unknown types instead of throwing
  • analyzeRequiredFields traverses through multiple ProjectExec nodes (previously stopped at first)
  • collectedExprs.isEmpty redundant guard removed
  • PROTOBUF_JARS_AVAILABLE correctly requires both JARs before reporting availability
  • curl downloads use -fsL (fail-fast on HTTP errors)
  • Driver classpath merging fixed: deduplication, re.sub(count=1), lambda replacement

Remaining open items (not blocking merge):

  • Nested struct fields in buildDecodedSchema are not forced nullable = true; only top-level fields get this treatment. In PERMISSIVE mode this could mislead downstream operators that trust nullable = false on proto2 required fields. A TODO comment would help track this.
  • PRUNED_ORDINAL_TAG is set at the end of tagExprForGpu (not in convertToGpu as stated in a prior review reply). The code is safe as documented, but the implementation diverges from what was communicated in the review thread.
  • Schema pruning is conservatively disabled for plan shapes involving join operators, exchange nodes, or top-level GetArrayStructFields access — documented limitations, no CPU fallback risk.
  • The Alias.typeMeta override in GpuOverrides.scala now propagates any child overrideDataType through all aliases globally. This is correct today (only GpuFromProtobuf sets overrides) but is a broad behavioral change worth a clarifying comment.

Confidence Score: 4/5

  • PR is safe to merge; all critical correctness issues from prior rounds are resolved, remaining items are documented limitations or minor documentation gaps.
  • The prior review thread was extensive (30+ comments) and the developer addressed every P0 correctness issue: early-exit after willNotWorkOnGpu, throws-vs-Left in all validators, array equality, proto3/editions rejection, curl failure modes, classpath merging, and ordinal remapping. The two remaining P2 items (nested field nullability in buildDecodedSchema and the tagExprForGpu vs convertToGpu placement discrepancy) are non-blocking and the code is well-guarded. The Alias.typeMeta global change is intentional and safe for the current codebase. 83 tests (57 integration + 26 unit) cover the full feature surface including correctness, error handling, schema projection, and enum encoding. Score stops at 4 rather than 5 because the nested nullable gap could theoretically cause wrong results in PERMISSIVE mode with proto2 required nested fields, and a clarifying TODO is worth adding before merge.
  • ProtobufExprShims.scala lines 712-748 (buildDecodedSchema nested nullable) and GpuOverrides.scala line 956 (Alias.typeMeta global override) warrant a second look before merge.

Important Files Changed

Filename Overview
sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/ProtobufExprShims.scala Core planning orchestrator: tagExprForGpu/analyzeRequiredFields/convertToGpu. All major issues from prior thread addressed — early-exit after willNotWorkOnGpu, traversal through multiple ProjectExec, collectedExprs.isEmpty removed, registerPrunedOrdinals moved after step5 validation. Nested nullable propagation in buildDecodedSchema (top-level only) is a documented limitation; GetArrayStructFields empty-parentPath still conservatively disables all pruning.
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFromProtobuf.scala GPU expression with correct deep-equals/hashCode for all 16 array fields, null propagation via mergeAndSetValidity, PERMISSIVE-mode safety net CudfException logging, transient lazy ProtobufSchemaDescriptor with comment confirming it owns no native resources. Clean implementation.
sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/SparkProtobufCompat.scala Reflection compatibility layer: isGpuSupportedProtoSyntax now rejects PROTO3, EDITIONS, empty, and "null"; typeName(null) returns ""; InvocationTargetException.getCause null-checked; toDefaultValue returns Left for unknown types; descriptor retry wrapped in Try. One residual: HDFS/remote paths fall back to CPU on Spark 3.5+ since Files.readAllBytes is local-only — acknowledged and defended in thread.
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/protobuf/ProtobufSchemaValidator.scala encodeDefaultValue returns Left for incompatible type/default combinations; validateFlattenedSchema now checks parent STRUCT type invariant. Clean validation with proper Either propagation throughout.
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/protobuf/ProtobufSchemaExtractor.scala analyzeAllFields no longer fails fast on field-local errors; extractFieldInfo preserves primary unsupported reason from checkFieldSupport; cross-precision DoubleType/FloatType mismatches emit explicit actionable messages; getWireType returns Left instead of throwing. Well-designed field analysis.
sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala shouldCoalesceAfterProject correctly detects direct ProtobufDataToCatalyst via isRootedAtProtobufDecode. forcePostProjectCoalesce parameter wired through GpuProjectExec and GpuProjectAstExec. Config defaults to false as explicit rollout valve.
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala Two key changes: (1) Alias.typeMeta now delegates to childExprs.head.typeMeta, enabling pruned-schema propagation through aliases — affects ALL aliases globally, not just protobuf ones; (2) AttributeReference.convertToGpuImpl resolves type from child plan output with comment noting GpuBoundAttribute handles actual runtime type. Minor: GetStructField now uses GpuGetStructFieldMeta.
integration_tests/run_pyspark_from_build.sh JAR download uses -fsL (fail on HTTP errors). PROTOBUF_JARS_AVAILABLE=true only when BOTH spark-protobuf AND protobuf-java are confirmed present. Protobuf-java version detected from $SPARK_HOME/jars/ first, falls back to version map. Highest-versioned bundled jar selected via sort -V

Sequence Diagram

sequenceDiagram
    participant Spark as Spark Optimizer
    participant Rule as ProtobufExprShims.fromProtobufRule
    participant Tag as tagExprForGpu
    participant Compat as SparkProtobufCompat
    participant Extractor as ProtobufSchemaExtractor
    participant Validator as ProtobufSchemaValidator
    participant Convert as convertToGpu
    participant JNI as Protobuf.decodeToStruct (JNI)

    Spark->>Rule: plan with ProtobufDataToCatalyst
    Rule->>Tag: tagExprForGpu()
    Tag->>Compat: extractExprInfo (reflect messageName/options/descriptor)
    Compat-->>Tag: ProtobufExprInfo
    Tag->>Compat: resolveMessageDescriptor
    Compat-->>Tag: ProtobufMessageDescriptor (reflective)
    Tag->>Extractor: analyzeAllFields(schema, msgDesc, enumsAsInts)
    Extractor-->>Tag: Map[String, ProtobufFieldInfo]
    Tag->>Tag: analyzeRequiredFields (traverse parent ProjectExec/FilterExec/…)
    Tag->>Tag: step 5: addFieldWithChildren (build flatFields)
    Tag->>Validator: toFlattenedFieldDescriptor (per field)
    Validator-->>Tag: FlattenedFieldDescriptor
    Tag->>Validator: validateFlattenedSchema
    Validator-->>Tag: Right(())
    Tag->>Tag: registerPrunedOrdinals (set PRUNED_ORDINAL_TAG)
    Tag->>Tag: overrideDataType (pruned StructType)

    Spark->>Convert: convertToGpu(child)
    Convert-->>Spark: GpuFromProtobuf(decodedSchema, flatArrays…, child)

    Note over Spark,JNI: At query execution time
    Spark->>JNI: Protobuf.decodeToStruct(binaryCol, ProtobufSchemaDescriptor, failOnErrors)
    JNI-->>Spark: cuDF STRUCT ColumnVector
    Spark->>Spark: mergeAndSetValidity (propagate input nulls)
Loading

Comments Outside Diff (1)

  1. sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/ProtobufExprShims.scala, line 402-408 (link)

    PRUNED_ORDINAL_TAG set in tagExprForGpu, not convertToGpu as stated in prior review reply

    The developer's response to the previous thread item ("Addressed. I removed the registerPrunedOrdinals call from analyzeRequiredFields, so ordinal tags are now written only once during convertToGpu") described moving the call to convertToGpu. However, in the current code the tags are still written here in tagExprForGpu (just later in the method, after all willNotWorkOnGpu guards pass).

    The code comment at lines 403–405 correctly explains why this is safe (CPU path never reads the tag, and a parent forcing CPU fallback would take the entire subtree with it). The implementation is sound, but it contradicts the stated fix. Please update the comment or reply to clarify that the final location is the success block of tagExprForGpu rather than convertToGpu, so future readers aren't confused when comparing the review history to the code.

Reviews (33): Last reviewed commit: "address comments" | Re-trigger Greptile

Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
@thirtiseven
Copy link
Collaborator Author

@greptileai full review

Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
@thirtiseven
Copy link
Collaborator Author

@greptileai full review again

Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
@thirtiseven
Copy link
Collaborator Author

@greptileai full review again

Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
@thirtiseven
Copy link
Collaborator Author

@greptileai full review again

Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
@thirtiseven
Copy link
Collaborator Author

@greptile review

Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
@thirtiseven
Copy link
Collaborator Author

@greptile review

Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
@thirtiseven
Copy link
Collaborator Author

@greptile review

Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
@thirtiseven
Copy link
Collaborator Author

@greptile review

Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
@thirtiseven
Copy link
Collaborator Author

@greptile review

Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
@thirtiseven
Copy link
Collaborator Author

@greptile review

Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
@thirtiseven thirtiseven marked this pull request as ready for review March 16, 2026 07:20
@thirtiseven thirtiseven marked this pull request as draft March 16, 2026 07:20
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
@thirtiseven
Copy link
Collaborator Author

@greptile review

Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
@thirtiseven
Copy link
Collaborator Author

performance test code:
from_protobuf_bench.zip

@thirtiseven
Copy link
Collaborator Author

@greptile review

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[FEA] Support from_protobuf

2 participants