|
16 | 16 | #
|
17 | 17 |
|
18 | 18 | import sys
|
| 19 | +if sys.version >= '3': |
| 20 | + long = int |
19 | 21 |
|
20 | 22 | from pyspark import since, SparkContext
|
21 |
| -from pyspark.sql.column import _to_seq, _to_java_column |
| 23 | +from pyspark.sql.column import Column, _to_seq, _to_java_column |
22 | 24 |
|
23 | 25 | __all__ = ["Window", "WindowSpec"]
|
24 | 26 |
|
@@ -124,20 +126,45 @@ def rangeBetween(start, end):
|
124 | 126 | and "5" means the five off after the current row.
|
125 | 127 |
|
126 | 128 | We recommend users use ``Window.unboundedPreceding``, ``Window.unboundedFollowing``,
|
127 |
| - and ``Window.currentRow`` to specify special boundary values, rather than using integral |
128 |
| - values directly. |
| 129 | + ``Window.currentRow``, ``pyspark.sql.functions.unboundedPreceding``, |
| 130 | + ``pyspark.sql.functions.unboundedFollowing`` and ``pyspark.sql.functions.currentRow`` |
| 131 | + to specify special boundary values, rather than using integral values directly. |
129 | 132 |
|
130 | 133 | :param start: boundary start, inclusive.
|
131 |
| - The frame is unbounded if this is ``Window.unboundedPreceding``, or |
| 134 | + The frame is unbounded if this is ``Window.unboundedPreceding``, |
| 135 | + a column returned by ``pyspark.sql.functions.unboundedPreceding``, or |
132 | 136 | any value less than or equal to max(-sys.maxsize, -9223372036854775808).
|
133 | 137 | :param end: boundary end, inclusive.
|
134 |
| - The frame is unbounded if this is ``Window.unboundedFollowing``, or |
| 138 | + The frame is unbounded if this is ``Window.unboundedFollowing``, |
| 139 | + a column returned by ``pyspark.sql.functions.unboundedFollowing``, or |
135 | 140 | any value greater than or equal to min(sys.maxsize, 9223372036854775807).
|
| 141 | +
|
| 142 | + >>> from pyspark.sql import functions as F, SparkSession, Window |
| 143 | + >>> spark = SparkSession.builder.getOrCreate() |
| 144 | + >>> df = spark.createDataFrame( |
| 145 | + ... [(1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")], ["id", "category"]) |
| 146 | + >>> window = Window.orderBy("id").partitionBy("category").rangeBetween( |
| 147 | + ... F.currentRow(), F.lit(1)) |
| 148 | + >>> df.withColumn("sum", F.sum("id").over(window)).show() |
| 149 | + +---+--------+---+ |
| 150 | + | id|category|sum| |
| 151 | + +---+--------+---+ |
| 152 | + | 1| b| 3| |
| 153 | + | 2| b| 5| |
| 154 | + | 3| b| 3| |
| 155 | + | 1| a| 4| |
| 156 | + | 1| a| 4| |
| 157 | + | 2| a| 2| |
| 158 | + +---+--------+---+ |
136 | 159 | """
|
137 |
| - if start <= Window._PRECEDING_THRESHOLD: |
138 |
| - start = Window.unboundedPreceding |
139 |
| - if end >= Window._FOLLOWING_THRESHOLD: |
140 |
| - end = Window.unboundedFollowing |
| 160 | + if isinstance(start, (int, long)) and isinstance(end, (int, long)): |
| 161 | + if start <= Window._PRECEDING_THRESHOLD: |
| 162 | + start = Window.unboundedPreceding |
| 163 | + if end >= Window._FOLLOWING_THRESHOLD: |
| 164 | + end = Window.unboundedFollowing |
| 165 | + elif isinstance(start, Column) and isinstance(end, Column): |
| 166 | + start = start._jc |
| 167 | + end = end._jc |
141 | 168 | sc = SparkContext._active_spark_context
|
142 | 169 | jspec = sc._jvm.org.apache.spark.sql.expressions.Window.rangeBetween(start, end)
|
143 | 170 | return WindowSpec(jspec)
|
@@ -212,27 +239,34 @@ def rangeBetween(self, start, end):
|
212 | 239 | and "5" means the five off after the current row.
|
213 | 240 |
|
214 | 241 | We recommend users use ``Window.unboundedPreceding``, ``Window.unboundedFollowing``,
|
215 |
| - and ``Window.currentRow`` to specify special boundary values, rather than using integral |
216 |
| - values directly. |
| 242 | + ``Window.currentRow``, ``pyspark.sql.functions.unboundedPreceding``, |
| 243 | + ``pyspark.sql.functions.unboundedFollowing`` and ``pyspark.sql.functions.currentRow`` |
| 244 | + to specify special boundary values, rather than using integral values directly. |
217 | 245 |
|
218 | 246 | :param start: boundary start, inclusive.
|
219 |
| - The frame is unbounded if this is ``Window.unboundedPreceding``, or |
| 247 | + The frame is unbounded if this is ``Window.unboundedPreceding``, |
| 248 | + a column returned by ``pyspark.sql.functions.unboundedPreceding``, or |
220 | 249 | any value less than or equal to max(-sys.maxsize, -9223372036854775808).
|
221 | 250 | :param end: boundary end, inclusive.
|
222 |
| - The frame is unbounded if this is ``Window.unboundedFollowing``, or |
| 251 | + The frame is unbounded if this is ``Window.unboundedFollowing``, |
| 252 | + a column returned by ``pyspark.sql.functions.unboundedFollowing``, or |
223 | 253 | any value greater than or equal to min(sys.maxsize, 9223372036854775807).
|
224 | 254 | """
|
225 |
| - if start <= Window._PRECEDING_THRESHOLD: |
226 |
| - start = Window.unboundedPreceding |
227 |
| - if end >= Window._FOLLOWING_THRESHOLD: |
228 |
| - end = Window.unboundedFollowing |
| 255 | + if isinstance(start, (int, long)) and isinstance(end, (int, long)): |
| 256 | + if start <= Window._PRECEDING_THRESHOLD: |
| 257 | + start = Window.unboundedPreceding |
| 258 | + if end >= Window._FOLLOWING_THRESHOLD: |
| 259 | + end = Window.unboundedFollowing |
| 260 | + elif isinstance(start, Column) and isinstance(end, Column): |
| 261 | + start = start._jc |
| 262 | + end = end._jc |
229 | 263 | return WindowSpec(self._jspec.rangeBetween(start, end))
|
230 | 264 |
|
231 | 265 |
|
232 | 266 | def _test():
|
233 | 267 | import doctest
|
234 | 268 | SparkContext('local[4]', 'PythonTest')
|
235 |
| - (failure_count, test_count) = doctest.testmod() |
| 269 | + (failure_count, test_count) = doctest.testmod(optionflags=doctest.NORMALIZE_WHITESPACE) |
236 | 270 | if failure_count:
|
237 | 271 | sys.exit(-1)
|
238 | 272 |
|
|
0 commit comments