Skip to content
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ Wayang’s Spark platform can now execute end-to-end pipelines on Spark `Dataset
To build a Dataset-backed pipeline:

1. **Use the Dataset-aware plan builder APIs.**
- `PlanBuilder.readParquetAsDataset(...)` (or the Java equivalent) reads Parquet files directly into a Dataset channel.
- `DataQuanta.writeParquetAsDataset(...)` writes a Dataset channel without converting it back to an RDD.
- `PlanBuilder.readParquet(..., preferDataset = true)` (or `JavaPlanBuilder.readParquet(..., ..., true)`) reads Parquet files directly into a Dataset channel.
- `DataQuanta.writeParquet(..., preferDataset = true)` writes a Dataset channel without converting it back to an RDD.
2. **Keep operators dataset-compatible.** Most operators continue to work unchanged; if an operator explicitly prefers RDDs, Wayang will insert the necessary conversions automatically (at an additional cost). Custom operators can expose `DatasetChannel` descriptors to stay in the dataframe world.
3. **Let the optimizer do the rest.** The optimizer now assigns a higher cost to Dataset↔RDD conversions, so once you opt into Dataset sources/sinks the plan will stay in Dataset form by default.

Expand Down
4 changes: 2 additions & 2 deletions guides/spark-datasets.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ Wayang’s Spark backend can now run entire pipelines on Spark `Dataset[Row]` (a
## Enable Dataset sources and sinks

1. **Plan builder APIs:**
- `PlanBuilder.readParquetAsDataset(...)` (Scala/Java) loads Parquet files into a `DatasetChannel` instead of an `RddChannel`.
- `DataQuanta.writeParquetAsDataset(...)` writes a dataset back to Parquet without converting to RDD first.
- `PlanBuilder.readParquet(..., preferDataset = true)` (Scala) or `JavaPlanBuilder.readParquet(..., ..., true)` loads Parquet files into a `DatasetChannel` instead of an `RddChannel`.
- `DataQuanta.writeParquet(..., preferDataset = true)` writes a dataset back to Parquet without converting to RDD first.
2. **Prefer dataset-friendly operators:** Most unary/binary operators accept either channel type, but custom operators can advertise dataset descriptors explicitly. See `DatasetChannel` in `wayang-platforms/wayang-spark` for details.
3. **Let the optimizer keep it:** The optimizer now assigns costs to Dataset↔RDD conversions, so once your plan starts with a dataset channel it will stay in dataset form unless an operator demands an RDD.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1027,11 +1027,10 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I
writeTextFileJava(url, toSerializableFunction(formatterUdf), udfLoad)
}

def writeParquet(url: String, overwrite: Boolean = false)(implicit ev: Out =:= Record): Unit =
writeParquetJava(url, overwrite, preferDataset = false)

def writeParquetAsDataset(url: String, overwrite: Boolean = true)(implicit ev: Out =:= Record): Unit =
writeParquetJava(url, overwrite, preferDataset = true)
def writeParquet(url: String,
overwrite: Boolean = false,
preferDataset: Boolean = false)(implicit ev: Out =:= Record): Unit =
writeParquetJava(url, overwrite, preferDataset)

/**
* Write the data quanta in this instance to a text file. Triggers execution.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,22 @@ class JavaPlanBuilder(wayangCtx: WayangContext, jobName: String) {
* @param projection the projection, if any
* @return [[DataQuantaBuilder]] for the file
*/
def readParquet(url: String, projection: Array[String] = null): UnarySourceDataQuantaBuilder[UnarySourceDataQuantaBuilder[_, Record], Record] =
createSourceBuilder(ParquetSource.create(url, projection))(ClassTag(classOf[Record]))
def readParquet(url: String,
projection: Array[String] = null): UnarySourceDataQuantaBuilder[UnarySourceDataQuantaBuilder[_, Record], Record] =
readParquet(url, projection, preferDataset = false)

/**
* Read a parquet file and provide it as a dataset of [[Record]]s backed by Spark Datasets.
* Read a parquet file and optionally keep it backed by Spark Datasets.
*
* @param url the URL of the Parquet file
* @param projection the projection, if any
* @param preferDataset when {@code true}, emit a Dataset-backed channel
* @return [[DataQuantaBuilder]] for the file
*/
def readParquetAsDataset(url: String, projection: Array[String] = null): UnarySourceDataQuantaBuilder[UnarySourceDataQuantaBuilder[_, Record], Record] =
createSourceBuilder(ParquetSource.create(url, projection).preferDatasetOutput(true))(ClassTag(classOf[Record]))
def readParquet(url: String,
projection: Array[String],
preferDataset: Boolean): UnarySourceDataQuantaBuilder[UnarySourceDataQuantaBuilder[_, Record], Record] =
createSourceBuilder(ParquetSource.create(url, projection).preferDatasetOutput(preferDataset))(ClassTag(classOf[Record]))

/**
* Read a text file from a Google Cloud Storage bucket and provide it as a dataset of [[String]]s, one per line.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,19 +136,13 @@ class PlanBuilder(private[api] val wayangContext: WayangContext, private var job
*
* @param url the URL of the Parquet file
* @param projection the projection, if any
* @param preferDataset when {@code true}, keep the resulting channel backed by Spark Datasets
* @return [[DataQuanta]] of [[Record]]s representing the file
*/
def readParquet(url: String, projection: Array[String] = null): DataQuanta[Record] = load(ParquetSource.create(url, projection))

/**
* Read a parquet file and keep it backed by a Spark Dataset throughout execution.
*
* @param url the URL of the Parquet file
* @param projection the projection, if any
* @return [[DataQuanta]] of [[Record]]s backed by a Spark Dataset when executed on Spark
*/
def readParquetAsDataset(url: String, projection: Array[String] = null): DataQuanta[Record] =
load(ParquetSource.create(url, projection).preferDatasetOutput(true))
def readParquet(url: String,
projection: Array[String] = null,
preferDataset: Boolean = false): DataQuanta[Record] =
load(ParquetSource.create(url, projection).preferDatasetOutput(preferDataset))

/**
* Read a text file from a Google Cloud Storage bucket and provide it as a dataset of [[String]]s, one per line.
Expand Down
Loading