Skip to content

Conversation

@andygrove
Copy link
Member

@andygrove andygrove commented Oct 3, 2025

Which issue does this PR close?

Closes #.

Rationale for this change

Debugging.

[Task 486] MemoryPool[ExternalSorter[6]].try_grow(256232960) returning Ok
[Task 486] MemoryPool[ExternalSorter[6]].try_grow(256375168) returning Ok
[Task 486] MemoryPool[ExternalSorter[6]].try_grow(256899456) returning Ok
[Task 486] MemoryPool[ExternalSorter[6]].try_grow(257296128) returning Ok
[Task 486] MemoryPool[ExternalSorter[6]].try_grow(257820416) returning Err
[Task 486] MemoryPool[ExternalSorterMerge[6]].shrink(10485760)
[Task 486] MemoryPool[ExternalSorter[6]].shrink(150464)
[Task 486] MemoryPool[ExternalSorter[6]].shrink(146688)
[Task 486] MemoryPool[ExternalSorter[6]].shrink(137856)
[Task 486] MemoryPool[ExternalSorter[6]].shrink(141952)
[Task 486] MemoryPool[ExternalSorterMerge[6]].try_grow(0) returning Ok
[Task 486] MemoryPool[ExternalSorterMerge[6]].try_grow(0) returning Ok
[Task 486] MemoryPool[ExternalSorter[6]].shrink(524288)
[Task 486] MemoryPool[ExternalSorterMerge[6]].try_grow(0) returning Ok
[Task 486] MemoryPool[ExternalSorterMerge[6]].try_grow(68928) returning Ok

From this, we can make pretty charts to help with comprehension:

image

What changes are included in this PR?

  • Add new config spark.comet.debug.memory
  • Add new LoggingPool that is enabled when the new config is set

How are these changes tested?

@andygrove andygrove changed the title chore: Add memory pool trace logging [WIP] chore: Add memory pool trace logging [WIP] [skip-ci] Oct 3, 2025
@andygrove andygrove changed the title chore: Add memory pool trace logging [WIP] [skip-ci] chore: Add memory pool trace logging [WIP] [skip ci] Oct 3, 2025
@andygrove andygrove changed the title chore: Add memory pool trace logging [WIP] [skip ci] chore: Add memory pool trace logging [WIP] Oct 3, 2025
@andygrove andygrove marked this pull request as ready for review October 3, 2025 17:36
@andygrove andygrove changed the title chore: Add memory pool trace logging [WIP] chore: Add memory pool trace logging Oct 3, 2025
Comment on lines 170 to 173
debug_native: jboolean,
explain_native: jboolean,
tracing_enabled: jboolean,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rather than adding yet another flag to this API call, I am now using the already available spark config map in native code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. The config map should be the preferred method

@codecov-commenter
Copy link

codecov-commenter commented Oct 3, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 58.93%. Comparing base (f09f8af) to head (2884ed3).
⚠️ Report is 585 commits behind head on main.

Additional details and impacted files
@@             Coverage Diff              @@
##               main    #2521      +/-   ##
============================================
+ Coverage     56.12%   58.93%   +2.80%     
- Complexity      976     1449     +473     
============================================
  Files           119      147      +28     
  Lines         11743    13649    +1906     
  Branches       2251     2369     +118     
============================================
+ Hits           6591     8044    +1453     
- Misses         4012     4382     +370     
- Partials       1140     1223      +83     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.


impl MemoryPool for LoggingPool {
fn grow(&self, reservation: &MemoryReservation, additional: usize) {
println!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be println as info! or trace! ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess info! would be ok. I pushed that change. If we use trace! then we would have to set spark.comet.debug.memory=true and also configure trace logging for this one file, which seem like overkill for a debug feature

@andygrove andygrove marked this pull request as draft October 3, 2025 20:53
@andygrove
Copy link
Member Author

moving to draft while I work on the Python scripts

@andygrove
Copy link
Member Author

Still experimenting...

mem_chart

@andygrove
Copy link
Member Author

Chart now shows when try_grow failed:

mem_chart

@andygrove andygrove changed the title chore: Add memory pool trace logging chore: Add memory reservation debug logging and visualization Oct 4, 2025
@andygrove andygrove marked this pull request as ready for review October 10, 2025 15:42
Next, generate a chart from the CSV file for a specific Spark task:

```shell
python3 dev/scripts/plot_memory_usage.py /tmp/mem.csv --task 1234
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
python3 dev/scripts/plot_memory_usage.py /tmp/mem.csv --task 1234
python3 dev/scripts/plot_memory_usage.py /tmp/mem.csv

plot_memory_usage.py does not accept --task argument

if __name__ == "__main__":
ap = argparse.ArgumentParser(description="Generate CSV From memory debug output")
ap.add_argument("--task", default=None, help="Task ID.")
ap.add_argument("--file", default=None, help="Spark log containing memory debug output")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Guide (https://datafusion.apache.org/comet/user-guide/tracing.html)"

private val DEBUGGING_GUIDE = "For more information, refer to the Comet Debugging " +
"Guide (https://datafusion.apache.org/comet/contributor-guide/debugging.html"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Guide (https://datafusion.apache.org/comet/contributor-guide/debugging.html"
"Guide (https://datafusion.apache.org/comet/contributor-guide/debugging.html)"

| spark.comet.convert.json.enabled | When enabled, data from Spark (non-native) JSON v1 and v2 scans will be converted to Arrow format. Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. | false |
| spark.comet.convert.parquet.enabled | When enabled, data from Spark (non-native) Parquet v1 and v2 scans will be converted to Arrow format. Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. | false |
| spark.comet.debug.enabled | Whether to enable debug mode for Comet. When enabled, Comet will do additional checks for debugging purpose. For example, validating array when importing arrays from JVM at native side. Note that these checks may be expensive in performance and should only be enabled for debugging purpose. | false |
| spark.comet.debug.memory | When enabled, log all native memory pool interactions. For more information, refer to the Comet Debugging Guide (https://datafusion.apache.org/comet/contributor-guide/debugging.html. | false |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
| spark.comet.debug.memory | When enabled, log all native memory pool interactions. For more information, refer to the Comet Debugging Guide (https://datafusion.apache.org/comet/contributor-guide/debugging.html. | false |
| spark.comet.debug.memory | When enabled, log all native memory pool interactions. For more information, refer to the Comet Debugging Guide (https://datafusion.apache.org/comet/contributor-guide/debugging.html). | false |

ap.add_argument("--task", default=None, help="Task ID.")
ap.add_argument("--file", default=None, help="Spark log containing memory debug output")
args = ap.parse_args()
main(args.file, int(args.task))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The task is optional parameter. Calling int(None) will fail with TypeError

size = int(re_match.group(4))

if alloc.get(consumer) is None:
alloc[consumer] = size
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible the method to be shrink for the first occurrence ?

elif method == "shrink":
alloc[consumer] = alloc[consumer] - size

print(consumer, ",", alloc[consumer])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
print(consumer, ",", alloc[consumer])
print(f"{consumer},{alloc[consumer]}")

nit: to avoid the extra spaces around each item


# Pivot the data to have consumers as columns
pivot_df = df.pivot(index='time', columns='name', values='size')
pivot_df = pivot_df.fillna(method='ffill').fillna(0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pivot_df = pivot_df.fillna(method='ffill').fillna(0)
pivot_df = pivot_df.ffill().fillna(0)

https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.fillna.html - Deprecated since version 2.1.0: Use ffill or bfill instead.

let tracing_enabled = spark_config.get_bool(COMET_TRACING_ENABLED);
let max_temp_directory_size =
spark_config.get_u64(COMET_MAX_TEMP_DIRECTORY_SIZE, 100 * 1024 * 1024 * 1024);
let logging_memory_pool = spark_config.get_bool(COMET_DEBUG_MEMORY);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let logging_memory_pool = spark_config.get_bool(COMET_DEBUG_MEMORY);
let debug_memory_enabled = spark_config.get_bool(COMET_DEBUG_MEMORY);

let memory_pool =
create_memory_pool(&memory_pool_config, task_memory_manager, task_attempt_id);

let memory_pool = if logging_memory_pool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let memory_pool = if logging_memory_pool {
let memory_pool = if debug_memory_enabled {

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants