Skip to content
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ protobuf = {version=">=4.21"}
ni-measurement-plugin-sdk = {version=">=2.3"}
typing-extensions = ">=4.13.2"
streamlit = ">=1.24"
nitypes = {version=">=0.1.0dev1", allow-prereleases=true}
nitypes = {version=">=0.1.0dev2", allow-prereleases=true}
debugpy = ">=1.8.1"

[tool.poetry.group.dev.dependencies]
Expand All @@ -30,6 +30,7 @@ pyright = { version = ">=1.1.400", extras = ["nodejs"] }
pytest = ">=7.2"
pytest-cov = ">=4.0"
pytest-mock = ">=3.0"
hightime = { git = "https://github.com/ni/hightime.git" }

[tool.poetry.group.codegen.dependencies]
grpcio-tools = [
Expand Down
151 changes: 151 additions & 0 deletions src/nipanel/converters/protobuf_types.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,30 @@
"""Classes to convert between measurement specific protobuf types and containers."""

import collections.abc
import datetime as dt
from typing import Type, Union

import hightime as ht
import nitypes.bintime as bt
import numpy as np
from ni.protobuf.types import scalar_pb2
from ni_measurement_plugin_sdk_service._internal.stubs.ni.protobuf.types.precision_timestamp_pb2 import (
PrecisionTimestamp,
)
from ni_measurement_plugin_sdk_service._internal.stubs.ni.protobuf.types.waveform_pb2 import (
DoubleAnalogWaveform,
WaveformAttributeValue,
)
from nitypes.scalar import Scalar
from nitypes.time import convert_datetime
from nitypes.waveform import (
AnalogWaveform,
ExtendedPropertyDictionary,
ExtendedPropertyValue,
NoneScaleMode,
SampleIntervalMode,
Timing,
)
from typing_extensions import TypeAlias

from nipanel.converters import Converter
Expand All @@ -17,6 +38,136 @@
}


class DoubleAnalogWaveformConverter(Converter[AnalogWaveform[np.float64], DoubleAnalogWaveform]):
"""A converter for AnalogWaveform types with scaled data (double)."""

def __init__(self) -> None:
"""Initialize a DoubleAnalogWaveformConverter object."""
self._pt_converter = PrecisionTimestampConverter()

@property
def python_typename(self) -> str:
"""The Python type that this converter handles."""
return AnalogWaveform.__name__

@property
def protobuf_message(self) -> Type[DoubleAnalogWaveform]:
"""The type-specific protobuf message for the Python type."""
return DoubleAnalogWaveform

def to_protobuf_message(self, python_value: AnalogWaveform[np.float64]) -> DoubleAnalogWaveform:
"""Convert the Python AnalogWaveform to a protobuf DoubleAnalogWaveform."""
if python_value.timing.has_timestamp:
bin_datetime = convert_datetime(bt.DateTime, python_value.timing.start_time)
precision_timestamp = self._pt_converter.to_protobuf_message(bin_datetime)
else:
precision_timestamp = None

if python_value.timing.has_sample_interval:
time_interval = python_value.timing.sample_interval.total_seconds()
else:
time_interval = 0

attributes = self._extended_properties_to_attributes(python_value.extended_properties)

return self.protobuf_message(
t0=precision_timestamp,
dt=time_interval,
y_data=python_value.scaled_data,
attributes=attributes,
)

def _extended_properties_to_attributes(
self,
extended_properties: ExtendedPropertyDictionary,
) -> collections.abc.Mapping[str, WaveformAttributeValue]:
return {key: self._value_to_attribute(value) for key, value in extended_properties.items()}

def _value_to_attribute(self, value: ExtendedPropertyValue) -> WaveformAttributeValue:
attr_value = WaveformAttributeValue()
if isinstance(value, bool):
attr_value.bool_value = value
elif isinstance(value, int):
attr_value.integer_value = value
elif isinstance(value, float):
attr_value.double_value = value
elif isinstance(value, str):
attr_value.string_value = value
else:
raise TypeError(f"Unexpected type for extended property value {type(value)}")

return attr_value

def to_python_value(self, protobuf_message: DoubleAnalogWaveform) -> AnalogWaveform[np.float64]:
"""Convert the protobuf DoubleAnalogWaveform to a Python AnalogWaveform."""
if not protobuf_message.dt and not protobuf_message.HasField("t0"):
# If both dt and t0 are unset, use Timing.empty.
timing = Timing.empty
else:
# Timestamp
pt_converter = PrecisionTimestampConverter()
bin_datetime = pt_converter.to_python_value(protobuf_message.t0)
timestamp = convert_datetime(dt.datetime, bin_datetime)

# Sample Interval
if not protobuf_message.dt:
sample_interval_mode = SampleIntervalMode.NONE
sample_interval = None
else:
sample_interval_mode = SampleIntervalMode.REGULAR
sample_interval = ht.timedelta(seconds=protobuf_message.dt)

timing = Timing(
sample_interval_mode=sample_interval_mode,
timestamp=timestamp,
sample_interval=sample_interval,
)

extended_properties = {}
for key, value in protobuf_message.attributes.items():
attr_type = value.WhichOneof("attribute")
extended_properties[key] = getattr(value, str(attr_type))

data_array = np.array(protobuf_message.y_data)
return AnalogWaveform(
sample_count=data_array.size,
dtype=np.float64,
raw_data=data_array,
start_index=0,
capacity=data_array.size,
extended_properties=extended_properties,
copy_extended_properties=True,
timing=timing,
scale_mode=NoneScaleMode(),
)


class PrecisionTimestampConverter(Converter[bt.DateTime, PrecisionTimestamp]):
"""A converter for bintime.DateTime types."""

@property
def python_typename(self) -> str:
"""The Python type that this converter handles."""
return bt.DateTime.__name__

@property
def protobuf_message(self) -> Type[PrecisionTimestamp]:
"""The type-specific protobuf message for the Python type."""
return PrecisionTimestamp

def to_protobuf_message(self, python_value: bt.DateTime) -> PrecisionTimestamp:
"""Convert the Python DateTime to a protobuf PrecisionTimestamp."""
seconds, fractional_seconds = python_value.to_tuple()
return self.protobuf_message(seconds=seconds, fractional_seconds=fractional_seconds)

def to_python_value(self, protobuf_message: PrecisionTimestamp) -> bt.DateTime:
"""Convert the protobuf PrecisionTimestamp to a Python DateTime."""
time_value_tuple = bt.TimeValueTuple(
protobuf_message.seconds, protobuf_message.fractional_seconds
)
return bt.DateTime.from_tuple(time_value_tuple)


class ScalarConverter(Converter[Scalar[_AnyScalarType], scalar_pb2.ScalarData]):
"""A converter for Scalar objects."""

Expand Down
159 changes: 156 additions & 3 deletions tests/unit/test_protobuf_type_conversion.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,165 @@
import datetime as dt

import numpy
import pytest
from ni.protobuf.types.scalar_pb2 import ScalarData
from ni_measurement_plugin_sdk_service._internal.stubs.ni.protobuf.types.waveform_pb2 import (
DoubleAnalogWaveform,
WaveformAttributeValue,
)
from nitypes.bintime import DateTime
from nitypes.scalar import Scalar
from nitypes.waveform import AnalogWaveform, NoneScaleMode, SampleIntervalMode, Timing

from nipanel.converters.protobuf_types import (
DoubleAnalogWaveformConverter,
PrecisionTimestampConverter,
ScalarConverter,
)


# ========================================================
# AnalogWaveform to DoubleAnalogWaveform
# ========================================================
def test___default_analog_waveform___convert___valid_protobuf() -> None:
analog_waveform = AnalogWaveform()

converter = DoubleAnalogWaveformConverter()
dbl_analog_waveform = converter.to_protobuf_message(analog_waveform)

assert not dbl_analog_waveform.attributes
assert dbl_analog_waveform.dt == 0
assert not dbl_analog_waveform.HasField("t0")
assert list(dbl_analog_waveform.y_data) == []


def test___analog_waveform_samples_only___convert___valid_protobuf() -> None:
analog_waveform = AnalogWaveform(5)

converter = DoubleAnalogWaveformConverter()
dbl_analog_waveform = converter.to_protobuf_message(analog_waveform)

assert list(dbl_analog_waveform.y_data) == [0.0, 0.0, 0.0, 0.0, 0.0]


def test___analog_waveform_non_zero_samples___convert___valid_protobuf() -> None:
analog_waveform = AnalogWaveform.from_array_1d(numpy.array([1.0, 2.0, 3.0]))

converter = DoubleAnalogWaveformConverter()
dbl_analog_waveform = converter.to_protobuf_message(analog_waveform)

assert list(dbl_analog_waveform.y_data) == [1.0, 2.0, 3.0]


def test___analog_waveform_with_extended_properties___convert___valid_protobuf() -> None:
analog_waveform = AnalogWaveform()
analog_waveform.channel_name = "Dev1/ai0"
analog_waveform.unit_description = "Volts"

converter = DoubleAnalogWaveformConverter()
dbl_analog_waveform = converter.to_protobuf_message(analog_waveform)

assert dbl_analog_waveform.attributes["NI_ChannelName"].string_value == "Dev1/ai0"
assert dbl_analog_waveform.attributes["NI_UnitDescription"].string_value == "Volts"


def test___analog_waveform_with_standard_timing___convert___valid_protobuf() -> None:
analog_waveform = AnalogWaveform.from_array_1d(numpy.array([1.0, 2.0, 3.0]))
t0_dt = dt.datetime(2000, 12, 1, tzinfo=dt.timezone.utc)
analog_waveform.timing = Timing.create_with_regular_interval(
sample_interval=dt.timedelta(milliseconds=1000),
timestamp=t0_dt,
)

converter = DoubleAnalogWaveformConverter()
dbl_analog_waveform = converter.to_protobuf_message(analog_waveform)

assert dbl_analog_waveform.dt == 1.0
bin_dt = DateTime(t0_dt)
pt_converter = PrecisionTimestampConverter()
converted_t0 = pt_converter.to_protobuf_message(bin_dt)
assert dbl_analog_waveform.t0 == converted_t0


# ========================================================
# DoubleAnalogWaveform to AnalogWaveform
# ========================================================
def test___default_dbl_analog_wfm___convert___valid_python_object() -> None:
dbl_analog_wfm = DoubleAnalogWaveform()

converter = DoubleAnalogWaveformConverter()
analog_waveform = converter.to_python_value(dbl_analog_wfm)

assert not analog_waveform.extended_properties
assert analog_waveform.timing == Timing.empty
assert analog_waveform.scaled_data.size == 0
assert analog_waveform.scale_mode == NoneScaleMode()


def test___dbl_analog_wfm_with_y_data___convert___valid_python_object() -> None:
dbl_analog_wfm = DoubleAnalogWaveform(y_data=[1.0, 2.0, 3.0])

converter = DoubleAnalogWaveformConverter()
analog_waveform = converter.to_python_value(dbl_analog_wfm)

assert list(analog_waveform.scaled_data) == [1.0, 2.0, 3.0]


def test___dbl_analog_wfm_with_attributes___convert___valid_python_object() -> None:
attributes = {
"NI_ChannelName": WaveformAttributeValue(string_value="Dev1/ai0"),
"NI_UnitDescription": WaveformAttributeValue(string_value="Volts"),
}
dbl_analog_wfm = DoubleAnalogWaveform(attributes=attributes)

converter = DoubleAnalogWaveformConverter()
analog_waveform = converter.to_python_value(dbl_analog_wfm)

assert analog_waveform.channel_name == "Dev1/ai0"
assert analog_waveform.unit_description == "Volts"


def test___dbl_analog_wfm_with_timing___convert___valid_python_object() -> None:
t0_dt = DateTime(2020, 5, 5, tzinfo=dt.timezone.utc)
pt_converter = PrecisionTimestampConverter()
t0_pt = pt_converter.to_protobuf_message(t0_dt)
dbl_analog_wfm = DoubleAnalogWaveform(t0=t0_pt, dt=0.1, y_data=[1.0, 2.0, 3.0])

converter = DoubleAnalogWaveformConverter()
analog_waveform = converter.to_python_value(dbl_analog_wfm)

assert analog_waveform.timing.start_time == t0_dt._to_datetime_datetime()
assert analog_waveform.timing.sample_interval == dt.timedelta(seconds=0.1)
assert analog_waveform.timing.sample_interval_mode == SampleIntervalMode.REGULAR


def test___dbl_analog_wfm_with_timing_no_t0___convert___valid_python_object() -> None:
dbl_analog_wfm = DoubleAnalogWaveform(dt=0.1, y_data=[1.0, 2.0, 3.0])

converter = DoubleAnalogWaveformConverter()
analog_waveform = converter.to_python_value(dbl_analog_wfm)

assert analog_waveform.timing.start_time == dt.datetime(1904, 1, 1, tzinfo=dt.timezone.utc)
assert analog_waveform.timing.sample_interval == dt.timedelta(seconds=0.1)
assert analog_waveform.timing.sample_interval_mode == SampleIntervalMode.REGULAR


def test___dbl_analog_wfm_with_timing_no_dt___convert___valid_python_object() -> None:
t0_dt = DateTime(2020, 5, 5, tzinfo=dt.timezone.utc)
pt_converter = PrecisionTimestampConverter()
t0_pt = pt_converter.to_protobuf_message(t0_dt)
dbl_analog_wfm = DoubleAnalogWaveform(t0=t0_pt, y_data=[1.0, 2.0, 3.0])

converter = DoubleAnalogWaveformConverter()
analog_waveform = converter.to_python_value(dbl_analog_wfm)

from nipanel.converters.protobuf_types import ScalarConverter
assert analog_waveform.timing.start_time == t0_dt._to_datetime_datetime()
assert not analog_waveform.timing.has_sample_interval
assert analog_waveform.timing.sample_interval_mode == SampleIntervalMode.NONE


# ========================================================
# Protobuf to Python
# Scalar: Protobuf to Python
# ========================================================
def test___bool_scalar_protobuf___convert___valid_bool_scalar() -> None:
protobuf_value = ScalarData()
Expand Down Expand Up @@ -84,7 +237,7 @@ def test___scalar_protobuf_units_unset___convert___python_units_blank() -> None:


# ========================================================
# Python to Protobuf
# Scalar: Python to Protobuf
# ========================================================
def test___bool_scalar___convert___valid_bool_scalar_protobuf() -> None:
python_value = Scalar(True, "volts")
Expand Down
Loading