Support from_protobuf expression#14354
Conversation
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
|
@greptileai full review |
Greptile SummaryThis PR adds GPU acceleration for Spark's Previous-thread fixes confirmed in this diff:
Remaining concerns (flagged inline):
Confidence Score: 3/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant Spark as Spark Optimizer
participant Shims as ProtobufExprShims
participant Compat as SparkProtobufCompat
participant Extractor as ProtobufSchemaExtractor
participant Validator as ProtobufSchemaValidator
participant GPU as GpuFromProtobuf
participant JNI as Protobuf.decodeToStruct (JNI)
Spark->>Shims: tagExprForGpu(ProtobufDataToCatalyst)
Shims->>Compat: extractExprInfo(expr)
Compat-->>Shims: ProtobufExprInfo (messageName, descriptorSource, options)
Shims->>Compat: parsePlannerOptions(options)
Compat-->>Shims: ProtobufPlannerOptions (enumsAsInts, failOnErrors)
Shims->>Compat: resolveMessageDescriptor(exprInfo)
Compat-->>Shims: ProtobufMessageDescriptor (proto2 syntax check)
Shims->>Extractor: analyzeAllFields(schema, msgDesc, enumsAsInts)
Extractor-->>Shims: Map[String, ProtobufFieldInfo]
Shims->>Shims: analyzeRequiredFields(allFieldNames)<br/>(walk ProjectExec/FilterExec/Agg/Sort/Window upward)
Shims->>Shims: addFieldWithChildren + addChildFieldsFromStruct<br/>(flatten schema to FlattenedFieldDescriptor list)
Shims->>Validator: validateFlattenedSchema(flatFields)
Validator-->>Shims: Right(()) or Left(reason)
Shims->>Validator: toFlattenedSchemaArrays(flatFields)
Validator-->>Shims: FlattenedSchemaArrays (16 parallel arrays)
Shims->>Shims: registerPrunedOrdinals → PRUNED_ORDINAL_TAG on GetStructField/GetArrayStructFields
Shims->>Shims: overrideDataType(buildDecodedSchema)
Spark->>Shims: convertToGpu(child)
Shims-->>Spark: GpuFromProtobuf(decodedSchema, flatArrays..., failOnErrors, child)
Note over Spark,JNI: Runtime (per batch)
Spark->>GPU: doColumnar(GpuColumnVector[BinaryType])
GPU->>JNI: Protobuf.decodeToStruct(input, ProtobufSchemaDescriptor, failOnErrors)
JNI-->>GPU: cudf STRUCT ColumnVector (pruned schema)
GPU->>GPU: mergeAndSetValidity(BITWISE_AND, input) if hasNulls
GPU-->>Spark: GpuColumnVector[StructType(pruned)]
Last reviewed commit: "apply jni refactor" |
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
|
@greptileai full review |
sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/ProtobufExprShims.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/ProtobufExprShims.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFromProtobuf.scala
Show resolved
Hide resolved
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
|
@greptileai full review again |
sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/ProtobufExprShims.scala
Outdated
Show resolved
Hide resolved
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
|
@greptileai full review again |
sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/ProtobufExprShims.scala
Show resolved
Hide resolved
|
@greptileai full review again |
sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/ProtobufExprShims.scala
Outdated
Show resolved
Hide resolved
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
|
@greptile review |
sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/SparkProtobufCompat.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/ProtobufExprShims.scala
Show resolved
Hide resolved
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
|
@greptile review |
sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/ProtobufExprShims.scala
Show resolved
Hide resolved
|
@greptile review |
|
@greptile review |
sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/ProtobufExprShims.scala
Show resolved
Hide resolved
sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/ProtobufExprShims.scala
Show resolved
Hide resolved
sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/ProtobufExprShims.scala
Outdated
Show resolved
Hide resolved
|
@greptile review |
sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/SparkProtobufCompat.scala
Show resolved
Hide resolved
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/protobuf/ProtobufSchemaValidator.scala
Show resolved
Hide resolved
sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/ProtobufExprShims.scala
Show resolved
Hide resolved
|
@greptile review |
sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/ProtobufExprShims.scala
Show resolved
Hide resolved
sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/SparkProtobufCompat.scala
Show resolved
Hide resolved
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/protobuf/ProtobufSchemaModel.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/ProtobufExprShims.scala
Show resolved
Hide resolved
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
|
@greptile review |
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Fixes #14069.
Depends on NVIDIA/spark-rapids-jni#4107
The code is ready, but it's too long to review. Feel free to review it directly or review first part of it #14419 first.
Description (⚠️ from AI)
This PR adds GPU acceleration for Spark's
from_protobuf()expression by replacingProtobufDataToCatalystwithGpuFromProtobufat query planning time. It bridges the Spark SQL catalyst layer to the CUDA protobuf decode kernels (provided by the companion JNI PR in spark-rapids-jni) through a clean three-layer architecture: reflection-based compatibility, typed metadata extraction/validation, and GPU expression execution.The implementation spans ~7,800 lines of new Scala/Python/Shell code across 27 files, including ~2,900 lines of Scala plugin code, ~4,400 lines of Python integration tests and data generators, and ~450 lines of shell/proto infrastructure.
Key capabilities
from_protobuf()replacement: Transparent GPU override ofProtobufDataToCatalyst— no user-facing API changesArrayType(StructType))enums.as.intsoption — integer mode or validated string name modemodeoptionArchitecture
File structure
New files (core)
GpuFromProtobuf.scalaProtobufExprShims.scalaconvertToGpuSparkProtobufCompat.scalaProtobufSchemaModel.scalaProtobufExprInfo,ProtobufFieldInfo,ProtobufDefaultValue,FlattenedFieldDescriptor, etc.ProtobufSchemaExtractor.scalaProtobufSchemaValidator.scalaNew files (tests)
protobuf_test.pyProtobufExprShimsSuite.scalaProtobufBatchMergeSuite.scaladata_gen.py(additions)ProtobufMessageGen,PbScalar/PbNested/PbRepeated/PbRepeatedMessageNew files (infrastructure)
main_log.protomodule_a_res.protomodule_b_res.protopredictor_schema.protodevice_req.protogen_nested_proto_data.shmain_log.descFileDescriptorSetModified files
GpuOverrides.scalaGetStructField→GpuGetStructFieldMeta,GetArrayStructFields→GpuGetArrayStructFieldsMetacomplexTypeExtractors.scalaGpuStructFieldOrdinalTag,GpuGetStructFieldMeta,GpuGetArrayStructFieldsMetawithPRUNED_ORDINAL_TAGsupportbasicPhysicalOperators.scalaGpuProjectExecMetadetects protobuf extraction, newforcePostProjectCoalesceparameter onGpuProjectExecRapidsConf.scalaspark.rapids.sql.protobuf.batchMergeAfterProject.enabledSpark340PlusNonDBShims.scalaProtobufExprShims.exprsinto expression rulesGpuBoundAttribute.scalaDeltaProviderBase.scalaGpuProjectExecsignature (forcePostProjectCoalesceparam)run_pyspark_from_build.shspark-protobuf+protobuf-javaJARs, driver classpath setupspark_init_internal.pysupported_ops.mdDesign decisions
1. Reflection-isolated compatibility layer
All Spark expression and protobuf-java descriptor reflection is confined to
SparkProtobufCompat.scala. This isolates version-specific API differences (Spark 3.4 path-based vs 3.5+ bytes-basedbuildDescriptor) from planning logic. Reflection failures produce explicit CPU fallback reasons rather than silent degradation.2. Typed metadata over raw reflection
Instead of passing raw
Option[Any]protobuf defaults or untyped descriptor fields through the planning pipeline, the code uses a typed metadata model (ProtobufExprInfo,ProtobufFieldInfo,ProtobufDefaultValue,ProtobufEnumMetadata). Enum defaults preserve both numeric value (defaultInt) and display name (defaultString), avoiding theClassCastException: EnumValueDescriptor cannot be cast to Stringpitfall.3. Two-level schema projection
Schema projection reduces GPU decode work by only processing fields referenced downstream:
ProjectExec/FilterExec/AggregateExec/SortExec/WindowExecStructType(non-repeated) andArrayType(StructType)(repeated) nested fieldsThe analysis walks
GetStructField/GetArrayStructFieldschains upward through the plan to determine required fields. Expression identity uses semantic equality (not just reference equality) to handle Catalyst optimizer creating duplicate instances.4. Ordinal remapping via TreeNodeTag
When schema projection prunes nested struct children, downstream
GetStructField/GetArrayStructFieldsexpressions must use remapped ordinals pointing into the pruned struct. This is done viaPRUNED_ORDINAL_TAG(TreeNodeTag[Int]), set duringconvertToGpuand read byGpuGetStructFieldMeta.convertToGpu/GpuGetArrayStructFieldsMeta.convertToGpu.Design principle: Operator-specific logic (ordinal remapping) stays in the Meta layer. The runtime classes (
GpuGetStructField,GpuGetArrayStructFields,GpuCanonicalize) remain generic and untouched.5. Post-project batch coalesce
Schema projection can produce narrower batches. When enabled via
spark.rapids.sql.protobuf.batchMergeAfterProject.enabled,GpuProjectExecMetadetects projects that extract from protobuf decode and setsforcePostProjectCoalesce=true, which inserts a post-project coalesce and prevents the optimizer from removing it (viaoutputBatching = null).6. Value equality for array-carrying types
Any type storing raw arrays that participates in expression equality (
GpuFromProtobuf,FlattenedFieldDescriptor,ProtobufDescriptorSource.DescriptorBytes,ProtobufDefaultValue.BinaryValue) overridesequals/hashCodewithjava.util.Arrayscontent-based semantics. This prevents semantically identical metadata from comparing unequal by JVM identity.7. Optional integration with graceful degradation
ProtobufExprShims.exprsloadsProtobufDataToCatalystby reflection. If the class is not on the classpath (no spark-protobuf JAR), it returns an empty map — no error, no GPU override. Class-loading failures (ExceptionInInitializerError,LinkageError) are caught at theErrorlevel to prevent crashing query planning.Test coverage
Python integration tests (57 tests)
Scala unit tests (26 tests)
ProtobufExprShimsSuiteProtobufBatchMergeSuiteTotal: 83 tests
Configurations
spark.rapids.sql.protobuf.batchMergeAfterProject.enabledfalsefrom_protobufdecodeReview Guide
This PR is large (~7,800 lines) but has a well-defined layered architecture. New Scala production code is ~2,020 lines across 6 files; the rest is tests, proto schemas, and shell infrastructure. This guide provides a recommended reading order and key areas to focus on.
Recommended reading order
Read bottom-up from the metadata model to the planning orchestration:
ProtobufSchemaModel.scalaProtobufExprInfo,ProtobufFieldInfo,ProtobufDefaultValue,FlattenedFieldDescriptor,FlattenedSchemaArrays. Value equality for array fields.ProtobufSchemaExtractor.scalacheckScalarEncodingmaps Spark types to protobuf encodings. Cross-precision FLOAT/DOUBLE rejection. Wire type resolution.ProtobufSchemaValidator.scalatoFlattenedFieldDescriptorconverts typed metadata to JNI format.validateFlattenedSchemachecks parent-child consistency, enum metadata.toFlattenedSchemaArraysbuilds parallel arrays.SparkProtobufCompat.scalaPbReflectcached method lookups,extractExprInforeads expression fields,resolveMessageDescriptorwith Spark 3.4/3.5 retry logic,ReflectiveMessageDescriptor/ReflectiveFieldDescriptorwrappers.GpuFromProtobuf.scaladoColumnarJNI call + null propagation,sparkTypeToCudfIdOpttype mapping,equals/hashCodewith deep array equality.complexTypeExtractors.scala(changes)GpuStructFieldOrdinalTag,GpuGetStructFieldMeta,GpuGetArrayStructFieldsMeta— PRUNED_ORDINAL_TAG read + effective ordinal computation.ProtobufExprShims.scala§1: rule + tagexprs,fromProtobufRule,tagExprForGputop half — extract info, validate options, resolve descriptor, analyze fields.ProtobufExprShims.scala§2: schema projectionanalyzeRequiredFields,collectStructFieldReferences,resolveFieldAccessChain,isProtobufStructReference— plan traversal and field reference collection.ProtobufExprShims.scala§3: flatten + convertaddFieldWithChildren,addChildFieldsFromStruct,buildPrunedFieldsMap,buildDecodedSchema,registerPrunedOrdinals,convertToGpu.basicPhysicalOperators.scala(changes)GpuProjectExecMeta: protobuf detection,shouldCoalesceAfterProject,forcePostProjectCoalescewiring.GpuProjectExecLiketrait:coalesceAfter,outputBatching.GpuOverrides.scala+RapidsConf.scala+Spark340PlusNonDBShims.scala(changes)ProtobufExprShimsSuite.scalaProtobufBatchMergeSuite.scalaprotobuf_test.pydata_gen.py(additions)ProtobufMessageGen, encoding functions.Total estimated review time: ~3.5–4 hours for a thorough review.
Key review areas by priority
P0: Correctness-critical
ProtobufExprShims.tagExprForGpu(ProtobufExprShims.scala): This is the most complex function in the PR. It orchestrates the full planning pipeline: extract expression info → validate options → resolve descriptor → analyze fields → compute required fields → flatten schema → validate → set ordinal tags → override data type. Key things to verify:willNotWorkOnGpupaths produce meaningful fallback reasonsanalyzeAllFieldsfailure triggers CPU fallback, not silent empty schemaanalyzeRequiredFields+collectStructFieldReferences(ProtobufExprShims.scala): Plan traversal for schema projection. Verify:ProjectExec,FilterExec,AggregateExec,SortExec,WindowExec; unknown nodes disable pruningisProtobufStructReferenceuses semantic equality as fallback (class +semanticEqualson children), not justeqresolveFieldAccessChaincorrectly walksGetStructFieldchains to the protobuf root expressionGetArrayStructFieldswith empty parent path disables pruning rather than registering a fake top-level requirementregisterPrunedOrdinals(ProtobufExprShims.scala): SetsPRUNED_ORDINAL_TAGon Spark expressions. Verify:convertToGpu(not during analysis/tagging when CPU fallback is still possible)GetStructFieldandGetArrayStructFieldsare handledGpuGetStructFieldMeta.convertToGpu/GpuGetArrayStructFieldsMeta.convertToGpu(complexTypeExtractors.scala): ReadPRUNED_ORDINAL_TAGand pass effective ordinal. Verify:expr.ordinalwhen tag is not present (non-protobuf usage)effectiveNumFieldsforGetArrayStructFieldsuses the child struct's actual field count after pruningGpuFromProtobuf.doColumnar(GpuFromProtobuf.scala): JNI call + null propagation. Verify:mergeAndSetValidity(BinaryOp.BITWISE_AND, input.getBase)— only when input has nullsCudfExceptioninSparkException; PERMISSIVE logs and rethrowsProtobufSchemaDescriptoris lazily initialized (transient) for serialization safetyP1: Robustness
SparkProtobufCompat(SparkProtobufCompat.scala): Reflection layer. Verify:extractExprInfoextractsmessageName,descriptorSource,optionswith proper error handlingresolveMessageDescriptorhandles both path-based (Spark 3.4) and bytes-based (Spark 3.5+) descriptor construction, with retry-on-ClassCastExceptionfor Spark 3.5 path→bytes conversionisGpuSupportedProtoSyntaxrejects proto3, editions, null, and empty stringsLeft/None, never throw through to the optimizerProtobufSchemaValidator(ProtobufSchemaValidator.scala): Flatten-time validation. Verify:ENC_ENUM_STRINGfields must have non-emptyenumValidValuesandenumNameswith matching lengthsencodeDefaultValuehandles allProtobufDefaultValuevariants includingEnumValue(both numeric and string)Value equality (multiple files): Verify
equals/hashCodeoverrides useArrays.equals/Arrays.deepEqualsfor:GpuFromProtobuf(all 16 schema arrays)FlattenedFieldDescriptor(defaultString, enumValidValues, enumNames)ProtobufDescriptorSource.DescriptorBytesProtobufDefaultValue.BinaryValueP2: Performance & Integration
Post-project coalesce (basicPhysicalOperators.scala): Verify:
GpuProjectExecMeta.shouldCoalesceAfterProjectcorrectly detectsProtobufDataToCatalystby class name viaisProtobufDecodeExprforcePostProjectCoalesce=truecausesoutputBatchingto returnnull(notTargetSize), preventing removal by transition rulesisProtobufBatchMergeAfterProjectEnableddefaults tofalseDeltaProviderBase.scala changes: Pattern match updates for new
GpuProjectExec4-parameter signature. Verify themergeIdenticalProjectscorrectly OR'sforcePostProjectCoalesceflags when merging.Integration test infrastructure (run_pyspark_from_build.sh): Verify:
spark-protobufandprotobuf-javaJAR download uses correct Maven coordinates$SPARK_HOME/jars/protobuf-java-*.jarwith fallback mappingPROTOBUF_JARS_AVAILABLEis exported soprotobuf_test.pycan skip when deps are missingPYSP_TEST_spark_driver_extraClassPathThings to watch for
isProtobufStructReferencemust handle both reference equality (eq) and semantic equality for duplicate Catalyst instances created bySimplifyExtractValueOpsor PySpark-JVM serialization. Without this, schema projection splits one decode into N separate single-field decodes.ProtobufExprShims.exprscatchesError(not justException) to handleExceptionInInitializerErrorfrom missing protobuf JAR. Returning empty map means no GPU override attempt.ProtobufSchemaExtractor.checkScalarEncodingexplicitly rejectsDoubleTypemapped to protobufFLOAT(and vice versa) rather than silently coercing.isGpuSupportedProtoSyntaxreturnstrueonly for"proto2"— proto3 and editions are not yet supported on GPU, triggering CPU fallback with an explicit reason.PRUNED_ORDINAL_TAGis set only inconvertToGpu, never duringtagExprForGpu. If it were set during tagging, a subsequent CPU fallback would leave stale tags on Spark expressions.Mapping review to test coverage
tagExprForGpuplanning pipelinetest_from_protobuf_schema_projection_*,test_deep_pruning_*,test_from_protobuf_projection_across_*test_from_protobuf_nested_message_field_access*,test_deep_pruning_*,array struct field meta uses pruned child field countcompat extracts *,compat invokes *,compat retries *,compat distinguishes *extractor preserves *,extractor records *,extractor gives *validator encodes *,validator rejects *,validator returns *test_from_protobuf_enum_cases,test_from_protobuf_*_enum_*(10 tests)test_from_protobuf_default_values_cases,test_from_protobuf_nested_child_default_valuestest_from_protobuf_failfast_malformed_data,test_from_protobuf_permissive_malformed_returns_nullProtobufBatchMergeSuite(4 tests),test_from_protobuf_nested_message_field_access_with_batch_mergeGpuFromProtobuf semantic equality *,protobuf binary defaults *,flattened field descriptor *Protobuf.decodeToStructthrough JNI)Checklists
(Please explain in the PR description how the new code paths are tested, such as names of the new/existing tests that cover them.)