Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pyright = { version = ">=1.1.400", extras = ["nodejs"] }
pytest = ">=7.2"
pytest-cov = ">=4.0"
pytest-mock = ">=3.0"
hightime = { git = "https://github.com/ni/hightime.git" }

[tool.poetry.group.codegen.dependencies]
grpcio-tools = [
Expand Down
97 changes: 47 additions & 50 deletions src/nipanel/converters/protobuf_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
from typing import Type, Union

import hightime as ht
import numpy
import nitypes.bintime as bt
import numpy as np
from ni.protobuf.types import scalar_pb2
from ni_measurement_plugin_sdk_service._internal.stubs.ni.protobuf.types.precision_timestamp_pb2 import (
PrecisionTimestamp,
Expand All @@ -14,12 +15,12 @@
DoubleAnalogWaveform,
WaveformAttributeValue,
)
from nitypes.bintime import DateTime, TimeValueTuple
from nitypes.scalar import Scalar
from nitypes.time import convert_datetime
from nitypes.waveform import (
AnalogWaveform,
ExtendedPropertyDictionary,
ExtendedPropertyValue,
NoneScaleMode,
SampleIntervalMode,
Timing,
Expand All @@ -37,9 +38,13 @@
}


class DoubleAnalogWaveformConverter(Converter[AnalogWaveform[numpy.float64], DoubleAnalogWaveform]):
class DoubleAnalogWaveformConverter(Converter[AnalogWaveform[np.float64], DoubleAnalogWaveform]):
"""A converter for AnalogWaveform types with scaled data (double)."""

def __init__(self) -> None:
"""Initialize a DoubleAnalogWaveformConverter object."""
self._pt_converter = PrecisionTimestampConverter()

@property
def python_typename(self) -> str:
"""The Python type that this converter handles."""
Expand All @@ -50,16 +55,13 @@ def protobuf_message(self) -> Type[DoubleAnalogWaveform]:
"""The type-specific protobuf message for the Python type."""
return DoubleAnalogWaveform

def to_protobuf_message(
self, python_value: AnalogWaveform[numpy.float64]
) -> DoubleAnalogWaveform:
def to_protobuf_message(self, python_value: AnalogWaveform[np.float64]) -> DoubleAnalogWaveform:
"""Convert the Python AnalogWaveform to a protobuf DoubleAnalogWaveform."""
if python_value.timing.has_timestamp:
pt_converter = PrecisionTimestampConverter()
bin_datetime = DateTime(python_value.timing.start_time)
precision_timestamp = pt_converter.to_protobuf_message(bin_datetime)
bin_datetime = convert_datetime(bt.DateTime, python_value.timing.start_time)
precision_timestamp = self._pt_converter.to_protobuf_message(bin_datetime)
else:
precision_timestamp = PrecisionTimestamp(seconds=0, fractional_seconds=0)
precision_timestamp = None

if python_value.timing.has_sample_interval:
time_interval = python_value.timing.sample_interval.total_seconds()
Expand All @@ -79,48 +81,41 @@ def _extended_properties_to_attributes(
self,
extended_properties: ExtendedPropertyDictionary,
) -> collections.abc.Mapping[str, WaveformAttributeValue]:
attributes = {}
for key, value in extended_properties.items():
attr_value = WaveformAttributeValue()
if isinstance(value, bool):
attr_value.bool_value = value
elif isinstance(value, int):
attr_value.integer_value = value
elif isinstance(value, float):
attr_value.double_value = value
elif isinstance(value, str):
attr_value.string_value = value
else:
raise TypeError(f"Unexpected type for extended property value {type(value)}")

attributes[key] = attr_value
return {key: self._value_to_attribute(value) for key, value in extended_properties.items()}

def _value_to_attribute(self, value: ExtendedPropertyValue) -> WaveformAttributeValue:
attr_value = WaveformAttributeValue()
if isinstance(value, bool):
attr_value.bool_value = value
elif isinstance(value, int):
attr_value.integer_value = value
elif isinstance(value, float):
attr_value.double_value = value
elif isinstance(value, str):
attr_value.string_value = value
else:
raise TypeError(f"Unexpected type for extended property value {type(value)}")

return attributes
return attr_value

def to_python_value(
self, protobuf_value: DoubleAnalogWaveform
) -> AnalogWaveform[numpy.float64]:
def to_python_value(self, protobuf_message: DoubleAnalogWaveform) -> AnalogWaveform[np.float64]:
"""Convert the protobuf DoubleAnalogWaveform to a Python AnalogWaveform."""
if (
not protobuf_value.dt
and not protobuf_value.t0.seconds
and not protobuf_value.t0.fractional_seconds
):
# If both dt and t0 and unset, use Timing.empty.
if not protobuf_message.dt and not protobuf_message.HasField("t0"):
# If both dt and t0 are unset, use Timing.empty.
timing = Timing.empty
else:
# Timestamp
pt_converter = PrecisionTimestampConverter()
bin_datetime = pt_converter.to_python_value(protobuf_value.t0)
bin_datetime = pt_converter.to_python_value(protobuf_message.t0)
timestamp = convert_datetime(dt.datetime, bin_datetime)

# Sample Interval
if not protobuf_value.dt:
if not protobuf_message.dt:
sample_interval_mode = SampleIntervalMode.NONE
sample_interval = None
else:
sample_interval_mode = SampleIntervalMode.REGULAR
sample_interval = ht.timedelta(seconds=protobuf_value.dt)
sample_interval = ht.timedelta(seconds=protobuf_message.dt)

timing = Timing(
sample_interval_mode=sample_interval_mode,
Expand All @@ -129,46 +124,48 @@ def to_python_value(
)

extended_properties = {}
for key, value in protobuf_value.attributes.items():
for key, value in protobuf_message.attributes.items():
attr_type = value.WhichOneof("attribute")
extended_properties[key] = getattr(value, str(attr_type))

data_list = list(protobuf_value.y_data)
data_array = np.array(protobuf_message.y_data)
return AnalogWaveform(
sample_count=len(data_list),
dtype=numpy.float64,
raw_data=numpy.array(data_list),
sample_count=data_array.size,
dtype=np.float64,
raw_data=data_array,
start_index=0,
capacity=len(data_list),
capacity=data_array.size,
extended_properties=extended_properties,
copy_extended_properties=True,
timing=timing,
scale_mode=NoneScaleMode(),
)


class PrecisionTimestampConverter(Converter[DateTime, PrecisionTimestamp]):
class PrecisionTimestampConverter(Converter[bt.DateTime, PrecisionTimestamp]):
"""A converter for bintime.DateTime types."""

@property
def python_typename(self) -> str:
"""The Python type that this converter handles."""
return DateTime.__name__
return bt.DateTime.__name__

@property
def protobuf_message(self) -> Type[PrecisionTimestamp]:
"""The type-specific protobuf message for the Python type."""
return PrecisionTimestamp

def to_protobuf_message(self, python_value: DateTime) -> PrecisionTimestamp:
def to_protobuf_message(self, python_value: bt.DateTime) -> PrecisionTimestamp:
"""Convert the Python DateTime to a protobuf PrecisionTimestamp."""
seconds, fractional_seconds = python_value.to_tuple()
return self.protobuf_message(seconds=seconds, fractional_seconds=fractional_seconds)

def to_python_value(self, protobuf_value: PrecisionTimestamp) -> DateTime:
def to_python_value(self, protobuf_message: PrecisionTimestamp) -> bt.DateTime:
"""Convert the protobuf PrecisionTimestamp to a Python DateTime."""
time_value_tuple = TimeValueTuple(protobuf_value.seconds, protobuf_value.fractional_seconds)
return DateTime.from_tuple(time_value_tuple)
time_value_tuple = bt.TimeValueTuple(
protobuf_message.seconds, protobuf_message.fractional_seconds
)
return bt.DateTime.from_tuple(time_value_tuple)


class ScalarConverter(Converter[Scalar[_AnyScalarType], scalar_pb2.ScalarData]):
Expand Down
9 changes: 3 additions & 6 deletions tests/unit/test_protobuf_type_conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@
import numpy
import pytest
from ni.protobuf.types.scalar_pb2 import ScalarData
from ni_measurement_plugin_sdk_service._internal.stubs.ni.protobuf.types.precision_timestamp_pb2 import (
PrecisionTimestamp,
)
from ni_measurement_plugin_sdk_service._internal.stubs.ni.protobuf.types.waveform_pb2 import (
DoubleAnalogWaveform,
WaveformAttributeValue,
Expand All @@ -32,7 +29,7 @@ def test___default_analog_waveform___convert___valid_protobuf() -> None:

assert not dbl_analog_waveform.attributes
assert dbl_analog_waveform.dt == 0
assert dbl_analog_waveform.t0 == PrecisionTimestamp(seconds=0, fractional_seconds=0)
assert not dbl_analog_waveform.HasField("t0")
assert list(dbl_analog_waveform.y_data) == []


Expand Down Expand Up @@ -162,7 +159,7 @@ def test___dbl_analog_wfm_with_timing_no_dt___convert___valid_python_object() ->


# ========================================================
# ScalarData to Scalar
# Scalar: Protobuf to Python
# ========================================================
def test___bool_scalar_protobuf___convert___valid_bool_scalar() -> None:
protobuf_value = ScalarData()
Expand Down Expand Up @@ -240,7 +237,7 @@ def test___scalar_protobuf_units_unset___convert___python_units_blank() -> None:


# ========================================================
# Scalar to ScalarData
# Scalar: Python to Protobuf
# ========================================================
def test___bool_scalar___convert___valid_bool_scalar_protobuf() -> None:
python_value = Scalar(True, "volts")
Expand Down
Loading