Skip to content

Commit b72b848

Browse files
jose-torresRobert Kruszewski
authored andcommitted
[SPARK-23099][SS] Migrate foreach sink to DataSourceV2
## What changes were proposed in this pull request? Migrate foreach sink to DataSourceV2. Since the previous attempt at this PR apache#20552, we've changed and strictly defined the lifecycle of writer components. This means we no longer need the complicated lifecycle shim from that PR; it just naturally works. ## How was this patch tested? existing tests Author: Jose Torres <[email protected]> Closes apache#20951 from jose-torres/foreach.
1 parent b15a6fd commit b72b848

File tree

5 files changed

+156
-111
lines changed

5 files changed

+156
-111
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala

Lines changed: 0 additions & 68 deletions
This file was deleted.
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.streaming.sources
19+
20+
import org.apache.spark.sql.{Encoder, ForeachWriter, SparkSession}
21+
import org.apache.spark.sql.catalyst.InternalRow
22+
import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder}
23+
import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamWriteSupport}
24+
import org.apache.spark.sql.sources.v2.writer.{DataWriter, DataWriterFactory, SupportsWriteInternalRow, WriterCommitMessage}
25+
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
26+
import org.apache.spark.sql.streaming.OutputMode
27+
import org.apache.spark.sql.types.StructType
28+
29+
/**
30+
* A [[org.apache.spark.sql.sources.v2.DataSourceV2]] for forwarding data into the specified
31+
* [[ForeachWriter]].
32+
*
33+
* @param writer The [[ForeachWriter]] to process all data.
34+
* @tparam T The expected type of the sink.
35+
*/
36+
case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) extends StreamWriteSupport {
37+
override def createStreamWriter(
38+
queryId: String,
39+
schema: StructType,
40+
mode: OutputMode,
41+
options: DataSourceOptions): StreamWriter = {
42+
new StreamWriter with SupportsWriteInternalRow {
43+
override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
44+
override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
45+
46+
override def createInternalRowWriterFactory(): DataWriterFactory[InternalRow] = {
47+
val encoder = encoderFor[T].resolveAndBind(
48+
schema.toAttributes,
49+
SparkSession.getActiveSession.get.sessionState.analyzer)
50+
ForeachWriterFactory(writer, encoder)
51+
}
52+
53+
override def toString: String = "ForeachSink"
54+
}
55+
}
56+
}
57+
58+
case class ForeachWriterFactory[T: Encoder](
59+
writer: ForeachWriter[T],
60+
encoder: ExpressionEncoder[T])
61+
extends DataWriterFactory[InternalRow] {
62+
override def createDataWriter(
63+
partitionId: Int,
64+
attemptNumber: Int,
65+
epochId: Long): ForeachDataWriter[T] = {
66+
new ForeachDataWriter(writer, encoder, partitionId, epochId)
67+
}
68+
}
69+
70+
/**
71+
* A [[DataWriter]] which writes data in this partition to a [[ForeachWriter]].
72+
* @param writer The [[ForeachWriter]] to process all data.
73+
* @param encoder An encoder which can convert [[InternalRow]] to the required type [[T]]
74+
* @param partitionId
75+
* @param epochId
76+
* @tparam T The type expected by the writer.
77+
*/
78+
class ForeachDataWriter[T : Encoder](
79+
writer: ForeachWriter[T],
80+
encoder: ExpressionEncoder[T],
81+
partitionId: Int,
82+
epochId: Long)
83+
extends DataWriter[InternalRow] {
84+
85+
// If open returns false, we should skip writing rows.
86+
private val opened = writer.open(partitionId, epochId)
87+
88+
override def write(record: InternalRow): Unit = {
89+
if (!opened) return
90+
91+
try {
92+
writer.process(encoder.fromRow(record))
93+
} catch {
94+
case t: Throwable =>
95+
writer.close(t)
96+
throw t
97+
}
98+
}
99+
100+
override def commit(): WriterCommitMessage = {
101+
writer.close(null)
102+
ForeachWriterCommitMessage
103+
}
104+
105+
override def abort(): Unit = {}
106+
}
107+
108+
/**
109+
* An empty [[WriterCommitMessage]]. [[ForeachWriter]] implementations have no global coordination.
110+
*/
111+
case object ForeachWriterCommitMessage extends WriterCommitMessage

sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.spark.sql.execution.command.DDLUtils
2828
import org.apache.spark.sql.execution.datasources.DataSource
2929
import org.apache.spark.sql.execution.streaming._
3030
import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
31-
import org.apache.spark.sql.execution.streaming.sources.{MemoryPlanV2, MemorySinkV2}
31+
import org.apache.spark.sql.execution.streaming.sources.{ForeachWriterProvider, MemoryPlanV2, MemorySinkV2}
3232
import org.apache.spark.sql.sources.v2.StreamWriteSupport
3333

3434
/**
@@ -269,7 +269,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
269269
query
270270
} else if (source == "foreach") {
271271
assertNotPartitioned("foreach")
272-
val sink = new ForeachSink[T](foreachWriter)(ds.exprEnc)
272+
val sink = new ForeachWriterProvider[T](foreachWriter)(ds.exprEnc)
273273
df.sparkSession.sessionState.streamingQueryManager.startQuery(
274274
extraOptions.get("queryName"),
275275
extraOptions.get("checkpointLocation"),

0 commit comments

Comments
 (0)