diff --git a/.github/badges/pylint.svg b/.github/badges/pylint.svg
index 01063c10..f95053fe 100644
--- a/.github/badges/pylint.svg
+++ b/.github/badges/pylint.svg
@@ -17,7 +17,7 @@
pylint
- 9.29
- 9.29
+ 9.31
+ 9.31
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 3ad1a19a..885ddc31 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -7,10 +7,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [0.52.0-beta.0] - Unreleased
### Added
+- `grouping=2` input parameter in the `ZurichInstruments_Hdawg` to enable initialization directly in wanted grouping mode.
+- Addition of `set_awg_module_index` method to set the AWG module index number.
### Changed
+- All ZI grouping modes are now allowed in methods.
+- For all calls using the "/awgs/n/" the AWG core number (n) is checked based on current grouping mode.
### Fixed
+- Now referring to "AWG core", and using `awg_core` in methods where erroneously was referred to "AWG index" and `awg_index`.
### Removed
diff --git a/qmi/instruments/zurich_instruments/hdawg.py b/qmi/instruments/zurich_instruments/hdawg.py
index 6f4559a8..37194ac1 100644
--- a/qmi/instruments/zurich_instruments/hdawg.py
+++ b/qmi/instruments/zurich_instruments/hdawg.py
@@ -9,7 +9,7 @@
from typing import TYPE_CHECKING, Any
from qmi.core.context import QMI_Context
-from qmi.core.exceptions import QMI_ApplicationException
+from qmi.core.exceptions import QMI_ApplicationException, QMI_TimeoutException
from qmi.core.instrument import QMI_Instrument
from qmi.core.rpc import rpc_method
@@ -74,35 +74,41 @@ class ZurichInstruments_HDAWG(QMI_Instrument):
NUM_CHANNELS_PER_AWG: The number of AWG channels per AWG core.
"""
- _rpc_constants = ["COMPILE_TIMEOUT", "UPLOAD_TIMEOUT"]
+ _rpc_constants = ["COMPILE_TIMEOUT", "UPLOAD_TIMEOUT", "NUM_AWGS", "NUM_CHANNELS"]
COMPILE_TIMEOUT = 30
UPLOAD_TIMEOUT = 30
- # Class constants
NUM_AWGS = 4
NUM_CHANNELS = 8
+ # Class constants
NUM_CHANNELS_PER_AWG = 2
- def __init__(self, context: QMI_Context, name: str, server_host: str, server_port: int, device_name: str) -> None:
+ def __init__(
+ self, context: QMI_Context, name: str, server_host: str, server_port: int, device_name: str, grouping: int = 2
+ ) -> None:
"""Initialize driver.
- We connect to a specific HDAWG via a Data Server, which is a process running on some computer.
-
Parameters:
- name: Name for this instrument instance.
- server_host: Host where the ziDataServer process is running.
- server_port: TCP port there the ziDataServer process can be reached.
- device_name: Name of the HDAWG device (typically "devNNNN").
+ name: Name for this instrument instance.
+ server_host: Host where the ziDataServer process is running.
+ server_port: TCP port there the ziDataServer process can be reached.
+ device_name: Name of the HDAWG device (typically "devNNNN").
+ grouping: The grouping to use. Options are 0 (4x2), 1 (2x4) and 2 (1x8, default).
"""
+ if grouping not in range(3):
+ raise ValueError(f"Invalid grouping number: {grouping}")
+
super().__init__(context, name)
# Class attributes
self._server_host = server_host
self._server_port = server_port
self._device_name = device_name
+ self._grouping = grouping
# ZI HDAWG server, module and device
self._daq_server: None | ziDAQServer = None
self._awg_module: None | AwgModule = None
+ self._awg_channel_map: list[int] = []
- # Import the "zhinst" module.
+ # Import the "zhinst" modules.
_import_modules()
# Flags
@@ -118,6 +124,14 @@ def awg_module(self) -> AwgModule:
assert self._awg_module is not None
return self._awg_module
+ @property
+ def awg_channel_map(self) -> list[int]:
+ """Channel map for AWG channels to correct core index, based on current grouping and channel range."""
+ return [
+ (awg_channel // (2 ** (self._grouping + 1))) *
+ (self._grouping % 2 + 1) for awg_channel in range(self.NUM_CHANNELS)
+ ]
+
@staticmethod
def _process_parameter_replacements(sequencer_program: str, replacements: dict[str, str | int | float]) -> str:
"""Process parameter replacements for sequencer code.
@@ -136,7 +150,7 @@ def _process_parameter_replacements(sequencer_program: str, replacements: dict[s
# At this point, the replacement value should be a string.
if not isinstance(replacement, str):
- raise ValueError("Cannot handle replacement value of type {!r}".format(type(replacement)))
+ raise ValueError(f"Cannot handle replacement value of type {type(replacement)!r}.")
# Perform the replacement.
if SEQC_PAR_PATTERN.fullmatch(parameter):
@@ -144,14 +158,15 @@ def _process_parameter_replacements(sequencer_program: str, replacements: dict[s
parameter_pattern = f"\\{parameter}\\b" # escape the '$' and add word boundary match
sequencer_program = re.sub(parameter_pattern, replacement, sequencer_program)
else:
- raise NameError("Replacement parameter has an invalid name: {}".format(parameter))
+ raise NameError(f"Replacement parameter has an invalid name: {parameter}.")
# Check if there are any unreplaced parameters left in the source code; this will not compile.
leftover_parameters = SEQC_PAR_PATTERN.findall(sequencer_program)
if leftover_parameters:
- raise KeyError("Variables left in sequencer program that were not in replacement dictionary: {}".format(
- ', '.join(leftover_parameters)
- ))
+ raise KeyError(
+ "Variables left in sequencer program that were not in replacement dictionary: " +
+ f"{', '.join(leftover_parameters)}."
+ )
return sequencer_program
@@ -166,13 +181,13 @@ def _check_program_not_empty(sequencer_program: str):
# Check if there are any lines left (we do not check if that is executable code; the compiler will do that).
if len(seqc_statements) == 0:
- raise QMI_ApplicationException("Source string does not contain executable statements")
+ raise QMI_ApplicationException("Source string does not contain executable statements.")
def _get_int(self, node_path: str) -> int:
"""Get an integer value from the nodetree.
Parameters:
- node_path: The path to the node to be queried.
+ node_path: The path to the node to be queried.
Returns:
integer value from node tree.
@@ -183,7 +198,7 @@ def _get_double(self, node_path: str) -> float:
"""Get a double value from the nodetree.
Parameters:
- node_path: The path to the node to be queried.
+ node_path: The path to the node to be queried.
Returns:
double value from node tree.
@@ -194,7 +209,7 @@ def _get_string(self, node_path: str) -> str:
"""Get a string value from the nodetree.
Parameters:
- node_path: The path to the node to be queried.
+ node_path: The path to the node to be queried.
Returns:
string value from node tree.
@@ -205,8 +220,8 @@ def _set_value(self, node_path: str, value: str | int | float) -> None:
"""Set a value in the nodetree. Can be a string, integer, or a floating point number.
Parameters:
- node_path: The path to the node to be queried.
- value: Value to set for the node.
+ node_path: The path to the node to be queried.
+ value: Value to set for the node.
"""
self.daq_server.set('/' + self._device_name + '/' + node_path, value)
@@ -232,13 +247,13 @@ def _wait_compile(self, sequencer_program: str) -> CompilerStatus:
"""Start a compilation of a sequencer program and wait until the compilation is done or timeout.
Parameters:
- sequencer_program: A sequencer program as a string.
+ sequencer_program: A sequencer program as a string.
Raises:
- RuntimeError: If the compilation does now within the 'self.COMPILE_TIMEOUT period.
+ RuntimeError: If the compilation does now within the 'self.COMPILE_TIMEOUT' period.
Returns:
- compilation_status: the obtained compiler status after compiler was finished.
+ compilation_status: The obtained compiler status after compiler was finished.
"""
# Compile the sequencer program with replacements made.
self.awg_module.set("compiler/sourcestring", sequencer_program)
@@ -253,7 +268,7 @@ def _wait_compile(self, sequencer_program: str) -> CompilerStatus:
time.sleep(0.1)
compilation_status = CompilerStatus(self.awg_module.getInt("compiler/status"))
if time.monotonic() - compilation_start_time > self.COMPILE_TIMEOUT:
- raise RuntimeError("Compilation process timed out (timeout={})".format(self.COMPILE_TIMEOUT))
+ raise QMI_TimeoutException(f"Compilation process timed out (timeout={self.COMPILE_TIMEOUT})")
compilation_end_time = time.monotonic()
_logger.debug(
@@ -305,14 +320,15 @@ def _wait_upload(self) -> UploadStatus:
# Poll the AWG module to check ELF upload progress.
upload_start_time = time.monotonic()
_logger.debug("Polling ELF upload status ...")
+ awg_core = self.awg_module.getInt("index") * (self._grouping % 2 + 1)
upload_progress = self.awg_module.getDouble("progress")
- upload_status = self.daq_server.getInt(f"/{self._device_name:s}/awgs/0/ready")
+ upload_status = self.daq_server.getInt(f"/{self._device_name:s}/awgs/{awg_core}/ready")
while upload_progress < 1.0 and upload_status == 0:
time.sleep(0.1)
upload_progress = self.awg_module.getDouble("progress")
- upload_status = self.daq_server.getInt(f"/{self._device_name:s}/awgs/0/ready")
+ upload_status = self.daq_server.getInt(f"/{self._device_name:s}/awgs/{awg_core}/ready")
if time.monotonic() - upload_start_time > self.UPLOAD_TIMEOUT:
- raise RuntimeError("Upload process timed out (timeout={})".format(self.UPLOAD_TIMEOUT))
+ raise QMI_TimeoutException(f"Upload process timed out (timeout={self.UPLOAD_TIMEOUT})")
upload_end_time = time.monotonic()
_logger.debug(
@@ -333,7 +349,6 @@ def _interpret_upload_result_is_ok(self, upload_result: UploadStatus) -> bool:
Returns:
ok_to_proceed: Result as True (OK) if upload was successful, else False.
"""
-
self._check_is_open()
if upload_result == UploadStatus.DONE:
@@ -348,12 +363,22 @@ def _interpret_upload_result_is_ok(self, upload_result: UploadStatus) -> bool:
_logger.error("ELF upload in progress but aborted for unknown reason.")
ok_to_proceed = False
else:
- raise ValueError("Unknown upload status: {}".format(upload_result))
+ raise ValueError(f"Unknown upload status: {upload_result}")
return ok_to_proceed
+ def _verify_awg_module_state(self):
+ # TODO: In other than 1x8 mode all AWG cores should be checked, not just the currently selected one?
+ # Verify that the AWG thread is not running.
+ assert self.awg_module.finished()
+ # Start the AWG thread.
+ self.awg_module.execute()
+ # Verify that the AWG thread is running.
+ assert not self.awg_module.finished()
+
@rpc_method
def open(self) -> None:
+ """We connect to a specific HDAWG via a DAQ Server, which is a process running on some computer."""
self._check_is_closed()
_logger.info("[%s] Opening connection to instrument", self._name)
@@ -367,24 +392,16 @@ def open(self) -> None:
self._awg_module = self.daq_server.awgModule()
self.awg_module.set("device", self._device_name)
- self.awg_module.set("index", 0) # only support 1x8 mode, so only one AWG module
-
- # Verify that the AWG thread is not running.
- assert self.awg_module.finished()
-
- # Start the AWG thread.
- self.awg_module.execute()
-
- # Verify that the AWG thread is running.
- assert not self.awg_module.finished()
-
super().open()
+ self.set_channel_grouping(self._grouping)
+ self.set_awg_module_index(0) # Set initially as 0, 0 is valid for all grouping modes.
@rpc_method
def close(self) -> None:
self._check_is_open()
_logger.info("[%s] Closing connection to instrument", self._name)
+ # TODO: In other than 1x8 mode all AWG cores should be checked, not just the currently selected one?
# Verify that the AWG thread is running.
assert not self.awg_module.finished()
@@ -403,11 +420,10 @@ def close(self) -> None:
@rpc_method
def get_node_string(self, node_path: str) -> str:
- """
- Get a string value for the node.
+ """Get a string value for the node.
Parameters:
- node_path: The node to query.
+ node_path: The node to query.
Returns:
string value for the given node.
@@ -468,27 +484,88 @@ def set_node_double(self, node_path: str, value: float) -> None:
self._set_double(node_path, value)
@rpc_method
- def set_channel_grouping(self, value: int) -> None:
+ def set_channel_grouping(self, grouping: int) -> None:
"""Set the channel grouping of the device.
- This QMI driver currently supports only channel grouping 2 (1x8 channels).
+ Parameters:
+ grouping: Channel grouping to set. Possible values are:
+ 0 = 4x2 channels;
+ 1 = 2x4 channels;
+ 2 = 1x8 channels.
+ """
+ if grouping not in range(3):
+ raise ValueError(f"Unsupported channel grouping: {grouping}.")
+
+ self._check_is_open()
+ self._set_int("system/awg/channelgrouping", grouping)
+ self._grouping = grouping
+
+ @rpc_method
+ def set_awg_module_index(self, index: int):
+ """Set the AWG module index. Possible values are dependent on grouping.
+
+ Parameters:
+ index: AWG module index number. Possible values are:
+ 0 for 1x8 group mode;
+ 0,1 for 2x4 group mode;
+ 0,1,2,3 for 4x2 group mode.
+ """
+ if index != 0:
+ # Then we need to check
+ if (
+ (self._grouping == 0 and index not in range(1, 4)) or
+ (self._grouping == 1 and index > 1) or
+ self._grouping == 2
+ ):
+ raise ValueError(f"Invalid index, {index}, for group mode {self._grouping}.")
+
+ self.awg_module.set("index", index)
+ self._verify_awg_module_state()
+
+ @rpc_method
+ def get_awg_module_enabled(self, awg_core: int) -> int:
+ """Return the current enable status of the AWG module.
+
+ Parameters:
+ awg_core: AWG core number.
+
+ Raises:
+ ValueError: AWG core number is invalid.
+
+ Returns:
+ 1 - If the AWG sequencer is currently running.
+ 0 - If the AWG sequencer is not running.
+ """
+ if awg_core not in self.awg_channel_map:
+ raise ValueError(f"Invalid AWG core ({awg_core}) for group mode {self._grouping}")
+
+ self._check_is_open()
+ return self._get_int(f"awgs/{awg_core}/enable")
+
+ @rpc_method
+ def set_awg_module_enabled(self, value: int) -> None:
+ """Enable or disable the AWG module.
+
+ Enabling the AWG starts execution of the currently loaded sequencer program. This is equal to calling
+ 'awg_module.execute()'. Disabling is equal to 'awg_module.finish()'.
+ Uploading a sequencer program or waveform is only possible while the AWG module is disabled.
Parameters:
- value: Channel grouping to set.
- 0 = 4x2 channels;
- 1 = 2x4 channels;
- 2 = 1x8 channels.
+ value: 1 to enable the AWG module, 0 to disable.
+
+ Raises:
+ ValueError: AWG module enable value is invalid.
"""
- if value != 2:
- raise ValueError("Unsupported channel grouping")
+ if value not in (0, 1):
+ raise ValueError("Invalid value")
+
self._check_is_open()
- self._set_int("system/awg/channelgrouping", value)
+ self.awg_module.set("awg/enable", value)
@rpc_method
- def compile_and_upload(self,
- sequencer_program: str,
- replacements: None | dict[str, str | int | float] = None
- ) -> None:
+ def compile_and_upload(
+ self, sequencer_program: str, replacements: None | dict[str, str | int | float] = None
+ ) -> None:
"""Compile and upload the sequencer_program, after performing textual replacements.
This function combines compilation followed by upload to the AWG if compilation was successful. This is forced
@@ -544,7 +621,7 @@ def compilation_successful(self) -> bool:
@rpc_method
def upload_waveform(
self,
- awg_index: int,
+ awg_core: int,
waveform_index: int,
wave1: np.ndarray,
wave2: None | np.ndarray = None,
@@ -562,7 +639,7 @@ def upload_waveform(
This can be inspected in the "Waveform Viewer" in the LabOne user interface.
Parameters:
- awg_index: 0-based index of the AWG (group of 2 channels).
+ awg_core: 0-based AWG core number.
waveform_index: 0-based index of the waveform array.
wave1: Array containing floating point samples
in range -1.0 .. +1.0 for the first waveform.
@@ -573,8 +650,9 @@ def upload_waveform(
represent the 4 marker channels.
"""
self._check_is_open()
+ assert awg_core in self.awg_channel_map, f"Invalid AWG core ({awg_core}) for group mode {self._grouping}."
waveform_data = zhinst.utils.convert_awg_waveform(wave1, wave2, markers)
- waveform_address = f"/{self._device_name}/awgs/{awg_index}/waveform/waves/{waveform_index}"
+ waveform_address = f"/{self._device_name}/awgs/{awg_core}/waveform/waves/{waveform_index}"
self.daq_server.setVector(waveform_address, waveform_data)
@rpc_method
@@ -588,8 +666,12 @@ def upload_waveforms(
Large sets of waveforms need to be batched to avoid running out of memory.
+ The loop that creates the unpacked waveforms loops such that:
+ - outer loop: for waveform_index, sequence in enumerate(waveforms)
+ - inner loop: for awg_core in range(4)
+
Parameters:
- unpacked_waveforms: List of tuples, each tuple is a collection of awg_index, waveform sequence index,
+ unpacked_waveforms: List of tuples, each tuple is a collection of AWG core number, waveform sequence index,
wave1, wave2 and markers.
batch_size: Large sets of waveforms take plenty of memory. This makes the waveforms to be sent with
maximum sized batches. Default size is 50 waveform entries.
@@ -598,9 +680,9 @@ def upload_waveforms(
waves_set = []
for wf_count, sequence in enumerate(unpacked_waveforms):
- awg_index, waveform_index, wave1, wave2, markers = sequence
+ awg_core, waveform_index, wave1, wave2, markers = sequence
wave_raw = zhinst.utils.convert_awg_waveform(wave1, wave2, markers)
- waveform_address = f'/{self._device_name}/awgs/{awg_index}/waveform/waves/{waveform_index}'
+ waveform_address = f"/{self._device_name}/awgs/{awg_core}/waveform/waves/{waveform_index}"
waves_set.append((waveform_address, wave_raw))
# Check set size against batch size
if wf_count % batch_size == batch_size - 1:
@@ -614,7 +696,7 @@ def upload_waveforms(
@rpc_method
def upload_command_table(
- self, awg_index: int, command_table_entries: list[dict[str, Any]], save_as_file: bool = False
+ self, awg_core: int, command_table_entries: list[dict[str, Any]], save_as_file: bool = False
) -> None:
"""Upload a new command table to the AWG.
@@ -630,28 +712,27 @@ def upload_command_table(
https://json-schema.org/draft-07.
Parameters:
- awg_index: 0-based index of the AWG core to apply the table to (0 .. 3).
- command_table_entries: Actual command table as a list of entries (dicts).
- save_as_file: Set to True to save the validated JSON command table in a file. Default is False.
+ awg_core: 0-based AWG core number to apply the table to (0 .. 3).
+ command_table_entries: Actual command table as a list of entries (dicts).
+ save_as_file: Set to True to save the validated JSON command table in a file. Default is False.
Raises:
- ValueError: Invalid schema used (should not happen as it is obtained from the device).
- ValueError: Validation of the command table failed.
- ValueError: Invalid value in the command table despite successful validation.
- RuntimeError: If the upload of command table on core {awg_index} failed.
- RuntimeError: If the upload process timed out.
+ ValueError: Invalid AWG core number.
+ ValueError: Provided schema is not valid.
+ ValueError: Validation of the command table failed.
+ ValueError: Invalid value in the command table despite successful validation.
+ RuntimeError: If the upload of command table on core {awg_index} failed.
+ QMI_TimeoutException: If the upload process timed out.
"""
self._check_is_open()
- # Check AWG core index.
- if awg_index not in (0, 1, 2, 3):
- raise ValueError("AWG index must be in 0 .. 3")
+ # Check AWG core number.
+ if awg_core not in self.awg_channel_map:
+ raise ValueError(f"Invalid AWG core ({awg_core}) for group mode {self._grouping}.")
# Get schema from the device
- schema_node = f"/{self._device_name:s}/awgs/0/commandtable/schema"
- schema = json.loads(
- self.daq_server.get(schema_node, flat=True)[schema_node][0]["vector"]
- )
+ schema_node = f"/{self._device_name:s}/awgs/{awg_core}/commandtable/schema"
+ schema = json.loads(self.daq_server.get(schema_node, flat=True)[schema_node][0]["vector"])
# Create the command table from the provided entries.
command_table = {
"header": {
@@ -664,38 +745,38 @@ def upload_command_table(
jsonschema.validate(instance=command_table, schema=schema, cls=jsonschema.Draft7Validator)
except jsonschema.exceptions.SchemaError as exc:
_logger.exception("The provided schema is invalid", exc_info=exc)
- raise ValueError("Invalid schema") from exc
+ raise ValueError("Invalid schema.") from exc
except jsonschema.exceptions.ValidationError as exc:
_logger.exception("The provided command table is not valid", exc_info=exc)
- raise ValueError("Invalid command table") from exc
+ raise ValueError("Invalid command table.") from exc
# Convert the command table to JSON and upload.
try:
command_table_as_json = json.dumps(command_table, allow_nan=False, separators=(",", ":"))
except (TypeError, ValueError) as exc:
- raise ValueError("Invalid value in command table") from exc
+ raise ValueError("Invalid value in command table.") from exc
if save_as_file:
- with open(f"{os.path.dirname(__file__)}/cmd_table_{awg_index}.json", "w") as out:
+ # Save as a file
+ with open(os.path.join(os.path.dirname(__file__), f"cmd_table_{awg_core}.json"), "w") as out:
out.write(command_table_as_json)
self.daq_server.setVector(
- "/{}/awgs/{}/commandtable/data".format(self._device_name, awg_index),
- command_table_as_json
+ f"/{self._device_name:s}/awgs/{awg_core}/commandtable/data", command_table_as_json
)
upload_start_time = time.monotonic()
while True:
time.sleep(0.01)
- status = self.daq_server.getInt(f"/{self._device_name:s}/awgs/{awg_index}/commandtable/status")
+ status = self.daq_server.getInt(f"/{self._device_name:s}/awgs/{awg_core}/commandtable/status")
if status & 0b1:
# Upload successful, move on the next core
break
if status & 0b1000:
# Error in command table
- raise RuntimeError(f"The upload of command table on core {awg_index} failed.")
+ raise RuntimeError(f"The upload of command table on core {awg_core} failed.")
if time.monotonic() - upload_start_time > self.UPLOAD_TIMEOUT:
- raise RuntimeError("Upload process timed out (timeout={})".format(self.UPLOAD_TIMEOUT))
+ raise QMI_TimeoutException(f"Upload process timed out (timeout={self.UPLOAD_TIMEOUT})")
upload_end_time = time.monotonic()
_logger.debug(
@@ -729,7 +810,7 @@ def set_reference_clock_source(self, value: int) -> None:
ValueError: By invalid reference clock source input parameter.
"""
if value not in (0, 1, 2):
- raise ValueError("Unsupported reference clock source")
+ raise ValueError("Unsupported reference clock source.")
self._check_is_open()
_logger.info("[%s] Setting reference clock source to [%d]", self._name, value)
@@ -805,12 +886,12 @@ def set_marker_source(self, trigger: int, value: int) -> None:
ValueError: By invalid marker source value.
"""
if trigger < 0 or trigger >= self.NUM_CHANNELS:
- raise ValueError("Invalid trigger index")
+ raise ValueError("Invalid trigger index.")
if value not in range(16) and value not in (17, 18):
- raise ValueError("Invalid marker source: {}".format(value))
+ raise ValueError(f"Invalid marker source: {value}.")
self._check_is_open()
- self._set_int("triggers/out/{}/source".format(trigger), value)
+ self._set_int(f"triggers/out/{trigger}/source", value)
@rpc_method
def set_marker_delay(self, trigger: int, value: float) -> None:
@@ -825,11 +906,11 @@ def set_marker_delay(self, trigger: int, value: float) -> None:
Raises:
ValueError: By invalid trigger index value.
"""
- if trigger < 0 or trigger >= self.NUM_CHANNELS:
- raise ValueError("Invalid trigger index")
+ if trigger not in range(self.NUM_CHANNELS):
+ raise ValueError("Invalid trigger index.")
self._check_is_open()
- self._set_double("triggers/out/{}/delay".format(trigger), value)
+ self._set_double(f"triggers/out/{trigger}/delay", value)
@rpc_method
def set_trigger_level(self, trigger: int, value: float) -> None:
@@ -843,13 +924,13 @@ def set_trigger_level(self, trigger: int, value: float) -> None:
ValueError: By invalid trigger index value.
ValueError: By invalid trigger level value.
"""
- if trigger < 0 or trigger >= self.NUM_CHANNELS:
- raise ValueError("Invalid trigger index")
+ if trigger not in range(self.NUM_CHANNELS):
+ raise ValueError("Invalid trigger index.")
if not -10.0 < value < 10.0:
- raise ValueError("Invalid trigger level")
+ raise ValueError("Invalid trigger level.")
self._check_is_open()
- self._set_double("triggers/in/{}/level".format(trigger), value)
+ self._set_double(f"triggers/in/{trigger}/level", value)
@rpc_method
def set_trigger_impedance(self, trigger: int, value: int) -> None:
@@ -863,10 +944,10 @@ def set_trigger_impedance(self, trigger: int, value: int) -> None:
ValueError: By invalid trigger index value.
ValueError: By invalid trigger impedance setting.
"""
- if trigger < 0 or trigger >= self.NUM_CHANNELS:
- raise ValueError("Invalid trigger index")
+ if trigger not in range(self.NUM_CHANNELS):
+ raise ValueError("Invalid trigger index.")
if value not in (0, 1):
- raise ValueError("Invalid impedance setting")
+ raise ValueError("Invalid impedance setting.")
self._check_is_open()
_logger.info(
@@ -875,175 +956,175 @@ def set_trigger_impedance(self, trigger: int, value: int) -> None:
trigger,
"50 Ohm" if value else "1 kOhm",
)
- self._set_int("triggers/in/{}/imp50".format(trigger), value)
+ self._set_int(f"triggers/in/{trigger}/imp50", value)
@rpc_method
- def set_dig_trigger_source(self, awg: int, trigger: int, value: int) -> None:
+ def set_dig_trigger_source(self, awg_core: int, trigger: int, value: int) -> None:
"""Select the source of a specific digital trigger channel.
There are two digital trigger channels which can be accessed in the
sequencer program via calls to "waitDigTrigger()" and "playWaveDigTrigger()".
Parameters:
- awg: AWG module index (must be 0 in 1x8 channel mode).
- trigger: Digital trigger index in the range 0 to 1, corresponding to digital triggers 1 and 2.
- value: Trigger source in the range 0 to 7, corresponding to trigger inputs 1 to 8 on the front panel.
+ awg_core: AWG core number.
+ trigger: Digital trigger index in the range 0 to 1, corresponding to digital triggers 1 and 2.
+ value: Trigger source in the range 0 to 7,
+ corresponding to trigger inputs 1 to 8 on the front panel.
Raises:
- ValueError: AWG index number is invalid.
+ ValueError: AWG core number is invalid.
ValueError: Digital trigger index is invalid.
ValueError: Trigger source value is invalid.
"""
- if awg < 0 or awg >= self.NUM_AWGS:
- raise ValueError("Invalid AWG index")
+ if awg_core not in self.awg_channel_map:
+ raise ValueError(f"Invalid AWG core ({awg_core}) for group mode {self._grouping}.")
if trigger < 0 or trigger > 1:
- raise ValueError("Invalid digital trigger index")
- if value < 0 or value >= self.NUM_CHANNELS:
- raise ValueError("Invalid trigger source")
+ raise ValueError("Invalid digital trigger index.")
+ if value not in range(self.NUM_CHANNELS):
+ raise ValueError("Invalid trigger source.")
self._check_is_open()
- self._set_int("awgs/{}/auxtriggers/{}/channel".format(awg, trigger), value)
+ self._set_int(f"awgs/{awg_core}/auxtriggers/{trigger}/channel", value)
@rpc_method
- def set_dig_trigger_slope(self, awg: int, trigger: int, value: int) -> None:
+ def set_dig_trigger_slope(self, awg_core: int, trigger: int, value: int) -> None:
"""Set the trigger slope of a specific digital trigger channel.
There are two digital trigger channels which can be accessed in the
sequencer program vi calls to "waitDigTrigger()" and "playWaveDigTrigger()".
Parameters:
- awg: AWG module index (must be 0 in 1x8 channel mode).
- trigger: Digital trigger index in the range 0 to 1, corresponding to digital triggers 1 and 2.
- value: Trigger slope.
- 0 - level sensitive (trigger on high signal);
- 1 - trigger on rising edge;
- 2 - trigger on falling edge;
- 3 - trigger on rising and falling edge.
+ awg_core: AWG core number.
+ trigger: Digital trigger index in the range 0 to 1, corresponding to digital triggers 1 and 2.
+ value: Trigger slope.
+ 0 - level sensitive (trigger on high signal);
+ 1 - trigger on rising edge;
+ 2 - trigger on falling edge;
+ 3 - trigger on rising and falling edge.
Raises:
- ValueError: AWG index number is invalid.
+ ValueError: AWG core number is invalid.
ValueError: Digital trigger index is invalid.
ValueError: Trigger slope value is invalid.
"""
- if awg < 0 or awg >= self.NUM_AWGS:
- raise ValueError("Invalid AWG index")
+ if awg_core not in self.awg_channel_map:
+ raise ValueError(f"Invalid AWG core ({awg_core}) for group mode {self._grouping}.")
if trigger < 0 or trigger > 1:
- raise ValueError("Invalid digital trigger index")
+ raise ValueError("Invalid digital trigger index.")
if value < 0 or value > 3:
- raise ValueError("Invalid trigger slope")
+ raise ValueError("Invalid trigger slope.")
self._check_is_open()
- self._set_int("awgs/{}/auxtriggers/{}/slope".format(awg, trigger), value)
+ self._set_int(f"awgs/{awg_core}/auxtriggers/{trigger}/slope", value)
@rpc_method
- def get_output_amplitude(self, awg: int, channel: int) -> float:
+ def get_output_amplitude(self, awg_core: int, channel: int) -> float:
"""Get the output amplitude scaling factor of the specified channel.
Parameters:
- awg: AWG module index in the range 0 to 3. The AWG index selects a pair of output channels.
- Index 0 selects output channels 1 and 2, index 1 selects channels 2 and 3 etc.
- channel: Channel index in the range 0 to 1, selecting the first or second output channel
- within the selected channel pair.
+ awg_core: AWG core number in the range 0 to 3. The core number selects a pair of output channels.
+ Core 0 selects output channels 1 and 2, core 1 selects channels 3 and 4 etc.
+ channel: Channel index in the range 0 to 1, selecting the first or second output channel
+ within the selected channel pair.
Raises:
- ValueError: AWG index number is invalid.
- ValueError: AWG channel number is invalid.
+ ValueError: AWG core number is invalid.
+ ValueError: AWG channel pair index is invalid.
Returns:
amplitude: A dimensionless scaling factor applied to the digital signal.
"""
- if awg < 0 or awg >= self.NUM_AWGS:
- raise ValueError("Invalid AWG index")
- if channel < 0 or channel >= self.NUM_CHANNELS_PER_AWG:
- raise ValueError("Invalid channel index")
+ if awg_core not in self.awg_channel_map:
+ raise ValueError(f"Invalid AWG core ({awg_core}) for group mode {self._grouping}.")
+ if channel not in range(self.NUM_CHANNELS_PER_AWG):
+ raise ValueError("Invalid channel pair index.")
self._check_is_open()
- return self._get_double(f"awgs/{awg}/outputs/{channel}/amplitude")
+ return self._get_double(f"awgs/{awg_core}/outputs/{channel}/amplitude")
@rpc_method
- def set_output_amplitude(self, awg: int, channel: int, value: float) -> None:
+ def set_output_amplitude(self, awg_core: int, channel: int, value: float) -> None:
"""Set the output scaling factor of the specified channel.
The amplitude is a dimensionless scaling factor applied to the digital signal.
Parameters:
- awg: AWG module index in the range 0 to 3. The AWG index selects a pair of output channels.
- Index 0 selects output channels 1 and 2, index 1 selects channels 2 and 3 etc.
- channel: Channel index in the range 0 to 1, selecting the first or second output channel
- within the selected channel pair.
- value: Amplitude scale factor.
+ awg_core: AWG core number in the range 0 to 3. The core number selects a pair of output channels.
+ Core 0 selects output channels 1 and 2, core 1 selects channels 3 and 4 etc.
+ channel: Channel index in the range 0 to 1, selecting the first or second output channel
+ within the selected channel pair.
+ value: Amplitude scale factor.
Raises:
- ValueError: AWG index number is invalid.
- ValueError: AWG channel number is invalid.
+ ValueError: AWG core number is invalid.
+ ValueError: AWG channel pair index is invalid.
"""
- if awg < 0 or awg >= self.NUM_AWGS:
- raise ValueError("Invalid AWG index")
- if channel < 0 or channel >= self.NUM_CHANNELS_PER_AWG:
- raise ValueError("Invalid channel index")
+ if awg_core not in self.awg_channel_map:
+ raise ValueError(f"Invalid AWG core ({awg_core}) for group mode {self._grouping}.")
+ if channel not in range(self.NUM_CHANNELS_PER_AWG):
+ raise ValueError("Invalid channel pair index.")
self._check_is_open()
- self._set_double("awgs/{}/outputs/{}/amplitude".format(awg, channel), value)
+ self._set_double(f"awgs/{awg_core}/outputs/{channel}/amplitude", value)
@rpc_method
- def get_output_channel_hold(self, awg: int, channel: int) -> int:
+ def get_output_channel_hold(self, awg_core: int, channel: int) -> int:
"""Get whether the last sample is held for the specified channel.
Parameters:
- awg: AWG module index in the range 0 to 3. The AWG index selects a pair of output channels.
- Index 0 selects output channels 1 and 2, index 1 selects channels 2 and 3 etc.
- channel: Channel index in the range 0 to 1, selecting the first or second output channel
- within the selected channel pair.
+ awg_core: AWG core number in the range 0 to 3. The core number selects a pair of output channels.
+ Core 0 selects output channels 1 and 2, core 1 selects channels 3 and 4 etc.
+ channel: Channel index in the range 0 to 1, selecting the first or second output channel
+ within the selected channel pair.
Raises:
- ValueError: AWG index number is invalid.
- ValueError: AWG channel number is invalid.
+ ValueError: AWG core number is invalid.
+ ValueError: AWG channel pair index is invalid.
Returns:
0: If last sample is not held.
1: If last sample is held.
"""
- if awg < 0 or awg >= self.NUM_AWGS:
- raise ValueError("Invalid AWG index")
- if channel < 0 or channel >= self.NUM_CHANNELS_PER_AWG:
- raise ValueError("Invalid channel index")
+ if awg_core not in self.awg_channel_map:
+ raise ValueError(f"Invalid AWG core ({awg_core}) for group mode {self._grouping}")
+ if channel not in range(self.NUM_CHANNELS_PER_AWG):
+ raise ValueError("Invalid channel pair index.")
self._check_is_open()
- return self._get_int(f"awgs/{awg}/outputs/{channel}/hold")
+ return self._get_int(f"awgs/{awg_core}/outputs/{channel}/hold")
@rpc_method
- def set_output_channel_hold(self, awg: int, channel: int, value: int) -> None:
+ def set_output_channel_hold(self, awg_core: int, channel: int, value: int) -> None:
"""Set whether the last sample should be held for the specified channel.
Parameters:
- awg: AWG module index in the range 0 to 3.
- The AWG index selects a pair of output channels.
- Index 0 selects output channels 1 and 2, index 1 selects channels 2 and 3 etc.
- channel: Channel index in the range 0 to 1, selecting the first or second output channel
- within the selected channel pair.
- value: Hold state; 0 = False, 1 = True.
+ awg_core: AWG core number in the range 0 to 3. The core number selects a pair of output channels.
+ Core 0 selects output channels 1 and 2, core 1 selects channels 3 and 4 etc.
+ channel: Channel index in the range 0 to 1, selecting the first or second output channel
+ within the selected channel pair.
+ value: Hold state; 0 = False, 1 = True.
Raises:
- ValueError: AWG index number is invalid.
- ValueError: AWG channel number is invalid.
+ ValueError: AWG core number is invalid.
+ ValueError: AWG channel pair index is invalid.
ValueError: Invalid hold state value.
"""
- if awg < 0 or awg >= self.NUM_AWGS:
- raise ValueError("Invalid AWG index")
- if channel < 0 or channel >= self.NUM_CHANNELS_PER_AWG:
- raise ValueError("Invalid channel index")
+ if awg_core not in self.awg_channel_map:
+ raise ValueError(f"Invalid AWG core ({awg_core}) for group mode {self._grouping}.")
+ if channel not in range(self.NUM_CHANNELS_PER_AWG):
+ raise ValueError("Invalid channel pair index.")
if value not in (0, 1):
- raise ValueError("Invalid hold state")
+ raise ValueError("Invalid hold state.")
self._check_is_open()
- self._set_int(f"awgs/{awg}/outputs/{channel}/hold", value)
+ self._set_int(f"awgs/{awg_core}/outputs/{channel}/hold", value)
@rpc_method
- def get_output_channel_on(self, output_channel: int) -> int:
+ def get_output_channel_on(self, awg_channel: int) -> int:
"""Get the specified wave output channel state.
Parameters:
- output_channel: Channel index in the range 0 to 7, corresponding to wave outputs 1 to 8 on the front panel.
+ awg_channel: Channel index in the range 0 to 7, corresponding to wave outputs 1 to 8 on the front panel.
Returns:
channel_state: 0 = output off, 1 = output on.
@@ -1051,80 +1132,80 @@ def get_output_channel_on(self, output_channel: int) -> int:
Raises:
ValueError: Output channel number is invalid.
"""
- if output_channel < 0 or output_channel >= self.NUM_CHANNELS:
- raise ValueError("Invalid channel index")
+ if awg_channel not in range(self.NUM_CHANNELS):
+ raise ValueError(f"Invalid output channel: {awg_channel}.")
self._check_is_open()
- return self._get_int("sigouts/{}/on".format(output_channel))
+ return self._get_int(f"sigouts/{awg_channel}/on")
@rpc_method
- def set_output_channel_on(self, output_channel: int, value: int) -> None:
+ def set_output_channel_on(self, awg_channel: int, value: int) -> None:
"""Set the specified wave output channel on or off.
Parameters:
- output_channel: Channel index in the range 0 to 7, corresponding to wave outputs 1 to 8 on the front panel.
- value: Output state; 0 = output off, 1 = output on.
+ awg_channel: Channel index in the range 0 to 7, corresponding to wave outputs 1 to 8 on the front panel.
+ value: Output state; 0 = output off, 1 = output on.
Raises:
ValueError: Output channel number is invalid.
ValueError: Invalid output channel state value.
"""
- if output_channel < 0 or output_channel >= self.NUM_CHANNELS:
- raise ValueError("Invalid channel index")
+ if awg_channel not in range(self.NUM_CHANNELS):
+ raise ValueError(f"Invalid output channel: {awg_channel}.")
if value not in (0, 1):
- raise ValueError("Invalid on/off state")
+ raise ValueError("Invalid on/off state.")
self._check_is_open()
- self._set_int("sigouts/{}/on".format(output_channel), value)
+ self._set_int(f"sigouts/{awg_channel}/on", value)
@rpc_method
- def set_output_channel_range(self, output_channel: int, value: float) -> None:
+ def set_output_channel_range(self, awg_channel: int, value: float) -> None:
"""Set voltage range of the specified wave output channel.
Parameters:
- output_channel: Channel index in the range 0 to 7, corresponding to wave outputs 1 to 8 on the front panel.
- value: Output range in Volt. The instrument selects the next higher available range.
+ awg_channel: Channel index in the range 0 to 7, corresponding to wave outputs 1 to 8 on the front panel.
+ value: Output range in Volt. The instrument selects the next higher available range.
Raises:
ValueError: Output channel number is invalid.
ValueError: Voltage range is invalid.
"""
- if output_channel < 0 or output_channel >= self.NUM_CHANNELS:
- raise ValueError("Invalid channel index")
+ if awg_channel not in range(self.NUM_CHANNELS):
+ raise ValueError(f"Invalid output channel: {awg_channel}.")
if value < 0 or value > 5.0:
- raise ValueError("Invalid output range")
+ raise ValueError("Invalid output range.")
self._check_is_open()
- self._set_double("sigouts/{}/range".format(output_channel), value)
+ self._set_double(f"sigouts/{awg_channel}/range", value)
@rpc_method
- def set_output_channel_offset(self, output_channel: int, value: float) -> None:
+ def set_output_channel_offset(self, awg_channel: int, value: float) -> None:
"""Set the DC offset voltage for the specified wave output channel.
The DC offset is only active in amplified mode, not in direct mode.
Parameters:
- output_channel: Channel index in the range 0 to 7, corresponding to wave outputs 1 to 8 on the front panel.
- value: Offset in Volt, in range -1.25 to +1.25 V.
+ awg_channel: Channel index in the range 0 to 7, corresponding to wave outputs 1 to 8 on the front panel.
+ value: Offset in Volt, in range -1.25 to +1.25 V.
Raises:
ValueError: Output channel number is invalid.
ValueError: Voltage offset is invalid.
"""
- if output_channel < 0 or output_channel >= self.NUM_CHANNELS:
- raise ValueError("Invalid channel index")
+ if awg_channel not in range(self.NUM_CHANNELS):
+ raise ValueError(f"Invalid output channel: {awg_channel}.")
if not -1.25 <= value <= 1.25:
- raise ValueError("Invalid offset value")
+ raise ValueError("Invalid offset value.")
self._check_is_open()
- self._set_double("sigouts/{}/offset".format(output_channel), value)
+ self._set_double(f"sigouts/{awg_channel}/offset", value)
@rpc_method
- def get_output_channel_delay(self, output_channel: int) -> float:
+ def get_output_channel_delay(self, awg_channel: int) -> float:
"""Return output delay for fine alignment of the specified wave output channel.
Parameters:
- output_channel: Channel index in the range 0 to 7, corresponding to wave outputs 1 to 8 on the front panel.
+ awg_channel: Channel index in the range 0 to 7, corresponding to wave outputs 1 to 8 on the front panel.
Raises:
ValueError: Output channel number is invalid.
@@ -1132,76 +1213,76 @@ def get_output_channel_delay(self, output_channel: int) -> float:
Returns:
Delay in seconds.
"""
- if output_channel < 0 or output_channel >= self.NUM_CHANNELS:
- raise ValueError("Invalid channel index")
+ if awg_channel not in range(self.NUM_CHANNELS):
+ raise ValueError(f"Invalid output channel: {awg_channel}.")
self._check_is_open()
- return self._get_double("sigouts/{}/delay".format(output_channel))
+ return self._get_double(f"sigouts/{awg_channel}/delay")
@rpc_method
- def set_output_channel_delay(self, output_channel: int, value: float) -> None:
+ def set_output_channel_delay(self, awg_channel: int, value: float) -> None:
"""Set output delay for fine alignment of the specified wave output channel.
Changing the delay setting may take several seconds.
Parameters:
- output_channel: Channel index in the range 0 to 7, corresponding to wave outputs 1 to 8 on the front panel.
- value: Delay in seconds, range 0 to 26e-9.
+ awg_channel: Channel index in the range 0 to 7, corresponding to wave outputs 1 to 8 on the front panel.
+ value: Delay in seconds, range 0 to 26e-9.
Raises:
ValueError: Output channel number is invalid.
ValueError: Delay value is invalid.
"""
- if output_channel < 0 or output_channel >= self.NUM_CHANNELS:
- raise ValueError("Invalid channel index")
+ if awg_channel not in range(self.NUM_CHANNELS):
+ raise ValueError(f"Invalid output channel: {awg_channel}.")
if not 0 <= value < 26e-9:
- raise ValueError("Invalid delay setting")
+ raise ValueError("Invalid delay setting.")
self._check_is_open()
- self._set_double("sigouts/{}/delay".format(output_channel), value)
+ self._set_double(f"sigouts/{awg_channel}/delay", value)
@rpc_method
- def set_output_channel_direct(self, output_channel: int, value: int) -> None:
+ def set_output_channel_direct(self, awg_channel: int, value: int) -> None:
"""Enable or disable the direct output path for the specified wave output channel.
The direct output path bypasses the output amplifier and offset circuits,
and fixes the output range to 800 mV.
Parameters:
- output_channel: Channel index in the range 0 to 7, corresponding to wave outputs 1 to 8 on the front panel.
- value: 1 to enable the direct output path, 0 to disable.
+ awg_channel: Channel index in the range 0 to 7, corresponding to wave outputs 1 to 8 on the front panel.
+ value: 1 to enable the direct output path, 0 to disable.
Raises:
ValueError: Output channel number is invalid.
ValueError: Direct value is invalid.
"""
- if output_channel < 0 or output_channel >= self.NUM_CHANNELS:
- raise ValueError("Invalid channel index")
+ if awg_channel not in range(self.NUM_CHANNELS):
+ raise ValueError(f"Invalid output channel: {awg_channel}.")
if value not in (0, 1):
- raise ValueError("Invalid value")
+ raise ValueError("Invalid value.")
self._check_is_open()
- self._set_int("sigouts/{}/direct".format(output_channel), value)
+ self._set_int(f"sigouts/{awg_channel}/direct", value)
@rpc_method
- def set_output_channel_filter(self, output_channel: int, value: int) -> None:
+ def set_output_channel_filter(self, awg_channel: int, value: int) -> None:
"""Enable or disable the analog output filter for the specified wave output channel.
Parameters:
- output_channel: Channel index in the range 0 to 7, corresponding to wave outputs 1 to 8 on the front panel.
- value: 1 to enable the output filter, 0 to disable.
+ awg_channel: Channel index in the range 0 to 7, corresponding to wave outputs 1 to 8 on the front panel.
+ value: 1 to enable the output filter, 0 to disable.
Raises:
ValueError: Output channel number is invalid.
ValueError: Filter value is invalid.
"""
- if output_channel < 0 or output_channel >= self.NUM_CHANNELS:
- raise ValueError("Invalid channel index")
+ if awg_channel not in range(self.NUM_CHANNELS):
+ raise ValueError(f"Invalid output channel: {awg_channel}.")
if value not in (0, 1):
- raise ValueError("Invalid value")
+ raise ValueError("Invalid value.")
self._check_is_open()
- self._set_int("sigouts/{}/filter".format(output_channel), value)
+ self._set_int(f"sigouts/{awg_channel}/filter", value)
@rpc_method
def set_dio_mode(self, mode: int) -> None:
@@ -1221,7 +1302,7 @@ def set_dio_mode(self, mode: int) -> None:
ValueError: If mode is not valid.
"""
if not 0 <= mode <= 3:
- raise ValueError("Invalid DIO mode")
+ raise ValueError("Invalid DIO mode.")
self._check_is_open()
self._set_int("dios/0/mode", mode)
@@ -1240,64 +1321,64 @@ def set_dio_drive(self, mask: int) -> None:
ValueError: If DIO signals direction mask is not valid.
"""
if not 0 <= mask <= 15:
- raise ValueError("Invalid value for mask")
+ raise ValueError("Invalid value for mask.")
self._check_is_open()
self._set_int("dios/0/drive", mask)
@rpc_method
- def set_dio_strobe_index(self, awg: int, value: int) -> None:
+ def set_dio_strobe_index(self, awg_core: int, value: int) -> None:
"""Select the DIO index to be used as a trigger for playback.
The sequencer program uses this trigger by calling "playWaveDIO()".
Parameters:
- awg: AWG module index (must be 0 in 1x8 channel mode).
- value: DIO bit index in the range 0 to 31.
+ awg_core: AWG core number.
+ value: DIO bit index in the range 0 to 31.
Raises:
ValueError: AWG index number is invalid.
ValueError: DIO bit index is invalid.
"""
- if awg < 0 or awg >= self.NUM_AWGS:
- raise ValueError("Invalid AWG index")
+ if awg_core not in self.awg_channel_map:
+ raise ValueError(f"Invalid AWG core ({awg_core}) for group mode {self._grouping}.")
if value < 0 or value > 31:
- raise ValueError("Invalid DIO bit index")
+ raise ValueError("Invalid DIO bit index.")
self._check_is_open()
- self._set_int("awgs/{}/dio/strobe/index".format(awg), value)
+ self._set_int(f"awgs/{awg_core}/dio/strobe/index", value)
@rpc_method
- def set_dio_strobe_slope(self, awg: int, value: int) -> None:
+ def set_dio_strobe_slope(self, awg_core: int, value: int) -> None:
"""Select the signal edge that should activate the strobe trigger.
Parameters:
- awg: AWG module index (must be 0 in 1x8 channel mode).
- value: Slope type.
- 0 - off;
- 1 - trigger on rising edge;
- 2 - trigger on falling edge;
- 3 - trigger on rising and falling edge.
+ awg_core: AWG core number.
+ value: Slope type.
+ 0 - off;
+ 1 - trigger on rising edge;
+ 2 - trigger on falling edge;
+ 3 - trigger on rising and falling edge.
Raises:
ValueError: AWG index number is invalid.
ValueError: DIO strobe slope value is invalid.
"""
- if awg < 0 or awg >= self.NUM_AWGS:
- raise ValueError("Invalid AWG index")
+ if awg_core not in self.awg_channel_map:
+ raise ValueError(f"Invalid AWG core ({awg_core}) for group mode {self._grouping}.")
if value < 0 or value > 3:
- raise ValueError("Invalid slope")
+ raise ValueError("Invalid slope.")
self._check_is_open()
- self._set_int("awgs/{}/dio/strobe/slope".format(awg), value)
+ self._set_int(f"awgs/{awg_core}/dio/strobe/slope", value)
@rpc_method
- def get_user_register(self, awg: int, reg: int) -> int:
+ def get_user_register(self, awg_core: int, reg: int) -> int:
"""Return the value of the specified user register.
Parameters:
- awg: AWG module index (must be 0 in 1x8 channel mode).
- reg: Register index in the range 0 to 15.
+ awg_core: AWG core number.
+ reg: Register index in the range 0 to 15.
Raises:
ValueError: AWG index number is invalid.
@@ -1306,70 +1387,31 @@ def get_user_register(self, awg: int, reg: int) -> int:
Returns:
value: User register value.
"""
- if awg < 0 or awg >= self.NUM_AWGS:
- raise ValueError("Invalid AWG index")
+ if awg_core not in self.awg_channel_map:
+ raise ValueError(f"Invalid AWG core ({awg_core}) for group mode {self._grouping}.")
if not 0 <= reg <= 15:
- raise ValueError("Invalid register index")
+ raise ValueError("Invalid register index.")
self._check_is_open()
- return self._get_int("awgs/{}/userregs/{}".format(awg, reg))
+ return self._get_int(f"awgs/{awg_core}/userregs/{reg}")
@rpc_method
- def set_user_register(self, awg: int, reg: int, value: int) -> None:
+ def set_user_register(self, awg_core: int, reg: int, value: int) -> None:
"""Change the value of the specified user register.
Parameters:
- awg: AWG module index (must be 0 in 1x8 channel mode).
- reg: Register index in the range 0 to 15.
- value: Integer value to write to the register.
+ awg_core: AWG core number.
+ reg: Register index in the range 0 to 15.
+ value: Integer value to write to the register.
Raises:
ValueError: AWG index number is invalid.
ValueError: Register index is invalid.
"""
- if awg < 0 or awg >= self.NUM_AWGS:
- raise ValueError("Invalid AWG index")
+ if awg_core not in self.awg_channel_map:
+ raise ValueError(f"Invalid AWG core ({awg_core}) for group mode {self._grouping}.")
if not 0 <= reg <= 15:
- raise ValueError("Invalid register index")
+ raise ValueError("Invalid register index.")
self._check_is_open()
- self._set_int("awgs/{}/userregs/{}".format(awg, reg), value)
-
- @rpc_method
- def get_awg_module_enabled(self, awg: int) -> int:
- """Return the current enable status of the AWG module.
-
- Parameters:
- awg: AWG module index (must be 0 in 1x8 channel mode).
-
- Raises:
- ValueError: AWG index number is invalid.
-
- Returns:
- 1 - If the AWG sequencer is currently running.
- 0 - If the AWG sequencer is not running.
- """
- if awg < 0 or awg >= self.NUM_AWGS:
- raise ValueError("Invalid AWG index")
-
- self._check_is_open()
- return self._get_int("awgs/{}/enable".format(awg))
-
- @rpc_method
- def set_awg_module_enabled(self, value: int) -> None:
- """Enable or disable the AWG module.
-
- Enabling the AWG starts execution of the currently loaded sequencer program.
- Uploading a sequencer program or waveform is only possible while the AWG module is disabled.
-
- Parameters:
- value: 1 to enable the AWG module, 0 to disable.
-
- Raises:
- ValueError: AWG module enable value is invalid.
- """
- if value not in (0, 1):
- raise ValueError("Invalid value")
-
- self._check_is_open()
- self.awg_module.set("awg/enable", value)
+ self._set_int(f"awgs/{awg_core}/userregs/{reg}", value)
diff --git a/tests/instruments/zurich_instruments/test_hdawg.py b/tests/instruments/zurich_instruments/test_hdawg.py
index 74fbe29b..feba39bc 100644
--- a/tests/instruments/zurich_instruments/test_hdawg.py
+++ b/tests/instruments/zurich_instruments/test_hdawg.py
@@ -10,7 +10,7 @@
import numpy as np
from qmi.instruments.zurich_instruments import ZurichInstruments_Hdawg
-from qmi.core.exceptions import QMI_InvalidOperationException, QMI_ApplicationException
+from qmi.core.exceptions import QMI_InvalidOperationException, QMI_ApplicationException, QMI_TimeoutException
from tests.patcher import PatcherQmiContext as QMI_Context
_DEVICE_HOST = "localhost"
@@ -249,7 +249,7 @@ def setUp(self):
self.ctx = QMI_Context("test_hdawg_openclose")
self._awg_module = PropertyMock()
- self._awg_module.finished.side_effect = [True, False, False, True]
+ self._awg_module.finished.side_effect = [True, False, False, True] * 2
self._daq_server = PropertyMock()
self._daq_server.awgModule = Mock(return_value=self._awg_module)
@@ -285,10 +285,13 @@ def test_openclose(self):
self.hdawg.open()
self.hdawg.close()
+ self.assertListEqual([0 for _ in range(ZurichInstruments_Hdawg.NUM_CHANNELS)], self.hdawg.awg_channel_map)
+
expected_daq_server_calls = [
# open()
call.connectDevice(_DEVICE_NAME, "1GbE"),
call.awgModule(),
+ call.setInt(f"/{_DEVICE_NAME}/system/awg/channelgrouping", 2), # 2 is the default group
# close()
call.disconnect()
]
@@ -308,6 +311,63 @@ def test_openclose(self):
]
self.assertEqual(self._awg_module.mock_calls, expected_awg_module_calls)
+ def test_openclose_grouping_modes_0_1(self):
+ """open/close sequence with 4x2 and 2x4 grouping modes."""
+
+ for grouping in (0, 1):
+ with unittest.mock.patch(
+ "qmi.instruments.zurich_instruments.hdawg.ziDAQServer", return_value=self._daq_server
+ ), unittest.mock.patch(
+ "qmi.instruments.zurich_instruments.hdawg.AwgModule", return_value=self._awg_module
+ ), unittest.mock.patch(
+ "qmi.instruments.zurich_instruments.hdawg.zhinst"
+ ), unittest.mock.patch(
+ "qmi.instruments.zurich_instruments.hdawg.zhinst.core"
+ ) as core_patch:
+ core_patch.ziDAQServer = Mock(return_value=self._daq_server)
+ core_patch.AwgModule = Mock(return_value=self._awg_module)
+ self.hdawg = ZurichInstruments_Hdawg(
+ self.ctx,
+ "HDAWG",
+ server_host=_DEVICE_HOST,
+ server_port=_DEVICE_PORT,
+ device_name=_DEVICE_NAME,
+ grouping=grouping
+ )
+ self.hdawg.open()
+ self.hdawg.close()
+
+ awg_cores_map = [
+ n // (2 ** (grouping + 1)) * (grouping % 2 + 1) for n in range(ZurichInstruments_Hdawg.NUM_CHANNELS)
+ ]
+ self.assertListEqual(awg_cores_map, self.hdawg.awg_channel_map)
+
+ expected_daq_server_calls = [
+ # open()
+ call.connectDevice(_DEVICE_NAME, "1GbE"),
+ call.awgModule(),
+ call.setInt(f"/{_DEVICE_NAME}/system/awg/channelgrouping", grouping),
+ # close()
+ call.disconnect()
+ ]
+ self.assertEqual(expected_daq_server_calls, self._daq_server.mock_calls)
+
+ expected_awg_module_calls = [
+ # open()
+ call.set("device", _DEVICE_NAME),
+ call.set("index", 0),
+ call.finished(),
+ call.execute(),
+ call.finished(),
+ # close()
+ call.finished(),
+ call.finish(),
+ call.finished()
+ ]
+ self.assertEqual(expected_awg_module_calls, self._awg_module.mock_calls)
+ self._daq_server.reset_mock()
+ self._awg_module.reset_mock()
+
def test_failed_open1(self):
"""Failed open sequence."""
self._awg_module.finished.side_effect = [False]
@@ -335,7 +395,8 @@ def test_failed_open1(self):
expected_daq_server_calls = [
call.connectDevice(_DEVICE_NAME, "1GbE"),
- call.awgModule()
+ call.awgModule(),
+ call.setInt(f"/{_DEVICE_NAME}/system/awg/channelgrouping", 2) # 2 is the default group
]
self.assertEqual(self._daq_server.mock_calls, expected_daq_server_calls)
@@ -351,7 +412,7 @@ def test_failed_open2(self):
self._awg_module.finished.side_effect = [True, True]
with unittest.mock.patch(
- "qmi.instruments.zurich_instruments.hdawg.ziDAQServer", return_value=self._daq_server
+ "qmi.instruments.zurich_instruments.hdawg.ziDAQServer", return_value=self._daq_server
), unittest.mock.patch(
"qmi.instruments.zurich_instruments.hdawg.AwgModule", return_value=self._awg_module
), unittest.mock.patch(
@@ -375,7 +436,8 @@ def test_failed_open2(self):
expected_daq_server_calls = [
call.connectDevice(_DEVICE_NAME, "1GbE"),
- call.awgModule()
+ call.awgModule(),
+ call.setInt(f"/{_DEVICE_NAME}/system/awg/channelgrouping", 2) # 2 is the default group
]
self.assertEqual(self._daq_server.mock_calls, expected_daq_server_calls)
@@ -417,7 +479,8 @@ def test_failed_close1(self):
expected_daq_server_calls = [
# open()
call.connectDevice(_DEVICE_NAME, "1GbE"),
- call.awgModule()
+ call.awgModule(),
+ call.setInt(f"/{_DEVICE_NAME}/system/awg/channelgrouping", 2) # 2 is the default group
]
self.assertEqual(self._daq_server.mock_calls, expected_daq_server_calls)
@@ -462,7 +525,8 @@ def test_failed_close2(self):
expected_daq_server_calls = [
# open()
call.connectDevice(_DEVICE_NAME, "1GbE"),
- call.awgModule()
+ call.awgModule(),
+ call.setInt(f"/{_DEVICE_NAME}/system/awg/channelgrouping", 2) # 2 is the default group
]
self.assertEqual(self._daq_server.mock_calls, expected_daq_server_calls)
@@ -563,7 +627,8 @@ def setUp(self):
"HDAWG",
server_host=_DEVICE_HOST,
server_port=_DEVICE_PORT,
- device_name=_DEVICE_NAME
+ device_name=_DEVICE_NAME,
+ grouping=0
)
self.hdawg.open()
@@ -688,12 +753,13 @@ def test_compile_and_upload(self):
}
"""
- self._awg_module.getInt.side_effect = [0, 0] # successful compilation; upload completed
+ self._awg_module.getInt.side_effect = [0, 0, 0] # compiler status; get index; elf status
self._awg_module.getDouble.side_effect = [1.0] # upload progress
expected_calls = [
call.set("compiler/sourcestring", expected_source),
call.getInt("compiler/status"),
+ call.getInt("index"),
call.getDouble("progress"),
call.getInt("elf/status"),
]
@@ -741,7 +807,7 @@ def test_valid_replacements(self):
for source, replacements, expected_source in cases:
with self.subTest(source=source):
- self._awg_module.getInt.side_effect = [0, 0] # successful compilation; upload completed
+ self._awg_module.getInt.side_effect = [0, 0, 0] # successful compilation; get index; upload completed
self._awg_module.getDouble.side_effect = [1.0] # upload progress
self.hdawg.compile_and_upload(source, replacements)
@@ -777,7 +843,7 @@ def test_replacement_value_good_types(self):
for value, expected_source in cases:
with self.subTest(value=value):
- self._awg_module.getInt.side_effect = [0, 0] # successful compilation; upload completed
+ self._awg_module.getInt.side_effect = [0, 0, 0] # successful compilation; get index; upload completed
self._awg_module.getDouble.side_effect = [1.0] # upload progress
self.hdawg.compile_and_upload(source, {"$PAR": value})
@@ -824,7 +890,7 @@ def test_compile_timeout(self):
]
self.hdawg.COMPILE_TIMEOUT = 0.1
self.hdawg.UPLOAD_TIMEOUT = 0.1
- with self.assertRaises(RuntimeError):
+ with self.assertRaises(QMI_TimeoutException):
self.hdawg.compile_and_upload(source)
self.assertEqual(self._awg_module.mock_calls, expected_calls)
@@ -833,12 +899,13 @@ def test_elf_upload_error(self):
"""Test failed ELF upload sequence."""
source = "while(true) {}"
- self._awg_module.getInt.side_effect = [0, 1] # successful compilation; upload timed out
+ self._awg_module.getInt.side_effect = [0, 0, 1] # successful compilation; get index; upload timed out
self._awg_module.getDouble.side_effect = [0.3] # upload progress
expected_calls = [
call.set("compiler/sourcestring", source),
call.getInt("compiler/status"),
+ call.getInt("index"),
call.getDouble("progress"),
call.getInt("elf/status"),
]
@@ -859,7 +926,7 @@ def test_elf_upload_timeout(self):
expected_calls = [
call.set("compiler/sourcestring", source),
call.getInt("compiler/status"),
- # call.getInt("compiler/status"),
+ call.getInt("index"),
call.getDouble("progress"),
# call.getInt("elf/status"),
call.getDouble("progress"),
@@ -867,7 +934,7 @@ def test_elf_upload_timeout(self):
]
self.hdawg.UPLOAD_TIMEOUT = 0.1
- with self.assertRaises(RuntimeError) as err:
+ with self.assertRaises(QMI_TimeoutException) as err:
self.hdawg.compile_and_upload(source)
self.assertEqual(f"Upload process timed out (timeout={self.hdawg.UPLOAD_TIMEOUT})", str(err.exception))
@@ -1025,12 +1092,12 @@ def test_upload_command_table_invalid_schema(self):
with self.assertRaises(ValueError) as verr_2:
self.hdawg.upload_command_table(0, command_table_entries)
- self.assertEqual("Invalid schema", str(verr_2.exception))
+ self.assertEqual("Invalid schema.", str(verr_2.exception))
def test_upload_empty_command_table(self):
"""Test command table upload."""
table = []
- awg_index = 1
+ awg_index = 0
# Create the command table from the provided entries.
command_table = {
"header": {"version":"0.4"}, "table": table
@@ -1040,6 +1107,19 @@ def test_upload_empty_command_table(self):
self.hdawg.upload_command_table(awg_index, table)
self._daq_server.setVector.assert_called_once_with(set_vector, json.dumps(command_table, allow_nan=False).replace(" ", ""))
+ def test_upload_empty_command_table_invalid_index_error(self):
+ """Test command table upload."""
+ table = []
+ awg_index = 1
+ # Create the command table from the provided entries.
+ command_table = {
+ "header": {"version":"0.4"}, "table": table
+ }
+ set_vector = "/{}/awgs/{}/commandtable/data".format(_DEVICE_NAME, awg_index)
+
+ with self.assertRaises(KeyError):
+ self.hdawg.upload_command_table(awg_index, table)
+
def test_upload_invalid_command_table(self):
"""Test invalid command table upload."""
command_table_entries = [
@@ -1065,8 +1145,8 @@ def test_upload_invalid_command_table(self):
with self.assertRaises(ValueError) as verr_2:
self.hdawg.upload_command_table(0, command_table_entries)
- self.assertEqual("Invalid command table", str(verr.exception))
- self.assertEqual("Invalid value in command table", str(verr_2.exception))
+ self.assertEqual("Invalid command table.", str(verr.exception))
+ self.assertEqual("Invalid value in command table.", str(verr_2.exception))
def test_upload_command_table_wrong_awg(self):
"""Test invalid command table upload."""
@@ -1085,13 +1165,65 @@ def test_sync(self):
]
self.assertEqual(self._daq_server.mock_calls, expected_daq_server_calls)
+ def test_set_awg_module_index(self):
+ """Test setting AWG module index with good values."""
+ side_effect = [True, False] * 7 # group 0 = 4x, group 1 = 2x, group 2 = 1x
+ self._awg_module.finished.side_effect = side_effect + [False, True] # For close
+ groupings = list(range(3))
+ for grouping in groupings:
+ self.hdawg._grouping = grouping
+ ok_indexes = list(range((2 - grouping) * 2))
+ ok_indexes = [0] if not ok_indexes else ok_indexes
+ for index in ok_indexes:
+ self.hdawg.set_awg_module_index(index)
+
+ def test_set_awg_module_index_exceptions(self):
+ """Test setting AWG module index with wrong values w.r.t. grouping."""
+ self._awg_module.finished.side_effect = [False, True] # For close
+ groupings = list(range(3))
+ for grouping in groupings:
+ self.hdawg._grouping = grouping
+ nok_indexes = list(range((3 - grouping), grouping * 2 + 2))
+ nok_indexes = [-1, 4] if not nok_indexes else nok_indexes
+ for index in nok_indexes:
+ with self.assertRaises(ValueError):
+ self.hdawg.set_awg_module_index(index)
+
+ def test_awg_module_enabled(self):
+ """Test AWG enable on/off."""
+ self.hdawg.get_awg_module_enabled(0)
+ self._check_get_value_int("awgs/0/enable")
+ self.hdawg.get_awg_module_enabled(3)
+ self._check_get_value_int("awgs/3/enable")
+
+ with self.assertRaises(ValueError):
+ self.hdawg.get_awg_module_enabled(-1)
+
+ with self.assertRaises(ValueError):
+ self.hdawg.get_awg_module_enabled(4)
+
+ self.hdawg.set_awg_module_enabled(0)
+ self.hdawg.set_awg_module_enabled(1)
+
+ expected_awg_module_calls = [
+ call.set("awg/enable", 0),
+ call.set("awg/enable", 1)
+ ]
+ self.assertEqual(expected_awg_module_calls, self._awg_module.mock_calls)
+
+ with self.assertRaises(ValueError):
+ self.hdawg.set_awg_module_enabled(-1)
+
+ with self.assertRaises(ValueError):
+ self.hdawg.set_awg_module_enabled(2)
+
def test_set_channel_grouping(self):
- """Test channel grouping setting."""
- self.hdawg.set_channel_grouping(2)
- self._check_set_value_int("system/awg/channelgrouping", 2)
+ """Test setting channel grouping."""
+ self.hdawg.set_channel_grouping(0)
+ self._check_set_value_int("system/awg/channelgrouping", 0)
with self.assertRaises(ValueError):
- self.hdawg.set_channel_grouping(1)
+ self.hdawg.set_channel_grouping(3)
def test_set_reference_clock_source(self):
"""Test reference clock source setting."""
@@ -1675,34 +1807,6 @@ def test_set_user_register(self):
with self.assertRaises(ValueError):
self.hdawg.set_user_register(0, 16, 789)
- def test_awg_module_enabled(self):
- """Test AWG enable on/off."""
- self.hdawg.get_awg_module_enabled(0)
- self._check_get_value_int("awgs/0/enable")
- self.hdawg.get_awg_module_enabled(3)
- self._check_get_value_int("awgs/3/enable")
-
- with self.assertRaises(ValueError):
- self.hdawg.get_awg_module_enabled(-1)
-
- with self.assertRaises(ValueError):
- self.hdawg.get_awg_module_enabled(4)
-
- self.hdawg.set_awg_module_enabled(0)
- self.hdawg.set_awg_module_enabled(1)
-
- expected_awg_module_calls = [
- call.set("awg/enable", 0),
- call.set("awg/enable", 1)
- ]
- self.assertEqual(expected_awg_module_calls, self._awg_module.mock_calls)
-
- with self.assertRaises(ValueError):
- self.hdawg.set_awg_module_enabled(-1)
-
- with self.assertRaises(ValueError):
- self.hdawg.set_awg_module_enabled(2)
-
if __name__ == '__main__':
unittest.main()