Skip to content

Commit 7798c9e

Browse files
jose-torreszsxwing
authored andcommitted
[SPARK-22824] Restore old offset for binary compatibility
## What changes were proposed in this pull request? Some users depend on source compatibility with the org.apache.spark.sql.execution.streaming.Offset class. Although this is not a stable interface, we can keep it in place for now to simplify upgrades to 2.3. Author: Jose Torres <[email protected]> Closes #20012 from joseph-torres/binary-compat.
1 parent 7570eab commit 7798c9e

33 files changed

+82
-43
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ import org.apache.spark.sql.catalyst.InternalRow
3232
import org.apache.spark.sql.catalyst.util.DateTimeUtils
3333
import org.apache.spark.sql.execution.streaming._
3434
import org.apache.spark.sql.kafka010.KafkaSource._
35-
import org.apache.spark.sql.sources.v2.reader.Offset
3635
import org.apache.spark.sql.types._
3736
import org.apache.spark.unsafe.types.UTF8String
3837

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@ package org.apache.spark.sql.kafka010
1919

2020
import org.apache.kafka.common.TopicPartition
2121

22-
import org.apache.spark.sql.execution.streaming.SerializedOffset
23-
import org.apache.spark.sql.sources.v2.reader.Offset
22+
import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
2423

2524
/**
2625
* An [[Offset]] for the [[KafkaSource]]. This one tracks all partitions of subscribed topics and

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ import org.apache.spark.sql.ForeachWriter
3838
import org.apache.spark.sql.execution.streaming._
3939
import org.apache.spark.sql.functions.{count, window}
4040
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
41-
import org.apache.spark.sql.sources.v2.reader.Offset
4241
import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest}
4342
import org.apache.spark.sql.streaming.util.StreamManualClock
4443
import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}

sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Offset.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
* restart checkpoints. Sources should provide an Offset implementation which they can use to
2424
* reconstruct the stream position where the offset was taken.
2525
*/
26-
public abstract class Offset {
26+
public abstract class Offset extends org.apache.spark.sql.execution.streaming.Offset {
2727
/**
2828
* A JSON-serialized representation of an Offset that is
2929
* used for saving offsets to the offset log.
@@ -41,8 +41,8 @@ public abstract class Offset {
4141
*/
4242
@Override
4343
public boolean equals(Object obj) {
44-
if (obj instanceof Offset) {
45-
return this.json().equals(((Offset) obj).json());
44+
if (obj instanceof org.apache.spark.sql.execution.streaming.Offset) {
45+
return this.json().equals(((org.apache.spark.sql.execution.streaming.Offset) obj).json());
4646
} else {
4747
return false;
4848
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import org.apache.spark.deploy.SparkHadoopUtil
2727
import org.apache.spark.internal.Logging
2828
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
2929
import org.apache.spark.sql.execution.datasources.{DataSource, InMemoryFileIndex, LogicalRelation}
30-
import org.apache.spark.sql.sources.v2.reader.Offset
3130
import org.apache.spark.sql.types.StructType
3231

3332
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@ import scala.util.control.Exception._
2222
import org.json4s.NoTypeHints
2323
import org.json4s.jackson.Serialization
2424

25-
import org.apache.spark.sql.sources.v2.reader.Offset
26-
2725
/**
2826
* Offset for the [[FileStreamSource]].
2927
*

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.spark.sql.execution.streaming
1919

20-
import org.apache.spark.sql.sources.v2.reader.Offset
21-
2220
/**
2321
* A simple offset for sources that produce a single linear stream of data.
2422
*/

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
2424
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
2525
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
2626
import org.apache.spark.sql.execution.SQLExecution
27-
import org.apache.spark.sql.sources.v2.reader.Offset
2827
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
2928
import org.apache.spark.util.{Clock, Utils}
3029

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.streaming;
19+
20+
/**
21+
* This is an internal, deprecated interface. New source implementations should use the
22+
* org.apache.spark.sql.sources.v2.reader.Offset class, which is the one that will be supported
23+
* in the long term.
24+
*
25+
* This class will be removed in a future release.
26+
*/
27+
public abstract class Offset {
28+
/**
29+
* A JSON-serialized representation of an Offset that is
30+
* used for saving offsets to the offset log.
31+
* Note: We assume that equivalent/equal offsets serialize to
32+
* identical JSON strings.
33+
*
34+
* @return JSON string encoding
35+
*/
36+
public abstract String json();
37+
38+
/**
39+
* Equality based on JSON string representation. We leverage the
40+
* JSON representation for normalization between the Offset's
41+
* in memory and on disk representations.
42+
*/
43+
@Override
44+
public boolean equals(Object obj) {
45+
if (obj instanceof Offset) {
46+
return this.json().equals(((Offset) obj).json());
47+
} else {
48+
return false;
49+
}
50+
}
51+
52+
@Override
53+
public int hashCode() {
54+
return this.json().hashCode();
55+
}
56+
57+
@Override
58+
public String toString() {
59+
return this.json();
60+
}
61+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import org.json4s.jackson.Serialization
2323
import org.apache.spark.internal.Logging
2424
import org.apache.spark.sql.RuntimeConfig
2525
import org.apache.spark.sql.internal.SQLConf.{SHUFFLE_PARTITIONS, STATE_STORE_PROVIDER_CLASS}
26-
import org.apache.spark.sql.sources.v2.reader.Offset
2726

2827
/**
2928
* An ordered collection of offsets, used to track the progress of processing data from one or more

0 commit comments

Comments
 (0)