Skip to content

Commit e37e432

Browse files
committed
edit
1 parent 5d231f5 commit e37e432

File tree

1 file changed

+27
-14
lines changed

1 file changed

+27
-14
lines changed

docs/source/contributor-guide/adding_a_new_operator.md

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,20 @@ This guide explains how to add support for a new Spark physical operator in Apac
2323

2424
## Overview
2525

26-
`CometExecRule` is responsible for replacing Spark operators with Comet operators. There are different approaches to implementing Comet operators depending on where they execute and how they integrate with the native execution engine.
26+
`CometExecRule` is responsible for replacing Spark operators with Comet operators. There are different approaches to
27+
implementing Comet operators depending on where they execute and how they integrate with the native execution engine.
2728

2829
### Types of Comet Operators
2930

30-
`CometExecRule` maintains two distinct maps of operators (see `CometExecRule.scala:54-83`):
31+
`CometExecRule` maintains two distinct maps of operators:
3132

3233
#### 1. Native Operators (`nativeExecs` map)
3334

34-
These operators run entirely in native Rust code and are the primary way to accelerate Spark workloads. Native operators are registered in the `nativeExecs` map in `CometExecRule.scala:57-71`.
35+
These operators run entirely in native Rust code and are the primary way to accelerate Spark workloads. Native
36+
operators are registered in the `nativeExecs` map in `CometExecRule.scala`.
37+
38+
Key characteristics of native operators:
3539

36-
For native operators:
3740
- They are converted to their corresponding native protobuf representation
3841
- They execute as DataFusion operators in the native engine
3942
- The `CometOperatorSerde` implementation handles enable/disable checks, support validation, and protobuf serialization
@@ -42,24 +45,28 @@ Examples: `ProjectExec`, `FilterExec`, `SortExec`, `HashAggregateExec`, `SortMer
4245

4346
#### 2. Sink Operators (`sinks` map)
4447

45-
Sink operators serve as entry points (data sources) for native execution blocks. They are registered in the `sinks` map in `CometExecRule.scala:76-81`.
48+
Sink operators serve as entry points (data sources) for native execution blocks. They are registered in the `sinks`
49+
map in `CometExecRule.scala`.
4650

4751
Key characteristics of sinks:
48-
- They become `ScanExec` operators in the native plan (see `operator2Proto` in `CometExecRule.scala:810-862`)
52+
53+
- They become `ScanExec` operators in the native plan (see `operator2Proto` in `CometExecRule.scala`)
4954
- They can be leaf nodes that feed data into native execution blocks
5055
- They are wrapped with `CometScanWrapper` or `CometSinkPlaceHolder` during plan transformation
5156
- Examples include operators that bring data from various sources into native execution
5257

5358
Examples: `UnionExec`, `CoalesceExec`, `CollectLimitExec`, `TakeOrderedAndProjectExec`
5459

5560
Special sinks (not in the `sinks` map but also treated as sinks):
61+
5662
- `CometScanExec` - File scans
5763
- `CometSparkToColumnarExec` - Conversion from Spark row format
5864
- `ShuffleExchangeExec` / `BroadcastExchangeExec` - Exchange operators
5965

6066
#### 3. Comet JVM Operators
6167

62-
These operators run in the JVM but are part of the Comet execution path. For JVM operators, all checks happen in `CometExecRule` rather than using `CometOperatorSerde`, because they don't need protobuf serialization.
68+
These operators run in the JVM but are part of the Comet execution path. For JVM operators, all checks happen
69+
in `CometExecRule` rather than using `CometOperatorSerde`, because they don't need protobuf serialization.
6370

6471
Examples: `CometBroadcastExchangeExec`, `CometShuffleExchangeExec`
6572

@@ -68,20 +75,25 @@ Examples: `CometBroadcastExchangeExec`, `CometShuffleExchangeExec`
6875
When adding a new operator, choose based on these criteria:
6976

7077
**Use Native Operators when:**
78+
7179
- The operator transforms data (e.g., project, filter, sort, aggregate, join)
7280
- The operator has a direct DataFusion equivalent or custom implementation
7381
- The operator consumes native child operators and produces native output
7482
- The operator is in the middle of an execution pipeline
7583

7684
**Use Sink Operators when:**
85+
7786
- The operator serves as a data source for native execution (becomes a `ScanExec`)
7887
- The operator brings data from non-native sources (e.g., `UnionExec` combining multiple inputs)
7988
- The operator is typically a leaf or near-leaf node in the execution tree
8089
- The operator needs special handling to interface with the native engine
8190

8291
**Implementation Note for Sinks:**
8392

84-
Sink operators are handled specially in `CometExecRule.operator2Proto` (lines 810-862). Instead of converting to their own operator type, they are converted to `ScanExec` in the native plan. This allows them to serve as entry points for native execution blocks. The original Spark operator is wrapped with `CometScanWrapper` or `CometSinkPlaceHolder` which manages the boundary between JVM and native execution.
93+
Sink operators are handled specially in `CometExecRule.operator2Proto`. Instead of converting to their own operator
94+
type, they are converted to `ScanExec` in the native plan. This allows them to serve as entry points for native
95+
execution blocks. The original Spark operator is wrapped with `CometScanWrapper` or `CometSinkPlaceHolder` which
96+
manages the boundary between JVM and native execution.
8597

8698
## Implementing a Native Operator
8799

@@ -135,7 +147,8 @@ The `CometOperatorSerde` trait provides several key methods:
135147
- `convert(op: T, builder: Operator.Builder, childOp: Operator*): Option[Operator]` - Converts to protobuf
136148
- `createExec(nativeOp: Operator, op: T): CometNativeExec` - Creates the Comet execution operator wrapper
137149

138-
The validation workflow in `CometExecRule.isOperatorEnabled` (lines 876-917):
150+
The validation workflow in `CometExecRule.isOperatorEnabled`:
151+
139152
1. Checks if the operator is enabled via `enabledConfig`
140153
2. Calls `getSupportLevel()` to determine compatibility
141154
3. Handles Compatible/Incompatible/Unsupported cases with appropriate fallback messages
@@ -269,7 +282,7 @@ Add your operator to the appropriate map in `CometExecRule.scala`:
269282

270283
#### For Native Operators
271284

272-
Add to the `nativeExecs` map (`CometExecRule.scala:57-71`):
285+
Add to the `nativeExecs` map (`CometExecRule.scala`):
273286

274287
```scala
275288
val nativeExecs: Map[Class[_ <: SparkPlan], CometOperatorSerde[_]] =
@@ -283,7 +296,7 @@ val nativeExecs: Map[Class[_ <: SparkPlan], CometOperatorSerde[_]] =
283296

284297
#### For Sink Operators
285298

286-
If your operator is a sink (becomes a `ScanExec` in the native plan), add to the `sinks` map (`CometExecRule.scala:76-81`):
299+
If your operator is a sink (becomes a `ScanExec` in the native plan), add to the `sinks` map (`CometExecRule.scala`):
287300

288301
```scala
289302
val sinks: Map[Class[_ <: SparkPlan], CometOperatorSerde[_]] =
@@ -469,7 +482,7 @@ case class CometYourSinkExec(
469482
**Key Points:**
470483

471484
- Extend `CometSink[T]` which provides the `convert()` method that transforms the operator to `ScanExec`
472-
- The `CometSink.convert()` method (in `spark/src/main/scala/org/apache/comet/serde/operator/CometSink.scala:40-80`) automatically handles:
485+
- The `CometSink.convert()` method (in `CometSink.scala`) automatically handles:
473486
- Data type validation
474487
- Conversion to `ScanExec` in the native plan
475488
- Setting FFI safety flags
@@ -478,7 +491,7 @@ case class CometYourSinkExec(
478491

479492
### Step 2: Register the Sink
480493

481-
Add your sink to the `sinks` map in `CometExecRule.scala:76-81`:
494+
Add your sink to the `sinks` map in `CometExecRule.scala`:
482495

483496
```scala
484497
val sinks: Map[Class[_ <: SparkPlan], CometOperatorSerde[_]] =
@@ -491,7 +504,7 @@ val sinks: Map[Class[_ <: SparkPlan], CometOperatorSerde[_]] =
491504

492505
### Step 3: Add Configuration
493506

494-
Add a configuration entry in `common/src/main/scala/org/apache/comet/CometConf.scala`:
507+
Add a configuration entry in `CometConf.scala`:
495508

496509
```scala
497510
val COMET_EXEC_YOUR_SINK_ENABLED: ConfigEntry[Boolean] =

0 commit comments

Comments
 (0)