Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 19 additions & 14 deletions python/pyspark/pandas/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -9966,33 +9966,38 @@ def describe(self, percentiles: Optional[List[float]] = None) -> "DataFrame":
has_numeric_type = len(psser_numeric) > 0

if is_all_string_type:
# Handling string type columns
# We will retrieve the `count`, `unique`, `top` and `freq`.
internal = self._internal.resolved_copy
exprs_string = [
internal.spark_column_for(psser._column_label) for psser in psser_string
]
sdf = internal.spark_frame.select(*exprs_string)

# Get `count` & `unique` for each columns
counts, uniques = map(lambda x: x[1:], sdf.summary("count", "count_distinct").take(2))
# Handling Empty DataFrame
if len(counts) == 0 or counts[0] == "0":
data = dict()
for psser in psser_string:
data[psser.name] = [0, 0, np.nan, np.nan]
return DataFrame(data, index=["count", "unique", "top", "freq"])

# Get `top` & `freq` for each columns
tops = []
freqs = []
# TODO(SPARK-37711): We should do it in single pass since invoking Spark job
# for every columns is too expensive.
for column in exprs_string:
top, freq = sdf.groupby(column).count().sort("count", ascending=False).first()
tops.append(str(top))
freqs.append(str(freq))

n_cols = len(column_names)
stack_args = ", ".join([f"'{col_name}', `{col_name}`" for col_name in column_names])
stack_expr = f"stack({n_cols}, {stack_args}) as (column_name, value)"
# Unpivot, group by (column_name, value), and count occurrences
unpivoted = sdf.selectExpr(stack_expr)
value_counts = unpivoted.groupBy("column_name", "value").count()
# Use window function to rank values by count within each column
# When counts tie, pick the first value alphabetically like pandas
window = Window.partitionBy("column_name").orderBy(F.desc("count"), F.asc("value"))
# Unfortunately, there's no straightforward way to get the top value and its frequency
# for each column without collecting the data to the driver side.
Comment on lines +9991 to +9992
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for the future: This seems like a good follow up issue, I think we could do something smarter here long term. I've been thinking about some kind of bounded collection types for aggregations and this might fit. (although tbf describe isn't used all that often, but would love to put these together if we can). They do still end up being large but on the executors and the final driver part is a bit smaller.

top_values = (
value_counts.withColumn("rank", F.row_number().over(window))
.filter(F.col("rank") == 1)
.select("column_name", "value", F.col("count").alias("freq"))
.collect()
)
top_freq_dict = {row.column_name: (str(row.value), str(row.freq)) for row in top_values}
tops, freqs = map(list, zip(*(top_freq_dict[col_name] for col_name in column_names)))
stats = [counts, uniques, tops, freqs]
stats_names = ["count", "unique", "top", "freq"]

Expand Down