Skip to content

Commit b50649d

Browse files
author
Robert Kruszewski
committed
Revert "[SPARK-25862][SQL] Remove rangeBetween APIs introduced in SPARK-21608"
This reverts commit 9cf9a83.
1 parent 2427d17 commit b50649d

File tree

6 files changed

+116
-3
lines changed

6 files changed

+116
-3
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ case class SpecifiedWindowFrame(
206206
// Check combination (of expressions).
207207
(lower, upper) match {
208208
case (l: Expression, u: Expression) if !isValidFrameBoundary(l, u) =>
209-
TypeCheckFailure(s"Window frame upper bound '$upper' does not follow the lower bound " +
209+
TypeCheckFailure(s"Window frame upper bound '$upper' does not followes the lower bound " +
210210
s"'$lower'.")
211211
case (l: SpecialFrameBoundary, _) => TypeCheckSuccess
212212
case (_, u: SpecialFrameBoundary) => TypeCheckSuccess

sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,15 @@ object Window {
214214
spec.rangeBetween(start, end)
215215
}
216216

217+
/**
218+
* This function has been deprecated in Spark 2.4. See SPARK-25842 for more information.
219+
* @since 2.3.0
220+
*/
221+
@deprecated("Use the version with Long parameter types", "2.4.0")
222+
def rangeBetween(start: Column, end: Column): WindowSpec = {
223+
spec.rangeBetween(start, end)
224+
}
225+
217226
private[sql] def spec: WindowSpec = {
218227
new WindowSpec(Seq.empty, Seq.empty, UnspecifiedFrame)
219228
}

sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,18 @@ class WindowSpec private[sql](
209209
SpecifiedWindowFrame(RangeFrame, boundaryStart, boundaryEnd))
210210
}
211211

212+
/**
213+
* This function has been deprecated in Spark 2.4. See SPARK-25842 for more information.
214+
* @since 2.3.0
215+
*/
216+
@deprecated("Use the version with Long parameter types", "2.4.0")
217+
def rangeBetween(start: Column, end: Column): WindowSpec = {
218+
new WindowSpec(
219+
partitionSpec,
220+
orderSpec,
221+
SpecifiedWindowFrame(RangeFrame, start.expr, end.expr))
222+
}
223+
212224
/**
213225
* Converts this [[WindowSpec]] into a [[Column]] with an aggregate expression.
214226
*/

sql/core/src/main/scala/org/apache/spark/sql/functions.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -829,6 +829,32 @@ object functions {
829829
//////////////////////////////////////////////////////////////////////////////////////////////
830830
// Window functions
831831
//////////////////////////////////////////////////////////////////////////////////////////////
832+
/**
833+
* This function has been deprecated in Spark 2.4. See SPARK-25842 for more information.
834+
*
835+
* @group window_funcs
836+
* @since 2.3.0
837+
*/
838+
@deprecated("Use Window.unboundedPreceding", "2.4.0")
839+
def unboundedPreceding(): Column = Column(UnboundedPreceding)
840+
841+
/**
842+
* This function has been deprecated in Spark 2.4. See SPARK-25842 for more information.
843+
*
844+
* @group window_funcs
845+
* @since 2.3.0
846+
*/
847+
@deprecated("Use Window.unboundedFollowing", "2.4.0")
848+
def unboundedFollowing(): Column = Column(UnboundedFollowing)
849+
850+
/**
851+
* This function has been deprecated in Spark 2.4. See SPARK-25842 for more information.
852+
*
853+
* @group window_funcs
854+
* @since 2.3.0
855+
*/
856+
@deprecated("Use Window.currentRow", "2.4.0")
857+
def currentRow(): Column = Column(CurrentRow)
832858

833859
/**
834860
* Window function: returns the cumulative distribution of values within a window partition,

sql/core/src/test/resources/sql-tests/results/window.sql.out

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ ROWS BETWEEN UNBOUNDED FOLLOWING AND 1 FOLLOWING) FROM testData ORDER BY cate, v
191191
struct<>
192192
-- !query 11 output
193193
org.apache.spark.sql.AnalysisException
194-
cannot resolve 'ROWS BETWEEN UNBOUNDED FOLLOWING AND 1 FOLLOWING' due to data type mismatch: Window frame upper bound '1' does not follow the lower bound 'unboundedfollowing$()'.; line 1 pos 33
194+
cannot resolve 'ROWS BETWEEN UNBOUNDED FOLLOWING AND 1 FOLLOWING' due to data type mismatch: Window frame upper bound '1' does not followes the lower bound 'unboundedfollowing$()'.; line 1 pos 33
195195

196196

197197
-- !query 12

sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFramesSuite.scala

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,12 @@
1717

1818
package org.apache.spark.sql
1919

20-
import java.sql.Date
20+
import java.sql.{Date, Timestamp}
2121

2222
import org.apache.spark.sql.expressions.Window
2323
import org.apache.spark.sql.functions._
2424
import org.apache.spark.sql.test.SharedSQLContext
25+
import org.apache.spark.unsafe.types.CalendarInterval
2526

2627
/**
2728
* Window frame testing for DataFrame API.
@@ -218,6 +219,71 @@ class DataFrameWindowFramesSuite extends QueryTest with SharedSQLContext {
218219
Window.partitionBy($"value").orderBy($"key").rangeBetween(-2147483649L, 0))),
219220
Seq(Row(1, 2), Row(1, 2), Row(2, 3), Row(2147483650L, 2), Row(2147483650L, 4), Row(3, 1))
220221
)
222+
223+
def dt(date: String): Date = Date.valueOf(date)
224+
225+
val df2 = Seq((dt("2017-08-01"), "1"), (dt("2017-08-01"), "1"), (dt("2020-12-31"), "1"),
226+
(dt("2017-08-03"), "2"), (dt("2017-08-02"), "1"), (dt("2020-12-31"), "2"))
227+
.toDF("key", "value")
228+
val window = Window.partitionBy($"value").orderBy($"key").rangeBetween(lit(0), lit(2))
229+
230+
checkAnswer(
231+
df2.select(
232+
$"key",
233+
count("key").over(window)),
234+
Seq(Row(dt("2017-08-01"), 3), Row(dt("2017-08-01"), 3), Row(dt("2020-12-31"), 1),
235+
Row(dt("2017-08-03"), 1), Row(dt("2017-08-02"), 1), Row(dt("2020-12-31"), 1))
236+
)
237+
}
238+
239+
test("range between should accept double values as boundary") {
240+
val df = Seq((1.0D, "1"), (1.0D, "1"), (100.001D, "1"), (3.3D, "2"), (2.02D, "1"),
241+
(100.001D, "2")).toDF("key", "value")
242+
val window = Window.partitionBy($"value").orderBy($"key").rangeBetween(currentRow, lit(2.5D))
243+
244+
checkAnswer(
245+
df.select(
246+
$"key",
247+
count("key").over(window)),
248+
Seq(Row(1.0, 3), Row(1.0, 3), Row(100.001, 1), Row(3.3, 1), Row(2.02, 1), Row(100.001, 1))
249+
)
250+
}
251+
252+
test("range between should accept interval values as boundary") {
253+
def ts(timestamp: Long): Timestamp = new Timestamp(timestamp * 1000)
254+
255+
val df = Seq((ts(1501545600), "1"), (ts(1501545600), "1"), (ts(1609372800), "1"),
256+
(ts(1503000000), "2"), (ts(1502000000), "1"), (ts(1609372800), "2"))
257+
.toDF("key", "value")
258+
val window = Window.partitionBy($"value").orderBy($"key")
259+
.rangeBetween(currentRow, lit(CalendarInterval.fromString("interval 23 days 4 hours")))
260+
261+
checkAnswer(
262+
df.select(
263+
$"key",
264+
count("key").over(window)),
265+
Seq(Row(ts(1501545600), 3), Row(ts(1501545600), 3), Row(ts(1609372800), 1),
266+
Row(ts(1503000000), 1), Row(ts(1502000000), 1), Row(ts(1609372800), 1))
267+
)
268+
}
269+
270+
test("range between should accept interval values as both boundaries") {
271+
def ts(timestamp: Long): Timestamp = new Timestamp(timestamp * 1000)
272+
273+
val df = Seq((ts(1501545600), "1"), (ts(1501545600), "1"), (ts(1609372800), "1"),
274+
(ts(1503000000), "2"), (ts(1502000000), "1"), (ts(1609372800), "2"))
275+
.toDF("key", "value")
276+
val window = Window.partitionBy($"value").orderBy($"key")
277+
.rangeBetween(lit(CalendarInterval.fromString("interval 3 hours")),
278+
lit(CalendarInterval.fromString("interval 23 days 4 hours")))
279+
280+
checkAnswer(
281+
df.select(
282+
$"key",
283+
count("key").over(window)),
284+
Seq(Row(ts(1501545600), 1), Row(ts(1501545600), 1), Row(ts(1609372800), 0),
285+
Row(ts(1503000000), 0), Row(ts(1502000000), 0), Row(ts(1609372800), 0))
286+
)
221287
}
222288

223289
test("unbounded rows/range between with aggregation") {

0 commit comments

Comments
 (0)