Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ protobuf = {version=">=4.21"}
ni-measurement-plugin-sdk = {version=">=2.3"}
typing-extensions = ">=4.13.2"
streamlit = ">=1.24"
nitypes = {version=">=0.1.0dev1", allow-prereleases=true}
nitypes = {version=">=0.1.0dev2", allow-prereleases=true}
debugpy = ">=1.8.1"

[tool.poetry.group.dev.dependencies]
Expand Down
128 changes: 128 additions & 0 deletions src/nipanel/converters/protobuf_types.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
"""Classes to convert between measurement specific protobuf types and containers."""

import collections.abc
import datetime as dt
from typing import Type, Union

import numpy
from ni.protobuf.types import scalar_pb2
from ni_measurement_plugin_sdk_service._internal.stubs.ni.protobuf.types.precision_timestamp_pb2 import (
PrecisionTimestamp,
)
from ni_measurement_plugin_sdk_service._internal.stubs.ni.protobuf.types.waveform_pb2 import (
DoubleAnalogWaveform,
WaveformAttributeValue,
)
from nitypes.bintime import DateTime, TimeValueTuple
from nitypes.scalar import Scalar
from nitypes.waveform import AnalogWaveform, ExtendedPropertyDictionary, NoneScaleMode, Timing
from typing_extensions import TypeAlias

from nipanel.converters import Converter
Expand All @@ -17,6 +29,122 @@
}


class DoubleAnalogWaveformConverter(Converter[AnalogWaveform[numpy.float64], DoubleAnalogWaveform]):
"""A converter for AnalogWaveform types with scaled data (double)."""

@property
def python_typename(self) -> str:
"""The Python type that this converter handles."""
return AnalogWaveform.__name__

@property
def protobuf_message(self) -> Type[DoubleAnalogWaveform]:
"""The type-specific protobuf message for the Python type."""
return DoubleAnalogWaveform

def to_protobuf_message(
self, python_value: AnalogWaveform[numpy.float64]
) -> DoubleAnalogWaveform:
"""Convert the Python AnalogWaveform to a protobuf DoubleAnalogWaveform."""
if python_value.timing.has_timestamp:
pt_converter = PrecisionTimestampConverter()
bin_datetime = DateTime(python_value.timing.start_time)
precision_timestamp = pt_converter.to_protobuf_message(bin_datetime)
else:
precision_timestamp = PrecisionTimestamp(seconds=0, fractional_seconds=0)

if python_value.timing.has_sample_interval:
time_interval = python_value.timing.sample_interval.total_seconds()
else:
time_interval = 0

attributes = self._extended_properties_to_attributes(python_value.extended_properties)

return self.protobuf_message(
t0=precision_timestamp,
dt=time_interval,
y_data=python_value.scaled_data,
attributes=attributes,
)

def _extended_properties_to_attributes(
self,
extended_properties: ExtendedPropertyDictionary,
) -> collections.abc.Mapping[str, WaveformAttributeValue]:
attributes = {}
for key, value in extended_properties.items():
attr_value = WaveformAttributeValue()
if isinstance(value, bool):
attr_value.bool_value = value
elif isinstance(value, int):
attr_value.integer_value = value
elif isinstance(value, float):
attr_value.double_value = value
elif isinstance(value, str):
attr_value.string_value = value
else:
raise TypeError(f"Unexpected type for extended property value {type(value)}")

attributes[key] = attr_value

return attributes

def to_python_value(
self, protobuf_value: DoubleAnalogWaveform
) -> AnalogWaveform[numpy.float64]:
"""Convert the protobuf DoubleAnalogWaveform to a Python AnalogWaveform."""
pt_converter = PrecisionTimestampConverter()
bin_datetime = pt_converter.to_python_value(protobuf_value.t0)
timestamp = bin_datetime._to_datetime_datetime()
sample_interval = dt.timedelta(seconds=protobuf_value.dt)
timing = Timing.create_with_regular_interval(
sample_interval,
timestamp,
)

extended_properties = {}
for key, value in protobuf_value.attributes.items():
attr_type = value.WhichOneof("attribute")
extended_properties[key] = getattr(value, str(attr_type))

data_list = list(protobuf_value.y_data)
return AnalogWaveform(
sample_count=len(data_list),
dtype=numpy.float64,
raw_data=numpy.array(data_list),
start_index=0,
capacity=len(data_list),
extended_properties=extended_properties,
copy_extended_properties=True,
timing=timing,
scale_mode=NoneScaleMode(),
)


class PrecisionTimestampConverter(Converter[DateTime, PrecisionTimestamp]):
"""A converter for bintime.DateTime types."""

@property
def python_typename(self) -> str:
"""The Python type that this converter handles."""
return DateTime.__name__

@property
def protobuf_message(self) -> Type[PrecisionTimestamp]:
"""The type-specific protobuf message for the Python type."""
return PrecisionTimestamp

def to_protobuf_message(self, python_value: DateTime) -> PrecisionTimestamp:
"""Convert the Python DateTime to a protobuf PrecisionTimestamp."""
seconds, fractional_seconds = python_value.to_tuple()
return self.protobuf_message(seconds=seconds, fractional_seconds=fractional_seconds)

def to_python_value(self, protobuf_value: PrecisionTimestamp) -> DateTime:
"""Convert the protobuf PrecisionTimestamp to a Python DateTime."""
time_value_tuple = TimeValueTuple(protobuf_value.seconds, protobuf_value.fractional_seconds)
return DateTime.from_tuple(time_value_tuple)


class ScalarConverter(Converter[Scalar[_AnyScalarType], scalar_pb2.ScalarData]):
"""A converter for Scalar objects."""

Expand Down
133 changes: 130 additions & 3 deletions tests/unit/test_protobuf_type_conversion.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,139 @@
import datetime as dt

import numpy
import pytest
from ni.protobuf.types.scalar_pb2 import ScalarData
from ni_measurement_plugin_sdk_service._internal.stubs.ni.protobuf.types.precision_timestamp_pb2 import (
PrecisionTimestamp,
)
from ni_measurement_plugin_sdk_service._internal.stubs.ni.protobuf.types.waveform_pb2 import (
DoubleAnalogWaveform,
WaveformAttributeValue,
)
from nitypes.bintime import DateTime
from nitypes.scalar import Scalar
from nitypes.waveform import AnalogWaveform, NoneScaleMode, Timing

from nipanel.converters.protobuf_types import (
DoubleAnalogWaveformConverter,
PrecisionTimestampConverter,
ScalarConverter,
)


# ========================================================
# AnalogWaveform to DoubleAnalogWaveform
# ========================================================
def test___default_analog_waveform___convert___valid_protobuf() -> None:
analog_waveform = AnalogWaveform()
converter = DoubleAnalogWaveformConverter()
dbl_analog_waveform = converter.to_protobuf_message(analog_waveform)

assert not dbl_analog_waveform.attributes
assert dbl_analog_waveform.dt == 0
assert dbl_analog_waveform.t0 == PrecisionTimestamp(seconds=0, fractional_seconds=0)
assert list(dbl_analog_waveform.y_data) == []


def test___analog_waveform_samples_only___convert___valid_protobuf() -> None:
analog_waveform = AnalogWaveform(5)

converter = DoubleAnalogWaveformConverter()
dbl_analog_waveform = converter.to_protobuf_message(analog_waveform)

assert list(dbl_analog_waveform.y_data) == [0.0, 0.0, 0.0, 0.0, 0.0]


def test___analog_waveform_non_zero_samples___convert___valid_protobuf() -> None:
analog_waveform = AnalogWaveform.from_array_1d(numpy.array([1.0, 2.0, 3.0]))

converter = DoubleAnalogWaveformConverter()
dbl_analog_waveform = converter.to_protobuf_message(analog_waveform)

assert list(dbl_analog_waveform.y_data) == [1.0, 2.0, 3.0]


def test___analog_waveform_with_extended_properties___convert___valid_protobuf() -> None:
analog_waveform = AnalogWaveform()
analog_waveform.channel_name = "Dev1/ai0"
analog_waveform.unit_description = "Volts"

converter = DoubleAnalogWaveformConverter()
dbl_analog_waveform = converter.to_protobuf_message(analog_waveform)

assert dbl_analog_waveform.attributes["NI_ChannelName"].string_value == "Dev1/ai0"
assert dbl_analog_waveform.attributes["NI_UnitDescription"].string_value == "Volts"


def test___analog_waveform_with_standard_timing___convert___valid_protobuf() -> None:
analog_waveform = AnalogWaveform.from_array_1d(numpy.array([1.0, 2.0, 3.0]))
t0_dt = dt.datetime(2000, 12, 1, tzinfo=dt.timezone.utc)
analog_waveform.timing = Timing.create_with_regular_interval(
sample_interval=dt.timedelta(milliseconds=1000),
timestamp=t0_dt,
)

converter = DoubleAnalogWaveformConverter()
dbl_analog_waveform = converter.to_protobuf_message(analog_waveform)

assert dbl_analog_waveform.dt == 1.0

bin_dt = DateTime(t0_dt)
pt_converter = PrecisionTimestampConverter()
converted_t0 = pt_converter.to_protobuf_message(bin_dt)
assert dbl_analog_waveform.t0 == converted_t0


# ========================================================
# DoubleAnalogWaveform to AnalogWaveform
# ========================================================
def test___default_dbl_analog_wfm___convert___valid_python_object() -> None:
dbl_analog_wfm = DoubleAnalogWaveform()
converter = DoubleAnalogWaveformConverter()
analog_waveform = converter.to_python_value(dbl_analog_wfm)

assert not analog_waveform.extended_properties
assert analog_waveform.timing.sample_interval == dt.timedelta()
assert analog_waveform.timing.start_time == dt.datetime(1904, 1, 1, tzinfo=dt.timezone.utc)
assert analog_waveform.scaled_data.size == 0
assert analog_waveform.scale_mode == NoneScaleMode()


def test___dbl_analog_wfm_with_y_data___convert___valid_python_object() -> None:
dbl_analog_wfm = DoubleAnalogWaveform(y_data=[1.0, 2.0, 3.0])
converter = DoubleAnalogWaveformConverter()
analog_waveform = converter.to_python_value(dbl_analog_wfm)

assert list(analog_waveform.scaled_data) == [1.0, 2.0, 3.0]


def test___dbl_analog_wfm_with_attributes___convert___valid_python_object() -> None:
attributes = {
"NI_ChannelName": WaveformAttributeValue(string_value="Dev1/ai0"),
"NI_UnitDescription": WaveformAttributeValue(string_value="Volts"),
}
dbl_analog_wfm = DoubleAnalogWaveform(attributes=attributes)
converter = DoubleAnalogWaveformConverter()
analog_waveform = converter.to_python_value(dbl_analog_wfm)

assert analog_waveform.channel_name == "Dev1/ai0"
assert analog_waveform.unit_description == "Volts"


def test___dbl_analog_wfm_with_timing___convert___valid_python_object() -> None:
t0_dt = DateTime(2020, 5, 5, tzinfo=dt.timezone.utc)
pt_converter = PrecisionTimestampConverter()
t0_pt = pt_converter.to_protobuf_message(t0_dt)
dbl_analog_wfm = DoubleAnalogWaveform(t0=t0_pt, dt=0.1, y_data=[1.0, 2.0, 3.0])
converter = DoubleAnalogWaveformConverter()
analog_waveform = converter.to_python_value(dbl_analog_wfm)

from nipanel.converters.protobuf_types import ScalarConverter
assert analog_waveform.timing.start_time == t0_dt._to_datetime_datetime()
assert analog_waveform.timing.sample_interval == dt.timedelta(seconds=0.1)


# ========================================================
# Protobuf to Python
# ScalarData to Scalar
# ========================================================
def test___bool_scalar_protobuf___convert___valid_bool_scalar() -> None:
protobuf_value = ScalarData()
Expand Down Expand Up @@ -84,7 +211,7 @@ def test___scalar_protobuf_units_unset___convert___python_units_blank() -> None:


# ========================================================
# Python to Protobuf
# Scalar to ScalarData
# ========================================================
def test___bool_scalar___convert___valid_bool_scalar_protobuf() -> None:
python_value = Scalar(True, "volts")
Expand Down