Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 32 additions & 12 deletions src/nipanel/_convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
IntCollectionConverter,
StrCollectionConverter,
)
from nipanel.converters.protobuf_types import DoubleAnalogWaveformConverter, ScalarConverter
from nipanel.converters.protobuf_types import (
Double2DArrayConverter,
DoubleAnalogWaveformConverter,
ScalarConverter,
)

_logger = logging.getLogger(__name__)

Expand All @@ -40,6 +44,7 @@
IntCollectionConverter(),
StrCollectionConverter(),
# Protobuf Types
Double2DArrayConverter(),
DoubleAnalogWaveformConverter(),
ScalarConverter(),
]
Expand All @@ -66,25 +71,39 @@ def to_any(python_value: object) -> any_pb2.Any:
def _get_best_matching_type(python_value: object) -> str:
underlying_parents = type(python_value).mro() # This covers enum.IntEnum and similar

container_type = None
value_is_collection = _CONVERTIBLE_COLLECTION_TYPES.intersection(underlying_parents)
if value_is_collection:
container_types = []
value_is_collection = any(_CONVERTIBLE_COLLECTION_TYPES.intersection(underlying_parents))
# Variable to use when traversing down through collection types.
working_python_value = python_value
while value_is_collection:
# Assume Sized -- Generators not supported, callers must use list(), set(), ... as desired
if not isinstance(python_value, Collection):
if not isinstance(working_python_value, Collection):
raise TypeError()
if len(python_value) == 0:
if len(working_python_value) == 0:
underlying_parents = type(None).mro()
value_is_collection = False
else:
# Assume homogenous -- collections of mixed-types not supported
visitor = iter(python_value)
first_value = next(visitor)
underlying_parents = type(first_value).mro()
container_type = Collection
visitor = iter(working_python_value)

# Store off the first element. If it's a container, we'll need it in the next while
# loop iteration.
working_python_value = next(visitor)
underlying_parents = type(working_python_value).mro()

# If this element is a collection, we want to continue traversing. Once we find a
# non-collection, underlying_elements will refer to the candidates for the non-
# collection type.
value_is_collection = any(
_CONVERTIBLE_COLLECTION_TYPES.intersection(underlying_parents)
)
container_types.append(Collection.__name__)

best_matching_type = None
candidates = [parent.__name__ for parent in underlying_parents]
for candidate in candidates:
python_typename = f"{container_type.__name__}.{candidate}" if container_type else candidate
containers_str = ".".join(container_types)
python_typename = f"{containers_str}.{candidate}" if containers_str else candidate
if python_typename not in _SUPPORTED_PYTHON_TYPES:
continue
best_matching_type = python_typename
Expand All @@ -93,7 +112,8 @@ def _get_best_matching_type(python_value: object) -> str:
if not best_matching_type:
payload_type = underlying_parents[0]
raise TypeError(
f"Unsupported type: ({container_type}, {payload_type}) with parents {underlying_parents}. Supported types are: {_SUPPORTED_PYTHON_TYPES}"
f"Unsupported type: ({container_types}, {payload_type}) with parents "
f"{underlying_parents}. Supported types are: {_SUPPORTED_PYTHON_TYPES}"
)
_logger.debug(f"Best matching type for '{repr(python_value)}' resolved to {best_matching_type}")
return best_matching_type
Expand Down
52 changes: 50 additions & 2 deletions src/nipanel/converters/protobuf_types.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
"""Classes to convert between measurement specific protobuf types and containers."""

import collections.abc
import datetime as dt
from collections.abc import Collection, Mapping
from typing import Type, Union

import hightime as ht
import nitypes.bintime as bt
import numpy as np
from ni.protobuf.types import scalar_pb2
from ni_measurement_plugin_sdk_service._internal.stubs.ni.protobuf.types.array_pb2 import (
Double2DArray,
)
from ni_measurement_plugin_sdk_service._internal.stubs.ni.protobuf.types.precision_timestamp_pb2 import (
PrecisionTimestamp,
)
Expand Down Expand Up @@ -37,6 +40,51 @@
}


class Double2DArrayConverter(Converter[Collection[Collection[float]], Double2DArray]):
"""A converter between Collection[Collection[float]] and Double2DArray."""

@property
def python_typename(self) -> str:
"""The Python type that this converter handles."""
return f"{Collection.__name__}.{Collection.__name__}.{float.__name__}"

@property
def protobuf_message(self) -> Type[Double2DArray]:
"""The type-specific protobuf message for the Python type."""
return Double2DArray

def to_protobuf_message(self, python_value: Collection[Collection[float]]) -> Double2DArray:
"""Convert the Python Collection[Collection[float]] to a protobuf Double2DArray."""
rows = len(python_value)
if rows:
visitor = iter(python_value)
first_subcollection = next(visitor)
columns = len(first_subcollection)
else:
columns = 0
if not all(len(subcollection) == columns for subcollection in python_value):
raise ValueError("All subcollections must have the same length.")

# Create a flat list in row major order.
flat_list = [item for subcollection in python_value for item in subcollection]
return Double2DArray(rows=rows, columns=columns, data=flat_list)

def to_python_value(self, protobuf_message: Double2DArray) -> Collection[Collection[float]]:
"""Convert the protobuf Double2DArray to a Python Collection[Collection[float]]."""
if not protobuf_message.data:
return []
if len(protobuf_message.data) % protobuf_message.columns != 0:
raise ValueError("The length of the data list must be divisible by num columns.")

# Convert from a flat list in row major order into a list of lists.
list_of_lists = []
for i in range(0, len(protobuf_message.data), protobuf_message.columns):
row = protobuf_message.data[i : i + protobuf_message.columns]
list_of_lists.append(row)

return list_of_lists


class DoubleAnalogWaveformConverter(Converter[AnalogWaveform[np.float64], DoubleAnalogWaveform]):
"""A converter for AnalogWaveform types with scaled data (double)."""

Expand Down Expand Up @@ -79,7 +127,7 @@ def to_protobuf_message(self, python_value: AnalogWaveform[np.float64]) -> Doubl
def _extended_properties_to_attributes(
self,
extended_properties: ExtendedPropertyDictionary,
) -> collections.abc.Mapping[str, WaveformAttributeValue]:
) -> Mapping[str, WaveformAttributeValue]:
return {key: self._value_to_attribute(value) for key, value in extended_properties.items()}

def _value_to_attribute(self, value: ExtendedPropertyValue) -> WaveformAttributeValue:
Expand Down
63 changes: 62 additions & 1 deletion tests/unit/test_convert.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from typing import Any, Union
from typing import Any, Collection, Union

import numpy as np
import pytest
from google.protobuf import any_pb2, wrappers_pb2
from google.protobuf.message import Message
from ni.protobuf.types.scalar_pb2 import ScalarData
from ni.pythonpanel.v1 import python_panel_types_pb2
from ni_measurement_plugin_sdk_service._internal.stubs.ni.protobuf.types.array_pb2 import (
Double2DArray,
)
from ni_measurement_plugin_sdk_service._internal.stubs.ni.protobuf.types.waveform_pb2 import (
DoubleAnalogWaveform,
)
Expand All @@ -14,6 +17,7 @@
from typing_extensions import TypeAlias

import nipanel._convert
import tests.types


_AnyWrappersPb2: TypeAlias = Union[
Expand Down Expand Up @@ -44,6 +48,11 @@
(456.2, "float"),
(123, "int"),
("mystr", "str"),
(tests.types.MyIntFlags.VALUE1, "int"),
(tests.types.MyIntEnum.VALUE10, "int"),
(tests.types.MixinIntEnum.VALUE11, "int"),
(tests.types.MyStrEnum.VALUE1, "str"),
(tests.types.MixinStrEnum.VALUE11, "str"),
([False, False], "Collection.bool"),
([b"mystr", b"mystr"], "Collection.bytes"),
([456.2, 1.0], "Collection.float"),
Expand All @@ -69,6 +78,14 @@
(frozenset([456.2, 1.0]), "Collection.float"),
(frozenset([123, 456]), "Collection.int"),
(frozenset(["mystr", "mystr2"]), "Collection.str"),
([[1.0, 2.0], [1.0, 2.0]], "Collection.Collection.float"),
([(1.0, 2.0), (3.0, 4.0)], "Collection.Collection.float"),
([set([1.0, 2.0]), set([3.0, 4.0])], "Collection.Collection.float"),
([frozenset([1.0, 2.0]), frozenset([3.0, 4.0])], "Collection.Collection.float"),
(([1.0, 2.0], [3.0, 4.0]), "Collection.Collection.float"),
(((1.0, 2.0), (3.0, 4.0)), "Collection.Collection.float"),
((set([1.0, 2.0]), set([3.0, 4.0])), "Collection.Collection.float"),
((frozenset([1.0, 2.0]), frozenset([3.0, 4.0])), "Collection.Collection.float"),
],
)
def test___various_python_objects___get_best_matching_type___returns_correct_type_string(
Expand Down Expand Up @@ -194,6 +211,39 @@ def test___python_analog_waveform___to_any___valid_double_analog_waveform() -> N
assert list(unpack_dest.y_data) == [0.0, 0.0, 0.0]


@pytest.mark.parametrize(
"python_value",
[
# lists of collections
([[1.0, 2.0], [3.0, 4.0]]),
([(1.0, 2.0), (3.0, 4.0)]),
([set([1.0, 2.0]), set([3.0, 4.0])]),
([frozenset([1.0, 2.0]), frozenset([3.0, 4.0])]),
# tuples of collections
(([1.0, 2.0], [3.0, 4.0])),
(((1.0, 2.0), (3.0, 4.0))),
((set([1.0, 2.0]), set([3.0, 4.0]))),
((frozenset([1.0, 2.0]), frozenset([3.0, 4.0]))),
# sets and frozensets of collections don't preserve order,
# so they need to be tested separately.
],
)
def test___python_2dcollection_of_float___to_any___valid_double2darray(
python_value: Collection[Collection[float]],
) -> None:
expected_data = [1.0, 2.0, 3.0, 4.0]
expected_rows = 2
expected_columns = 2
result = nipanel._convert.to_any(python_value)
unpack_dest = Double2DArray()
_assert_any_and_unpack(result, unpack_dest)

assert isinstance(unpack_dest, Double2DArray)
assert unpack_dest.rows == expected_rows
assert unpack_dest.columns == expected_columns
assert unpack_dest.data == expected_data


# ========================================================
# Protobuf Types: Protobuf to Python
# ========================================================
Expand All @@ -219,6 +269,17 @@ def test___double_analog_waveform___from_any___valid_python_analog_waveform() ->
assert result.dtype == np.float64


def test___double2darray___from_any___valid_python_2dcollection() -> None:
pb_value = Double2DArray(data=[1.0, 2.0, 3.0, 4.0], rows=2, columns=2)
packed_any = _pack_into_any(pb_value)

result = nipanel._convert.from_any(packed_any)

expected_value = [[1.0, 2.0], [3.0, 4.0]]
assert isinstance(result, type(expected_value))
assert result == expected_value


# ========================================================
# Pack/Unpack Helpers
# ========================================================
Expand Down
79 changes: 79 additions & 0 deletions tests/unit/test_protobuf_type_conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import numpy
import pytest
from ni.protobuf.types.scalar_pb2 import ScalarData
from ni_measurement_plugin_sdk_service._internal.stubs.ni.protobuf.types.array_pb2 import (
Double2DArray,
)
from ni_measurement_plugin_sdk_service._internal.stubs.ni.protobuf.types.waveform_pb2 import (
DoubleAnalogWaveform,
WaveformAttributeValue,
Expand All @@ -12,12 +15,88 @@
from nitypes.waveform import AnalogWaveform, NoneScaleMode, SampleIntervalMode, Timing

from nipanel.converters.protobuf_types import (
Double2DArrayConverter,
DoubleAnalogWaveformConverter,
PrecisionTimestampConverter,
ScalarConverter,
)


# ========================================================
# list[list[float]] to Double2DArray
# Other collection types are tested in test_convert.py
# ========================================================
@pytest.mark.parametrize(
"list_of_lists, expected_data, expected_rows, expected_columns",
[
([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]], [1.0, 2.0, 3.0, 4.0, 5.0, 6.0], 3, 2),
([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]], [1.0, 2.0, 3.0, 4.0, 5.0, 6.0], 2, 3),
],
)
def test___list_of_lists___convert___valid_double2darray(
list_of_lists: list[list[float]],
expected_data: list[float],
expected_rows: int,
expected_columns: int,
) -> None:
converter = Double2DArrayConverter()
result = converter.to_protobuf_message(list_of_lists)

assert result.data == expected_data
assert result.rows == expected_rows
assert result.columns == expected_columns


def test___list_of_lists_inconsistent_column_length___convert___throws_value_error() -> None:
converter = Double2DArrayConverter()

with pytest.raises(ValueError):
_ = converter.to_protobuf_message([[1.0, 2.0], [3.0, 4.0, 5.0]])


# ========================================================
# Double2DArray to list[list[float]]
# Other collection types are tested in test_convert.py
# ========================================================
@pytest.mark.parametrize(
"double2darray, expected_data",
[
(
Double2DArray(rows=3, columns=2, data=[1.0, 2.0, 3.0, 4.0, 5.0, 6.0]),
[[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]],
),
(
Double2DArray(rows=2, columns=3, data=[1.0, 2.0, 3.0, 4.0, 5.0, 6.0]),
[[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]],
),
],
)
def test___double2darray___convert___valid_list_of_lists(
double2darray: Double2DArray, expected_data: list[list[float]]
) -> None:
converter = Double2DArrayConverter()
list_of_lists = converter.to_python_value(double2darray)

assert list_of_lists == expected_data


def test___double2darray_invalid_num_columns___convert___throws_value_error() -> None:
double2darray = Double2DArray(rows=1, columns=2, data=[1.0, 2.0, 3.0])
converter = Double2DArrayConverter()

with pytest.raises(ValueError):
_ = converter.to_python_value(double2darray)


def test___double2darray_empty_data___convert___returns_empty_list() -> None:
double2darray = Double2DArray(rows=0, columns=0, data=[])
converter = Double2DArrayConverter()

list_of_lists = converter.to_python_value(double2darray)

assert not list_of_lists


# ========================================================
# AnalogWaveform to DoubleAnalogWaveform
# ========================================================
Expand Down