-
Notifications
You must be signed in to change notification settings - Fork 258
Description
Looking at Auron source code there are number of things that look more convenient and easier to understand
Check for conversion
Checking if operator is supported is usually in each matching operator and in rare cases in the global conversion, the rare cases are when it depend on other stuff (for example final aggregation requires native partial aggregation)
this makes it very clear and close to the actual operator
abstract class NativeSortBase(
sortOrder: Seq[SortOrder],
global: Boolean,
override val child: SparkPlan)
extends UnaryExecNode
with NativeSupports {
// ...
private def nativeSortExprs = sortOrder.map { sortOrder =>
PhysicalExprNode
.newBuilder()
.setSort(
PhysicalSortExprNode
.newBuilder()
.setExpr(NativeConverters.convertExpr(sortOrder.child))
.setAsc(sortOrder.direction == Ascending)
.setNullsFirst(sortOrder.nullOrdering == NullsFirst)
.build())
.build()
}
// check whether native converting is supported
nativeSortExprs
// ....
}From apache/auron#NativeSortBase.scala
Building protobuf
It looks like there build of the protobuf is done in each operator, making the conversion and the actual operator closer
again, making it cleaner
val nativeFilterExec = FilterExecNode
.newBuilder()
.setInput(inputRDD.nativePlan(inputPartition, taskContext))
.addAllExpr(nativeFilterExprs.asJava)
.build()
PhysicalPlanNode.newBuilder().setFilter(nativeFilterExec).build()From apache/auron#NativeFilterBase.scala
Different Spark versions code
Support for different spark versions:
It seems like they have some decorator to for conditional code based on specific spark version which looks much cleaner and easy to understand and update (currently there are shim files and we need to add those to maven regarding version, and this looks cleaner, adding new version doesn't seem like a lot of work):
case class NativeFilterExec(condition: Expression, override val child: SparkPlan)
extends NativeFilterBase(condition, child) {
@sparkver("3.2 / 3.3 / 3.4 / 3.5")
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)
@sparkver("3.0 / 3.1")
override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan =
copy(child = newChildren.head)
}From apache/auron#NativeFilterExec.scala
Execution
I'm not sure if Auron does operator fusion to save the cost of going back and forth between the JVM and Rust for each operator. but if they does, their current implementation look cleaner.
our operators are not really doing anything, they are just placeholder and the actual execution happen in another place making it harder to understand for new comers
from Auron code it looks they
abstract class NativeSortBase(
sortOrder: Seq[SortOrder],
global: Boolean,
override val child: SparkPlan)
extends UnaryExecNode
with NativeSupports {
// ...
override def doExecuteNative(): NativeRDD = {
val inputRDD = NativeHelper.executeNative(child)
val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil)
val nativeSortExprs = this.nativeSortExprs
new NativeRDD(
sparkContext,
nativeMetrics,
rddPartitions = inputRDD.partitions,
rddPartitioner = inputRDD.partitioner,
rddDependencies = new OneToOneDependency(inputRDD) :: Nil,
inputRDD.isShuffleReadFull,
(partition, taskContext) => {
val inputPartition = inputRDD.partitions(partition.index)
val nativeSortExec = SortExecNode
.newBuilder()
.setInput(inputRDD.nativePlan(inputPartition, taskContext))
.addAllExpr(nativeSortExprs.asJava)
.build()
PhysicalPlanNode.newBuilder().setSort(nativeSortExec).build()
},
friendlyName = "NativeRDD.Sort")
}
}