Skip to content

Concurrent get_heatmap_df #798

@tylerjereddy

Description

@tylerjereddy

As a quick experiment, if we use more than 1 processor in get_heatmap_df() we can save ~8 seconds in the generation of the HTML report for e3sm_io_heatmap_and_dxt.darshan. This is in branch treddy_dxt_html_speedup, which itself branches off of gh-784. It has no effect on the processing time of snyder_acme.exe_id1253318_9-27-24239-1515303144625770178_2.darshan though, and the drop from 42 to 34 seconds isn't enough to justify the complexity that would be required to handle the concurrency properly (heuristic for size at which to use > 1 core, how much work per process, testing the concurrent vs. serial code, having concurrency off by default/opt-in, etc.).

Nonetheless, I'll note this here for now since the interpolation takes up 27 seconds of the 42 total for e3sm...

--- a/darshan-util/pydarshan/darshan/experimental/plots/heatmap_handling.py
+++ b/darshan-util/pydarshan/darshan/experimental/plots/heatmap_handling.py
@@ -3,6 +3,7 @@ Module of data pre-processing functions for constructing the heatmap figure.
 """
 
 from typing import Dict, Any, Tuple, Sequence, TYPE_CHECKING
+import concurrent.futures
 
 import sys
 
@@ -369,9 +370,26 @@ def get_heatmap_df(agg_df: pd.DataFrame, xbins: int, nprocs: int) -> pd.DataFram
     else:
         null_mask = cats.notna().sum(axis=1) > 1
         null_mask = null_mask.loc[null_mask == True].index
-        cats_vals_to_interp = pd.DataFrame(cats.iloc[null_mask].values)
-        cats_vals_to_interp.interpolate(method="nearest", axis=1, inplace=True)
-        cats.iloc[null_mask] = cats_vals_to_interp
+        list_futures = []
+        num_workers = 4
+        if len(null_mask) > 10000:
+            work_inds = np.array_split(null_mask, num_workers)
+
+            with concurrent.futures.ProcessPoolExecutor(max_workers=num_workers) as executor:
+                for ind, work_ind in enumerate(work_inds):
+                    list_futures.append(executor.submit(interpolator,
+                                                        cats,
+                                                        work_ind,
+                                                        ind))
+
+            for future in concurrent.futures.as_completed(list_futures):
+                cats_vals_to_interp, ind = future.result()
+                cats.iloc[work_inds[ind]] = cats_vals_to_interp
+        else:
+            cats_vals_to_interp = pd.DataFrame(cats.iloc[null_mask].values)
+            cats_vals_to_interp.interpolate(method="nearest", axis=1, inplace=True)
+            cats.iloc[null_mask] = cats_vals_to_interp
+
     # each time bin containing an event has a 1 in it, otherwise NaN
     # store mask for restoring fully occupied bins
     mask_occ = cats == 2
@@ -396,3 +414,9 @@ def get_heatmap_df(agg_df: pd.DataFrame, xbins: int, nprocs: int) -> pd.DataFram
     hmap_df = cats.groupby("rank").sum()
     hmap_df = hmap_df.reindex(index=range(nprocs), fill_value=0.0)
     return hmap_df
+
+
+def interpolator(cats, work_ind, ind):
+    cats_vals_to_interp = pd.DataFrame(cats.iloc[work_ind].values)
+    cats_vals_to_interp.interpolate(method="nearest", axis=1, inplace=True)
+    return cats_vals_to_interp, ind

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions