diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 4f868f4963..496284ce65 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -79,6 +79,8 @@ object CometConf extends ShimCometConf { val COMET_EXPR_CONFIG_PREFIX: String = s"$COMET_PREFIX.expression"; + val COMET_OPERATOR_CONFIG_PREFIX: String = s"$COMET_PREFIX.operator"; + val COMET_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.enabled") .category(CATEGORY_EXEC) .doc( @@ -747,6 +749,18 @@ object CometConf extends ShimCometConf { s"${CometConf.COMET_EXPR_CONFIG_PREFIX}.${exprClass.getSimpleName}.allowIncompatible" } + def isOperatorAllowIncompat(name: String, conf: SQLConf = SQLConf.get): Boolean = { + getBooleanConf(getOperatorAllowIncompatConfigKey(name), defaultValue = false, conf) + } + + def getOperatorAllowIncompatConfigKey(name: String): String = { + s"${CometConf.COMET_OPERATOR_CONFIG_PREFIX}.$name.allowIncompatible" + } + + def getOperatorAllowIncompatConfigKey(exprClass: Class[_]): String = { + s"${CometConf.COMET_OPERATOR_CONFIG_PREFIX}.${exprClass.getSimpleName}.allowIncompatible" + } + def getBooleanConf(name: String, defaultValue: Boolean, conf: SQLConf): Boolean = { conf.getConfString(name, defaultValue.toString).toLowerCase(Locale.ROOT) == "true" } diff --git a/spark/src/main/scala/org/apache/comet/serde/CometOperatorSerde.scala b/spark/src/main/scala/org/apache/comet/serde/CometOperatorSerde.scala index c6a95ec88a..25ea2c1233 100644 --- a/spark/src/main/scala/org/apache/comet/serde/CometOperatorSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/CometOperatorSerde.scala @@ -29,6 +29,22 @@ import org.apache.comet.serde.OperatorOuterClass.Operator */ trait CometOperatorSerde[T <: SparkPlan] { + /** + * Get the optional Comet configuration entry that is used to enable or disable native support + * for this operator. + */ + def enabledConfig: Option[ConfigEntry[Boolean]] + + /** + * Determine the support level of the operator based on its attributes. + * + * @param operator + * The Spark operator. + * @return + * Support level (Compatible, Incompatible, or Unsupported). + */ + def getSupportLevel(operator: T): SupportLevel = Compatible(None) + /** * Convert a Spark operator into a protocol buffer representation that can be passed into native * code. @@ -49,9 +65,4 @@ trait CometOperatorSerde[T <: SparkPlan] { builder: Operator.Builder, childOp: Operator*): Option[OperatorOuterClass.Operator] - /** - * Get the optional Comet configuration entry that is used to enable or disable native support - * for this operator. - */ - def enabledConfig: Option[ConfigEntry[Boolean]] } diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 909f6c175b..edae4453a7 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -47,7 +47,7 @@ import org.apache.comet.serde.OperatorOuterClass.Operator import org.apache.comet.serde.Types.{DataType => ProtoDataType} import org.apache.comet.serde.Types.DataType._ import org.apache.comet.serde.literals.CometLiteral -import org.apache.comet.serde.operator.{CometLocalTableScan, CometProject, CometSort, CometSortOrder} +import org.apache.comet.serde.operator._ import org.apache.comet.shims.CometExprShim /** @@ -61,6 +61,15 @@ object QueryPlanSerde extends Logging with CometExprShim { private val opSerdeMap: Map[Class[_ <: SparkPlan], CometOperatorSerde[_]] = Map( classOf[ProjectExec] -> CometProject, + classOf[FilterExec] -> CometFilter, + classOf[LocalLimitExec] -> CometLocalLimit, + classOf[GlobalLimitExec] -> CometGlobalLimit, + classOf[ExpandExec] -> CometExpand, + classOf[HashAggregateExec] -> CometHashAggregate, + classOf[ObjectHashAggregateExec] -> CometObjectHashAggregate, + classOf[BroadcastHashJoinExec] -> CometBroadcastHashJoin, + classOf[ShuffledHashJoinExec] -> CometShuffleHashJoin, + classOf[SortMergeJoinExec] -> CometSortMergeJoin, classOf[SortExec] -> CometSort, classOf[LocalTableScanExec] -> CometLocalTableScan) @@ -924,51 +933,30 @@ object QueryPlanSerde extends Logging with CometExprShim { val builder = OperatorOuterClass.Operator.newBuilder().setPlanId(op.id) childOp.foreach(builder.addChildren) + // look for registered handler first + val serde = opSerdeMap.get(op.getClass) + serde match { + case Some(handler) if isOperatorEnabled(handler, op) => + val opSerde = handler.asInstanceOf[CometOperatorSerde[SparkPlan]] + val maybeConverted = opSerde.convert(op, builder, childOp: _*) + if (maybeConverted.isDefined) { + return maybeConverted + } + case _ => + } + + // now handle special cases that cannot be handled as a simple mapping from class name + // and see if operator can be used as a sink op match { // Fully native scan for V1 case scan: CometScanExec if scan.scanImpl == CometConf.SCAN_NATIVE_DATAFUSION => CometNativeScan.convert(scan, builder, childOp: _*) - case filter: FilterExec if CometConf.COMET_EXEC_FILTER_ENABLED.get(conf) => - CometFilter.convert(filter, builder, childOp: _*) - - case limit: LocalLimitExec if CometConf.COMET_EXEC_LOCAL_LIMIT_ENABLED.get(conf) => - CometLocalLimit.convert(limit, builder, childOp: _*) - - case globalLimitExec: GlobalLimitExec - if CometConf.COMET_EXEC_GLOBAL_LIMIT_ENABLED.get(conf) => - CometGlobalLimit.convert(globalLimitExec, builder, childOp: _*) - - case expand: ExpandExec if CometConf.COMET_EXEC_EXPAND_ENABLED.get(conf) => - CometExpand.convert(expand, builder, childOp: _*) - case _: WindowExec if CometConf.COMET_EXEC_WINDOW_ENABLED.get(conf) => withInfo(op, "Window expressions are not supported") None - case aggregate: HashAggregateExec if CometConf.COMET_EXEC_AGGREGATE_ENABLED.get(conf) => - CometHashAggregate.convert(aggregate, builder, childOp: _*) - - case aggregate: ObjectHashAggregateExec - if CometConf.COMET_EXEC_AGGREGATE_ENABLED.get(conf) => - CometObjectHashAggregate.convert(aggregate, builder, childOp: _*) - - case join: BroadcastHashJoinExec - if CometConf.COMET_EXEC_BROADCAST_HASH_JOIN_ENABLED.get(conf) => - CometBroadcastHashJoin.convert(join, builder, childOp: _*) - - case join: ShuffledHashJoinExec if CometConf.COMET_EXEC_HASH_JOIN_ENABLED.get(conf) => - CometShuffleHashJoin.convert(join, builder, childOp: _*) - - case join: SortMergeJoinExec => - if (CometConf.COMET_EXEC_SORT_MERGE_JOIN_ENABLED.get(conf)) { - CometSortMergeJoin.convert(join, builder, childOp: _*) - } else { - withInfo(join, "SortMergeJoin is not enabled") - None - } - case op if isCometSink(op) => val supportedTypes = op.output.forall(a => supportedDataType(a.dataType, allowComplex = true)) @@ -1023,28 +1011,58 @@ object QueryPlanSerde extends Logging with CometExprShim { None } - case op => - opSerdeMap.get(op.getClass) match { - case Some(handler) => - handler.enabledConfig.foreach { enabledConfig => - if (!enabledConfig.get(op.conf)) { - withInfo( - op, - s"Native support for operator ${op.getClass.getSimpleName} is disabled. " + - s"Set ${enabledConfig.key}=true to enable it.") - return None - } - } - handler.asInstanceOf[CometOperatorSerde[SparkPlan]].convert(op, builder, childOp: _*) - case _ => - // Emit warning if: - // 1. it is not Spark shuffle operator, which is handled separately - // 2. it is not a Comet operator - if (!op.nodeName.contains("Comet") && !op.isInstanceOf[ShuffleExchangeExec]) { - withInfo(op, s"unsupported Spark operator: ${op.nodeName}") - } - None + case _ => + // Emit warning if: + // 1. it is not Spark shuffle operator, which is handled separately + // 2. it is not a Comet operator + if (serde.isEmpty && !op.nodeName.contains("Comet") && + !op.isInstanceOf[ShuffleExchangeExec]) { + withInfo(op, s"unsupported Spark operator: ${op.nodeName}") } + None + } + } + + private def isOperatorEnabled(handler: CometOperatorSerde[_], op: SparkPlan): Boolean = { + val enabled = handler.enabledConfig.forall(_.get(op.conf)) + val opName = op.getClass.getSimpleName + if (enabled) { + val opSerde = handler.asInstanceOf[CometOperatorSerde[SparkPlan]] + opSerde.getSupportLevel(op) match { + case Unsupported(notes) => + withInfo(op, notes.getOrElse("")) + false + case Incompatible(notes) => + val allowIncompat = CometConf.isOperatorAllowIncompat(opName) + val incompatConf = CometConf.getOperatorAllowIncompatConfigKey(opName) + if (allowIncompat) { + if (notes.isDefined) { + logWarning( + s"Comet supports $opName when $incompatConf=true " + + s"but has notes: ${notes.get}") + } + true + } else { + val optionalNotes = notes.map(str => s" ($str)").getOrElse("") + withInfo( + op, + s"$opName is not fully compatible with Spark$optionalNotes. " + + s"To enable it anyway, set $incompatConf=true. " + + s"${CometConf.COMPAT_GUIDE}.") + false + } + case Compatible(notes) => + if (notes.isDefined) { + logWarning(s"Comet supports $opName but has notes: ${notes.get}") + } + true + } + } else { + withInfo( + op, + s"Native support for operator $opName is disabled. " + + s"Set ${handler.enabledConfig.get.key}=true to enable it.") + false } } @@ -1144,25 +1162,3 @@ object QueryPlanSerde extends Logging with CometExprShim { } } - -sealed trait SupportLevel - -/** - * Comet either supports this feature with full compatibility with Spark, or may have known - * differences in some specific edge cases that are unlikely to be an issue for most users. - * - * Any compatibility differences are noted in the - * [[https://datafusion.apache.org/comet/user-guide/compatibility.html Comet Compatibility Guide]]. - */ -case class Compatible(notes: Option[String] = None) extends SupportLevel - -/** - * Comet supports this feature but results can be different from Spark. - * - * Any compatibility differences are noted in the - * [[https://datafusion.apache.org/comet/user-guide/compatibility.html Comet Compatibility Guide]]. - */ -case class Incompatible(notes: Option[String] = None) extends SupportLevel - -/** Comet does not support this feature */ -case class Unsupported(notes: Option[String] = None) extends SupportLevel diff --git a/spark/src/main/scala/org/apache/comet/serde/SupportLevel.scala b/spark/src/main/scala/org/apache/comet/serde/SupportLevel.scala new file mode 100644 index 0000000000..d5a524077d --- /dev/null +++ b/spark/src/main/scala/org/apache/comet/serde/SupportLevel.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.serde + +sealed trait SupportLevel + +/** + * Comet either supports this feature with full compatibility with Spark, or may have known + * differences in some specific edge cases that are unlikely to be an issue for most users. + * + * Any compatibility differences are noted in the + * [[https://datafusion.apache.org/comet/user-guide/compatibility.html Comet Compatibility Guide]]. + */ +case class Compatible(notes: Option[String] = None) extends SupportLevel + +/** + * Comet supports this feature but results can be different from Spark. + * + * Any compatibility differences are noted in the + * [[https://datafusion.apache.org/comet/user-guide/compatibility.html Comet Compatibility Guide]]. + */ +case class Incompatible(notes: Option[String] = None) extends SupportLevel + +/** Comet does not support this feature */ +case class Unsupported(notes: Option[String] = None) extends SupportLevel diff --git a/spark/src/main/scala/org/apache/comet/serde/CometAggregate.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometAggregate.scala similarity index 98% rename from spark/src/main/scala/org/apache/comet/serde/CometAggregate.scala rename to spark/src/main/scala/org/apache/comet/serde/operator/CometAggregate.scala index f0cf244f1e..93e5d52c8d 100644 --- a/spark/src/main/scala/org/apache/comet/serde/CometAggregate.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometAggregate.scala @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.comet.serde +package org.apache.comet.serde.operator import scala.jdk.CollectionConverters._ @@ -28,6 +28,7 @@ import org.apache.spark.sql.types.MapType import org.apache.comet.{CometConf, ConfigEntry} import org.apache.comet.CometSparkSessionExtensions.withInfo +import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass} import org.apache.comet.serde.OperatorOuterClass.{AggregateMode => CometAggregateMode, Operator} import org.apache.comet.serde.QueryPlanSerde.{aggExprToProto, exprToProto} diff --git a/spark/src/main/scala/org/apache/comet/serde/CometExpand.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometExpand.scala similarity index 94% rename from spark/src/main/scala/org/apache/comet/serde/CometExpand.scala rename to spark/src/main/scala/org/apache/comet/serde/operator/CometExpand.scala index 5979eed4dc..ab5a58b064 100644 --- a/spark/src/main/scala/org/apache/comet/serde/CometExpand.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometExpand.scala @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.comet.serde +package org.apache.comet.serde.operator import scala.jdk.CollectionConverters._ @@ -26,6 +26,7 @@ import org.apache.spark.sql.execution.ExpandExec import org.apache.comet.{CometConf, ConfigEntry} import org.apache.comet.CometSparkSessionExtensions.withInfo +import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass} import org.apache.comet.serde.OperatorOuterClass.Operator import org.apache.comet.serde.QueryPlanSerde.exprToProto diff --git a/spark/src/main/scala/org/apache/comet/serde/CometFilter.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometFilter.scala similarity index 94% rename from spark/src/main/scala/org/apache/comet/serde/CometFilter.scala rename to spark/src/main/scala/org/apache/comet/serde/operator/CometFilter.scala index 1638750b5f..96771b902f 100644 --- a/spark/src/main/scala/org/apache/comet/serde/CometFilter.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometFilter.scala @@ -17,12 +17,13 @@ * under the License. */ -package org.apache.comet.serde +package org.apache.comet.serde.operator import org.apache.spark.sql.execution.FilterExec import org.apache.comet.{CometConf, ConfigEntry} import org.apache.comet.CometSparkSessionExtensions.withInfo +import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass} import org.apache.comet.serde.OperatorOuterClass.Operator import org.apache.comet.serde.QueryPlanSerde.exprToProto diff --git a/spark/src/main/scala/org/apache/comet/serde/CometGlobalLimit.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometGlobalLimit.scala similarity index 93% rename from spark/src/main/scala/org/apache/comet/serde/CometGlobalLimit.scala rename to spark/src/main/scala/org/apache/comet/serde/operator/CometGlobalLimit.scala index 774e1ad77e..b2df3cf72c 100644 --- a/spark/src/main/scala/org/apache/comet/serde/CometGlobalLimit.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometGlobalLimit.scala @@ -17,12 +17,13 @@ * under the License. */ -package org.apache.comet.serde +package org.apache.comet.serde.operator import org.apache.spark.sql.execution.GlobalLimitExec import org.apache.comet.{CometConf, ConfigEntry} import org.apache.comet.CometSparkSessionExtensions.withInfo +import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass} import org.apache.comet.serde.OperatorOuterClass.Operator object CometGlobalLimit extends CometOperatorSerde[GlobalLimitExec] { diff --git a/spark/src/main/scala/org/apache/comet/serde/CometHashJoin.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometHashJoin.scala similarity index 96% rename from spark/src/main/scala/org/apache/comet/serde/CometHashJoin.scala rename to spark/src/main/scala/org/apache/comet/serde/operator/CometHashJoin.scala index 67fb67a2e7..c58384e3a9 100644 --- a/spark/src/main/scala/org/apache/comet/serde/CometHashJoin.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometHashJoin.scala @@ -17,17 +17,18 @@ * under the License. */ -package org.apache.comet.serde +package org.apache.comet.serde.operator import scala.jdk.CollectionConverters._ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} -import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftAnti, LeftOuter, LeftSemi, RightOuter} +import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashJoin, ShuffledHashJoinExec} import org.apache.comet.{CometConf, ConfigEntry} import org.apache.comet.CometSparkSessionExtensions.withInfo +import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass} import org.apache.comet.serde.OperatorOuterClass.{BuildSide, JoinType, Operator} import org.apache.comet.serde.QueryPlanSerde.exprToProto diff --git a/spark/src/main/scala/org/apache/comet/serde/CometLocalLimit.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometLocalLimit.scala similarity index 94% rename from spark/src/main/scala/org/apache/comet/serde/CometLocalLimit.scala rename to spark/src/main/scala/org/apache/comet/serde/operator/CometLocalLimit.scala index 1347b12907..3e5fbdebb3 100644 --- a/spark/src/main/scala/org/apache/comet/serde/CometLocalLimit.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometLocalLimit.scala @@ -17,12 +17,13 @@ * under the License. */ -package org.apache.comet.serde +package org.apache.comet.serde.operator import org.apache.spark.sql.execution.LocalLimitExec import org.apache.comet.{CometConf, ConfigEntry} import org.apache.comet.CometSparkSessionExtensions.withInfo +import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass} import org.apache.comet.serde.OperatorOuterClass.Operator object CometLocalLimit extends CometOperatorSerde[LocalLimitExec] { diff --git a/spark/src/main/scala/org/apache/comet/serde/CometNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala similarity index 98% rename from spark/src/main/scala/org/apache/comet/serde/CometNativeScan.scala rename to spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala index 476313a9d1..2bc8b5526a 100644 --- a/spark/src/main/scala/org/apache/comet/serde/CometNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.comet.serde +package org.apache.comet.serde.operator import scala.collection.mutable.ListBuffer import scala.jdk.CollectionConverters._ @@ -35,6 +35,7 @@ import org.apache.comet.{CometConf, ConfigEntry} import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.objectstore.NativeConfig import org.apache.comet.parquet.CometParquetUtils +import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass} import org.apache.comet.serde.ExprOuterClass.Expr import org.apache.comet.serde.OperatorOuterClass.Operator import org.apache.comet.serde.QueryPlanSerde.{exprToProto, serializeDataType} diff --git a/spark/src/main/scala/org/apache/comet/serde/CometSortMergeJoin.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometSortMergeJoin.scala similarity index 94% rename from spark/src/main/scala/org/apache/comet/serde/CometSortMergeJoin.scala rename to spark/src/main/scala/org/apache/comet/serde/operator/CometSortMergeJoin.scala index 5f926f06e8..a22230d7aa 100644 --- a/spark/src/main/scala/org/apache/comet/serde/CometSortMergeJoin.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometSortMergeJoin.scala @@ -17,17 +17,18 @@ * under the License. */ -package org.apache.comet.serde +package org.apache.comet.serde.operator import scala.jdk.CollectionConverters._ import org.apache.spark.sql.catalyst.expressions.{Ascending, Expression, ExpressionSet, SortOrder} -import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftAnti, LeftOuter, LeftSemi, RightOuter} +import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.execution.joins.SortMergeJoinExec -import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, ShortType, StringType, TimestampNTZType} +import org.apache.spark.sql.types._ import org.apache.comet.{CometConf, ConfigEntry} import org.apache.comet.CometSparkSessionExtensions.withInfo +import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass} import org.apache.comet.serde.OperatorOuterClass.{JoinType, Operator} import org.apache.comet.serde.QueryPlanSerde.exprToProto diff --git a/spark/src/main/scala/org/apache/comet/serde/CometWindow.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometWindow.scala similarity index 97% rename from spark/src/main/scala/org/apache/comet/serde/CometWindow.scala rename to spark/src/main/scala/org/apache/comet/serde/operator/CometWindow.scala index 7e963d6326..8f560bed5b 100644 --- a/spark/src/main/scala/org/apache/comet/serde/CometWindow.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometWindow.scala @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.comet.serde +package org.apache.comet.serde.operator import scala.jdk.CollectionConverters._ @@ -27,6 +27,7 @@ import org.apache.spark.sql.execution.window.WindowExec import org.apache.comet.{CometConf, ConfigEntry} import org.apache.comet.CometSparkSessionExtensions.withInfo +import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass} import org.apache.comet.serde.OperatorOuterClass.Operator import org.apache.comet.serde.QueryPlanSerde.{exprToProto, windowExprToProto}