Skip to content

Commit a6051ea

Browse files
committed
merge branch Telemetry-SD-Card into main
2 parents 344d223 + 911046b commit a6051ea

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

85 files changed

+42720
-0
lines changed

.clang-format

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
IndentWidth: 4
2+
BasedOnStyle: LLVM

Telemetry-SD-Card/CMakeLists.txt

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
cmake_minimum_required(VERSION 3.19)
2+
cmake_policy(VERSION 3.19...3.22)
3+
4+
set(MBED_APP_JSON_PATH mbed_app.json5)
5+
set(CMAKE_EXPORT_COMPILE_COMMANDS 1)
6+
7+
8+
# Main project definition
9+
include(../mbed-os/tools/cmake/mbed_toolchain_setup.cmake)
10+
project(SDCard LANGUAGES C CXX ASM)
11+
include(mbed_project_setup)
12+
13+
# Include MBED OS
14+
add_subdirectory(../mbed-os ../mbed-os-build)
15+
16+
# Fetch Nanoarrow - a minimal implementation of Apache Arrow
17+
include(FetchContent)
18+
# This enables the nanoarrow IPC extension (required to use Arrow IPC functions)
19+
# (note: the nanoarrow_ipc library is created conditionally based on this flag)
20+
set(NANOARROW_IPC ON CACHE BOOL "Build IPC extension")
21+
FetchContent_Declare(
22+
nanoarrow
23+
URL https://github.com/apache/arrow-nanoarrow/releases/download/apache-arrow-nanoarrow-0.6.0/apache-arrow-nanoarrow-0.6.0.tar.gz
24+
URL_HASH SHA512=25e81c119c16bfa29a9943d3904acd8f29f225ff4a1c99decde7c8235300232402106a4b5249921df773fb797bb0149a885fa3a4524c34c67c06199edd72250c
25+
DOWNLOAD_EXTRACT_TIMESTAMP ON
26+
)
27+
FetchContent_MakeAvailable(nanoarrow)
28+
29+
30+
# Add our main executable, and link all the above libraries to it
31+
set(CMAKE_CXX_STANDARD 20)
32+
add_executable(${PROJECT_NAME} main.cpp)
33+
34+
target_link_libraries(${PROJECT_NAME} mbed-os mbed-storage-sd mbed-storage-fat)
35+
# "SYSTEM" should remove warnings when compiling the generated flatbuffers code
36+
# target_include_directories(${PROJECT_NAME} SYSTEM PUBLIC include)
37+
# target_link_libraries(${PROJECT_NAME} mbed-os mbed-storage-sd mbed-storage-fat nanoarrow nanoarrow_ipc)
38+
# target_link_libraries(${PROJECT_NAME} mbed-baremetal mbed-storage-sd mbed-storage-fat)
39+
# target_link_libraries(${PROJECT_NAME} mbed-baremetal mbed-storage-sd mbed-storage-fat nanoarrow nanoarrow_ipc)
40+
41+
mbed_set_post_build(${PROJECT_NAME})

Telemetry-SD-Card/arrowStream.py

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
import argparse
2+
import numpy as np
3+
import polars as pl
4+
import serial
5+
6+
parser = argparse.ArgumentParser()
7+
parser.add_argument('--input_file','-i',type=str, action='store', help='input file name', required=True)
8+
parser.add_argument('--port','-P',type=str, action='store', help='serial_port', required=True)
9+
parser.add_argument('--log','-l',type=str, action='store', help='serial_port', required=True)
10+
parser.add_argument('--output_file','-o',type=str, action='store', help='output file name', default='out.arrow')
11+
args = parser.parse_args()
12+
13+
# Conversion from ASCII defined types in custom binary to numpy types.
14+
# [type, n] where size is 2**n bits
15+
types_dict = {
16+
"i3": np.int8,
17+
"i4": np.int16,
18+
"i5": np.int32,
19+
"i6": np.int64,
20+
"u3": np.uint8,
21+
"u4": np.uint16,
22+
"u5": np.uint32,
23+
"u6": np.uint64,
24+
"f4": np.float16,
25+
"f5": np.float32,
26+
"f6": np.float64,
27+
"b0": bool
28+
}
29+
30+
# Initialize dataframe
31+
df = pl.DataFrame()
32+
33+
# Open serial port. Intentionall have no timeout so that we can read until we get the expected header/schema.
34+
s = serial.Serial(args.port, baudrate=115200)
35+
s.set_buffer_size(rx_size=1000000)
36+
s.reset_input_buffer()
37+
s.reset_output_buffer()
38+
s.write(b'QS') # Query Schema
39+
40+
# Attempts to read the schema from the serial port.
41+
# Requests by sending 'QS'
42+
for i in range(1000):
43+
header = ascii(s.read(8))
44+
m = np.frombuffer(s.read(4), dtype=np.uint32, count=1)[0]
45+
n = np.frombuffer(s.read(4), dtype=np.uint32, count=1)[0]
46+
47+
## Expects to receive record bathes in some multiple of bytes therefore it will not work if n is not a multiple of 8.
48+
if ((n%8 != 0) or (n <= 0) or (m <= 0) or (header[:5] != "FSDAQ")):
49+
s.write(b'QS') # Query Schema again if n is not a multiple of 8 - Indicates failed attempt to receive schema to the NUCLEO
50+
else:
51+
s.write(b'RS') # Received Schema - Indicates to NUCLEO to start sending data
52+
break
53+
if i == 999:
54+
s.write(b'SE') # Schema Error - Indicates to the NUCLEO that it failed to get the schema and will exit the program.
55+
s.close()
56+
raise Exception("Schema Error: Failed to receive valid schema after 1000 attempts.")
57+
58+
# print(f"Header: {header}, m: {m}, n: {n}")
59+
60+
# Reading in the colum titles. They are specified as
61+
colTitles = []
62+
for i in range(m):
63+
length = s.read() # uint8 length of title
64+
title = s.read(length).decode('ascii') # ascii string of title
65+
colTitles.append(title) # Add title to list of titles
66+
67+
# Reading in the column types. They are specified as 2 byte ascii strings where
68+
# the first character is type of type (int, uint, float, bool)
69+
# and the second character is log2() the size of the type in bits (3, 4, 5, 6). Ex. f5 = float32, b0 = bool, u4 = uint16
70+
colTypes = []
71+
for i in range(m):
72+
colType = s.read(2).decode('ascii') # 2 byte ascii string of type
73+
if colType not in types_dict:
74+
s.write(b'SE') # Schema Error - Indicates to the NUCLEO that it failed to get the schema and will exit the program.
75+
s.close()
76+
raise Exception(f"Schema Error: Invalid column type {colType} received.")
77+
colTypes.append((types_dict[colType], int((2**int(colType[1]))/8*n)))
78+
79+
80+
while (True):
81+
frame_pieces = []
82+
for i in range(m):
83+
col_byte_len = colTypes[i][1]
84+
col_type = colTypes[i][0]
85+
col_bit = np.frombuffer(s.read(col_byte_len), dtype=col_type, count=n)
86+
frame_piece_series = pl.Series(colTitles[i], col_bit)
87+
frame_pieces.append(frame_piece_series)
88+
pos += col_byte_len
89+
frame_piece = pl.DataFrame(frame_pieces)
90+
df.to_arrow()

Telemetry-SD-Card/dbc_to_cpp.py

Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
import os
2+
import cantools
3+
import pyarrow as pa
4+
import numpy as np
5+
6+
# See choose_float_type() for explanation
7+
DEFAULT_FLOAT_TOLERANCE = 0.1
8+
9+
FSDAQ_TYPE_TO_C_TYPE = {
10+
"b0": "bool",
11+
"u3": "uint8_t",
12+
"i3": "int8_t",
13+
"u4": "uint16_t",
14+
"i4": "int16_t",
15+
"u5": "uint32_t",
16+
"i5": "int32_t",
17+
"u6": "uint64_t",
18+
"i6": "int64_t",
19+
"f5": "float",
20+
"f6": "double",
21+
}
22+
23+
# Find the appropriate type to represent a given signal.
24+
# - If length==1, bool is used
25+
# - If scale==1, the correctly sized integer type is used
26+
# - Otherwise, choose_float_type() is used
27+
# See choose_float_type() for a description of float_tolerance
28+
def get_fsdaq_type_for_signal(
29+
s: cantools.db.Signal, float_tolerance: float = DEFAULT_FLOAT_TOLERANCE
30+
) -> str:
31+
if s.length == 1 and s.scale == 1 and s.offset == 0:
32+
return "b0"
33+
34+
raw_min = -(1 << (s.length - 1)) if s.is_signed else 0
35+
raw_max = (1 << (s.length - 1)) - 1 if s.is_signed else (1 << s.length) - 1
36+
37+
if s.scale == 1:
38+
decoded_min = raw_min
39+
decoded_max = raw_max
40+
candidates = [
41+
("u3", 0, 255),
42+
("i3", -128, 127),
43+
("u4", 0, 65535),
44+
("i4", -32768, 32767),
45+
("u5", 0, 2**32 - 1),
46+
("i5", -(2**31), 2**31 - 1),
47+
]
48+
for arrow_type, min_val, max_val in candidates:
49+
if decoded_min >= min_val and decoded_max <= max_val:
50+
return arrow_type
51+
return "i5" if s.is_signed else "u5"
52+
53+
return choose_float_type(s, float_tolerance)
54+
55+
56+
# Find the appropriate float type to represent the signal within the given
57+
# tolerance to the signal's scale. Determined by testing if every possible real
58+
# (scaled) value is representable within tolerance. Eg: if tolerance is 0.5
59+
# (50%), then any float whose value is within 50%*(signal's scale) of the actual
60+
# value is considered close enough
61+
def choose_float_type(s: cantools.db.Signal, tolerance: float):
62+
raw_min = -(1 << (s.length - 1)) if s.is_signed else 0
63+
raw_max = (1 << (s.length - 1)) - 1 if s.is_signed else (1 << s.length) - 1
64+
65+
# Interpret min/max as integers, create a range of every integer in between
66+
possible_raw_values = np.arange(raw_min, raw_max + 1)
67+
# Scale that range of integers to get every possible decimal value
68+
possible_decoded_values = possible_raw_values * s.scale + s.offset
69+
70+
# Make sure every possible decimal value is representable by some float
71+
# type, up to a precision of 10% of the signal's scale
72+
atol = abs(s.scale) * tolerance
73+
for dtype, fsdaq_type in [
74+
# (np.float16, "f?"),
75+
(np.float32, "f5"),
76+
(np.float64, "f6"),
77+
]:
78+
# If every possibel real value is close enough using a certain float
79+
# type, choose that float type. rtol is 0 as we don't care about
80+
# relative tolerance, only tolerance based on the signal's scale
81+
if np.allclose(
82+
possible_decoded_values,
83+
possible_decoded_values.astype(dtype),
84+
rtol=0,
85+
atol=atol,
86+
):
87+
return fsdaq_type
88+
89+
# Fallback if not exactly representable within 5% tolerance of scale (rare)
90+
print(
91+
f"Warning: Signal ${s.name} not representable as 64 bit float within 5% tolerance; ignoring!"
92+
)
93+
return pa.float64()
94+
95+
96+
def generate_cpp_code(signal_to_fsdaq_datatype: dict[str, str], rows: int = 8):
97+
assert rows % 8 == 0
98+
99+
out_file = open("./fsdaq_encoder_generated_from_dbc.hpp", "w")
100+
template_file = open("./fsdaq_encoder_generated_from_dbc.hpp.in", "r")
101+
102+
template = template_file.read()
103+
template = template.replace("@COLS@", str(len(signal_to_fsdaq_datatype)))
104+
template = template.replace("@ROWS@", str(rows))
105+
106+
col_names = ", ".join(['"' + col + '"' for col in signal_to_fsdaq_datatype.keys()])
107+
col_name_sizes = ", ".join([str(len(col_name)) for col_name in signal_to_fsdaq_datatype.keys()])
108+
col_name_types = ", ".join(['"' + fsdaq_type + '"' for fsdaq_type in signal_to_fsdaq_datatype.values()])
109+
template = template.replace("@COL_NAMES@", col_names)
110+
template = template.replace("@COL_NAME_SIZES@", col_name_sizes)
111+
template = template.replace("@COL_NAME_TYPES@", col_name_types)
112+
113+
values_struct_fields = []
114+
values_row_struct_fields = []
115+
update_fields_from_row = []
116+
for col_name, fsdaq_type in signal_to_fsdaq_datatype.items():
117+
if fsdaq_type == "b0":
118+
values_struct_fields.append(" "*4 + "uint8_t" + " " + col_name + "[ROWS/8];")
119+
update_fields_from_row.append(" "*8 + "this->" + col_name + "[idx/8] |= row." + col_name + " << idx;")
120+
else:
121+
values_struct_fields.append(" " + FSDAQ_TYPE_TO_C_TYPE[fsdaq_type] + " " + col_name + "[ROWS];")
122+
update_fields_from_row.append(" "*8 + "this->" + col_name + "[idx] = row." + col_name + ";")
123+
values_row_struct_fields.append(" "*4 + FSDAQ_TYPE_TO_C_TYPE[fsdaq_type] + " " + col_name + ";")
124+
template = template.replace("@VALUES_STRUCT_FIELDS@", "\n".join(values_struct_fields))
125+
template = template.replace("@VALUES_ROW_STRUCT_FIELDS@", "\n".join(values_row_struct_fields))
126+
127+
template = template.replace("@UPDATE_FIELDS_FROM_ROW@", "\n".join(update_fields_from_row))
128+
129+
130+
131+
out_file.write(template)
132+
out_file.close()
133+
template_file.close()
134+
135+
136+
if __name__ == "__main__":
137+
# for debugging purposes
138+
np.set_printoptions(formatter={"float_kind": "{:.8f}".format})
139+
140+
db = cantools.db.Database()
141+
142+
db.add_dbc_file("../CANbus.dbc")
143+
144+
signal_to_datatype: dict[str, pa.DataType] = {}
145+
146+
for msg in db.messages:
147+
for signal in msg.signals:
148+
signal_to_datatype[signal.name] = get_fsdaq_type_for_signal(signal)
149+
150+
# for k, v in signal_to_datatype.items():
151+
# v = str(v).removeprefix("(DataType(")
152+
# v = v.removesuffix("),)")
153+
# print("{:33s} {}".format(k, v))
154+
155+
# n = 50
156+
# generate_nanoarrow_code({k: signal_to_datatype[k] for k in list(signal_to_datatype)[:n]}, 8)
157+
generate_cpp_code({k: signal_to_datatype[k] for k in list(signal_to_datatype)}, 80)
158+
159+
160+
161+
162+
# OLD NANOARROW GENERATION CODE:
163+
#
164+
# def generate_nanoarrow_code(signal_to_datatype: dict[str, pa.DataType]):
165+
# with open("./nanoarrow_generated_from_dbc.hpp", "w") as f:
166+
# cols = len(signal_to_datatype)
167+
#
168+
# f.writelines(
169+
# [
170+
# f"#include <nanoarrow/nanoarrow.hpp>\n",
171+
# f"#include <nanoarrow/nanoarrow.h>\n",
172+
# f"#include <nanoarrow/nanoarrow_ipc.hpp>\n",
173+
# f"#include <nanoarrow/nanoarrow_ipc.h>\n",
174+
# f"\n"
175+
# f"na::UniqueSchema make_nanoarrow_schema() {{\n"
176+
# f" na::UniqueSchema schema_root;\n"
177+
# f" ArrowSchemaInit(schema_root.get());\n"
178+
# f" ArrowSchemaSetTypeStruct(schema_root.get(), {cols});\n"
179+
# f"\n",
180+
# ]
181+
# )
182+
# for i, (name, datatype) in enumerate(signal_to_datatype.items()):
183+
# nanoarrow_type_macro = PYARROW_TO_NANOARROW[datatype]
184+
# f.writelines(
185+
# [
186+
# f" ArrowSchemaInitFromType(schema_root->children[{i}], {nanoarrow_type_macro});\n",
187+
# f' ArrowSchemaSetName(schema_root->children[{i}], "{name}");\n',
188+
# ]
189+
# )
190+
# f.writelines(
191+
# [
192+
# " return schema_root;\n"
193+
# "}\n",
194+
# ]
195+
# )
196+
#
197+
# f.writelines(
198+
# [
199+
# f"na::UniqueArray make_nanoarrow_array(ArrowSchema *schema_root, int batch_rows) {{\n",
200+
# f" ArrowError error;\n",
201+
# f"\n",
202+
# f" na::UniqueArray array_root;\n",
203+
# f" ARROW_ERROR_PRINT(ArrowArrayInitFromSchema(array_root.get(), schema_root, &error));\n",
204+
# f" ArrowArrayAllocateChildren(array_root.get(), {cols});\n",
205+
# f" for (int i = 0; i < {cols}; i++) {{\n",
206+
# f" ARROW_ERROR_PRINT(ArrowArrayInitFromSchema(array_root->children[i], schema_root->children[i], &error));\n",
207+
# # f" ArrowArrayStartAppending(array_root->children[i]);",
208+
# f" ArrowArrayReserve(array_root->children[i], batch_rows);\n",
209+
# # f" for (int i = 0; i < ROWS; i++) {{",
210+
# # f" ArrowArrayAppendInt(array_root->children[i], 12340 + i);",
211+
# # f" }}",
212+
# # f" ARROW_ERROR_PRINT(ArrowArrayFinishBuildingDefault(array_root->children[i], &error));",
213+
# ]
214+
# )
215+
# f.writelines(
216+
# [
217+
# " }\n",
218+
# " return array_root;\n",
219+
# "}\n",
220+
# ]
221+
# )
222+
223+
# # NOTE: This mapping is NOT exhuastive
224+
# PYARROW_TO_NANOARROW: dict[pa.DataType, str] = {
225+
# pa.null(): "NANOARROW_TYPE_NA",
226+
# pa.bool_(): "NANOARROW_TYPE_BOOL",
227+
# pa.int8(): "NANOARROW_TYPE_INT8",
228+
# pa.uint8(): "NANOARROW_TYPE_UINT8",
229+
# pa.int16(): "NANOARROW_TYPE_INT16",
230+
# pa.uint16(): "NANOARROW_TYPE_UINT16",
231+
# pa.int32(): "NANOARROW_TYPE_INT32",
232+
# pa.uint32(): "NANOARROW_TYPE_UINT32",
233+
# pa.int64(): "NANOARROW_TYPE_INT64",
234+
# pa.uint64(): "NANOARROW_TYPE_UINT64",
235+
# pa.float16(): "NANOARROW_TYPE_HALF_FLOAT",
236+
# pa.float32(): "NANOARROW_TYPE_FLOAT",
237+
# pa.float64(): "NANOARROW_TYPE_DOUBLE",
238+
# pa.float64(): "NANOARROW_TYPE_DOUBLE",
239+
# pa.date32(): "NANOARROW_TYPE_DATE32",
240+
# pa.date64(): "NANOARROW_TYPE_DATE64",
241+
# pa.time32("ms"): "NANOARROW_TYPE_TIME32",
242+
# pa.time64("ns"): "NANOARROW_TYPE_TIME64",
243+
# pa.timestamp("ms"): "NANOARROW_TYPE_TIMESTAMP",
244+
# pa.duration("ms"): "NANOARROW_TYPE_DURATION",
245+
# }

0 commit comments

Comments
 (0)