44
55import datetime as dt
66from collections .abc import Mapping
7+ from typing import Any
78
89import hightime as ht
910import nitypes .bintime as bt
1011import numpy as np
12+ from nitypes .complex import ComplexInt32Base , ComplexInt32DType
1113from nitypes .time import convert_datetime
1214from nitypes .waveform import (
1315 AnalogWaveform ,
16+ ComplexWaveform ,
1417 ExtendedPropertyDictionary ,
1518 ExtendedPropertyValue ,
19+ LinearScaleMode ,
1620 NoneScaleMode ,
1721 Spectrum ,
1822 Timing ,
2226 bintime_datetime_from_protobuf ,
2327 bintime_datetime_to_protobuf ,
2428)
29+ from ni .protobuf .types .precision_timestamp_pb2 import PrecisionTimestamp
2530from ni .protobuf .types .waveform_pb2 import (
2631 DoubleAnalogWaveform ,
32+ DoubleComplexWaveform ,
2733 DoubleSpectrum ,
34+ I16ComplexWaveform ,
35+ LinearScale ,
36+ Scale ,
2837 WaveformAttributeValue ,
2938)
3039
@@ -33,21 +42,12 @@ def float64_analog_waveform_to_protobuf(
3342 value : AnalogWaveform [np .float64 ], /
3443) -> DoubleAnalogWaveform :
3544 """Convert the Python AnalogWaveform to a protobuf DoubleAnalogWaveform."""
36- if value .timing .has_start_time :
37- bin_datetime = convert_datetime (bt .DateTime , value .timing .start_time )
38- precision_timestamp = bintime_datetime_to_protobuf (bin_datetime )
39- else :
40- precision_timestamp = None
41-
42- if value .timing .has_sample_interval :
43- time_interval = value .timing .sample_interval .total_seconds ()
44- else :
45- time_interval = 0
46-
45+ t0 = _t0_from_waveform (value )
46+ time_interval = _time_interval_from_waveform (value )
4747 attributes = _extended_properties_to_attributes (value .extended_properties )
4848
4949 return DoubleAnalogWaveform (
50- t0 = precision_timestamp ,
50+ t0 = t0 ,
5151 dt = time_interval ,
5252 y_data = value .scaled_data ,
5353 attributes = attributes ,
@@ -58,31 +58,8 @@ def float64_analog_waveform_from_protobuf(
5858 message : DoubleAnalogWaveform , /
5959) -> AnalogWaveform [np .float64 ]:
6060 """Convert the protobuf DoubleAnalogWaveform to a Python AnalogWaveform."""
61- # Declare timing to accept both bintime and dt.datetime to satisfy mypy.
62- timing : Timing [bt .DateTime | dt .datetime ]
63- if not message .dt and not message .HasField ("t0" ):
64- # If both dt and t0 are unset, use Timing.empty.
65- timing = Timing .empty
66- else :
67- # Timestamp
68- bin_datetime = bintime_datetime_from_protobuf (message .t0 )
69-
70- # Sample Interval
71- if not message .dt :
72- timing = Timing .create_with_no_interval (timestamp = bin_datetime )
73- else :
74- sample_interval = ht .timedelta (seconds = message .dt )
75- timing = Timing .create_with_regular_interval (
76- sample_interval = sample_interval ,
77- timestamp = bin_datetime ,
78- )
79-
80- extended_properties = {}
81- for key , value in message .attributes .items ():
82- attr_type = value .WhichOneof ("attribute" )
83- if attr_type is None :
84- raise ValueError ("Could not determine the datatype of 'attribute'." )
85- extended_properties [key ] = getattr (value , attr_type )
61+ timing = _timing_from_waveform_message (message )
62+ extended_properties = _attributes_to_extended_properties (message .attributes )
8663
8764 return AnalogWaveform .from_array_1d (
8865 message .y_data ,
@@ -93,10 +70,86 @@ def float64_analog_waveform_from_protobuf(
9370 )
9471
9572
73+ def float64_complex_waveform_to_protobuf (
74+ value : ComplexWaveform [np .complex128 ], /
75+ ) -> DoubleComplexWaveform :
76+ """Convert the Python ComplexWaveform to a protobuf DoubleComplexWaveform."""
77+ t0 = _t0_from_waveform (value )
78+ time_interval = _time_interval_from_waveform (value )
79+ attributes = _extended_properties_to_attributes (value .extended_properties )
80+
81+ interleaved_array = value .scaled_data .view (np .float64 )
82+
83+ return DoubleComplexWaveform (
84+ t0 = t0 ,
85+ dt = time_interval ,
86+ y_data = interleaved_array ,
87+ attributes = attributes ,
88+ )
89+
90+
91+ def float64_complex_waveform_from_protobuf (
92+ message : DoubleComplexWaveform , /
93+ ) -> ComplexWaveform [np .complex128 ]:
94+ """Convert the protobuf DoubleComplexWaveform to a Python ComplexWaveform."""
95+ timing = _timing_from_waveform_message (message )
96+ extended_properties = _attributes_to_extended_properties (message .attributes )
97+
98+ y_array = np .array (message .y_data , np .float64 )
99+ data_array = y_array .view (np .complex128 )
100+
101+ return ComplexWaveform .from_array_1d (
102+ data_array ,
103+ copy = False ,
104+ extended_properties = extended_properties ,
105+ timing = timing ,
106+ scale_mode = NoneScaleMode (),
107+ )
108+
109+
110+ def int16_complex_waveform_to_protobuf (
111+ value : ComplexWaveform [ComplexInt32Base ], /
112+ ) -> I16ComplexWaveform :
113+ """Convert the Python ComplexWaveform to a protobuf DoubleComplexWaveform."""
114+ t0 = _t0_from_waveform (value )
115+ time_interval = _time_interval_from_waveform (value )
116+ attributes = _extended_properties_to_attributes (value .extended_properties )
117+ scale = _scale_from_waveform (value )
118+
119+ interleaved_array = value .raw_data .view (np .int16 )
120+
121+ return I16ComplexWaveform (
122+ t0 = t0 ,
123+ dt = time_interval ,
124+ y_data = interleaved_array ,
125+ attributes = attributes ,
126+ scale = scale ,
127+ )
128+
129+
130+ def int16_complex_waveform_from_protobuf (
131+ message : I16ComplexWaveform , /
132+ ) -> ComplexWaveform [ComplexInt32Base ]:
133+ """Convert the protobuf DoubleComplexWaveform to a Python ComplexWaveform."""
134+ timing = _timing_from_waveform_message (message )
135+ extended_properties = _attributes_to_extended_properties (message .attributes )
136+ scale_mode = _scale_mode_from_waveform_message (message )
137+
138+ y_array = np .array (message .y_data , np .int16 )
139+ data_array = y_array .view (ComplexInt32DType )
140+
141+ return ComplexWaveform .from_array_1d (
142+ data_array ,
143+ copy = False ,
144+ extended_properties = extended_properties ,
145+ timing = timing ,
146+ scale_mode = scale_mode ,
147+ )
148+
149+
96150def float64_spectrum_to_protobuf (value : Spectrum [np .float64 ], / ) -> DoubleSpectrum :
97151 """Convert the Python Spectrum to a protobuf DoubleSpectrum."""
98152 attributes = _extended_properties_to_attributes (value .extended_properties )
99-
100153 return DoubleSpectrum (
101154 start_frequency = value .start_frequency ,
102155 frequency_increment = value .frequency_increment ,
@@ -107,13 +160,7 @@ def float64_spectrum_to_protobuf(value: Spectrum[np.float64], /) -> DoubleSpectr
107160
108161def float64_spectrum_from_protobuf (message : DoubleSpectrum , / ) -> Spectrum [np .float64 ]:
109162 """Convert the protobuf DoubleSpectrum to a Python Spectrum."""
110- extended_properties = {}
111- for key , value in message .attributes .items ():
112- attr_type = value .WhichOneof ("attribute" )
113- if attr_type is None :
114- raise ValueError ("Could not determine the datatype of 'attribute'." )
115- extended_properties [key ] = getattr (value , attr_type )
116-
163+ extended_properties = _attributes_to_extended_properties (message .attributes )
117164 return Spectrum .from_array_1d (
118165 message .data ,
119166 dtype = np .float64 ,
@@ -123,6 +170,19 @@ def float64_spectrum_from_protobuf(message: DoubleSpectrum, /) -> Spectrum[np.fl
123170 )
124171
125172
173+ def _attributes_to_extended_properties (
174+ attributes : Mapping [str , WaveformAttributeValue ],
175+ ) -> Mapping [str , ExtendedPropertyValue ]:
176+ extended_properties = {}
177+ for key , value in attributes .items ():
178+ attr_type = value .WhichOneof ("attribute" )
179+ if attr_type is None :
180+ raise ValueError ("Could not determine the datatype of 'attribute'." )
181+ extended_properties [key ] = getattr (value , attr_type )
182+
183+ return extended_properties
184+
185+
126186def _extended_properties_to_attributes (
127187 extended_properties : ExtendedPropertyDictionary ,
128188) -> Mapping [str , WaveformAttributeValue ]:
@@ -143,3 +203,72 @@ def _value_to_attribute(value: ExtendedPropertyValue) -> WaveformAttributeValue:
143203 raise TypeError (f"Unexpected type for extended property value { type (value )} " )
144204
145205 return attr_value
206+
207+
208+ def _t0_from_waveform (
209+ waveform : AnalogWaveform [Any ] | ComplexWaveform [Any ],
210+ ) -> PrecisionTimestamp | None :
211+ if waveform .timing .has_start_time :
212+ bin_datetime = convert_datetime (bt .DateTime , waveform .timing .start_time )
213+ return bintime_datetime_to_protobuf (bin_datetime )
214+ else :
215+ return None
216+
217+
218+ def _time_interval_from_waveform (waveform : AnalogWaveform [Any ] | ComplexWaveform [Any ]) -> float :
219+ if waveform .timing .has_sample_interval :
220+ return waveform .timing .sample_interval .total_seconds ()
221+ else :
222+ return 0
223+
224+
225+ def _timing_from_waveform_message (
226+ message : DoubleAnalogWaveform | DoubleComplexWaveform | I16ComplexWaveform ,
227+ ) -> Timing [bt .DateTime | dt .datetime ]:
228+ # Declare timing to accept both bintime and dt.datetime to satisfy mypy.
229+ timing : Timing [bt .DateTime | dt .datetime ]
230+ if not message .dt and not message .HasField ("t0" ):
231+ # If both dt and t0 are unset, use Timing.empty.
232+ timing = Timing .empty
233+ else :
234+ # Timestamp
235+ bin_datetime = bintime_datetime_from_protobuf (message .t0 )
236+
237+ # Sample Interval
238+ if not message .dt :
239+ timing = Timing .create_with_no_interval (timestamp = bin_datetime )
240+ else :
241+ sample_interval = ht .timedelta (seconds = message .dt )
242+ timing = Timing .create_with_regular_interval (
243+ sample_interval = sample_interval ,
244+ timestamp = bin_datetime ,
245+ )
246+
247+ return timing
248+
249+
250+ def _scale_from_waveform (waveform : AnalogWaveform [Any ] | ComplexWaveform [Any ]) -> Scale | None :
251+ if isinstance (waveform .scale_mode , LinearScaleMode ):
252+ linear_scale = LinearScale (gain = waveform .scale_mode .gain , offset = waveform .scale_mode .offset )
253+ return Scale (linear_scale = linear_scale )
254+ elif isinstance (waveform .scale_mode , NoneScaleMode ):
255+ return None
256+ else :
257+ raise ValueError (f"The waveform scale mode { waveform .scale_mode } is not supported." )
258+
259+
260+ def _scale_mode_from_waveform_message (
261+ message : I16ComplexWaveform ,
262+ ) -> LinearScaleMode | NoneScaleMode :
263+ if message .HasField ("scale" ):
264+ mode = message .scale .WhichOneof ("mode" )
265+ if mode is None :
266+ raise ValueError ("Could not determine waveform scale mode." )
267+ elif mode == "linear_scale" :
268+ return LinearScaleMode (
269+ message .scale .linear_scale .gain , message .scale .linear_scale .offset
270+ )
271+ else :
272+ raise ValueError (f"The waveform scale mode { mode !r} is not supported." )
273+
274+ return NoneScaleMode ()
0 commit comments