|
6 | 6 | import inspect |
7 | 7 | import json |
8 | 8 | import math |
| 9 | +import os |
9 | 10 | import re |
10 | 11 | import sqlite3 |
11 | 12 | import sys |
|
17 | 18 | from pathlib import Path |
18 | 19 | from typing import Any |
19 | 20 |
|
| 21 | +import yaml |
| 22 | + |
20 | 23 | from nanonis_qcodes_controller.client import create_client, probe_host_ports, report_to_dict |
21 | 24 | from nanonis_qcodes_controller.client.errors import ( |
22 | 25 | NanonisCommandUnavailableError, |
|
57 | 60 | re.IGNORECASE, |
58 | 61 | ) |
59 | 62 |
|
| 63 | +_TRUE_BOOL_TOKENS = frozenset({"1", "true", "yes", "on"}) |
| 64 | +_FALSE_BOOL_TOKENS = frozenset({"0", "false", "no", "off"}) |
| 65 | + |
60 | 66 |
|
61 | 67 | @dataclass(frozen=True) |
62 | 68 | class ActionDescriptor: |
@@ -365,6 +371,23 @@ def _build_parser() -> argparse.ArgumentParser: |
365 | 371 | parser_policy_show.add_argument("--config-file") |
366 | 372 | parser_policy_show.set_defaults(handler=_cmd_policy_show) |
367 | 373 |
|
| 374 | + parser_policy_set = policy_subparsers.add_parser( |
| 375 | + "set", help="Set runtime policy write/dry-run flags." |
| 376 | + ) |
| 377 | + _add_json_arg(parser_policy_set) |
| 378 | + parser_policy_set.add_argument( |
| 379 | + "--allow-writes", |
| 380 | + type=_parse_cli_bool_arg, |
| 381 | + help="Override safety.allow_writes (1/0, true/false, yes/no, on/off).", |
| 382 | + ) |
| 383 | + parser_policy_set.add_argument( |
| 384 | + "--dry-run", |
| 385 | + type=_parse_cli_bool_arg, |
| 386 | + help="Override safety.dry_run (1/0, true/false, yes/no, on/off).", |
| 387 | + ) |
| 388 | + parser_policy_set.add_argument("--config-file") |
| 389 | + parser_policy_set.set_defaults(handler=_cmd_policy_set) |
| 390 | + |
368 | 391 | parser_trajectory = subparsers.add_parser("trajectory", help="Trajectory log utilities.") |
369 | 392 | trajectory_subparsers = parser_trajectory.add_subparsers( |
370 | 393 | dest="trajectory_command", required=True |
@@ -826,6 +849,44 @@ def _cmd_policy_show(args: argparse.Namespace) -> int: |
826 | 849 | return EXIT_OK |
827 | 850 |
|
828 | 851 |
|
| 852 | +def _cmd_policy_set(args: argparse.Namespace) -> int: |
| 853 | + allow_writes = args.allow_writes |
| 854 | + dry_run = args.dry_run |
| 855 | + if allow_writes is None and dry_run is None: |
| 856 | + raise ValueError("Provide at least one policy flag: --allow-writes and/or --dry-run.") |
| 857 | + |
| 858 | + config_path = _resolve_policy_config_path(args.config_file) |
| 859 | + config_values = _load_runtime_config_yaml(config_path) |
| 860 | + safety_section = config_values.get("safety") |
| 861 | + if safety_section is None: |
| 862 | + updated_safety: dict[str, Any] = {} |
| 863 | + elif isinstance(safety_section, Mapping): |
| 864 | + updated_safety = dict(safety_section) |
| 865 | + else: |
| 866 | + raise ValueError("Runtime config section 'safety' must be a mapping.") |
| 867 | + |
| 868 | + if allow_writes is not None: |
| 869 | + updated_safety["allow_writes"] = bool(allow_writes) |
| 870 | + if dry_run is not None: |
| 871 | + updated_safety["dry_run"] = bool(dry_run) |
| 872 | + config_values["safety"] = updated_safety |
| 873 | + |
| 874 | + config_path.parent.mkdir(parents=True, exist_ok=True) |
| 875 | + with config_path.open("w", encoding="utf-8") as handle: |
| 876 | + yaml.safe_dump(config_values, handle, sort_keys=False) |
| 877 | + |
| 878 | + settings = load_settings(config_file=config_path) |
| 879 | + payload = { |
| 880 | + "allow_writes": settings.safety.allow_writes, |
| 881 | + "dry_run": settings.safety.dry_run, |
| 882 | + "default_ramp_interval_s": settings.safety.default_ramp_interval_s, |
| 883 | + "config_file": str(config_path), |
| 884 | + "timestamp_utc": _now_utc_iso(), |
| 885 | + } |
| 886 | + _print_payload(payload, as_json=args.json) |
| 887 | + return EXIT_OK |
| 888 | + |
| 889 | + |
829 | 890 | def _cmd_parameters_discover(args: argparse.Namespace) -> int: |
830 | 891 | commands = list(_discover_nanonis_spm_commands(args.match)) |
831 | 892 | if args.limit > 0: |
@@ -1428,6 +1489,42 @@ def _parse_float_arg(*, name: str, raw_value: str) -> float: |
1428 | 1489 | raise ValueError(f"{name} must be numeric.") from exc |
1429 | 1490 |
|
1430 | 1491 |
|
| 1492 | +def _parse_cli_bool_arg(raw_value: object) -> bool: |
| 1493 | + if isinstance(raw_value, bool): |
| 1494 | + return raw_value |
| 1495 | + |
| 1496 | + normalized = str(raw_value).strip().lower() |
| 1497 | + if normalized in _TRUE_BOOL_TOKENS: |
| 1498 | + return True |
| 1499 | + if normalized in _FALSE_BOOL_TOKENS: |
| 1500 | + return False |
| 1501 | + |
| 1502 | + raise argparse.ArgumentTypeError("Expected boolean value: 1/0, true/false, yes/no, on/off.") |
| 1503 | + |
| 1504 | + |
| 1505 | +def _resolve_policy_config_path(config_file: str | None) -> Path: |
| 1506 | + if config_file is not None: |
| 1507 | + return Path(config_file).expanduser() |
| 1508 | + env_override = os.environ.get("NANONIS_CONFIG_FILE") |
| 1509 | + if env_override: |
| 1510 | + return Path(env_override).expanduser() |
| 1511 | + return Path("config/default_runtime.yaml") |
| 1512 | + |
| 1513 | + |
| 1514 | +def _load_runtime_config_yaml(config_path: Path) -> dict[str, Any]: |
| 1515 | + if not config_path.exists(): |
| 1516 | + return {} |
| 1517 | + |
| 1518 | + with config_path.open("r", encoding="utf-8") as handle: |
| 1519 | + loaded = yaml.safe_load(handle) |
| 1520 | + |
| 1521 | + if loaded is None: |
| 1522 | + return {} |
| 1523 | + if not isinstance(loaded, Mapping): |
| 1524 | + raise ValueError("Runtime config file must contain a top-level mapping.") |
| 1525 | + return dict(loaded) |
| 1526 | + |
| 1527 | + |
1431 | 1528 | def _parse_positive_float_arg(*, name: str, raw_value: str) -> float: |
1432 | 1529 | value = _parse_float_arg(name=name, raw_value=raw_value) |
1433 | 1530 | if value <= 0: |
|
0 commit comments