Skip to content

Commit 1fbebd7

Browse files
committed
dfreader_pandas: add param loading
1 parent 6e603c8 commit 1fbebd7

File tree

1 file changed

+198
-58
lines changed

1 file changed

+198
-58
lines changed

tools/dfreader_pandas.py

Lines changed: 198 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
Module to parse ArduPilot logs into pandas DataFrames with optional caching.
33
44
- Accepts a log file, list of messages/fields, and resample frequency.
5+
- Extracts both flight data and parameter changes as separate DataFrames.
56
- Aligns output rows to clean time intervals relative to the start of the log.
67
- Uses a lightweight caching mechanism keyed by file content and module hash.
78
- Automatically invalidates cache on module updates or parameter changes.
@@ -19,15 +20,51 @@
1920
HASH_SIZE_BYTES = 1024 * 1024 # 1MB
2021

2122

22-
def parse_log_to_df(path, specs, frequency, cache_dir=None):
23+
def load_log(path, fields, frequency, cache_dir=None):
24+
"""Load data and parameters from a log file."""
2325
manager = LogCacheManager(cache_dir)
24-
df = manager.try_load_dataframe(path, specs, frequency)
25-
if df is not None:
26-
return df
26+
manager.try_load(path, fields, frequency)
27+
28+
if manager.data is not None and manager.params is not None:
29+
return manager.data, manager.params
30+
31+
reader = DFReader_binary(path)
32+
if manager.data is None:
33+
manager.data = parse_log_data(reader, fields, frequency)
34+
if manager.params is None:
35+
manager.params = parse_log_params(reader)
36+
manager.save()
37+
return manager.data, manager.params
38+
39+
def load_log_params(path, cache_dir=None):
40+
"""Load only parameters from a log file."""
41+
manager = LogCacheManager(cache_dir)
42+
manager.try_load(path)
43+
44+
if manager.params is not None:
45+
return manager.params
2746

2847
reader = DFReader_binary(path)
29-
fields = expand_field_specs(specs, reader)
30-
# Dump the messages dict, so we get NaNs until the first valid message of each type
48+
manager.params = parse_log_params(reader)
49+
manager.save()
50+
return manager.params
51+
52+
def load_log_data(path, fields, frequency, cache_dir=None):
53+
"""Load only data from a log file."""
54+
manager = LogCacheManager(cache_dir)
55+
manager.try_load(path, fields, frequency)
56+
57+
if manager.data is not None:
58+
return manager.data
59+
60+
reader = DFReader_binary(path)
61+
manager.data = parse_log_data(reader, fields, frequency)
62+
manager.save()
63+
return manager.data
64+
65+
def parse_log_data(reader: DFReader_binary, fields, frequency):
66+
fields = expand_field_specs(fields, reader)
67+
# Ensures missing data is NaN until first message of each type is received
3168
reader.rewind()
3269

3370
PERIOD = 1 / frequency
@@ -55,7 +92,51 @@ def parse_log_to_df(path, specs, frequency, cache_dir=None):
5592
df.set_index("timestamp", inplace=True)
5693
df = df[[f"{m}.{f}" for m in fields.keys() for f in fields[m]]]
5794

58-
manager.save_dataframe(df)
95+
return df
96+
97+
98+
def parse_log_params(reader):
99+
"""Parse parameters from the log file."""
100+
param_dict = {}
101+
reader.rewind()
102+
103+
while True:
104+
msg = reader.recv_match(type="PARM")
105+
if msg is None:
106+
break
107+
name = msg.Name
108+
ts = msg._timestamp
109+
val = msg.Value
110+
default = msg.Default
111+
112+
entry = param_dict.setdefault(name, [])
113+
114+
# A NaN default means "no change"
115+
if pd.isna(default) and len(entry) > 0:
116+
default = entry[-1][2]
117+
118+
# We force the first timestamp to be time-of-boot. This allows users
119+
# to easily use asof() on the DataFrame later (since they don't have to
120+
# worry about using too early of a timestamp).
121+
if not entry:
122+
ts = reader.clock.timebase
123+
124+
# Log every change in value/default
125+
if not entry or entry[-1][0] != val or entry[-1][1] != default:
126+
entry.append((ts, val, default))
127+
128+
129+
# Flatten the dictionary list of index/value tuples
130+
index = []
131+
values = []
132+
for name, entries in param_dict.items():
133+
for entry in entries:
134+
index.append((name, pd.to_datetime(entry[0], unit="s")))
135+
values.append((entry[1], entry[2]))
136+
# Create a DataFrame with a multi-index
137+
df = pd.DataFrame(values, index=pd.MultiIndex.from_tuples(index))
138+
df.columns = ["Value", "Default"]
139+
df.index.names = ["Name", "Timestamp"]
59140

60141
return df
61142

@@ -98,13 +179,19 @@ class LogCacheManager:
98179
_module_hash = None
99180

100181
def __init__(self, cache_dir):
182+
self.init_module_hash()
101183
self.cache_dir = cache_dir
102-
if cache_dir is None:
103-
return
104-
os.makedirs(cache_dir, exist_ok=True)
105-
if self._module_hash is None:
184+
self.data = None
185+
self.params = None
186+
if cache_dir is not None:
187+
os.makedirs(cache_dir, exist_ok=True)
188+
189+
@classmethod
190+
def init_module_hash(cls):
191+
"""Initialize the module hash for the current module."""
192+
if cls._module_hash is None:
106193
with open(__file__, "rb") as f:
107-
self._module_hash = hashlib.sha256(f.read()).hexdigest()[:8]
194+
cls._module_hash = hashlib.sha256(f.read()).hexdigest()[:8]
108195

109196
@staticmethod
110197
def _compute_key(path):
@@ -119,30 +206,44 @@ def _compute_key(path):
119206
h = h[:16] # 16 characters is plenty to prevent collisions
120207
return f"{h}_{size}"
121208

122-
@staticmethod
123-
def _specs_equal(a, b):
124-
"""Compare two lists of messages/fields for equality."""
125-
return set(a) == set(b)
126-
127-
def _cache_path(self):
209+
def _data_cache_path(self):
128210
"""Compute the cache path for a given key."""
129211
if self.cache_dir is None:
130212
return None
131-
return os.path.join(self.cache_dir, f"{self.key}.feather")
213+
return os.path.join(self.cache_dir, f"{self._key}.feather")
214+
215+
def _param_cache_path(self):
216+
"""Compute the parameter cache path for a given key."""
217+
if self.cache_dir is None:
218+
return None
219+
return os.path.join(self.cache_dir, f"{self._key}.params.feather")
132220

133-
def _meta_path(self):
221+
def _data_meta_path(self):
134222
"""Compute the metadata path for a given key."""
135223
if self.cache_dir is None:
136224
return None
137-
return os.path.join(self.cache_dir, f"{self.key}.meta.json")
225+
return os.path.join(self.cache_dir, f"{self._key}.meta.json")
138226

139-
@staticmethod
140-
def _metadata(specs, frequency):
227+
def _param_meta_path(self):
228+
"""Compute the parameter metadata path for a given key."""
229+
if self.cache_dir is None:
230+
return None
231+
return os.path.join(self.cache_dir, f"{self._key}.params.meta.json")
232+
233+
@classmethod
234+
def _data_meta(cls, fields, frequency):
141235
"""Generate metadata for the cache file."""
142236
return {
143-
"specs": specs,
237+
"fields": fields,
144238
"frequency": frequency,
145-
"module_hash": LogCacheManager._module_hash,
239+
"module_hash": cls._module_hash,
240+
}
241+
242+
@classmethod
243+
def _param_meta(cls):
244+
"""Generate metadata for the parameter cache file."""
245+
return {
246+
"module_hash": cls._module_hash,
146247
}
147248

148249
@staticmethod
@@ -157,48 +258,72 @@ def _compare_metadata(meta1, meta2):
157258
return False
158259
return True
159260

160-
def try_load_dataframe(self, path, specs, frequency):
161-
"""Try to load a cached DataFrame for this log file.
261+
def try_load(self, path, fields=None, frequency=None):
262+
"""Try to load a cached data and params for this log file.
162263
163-
If the cache file does not exist yet, we store the information needed
164-
to save the cache file later.
264+
If we store the information needed to save the cache file later in
265+
case a cache file does not exist yet.
165266
"""
166267
if self.cache_dir is None or not os.path.exists(self.cache_dir):
167-
return None
168-
self.path = path
169-
self.key = self._compute_key(path)
170-
self.meta = self._metadata(specs, frequency)
171-
cache_path = self._cache_path()
172-
meta_path = self._meta_path()
173-
174-
if os.path.exists(cache_path) and os.path.exists(meta_path):
175-
with open(meta_path, "r") as f:
268+
return None, None
269+
self._log_path = path
270+
self._key = self._compute_key(path)
271+
self._data_meta = self._data_meta(fields, frequency)
272+
self._param_meta = self._param_meta()
273+
data_cache_path = self._data_cache_path()
274+
data_meta_path = self._data_meta_path()
275+
276+
if os.path.exists(data_cache_path) and os.path.exists(data_meta_path):
277+
with open(data_meta_path, "r") as f:
278+
meta = json.load(f)
279+
if self._compare_metadata(meta, self._data_meta):
280+
self.data = pd.read_feather(data_cache_path)
281+
282+
param_cache_path = self._param_cache_path()
283+
param_meta_path = self._param_meta_path()
284+
if os.path.exists(param_cache_path) and os.path.exists(
285+
param_meta_path
286+
):
287+
with open(param_meta_path, "r") as f:
176288
meta = json.load(f)
177-
if self._compare_metadata(meta, self.meta):
178-
return pd.read_feather(cache_path)
179-
return None
289+
if self._compare_metadata(meta, self._param_meta):
290+
self.params = pd.read_feather(param_cache_path)
291+
292+
return self.data, self.params
180293

181-
def save_dataframe(self, df):
294+
def save(self):
182295
"""Save the DataFrame to a cache file."""
183296
if self.cache_dir is None or not os.path.exists(self.cache_dir):
184297
return
185-
key = self._compute_key(self.path)
186-
if self.key != key:
298+
if self.data is None and self.params is None:
299+
raise ValueError(
300+
"Either data or params must be provided to save the cache"
301+
)
302+
key = self._compute_key(self._log_path)
303+
if self._key != key:
187304
print(
188-
f"Warning: cache key {self.key} does not match computed key {key}. "
189-
"This suggests the log file has changed since it was opened. "
190-
"This dataframe will not be saved to the cache."
305+
f"Warning: cache key {self._key} does not match computed key "
306+
f"{key}. This suggests the log file has changed since it was "
307+
"opened. Your data is likely truncated; it will not be saved "
308+
"to the cache."
191309
)
192310
return
193311

194-
df.to_feather(self._cache_path())
195-
with open(self._meta_path(), "w") as f:
196-
json.dump(self.meta, f)
312+
if self.data is not None:
313+
self.data.to_feather(self._data_cache_path())
314+
with open(self._data_meta_path(), "w") as f:
315+
json.dump(self._data_meta, f)
197316

198-
# Force the user to call try_load_dataframe() again before saving again
199-
self.key = None
200-
self.path = None
201-
self.meta = None
317+
if self.params is not None:
318+
self.params.to_feather(self._param_cache_path())
319+
with open(self._param_meta_path(), "w") as f:
320+
json.dump(self._param_meta, f)
321+
322+
# Every call to save() should be preceded by a call to try_load()
323+
self._key = None
324+
self._log_path = None
325+
self._data_meta = None
326+
self._param_meta = None
202327

203328

204329
def main():
@@ -240,7 +365,9 @@ def main():
240365
from line_profiler import LineProfiler
241366

242367
profiler = LineProfiler()
243-
profiler.add_function(parse_log_to_df)
368+
profiler.add_function(load_log)
369+
profiler.add_function(parse_log_data)
370+
profiler.add_function(parse_log_params)
244371
profiler.add_function(new_row)
245372
profiler.enable_by_count()
246373

@@ -249,12 +376,25 @@ def main():
249376
"No fields provided. Use --fields to specify message types and/or fields."
250377
)
251378

252-
df = parse_log_to_df(
379+
data, params = load_log(
253380
args.path, args.fields, args.frequency, args.cache_dir
254381
)
255-
print(df.head())
382+
print(data.head())
383+
print("...")
384+
print(data.tail())
385+
print(params.head())
386+
print("...")
387+
print(params.tail())
388+
389+
# Drop all params that don't change
390+
# Params is a multi-index with Name, Timestamp. Drop everything with only
391+
# one timestamp
392+
params = params.groupby("Name").filter(lambda x: len(x) > 1)
393+
394+
# Print all the names remaining, and their counts
395+
print("Remaining params:")
396+
print(params.groupby("Name").size())
256397
print("...")
257-
print(df.tail())
258398

259399
if args.profile:
260400
profiler.print_stats()

0 commit comments

Comments
 (0)