From 9fa66b89ee9d82942de47d113db496b8720ec51e Mon Sep 17 00:00:00 2001 From: Tyler Reddy Date: Tue, 26 Jul 2022 17:38:32 -0600 Subject: [PATCH 1/2] WIP, ENH: memory efficient DXT segs Fixes #779 * at the moment on `main`, DXT record data is effectively stored as a list of dictionaries of lists of dictionaries that look like this: ``` DXT_list -> [rec0, rec1, ..., recN] recN -> {"id":, ..., "rank":, ..., "write_segments": ..., ...} recN["write_segments"] -> [seg0, seg1, ..., segN] segN -> {"offset": int, "length": int, "start_time": float, "end_time": float} ``` - the list of segments is extremely memory inefficient, with the smallest file in the matching issue exceeding 20 GB of physical memory in `mod_read_all_dxt_records`: ``` Line # Mem usage Increment Occurrences Line Contents 852 # fetch records 853 92.484 MiB 18.820 MiB 1 rec = backend.log_get_dxt_record(self.log, mod, dtype=dtype) 854 20295.188 MiB 0.773 MiB 1025 while rec != None: 855 20295.188 MiB 0.000 MiB 1024 self.records[mod].append(rec) 856 20295.188 MiB 0.000 MiB 1024 self.data['modules'][mod]['num_records'] += 1 857 858 # fetch next 859 20295.188 MiB 20201.930 MiB 1024 rec = backend.log_get_dxt_record(self.log, mod, reads=reads, writes=writes, dtype=dtype) ``` - if we switch to NumPy arrays the memory footprint drops a lot (see below), and the performance informally seems similar (36 seconds vs. 33 seconds on `main` to produce a `report` object with smallest file in matching issue): ``` Line # Mem usage Increment Occurrences Line Contents 859 3222.547 MiB 3146.344 MiB 1024 rec = backend.log_get_dxt_record(self.log, mod, reads=reads, writes=writes, dtype=dtype) ``` - this branch currently uses NumPy record arrays, because I thought they'd be a better fit for a data structure with 2 int columns and 2 float columns; however, there is a big performance hit over regular NumPy arrays (almost 6 minutes vs. 33 seconds for the smallest file in matchin issue); so, if we could live without the extra dtype structuring of a recarray, maybe that would be best (we could also try to use a pandas dataframe, which is another natural fit for dtype columns..) --- .../pydarshan/darshan/backend/cffi_backend.py | 40 ++++++++------- .../experimental/plots/heatmap_handling.py | 4 ++ .../pydarshan/darshan/tests/test_moddxt.py | 50 ++++++++++++++++++- 3 files changed, 74 insertions(+), 20 deletions(-) diff --git a/darshan-util/pydarshan/darshan/backend/cffi_backend.py b/darshan-util/pydarshan/darshan/backend/cffi_backend.py index 22faa3407..d4ffbcdbd 100644 --- a/darshan-util/pydarshan/darshan/backend/cffi_backend.py +++ b/darshan-util/pydarshan/darshan/backend/cffi_backend.py @@ -565,29 +565,31 @@ def log_get_dxt_record(log, mod_name, reads=True, writes=True, dtype='dict'): size_of = ffi.sizeof("struct dxt_file_record") segments = ffi.cast("struct segment_info *", buf[0] + size_of ) - + arr_write = np.recarray(wcnt, dtype=[("offset", int), + ("length", int), + ("start_time", float), + ("end_time", float)]) + arr_read = np.recarray(rcnt, dtype=[("offset", int), + ("length", int), + ("start_time", float), + ("end_time", float)]) for i in range(wcnt): - seg = { - "offset": segments[i].offset, - "length": segments[i].length, - "start_time": segments[i].start_time, - "end_time": segments[i].end_time - } - rec['write_segments'].append(seg) - - - for i in range(rcnt): - i = i + wcnt - seg = { - "offset": segments[i].offset, - "length": segments[i].length, - "start_time": segments[i].start_time, - "end_time": segments[i].end_time - } - rec['read_segments'].append(seg) + arr_write[i, ...] = (segments[i].offset, + segments[i].length, + segments[i].start_time, + segments[i].end_time) + + for k in range(rcnt): + i = k + wcnt + arr_read[k, ...] = (segments[i].offset, + segments[i].length, + segments[i].start_time, + segments[i].end_time) + rec['write_segments'] = arr_write + rec['read_segments'] = arr_read if dtype == "pandas": rec['read_segments'] = pd.DataFrame(rec['read_segments']) rec['write_segments'] = pd.DataFrame(rec['write_segments']) diff --git a/darshan-util/pydarshan/darshan/experimental/plots/heatmap_handling.py b/darshan-util/pydarshan/darshan/experimental/plots/heatmap_handling.py index 08c49c599..09674a108 100644 --- a/darshan-util/pydarshan/darshan/experimental/plots/heatmap_handling.py +++ b/darshan-util/pydarshan/darshan/experimental/plots/heatmap_handling.py @@ -132,6 +132,10 @@ def get_rd_wr_dfs( # ignore for the same reason as above seg_df = _dict[seg_key] # type: ignore if seg_df.size: + seg_df.columns = ["offset", + "length", + "start_time", + "end_time"] # drop unused columns from the dataframe seg_df = seg_df.drop(columns=drop_columns) # create new column for the ranks diff --git a/darshan-util/pydarshan/darshan/tests/test_moddxt.py b/darshan-util/pydarshan/darshan/tests/test_moddxt.py index 5352aca4d..53532a7be 100644 --- a/darshan-util/pydarshan/darshan/tests/test_moddxt.py +++ b/darshan-util/pydarshan/darshan/tests/test_moddxt.py @@ -1,6 +1,8 @@ import os import pytest +import numpy as np +from numpy.testing import assert_allclose import darshan.backend.cffi_backend as backend from darshan.log_utils import get_log_path @@ -33,7 +35,53 @@ 'read_segments': []})]) def test_dxt_records(logfile, mod, expected_dict): # regression guard for DXT records values + # write_segments and read_segments are now NumPy + # recarrays, to save considerable memory + # per gh-779 + # TODO: refactor for simplicity--we can probably + # just initialize the expected values via + # np.array() with the appropriate structured dtypes + expected_write_segs = np.recarray(1, dtype=[("offset", int), + ("length", int), + ("start_time", float), + ("end_time", float)]) + expected_read_segs = np.recarray(1, dtype=[("offset", int), + ("length", int), + ("start_time", float), + ("end_time", float)]) + if expected_dict["write_segments"]: + expected_write_segs.offset = expected_dict["write_segments"][0]["offset"] + expected_write_segs.length = expected_dict["write_segments"][0]["length"] + expected_write_segs.start_time = expected_dict["write_segments"][0]["start_time"] + expected_write_segs.end_time = expected_dict["write_segments"][0]["end_time"] + else: + expected_write_segs = np.recarray(0, dtype=[("offset", int), + ("length", int), + ("start_time", float), + ("end_time", float)]) + if expected_dict["read_segments"]: + expected_read_segs.offset = expected_dict["read_segments"][0]["offset"] + expected_read_segs.length = expected_dict["read_segments"][0]["length"] + expected_read_segs.start_time = expected_dict["read_segments"][0]["start_time"] + expected_read_segs.end_time = expected_dict["read_segments"][0]["end_time"] + else: + expected_read_segs = np.recarray(0, dtype=[("offset", int), + ("length", int), + ("start_time", float), + ("end_time", float)]) + expected_dict["write_segments"] = expected_write_segs + expected_dict["read_segments"] = expected_read_segs + logfile = get_log_path(logfile) log = backend.log_open(logfile) rec = backend.log_get_record(log, mod) - assert rec == expected_dict + for key in expected_dict.keys(): + if "segments" in key: + # careful, can't use assert_allclose directly + # on recarrays + assert_allclose(rec[key].offset, expected_dict[key].offset) + assert_allclose(rec[key].length, expected_dict[key].length) + assert_allclose(rec[key].start_time, expected_dict[key].start_time) + assert_allclose(rec[key].end_time, expected_dict[key].end_time) + else: + assert rec[key] == expected_dict[key] From 3d3af72e4e4db0da1b953261fc91eb10e2103823 Mon Sep 17 00:00:00 2001 From: Tyler Reddy Date: Wed, 3 Aug 2022 15:51:04 -0600 Subject: [PATCH 2/2] MAINT: PR 784 revisions * more efficient `log_get_dxt_record()` by reading directly from the C-contiguous segment buffer into NumPy recarrays * simplify the changes to `test_dxt_records()` --- .../pydarshan/darshan/backend/cffi_backend.py | 37 ++------- .../pydarshan/darshan/tests/test_moddxt.py | 80 +++++++------------ 2 files changed, 39 insertions(+), 78 deletions(-) diff --git a/darshan-util/pydarshan/darshan/backend/cffi_backend.py b/darshan-util/pydarshan/darshan/backend/cffi_backend.py index d4ffbcdbd..310000b39 100644 --- a/darshan-util/pydarshan/darshan/backend/cffi_backend.py +++ b/darshan-util/pydarshan/darshan/backend/cffi_backend.py @@ -559,37 +559,16 @@ def log_get_dxt_record(log, mod_name, reads=True, writes=True, dtype='dict'): rec['write_count'] = wcnt rec['read_count'] = rcnt - rec['write_segments'] = [] - rec['read_segments'] = [] - - size_of = ffi.sizeof("struct dxt_file_record") segments = ffi.cast("struct segment_info *", buf[0] + size_of ) - arr_write = np.recarray(wcnt, dtype=[("offset", int), - ("length", int), - ("start_time", float), - ("end_time", float)]) - arr_read = np.recarray(rcnt, dtype=[("offset", int), - ("length", int), - ("start_time", float), - ("end_time", float)]) - - for i in range(wcnt): - arr_write[i, ...] = (segments[i].offset, - segments[i].length, - segments[i].start_time, - segments[i].end_time) - - for k in range(rcnt): - i = k + wcnt - arr_read[k, ...] = (segments[i].offset, - segments[i].length, - segments[i].start_time, - segments[i].end_time) - - - rec['write_segments'] = arr_write - rec['read_segments'] = arr_read + segments_buf = ffi.buffer(segments, (rcnt + wcnt) * 64 * 4) + segment_arr = np.frombuffer(buffer=segments_buf, + dtype=[("offset", int), + ("length", int), + ("start_time", float), + ("end_time", float)]) + rec['write_segments'] = segment_arr[:wcnt] + rec['read_segments'] = segment_arr[wcnt: rcnt + wcnt] if dtype == "pandas": rec['read_segments'] = pd.DataFrame(rec['read_segments']) rec['write_segments'] = pd.DataFrame(rec['write_segments']) diff --git a/darshan-util/pydarshan/darshan/tests/test_moddxt.py b/darshan-util/pydarshan/darshan/tests/test_moddxt.py index 53532a7be..1cdd7bd80 100644 --- a/darshan-util/pydarshan/darshan/tests/test_moddxt.py +++ b/darshan-util/pydarshan/darshan/tests/test_moddxt.py @@ -18,60 +18,42 @@ 'hostname': 'sn176.localdomain', 'write_count': 1, 'read_count': 0, - 'write_segments': [{'offset': 0, - 'length': 40, - 'start_time': 0.10337884305045009, - 'end_time': 0.10338771319948137}], - 'read_segments': []}), + 'write_segments': np.array([(0, + 40, + 0.10337884305045009, + 0.10338771319948137)], + dtype=[("offset", int), + ("length", int), + ("start_time", float), + ("end_time", float)]), + 'read_segments': np.array([], + dtype=[("offset", int), + ("length", int), + ("start_time", float), + ("end_time", float)])}), ('DXT_MPIIO', {'id': 9457796068806373448, 'rank': 0, 'hostname': 'sn176.localdomain', 'write_count': 1, 'read_count': 0, - 'write_segments': [{'offset': 0, - 'length': 4000, - 'start_time': 0.10368914622813463, - 'end_time': 0.1053433942142874}], - 'read_segments': []})]) + 'write_segments': np.array([(0, + 4000, + 0.10368914622813463, + 0.1053433942142874)], + dtype=[("offset", int), + ("length", int), + ("start_time", float), + ("end_time", float)]), + 'read_segments': np.array([], + dtype=[("offset", int), + ("length", int), + ("start_time", float), + ("end_time", float)])})]) def test_dxt_records(logfile, mod, expected_dict): - # regression guard for DXT records values + # regression guard for DXT records values; # write_segments and read_segments are now NumPy # recarrays, to save considerable memory # per gh-779 - # TODO: refactor for simplicity--we can probably - # just initialize the expected values via - # np.array() with the appropriate structured dtypes - expected_write_segs = np.recarray(1, dtype=[("offset", int), - ("length", int), - ("start_time", float), - ("end_time", float)]) - expected_read_segs = np.recarray(1, dtype=[("offset", int), - ("length", int), - ("start_time", float), - ("end_time", float)]) - if expected_dict["write_segments"]: - expected_write_segs.offset = expected_dict["write_segments"][0]["offset"] - expected_write_segs.length = expected_dict["write_segments"][0]["length"] - expected_write_segs.start_time = expected_dict["write_segments"][0]["start_time"] - expected_write_segs.end_time = expected_dict["write_segments"][0]["end_time"] - else: - expected_write_segs = np.recarray(0, dtype=[("offset", int), - ("length", int), - ("start_time", float), - ("end_time", float)]) - if expected_dict["read_segments"]: - expected_read_segs.offset = expected_dict["read_segments"][0]["offset"] - expected_read_segs.length = expected_dict["read_segments"][0]["length"] - expected_read_segs.start_time = expected_dict["read_segments"][0]["start_time"] - expected_read_segs.end_time = expected_dict["read_segments"][0]["end_time"] - else: - expected_read_segs = np.recarray(0, dtype=[("offset", int), - ("length", int), - ("start_time", float), - ("end_time", float)]) - expected_dict["write_segments"] = expected_write_segs - expected_dict["read_segments"] = expected_read_segs - logfile = get_log_path(logfile) log = backend.log_open(logfile) rec = backend.log_get_record(log, mod) @@ -79,9 +61,9 @@ def test_dxt_records(logfile, mod, expected_dict): if "segments" in key: # careful, can't use assert_allclose directly # on recarrays - assert_allclose(rec[key].offset, expected_dict[key].offset) - assert_allclose(rec[key].length, expected_dict[key].length) - assert_allclose(rec[key].start_time, expected_dict[key].start_time) - assert_allclose(rec[key].end_time, expected_dict[key].end_time) + assert_allclose(rec[key]["offset"], expected_dict[key]["offset"]) + assert_allclose(rec[key]["length"], expected_dict[key]["length"]) + assert_allclose(rec[key]["start_time"], expected_dict[key]["start_time"]) + assert_allclose(rec[key]["end_time"], expected_dict[key]["end_time"]) else: assert rec[key] == expected_dict[key]