Skip to content

Commit be77e1e

Browse files
ericm-dbHeartSaVioR
authored andcommitted
[SPARK-50770][SS] Removing package scope for transformWithState operator APIs
### What changes were proposed in this pull request? Removing package scope for transformWithState operator APIs This enables us to release the transformWithState APIs from the next Spark release. While traits and methods expand visibility, the usage is blocked due to a flag. The flag will be flipped in separate PR and corresponding behavior change ticket will be filed with it. ### Why are the changes needed? To allow use of the transformWithState operator ### Does this PR introduce _any_ user-facing change? Yes ### How was this patch tested? Existing unit tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #49417 from ericm-db/enable-tws. Authored-by: Eric Marnadi <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
1 parent 7c316f7 commit be77e1e

File tree

15 files changed

+22
-42
lines changed

15 files changed

+22
-42
lines changed

sql/api/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsWithStateFunction.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.util.Iterator;
2222

2323
import org.apache.spark.annotation.Evolving;
24-
import org.apache.spark.annotation.Experimental;
2524
import org.apache.spark.sql.streaming.GroupState;
2625

2726
/**
@@ -32,7 +31,6 @@
3231
* org.apache.spark.sql.Encoder, org.apache.spark.sql.Encoder)}
3332
* @since 2.1.1
3433
*/
35-
@Experimental
3634
@Evolving
3735
public interface FlatMapGroupsWithStateFunction<K, V, S, R> extends Serializable {
3836
Iterator<R> call(K key, Iterator<V> values, GroupState<S> state) throws Exception;

sql/api/src/main/java/org/apache/spark/api/java/function/MapGroupsWithStateFunction.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.util.Iterator;
2222

2323
import org.apache.spark.annotation.Evolving;
24-
import org.apache.spark.annotation.Experimental;
2524
import org.apache.spark.sql.streaming.GroupState;
2625

2726
/**
@@ -31,7 +30,6 @@
3130
* MapGroupsWithStateFunction, org.apache.spark.sql.Encoder, org.apache.spark.sql.Encoder)}
3231
* @since 2.1.1
3332
*/
34-
@Experimental
3533
@Evolving
3634
public interface MapGroupsWithStateFunction<K, V, S, R> extends Serializable {
3735
R call(K key, Iterator<V> values, GroupState<S> state) throws Exception;

sql/api/src/main/java/org/apache/spark/sql/streaming/GroupStateTimeout.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.spark.sql.streaming;
1919

2020
import org.apache.spark.annotation.Evolving;
21-
import org.apache.spark.annotation.Experimental;
2221
import org.apache.spark.sql.catalyst.plans.logical.*;
2322

2423
/**
@@ -29,7 +28,6 @@
2928
*
3029
* @since 2.2.0
3130
*/
32-
@Experimental
3331
@Evolving
3432
public class GroupStateTimeout {
3533
// NOTE: if you're adding new type of timeout, you should also fix the places below:

sql/api/src/main/java/org/apache/spark/sql/streaming/TimeMode.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.spark.sql.streaming;
1919

2020
import org.apache.spark.annotation.Evolving;
21-
import org.apache.spark.annotation.Experimental;
2221
import org.apache.spark.sql.catalyst.plans.logical.EventTime$;
2322
import org.apache.spark.sql.catalyst.plans.logical.NoTime$;
2423
import org.apache.spark.sql.catalyst.plans.logical.ProcessingTime$;
@@ -27,7 +26,6 @@
2726
* Represents the time modes (used for specifying timers and ttl) possible for
2827
* the Dataset operations {@code transformWithState}.
2928
*/
30-
@Experimental
3129
@Evolving
3230
public class TimeMode {
3331

sql/api/src/main/scala/org/apache/spark/sql/streaming/ExpiredTimerInfo.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,13 @@ package org.apache.spark.sql.streaming
1919

2020
import java.io.Serializable
2121

22-
import org.apache.spark.annotation.{Evolving, Experimental}
22+
import org.apache.spark.annotation.Evolving
2323

2424
/**
2525
* Class used to provide access to expired timer's expiry time.
2626
*/
27-
@Experimental
2827
@Evolving
29-
private[sql] trait ExpiredTimerInfo extends Serializable {
28+
trait ExpiredTimerInfo extends Serializable {
3029

3130
/**
3231
* Get the expired timer's expiry time as milliseconds in epoch time.

sql/api/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.streaming
1919

20-
import org.apache.spark.annotation.{Evolving, Experimental}
20+
import org.apache.spark.annotation.Evolving
2121
import org.apache.spark.sql.catalyst.plans.logical.LogicalGroupState
2222

2323
/**
@@ -196,7 +196,6 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalGroupState
196196
* types (see `Encoder` for more details).
197197
* @since 2.2.0
198198
*/
199-
@Experimental
200199
@Evolving
201200
trait GroupState[S] extends LogicalGroupState[S] {
202201

sql/api/src/main/scala/org/apache/spark/sql/streaming/ListState.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,13 @@
1616
*/
1717
package org.apache.spark.sql.streaming
1818

19-
import org.apache.spark.annotation.{Evolving, Experimental}
19+
import org.apache.spark.annotation.Evolving
2020

21-
@Experimental
2221
@Evolving
2322
/**
2423
* Interface used for arbitrary stateful operations with the v2 API to capture list value state.
2524
*/
26-
private[sql] trait ListState[S] extends Serializable {
25+
trait ListState[S] extends Serializable {
2726

2827
/** Whether state exists or not. */
2928
def exists(): Boolean

sql/api/src/main/scala/org/apache/spark/sql/streaming/MapState.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,8 @@
1616
*/
1717
package org.apache.spark.sql.streaming
1818

19-
import org.apache.spark.annotation.{Evolving, Experimental}
19+
import org.apache.spark.annotation.Evolving
2020

21-
@Experimental
2221
@Evolving
2322
/**
2423
* Interface used for arbitrary stateful operations with the v2 API to capture map value state.

sql/api/src/main/scala/org/apache/spark/sql/streaming/QueryInfo.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,14 @@ package org.apache.spark.sql.streaming
1919
import java.io.Serializable
2020
import java.util.UUID
2121

22-
import org.apache.spark.annotation.{Evolving, Experimental}
22+
import org.apache.spark.annotation.Evolving
2323

2424
/**
2525
* Represents the query info provided to the stateful processor used in the arbitrary state API v2
2626
* to easily identify task retries on the same partition.
2727
*/
28-
@Experimental
2928
@Evolving
30-
private[sql] trait QueryInfo extends Serializable {
29+
trait QueryInfo extends Serializable {
3130

3231
/** Returns the streaming query id associated with stateful operator */
3332
def getQueryId: UUID

sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.streaming
1919

2020
import java.io.Serializable
2121

22-
import org.apache.spark.annotation.{Evolving, Experimental}
22+
import org.apache.spark.annotation.Evolving
2323
import org.apache.spark.sql.api.EncoderImplicits
2424
import org.apache.spark.sql.errors.ExecutionErrors
2525

@@ -30,9 +30,8 @@ import org.apache.spark.sql.errors.ExecutionErrors
3030
* Users can also explicitly use `import implicits._` to access the EncoderImplicits and use the
3131
* state variable APIs relying on implicit encoders.
3232
*/
33-
@Experimental
3433
@Evolving
35-
private[sql] abstract class StatefulProcessor[K, I, O] extends Serializable {
34+
abstract class StatefulProcessor[K, I, O] extends Serializable {
3635

3736
// scalastyle:off
3837
// Disable style checker so "implicits" object can start with lowercase i
@@ -123,10 +122,8 @@ private[sql] abstract class StatefulProcessor[K, I, O] extends Serializable {
123122
* initial state to be initialized in the first batch. This can be used for starting a new
124123
* streaming query with existing state from a previous streaming query.
125124
*/
126-
@Experimental
127125
@Evolving
128-
private[sql] abstract class StatefulProcessorWithInitialState[K, I, O, S]
129-
extends StatefulProcessor[K, I, O] {
126+
abstract class StatefulProcessorWithInitialState[K, I, O, S] extends StatefulProcessor[K, I, O] {
130127

131128
/**
132129
* Function that will be invoked only in the first batch for users to process initial states.

0 commit comments

Comments
 (0)