From 79c692c01c4f787d381d796cafa52048e8de3c49 Mon Sep 17 00:00:00 2001 From: David Plankensteiner Date: Tue, 25 Nov 2025 12:27:50 +0100 Subject: [PATCH 01/15] Somewhat working version of invoke --- src/bloqade/analysis/fidelity/__init__.py | 6 +- src/bloqade/analysis/fidelity/analysis.py | 148 +++++++++++++++--- src/bloqade/analysis/fidelity/impls.py | 97 ++++++++++++ src/bloqade/squin/__init__.py | 1 + .../squin/analysis/fidelity/__init__.py | 0 src/bloqade/squin/analysis/fidelity/impls.py | 38 +++++ test/analysis/fidelity/test_fidelity.py | 20 ++- 7 files changed, 285 insertions(+), 25 deletions(-) create mode 100644 src/bloqade/analysis/fidelity/impls.py create mode 100644 src/bloqade/squin/analysis/fidelity/__init__.py create mode 100644 src/bloqade/squin/analysis/fidelity/impls.py diff --git a/src/bloqade/analysis/fidelity/__init__.py b/src/bloqade/analysis/fidelity/__init__.py index 496b1b5df..1d9f76450 100644 --- a/src/bloqade/analysis/fidelity/__init__.py +++ b/src/bloqade/analysis/fidelity/__init__.py @@ -1 +1,5 @@ -from .analysis import FidelityAnalysis as FidelityAnalysis +from . import impls as impls +from .analysis import ( + FidelityFrame as FidelityFrame, + FidelityAnalysis as FidelityAnalysis, +) diff --git a/src/bloqade/analysis/fidelity/analysis.py b/src/bloqade/analysis/fidelity/analysis.py index 815b57256..73efe5f98 100644 --- a/src/bloqade/analysis/fidelity/analysis.py +++ b/src/bloqade/analysis/fidelity/analysis.py @@ -1,15 +1,40 @@ -from typing import Any -from dataclasses import field +from typing import Any, Sequence +from dataclasses import field, dataclass from kirin import ir +from kirin.interp import InterpreterError from kirin.lattice import EmptyLattice -from kirin.analysis import Forward +from kirin.analysis import ForwardExtra +from kirin.dialects import py from kirin.analysis.forward import ForwardFrame -from ..address import Address, AddressAnalysis +from ..address import Address, AddressReg, AddressAnalysis -class FidelityAnalysis(Forward): +def init_nested_dict(): + return dict(dict()) + + +@dataclass +class FidelityFrame(ForwardFrame[EmptyLattice]): + gate_fidelities: list[list[float]] = field(init=False) + """Gate fidelities of each qubit as (min, max) pairs to provide a range""" + + const_values: dict[ir.SSAValue, Any] = field(default_factory=dict) + current_addresses: dict[ir.SSAValue, Address] = field(default_factory=dict) + + parent_stmt: ir.Statement | None = None + + def update_fidelities(self, fidelity: float, addresses: AddressReg): + """short-hand to update both (min, max) values""" + + for idx in addresses.data: + self.gate_fidelities[idx][0] *= fidelity + self.gate_fidelities[idx][1] *= fidelity + + +@dataclass +class FidelityAnalysis(ForwardExtra[FidelityFrame, EmptyLattice]): """ This analysis pass can be used to track the global addresses of qubits and wires. @@ -52,31 +77,116 @@ def main(): The probabilities that each of the atoms in the register survive the duration of the analysed program. The order of the list follows the order they are in the register. """ - addr_frame: ForwardFrame[Address] = field(init=False) + default_probabilities: dict[str, tuple[float, ...]] = field(default_factory=dict) + """Default probabilities for noise statements where the probabilities are runtime values. The key must be equal to `stmt.name` and the number of values must match the probabilities. + + Example: + + ```python + from bloqade import squin + + @squin.kernel + def main(): + ... + + analysis = FidelityAnalysis(main.dialects, default_probabilities = {'single_qubit_pauli_channel': [1e-4, 1e-4, 1e-4]}) + ``` + """ + + addr_frame: ForwardFrame[Address] | None = None + addr_analysis: AddressAnalysis = field(init=False) + + collected_address: dict[ir.Statement, dict[ir.SSAValue, Address]] = field( + default_factory=init_nested_dict + ) + + n_qubits: int | None = None + + const_values: dict[ir.SSAValue, Any] = field(default_factory=dict) + # def initialize(self): + # super().initialize() + # self._current_gate_fidelity = 1.0 + # self._current_atom_survival_probability = [ + # 1.0 for _ in range(len(self.atom_survival_probability)) + # ] + # return self - def initialize(self): - super().initialize() - self._current_gate_fidelity = 1.0 - self._current_atom_survival_probability = [ - 1.0 for _ in range(len(self.atom_survival_probability)) - ] - return self + def initialize_frame( + self, node: ir.Statement, *, has_parent_access: bool = False + ) -> FidelityFrame: + frame = FidelityFrame(node, has_parent_access=has_parent_access) + + if self.n_qubits is not None: + frame.gate_fidelities = [[1.0, 1.0] for _ in range(self.n_qubits)] + + if self.addr_frame is not None: + frame.current_addresses = self.addr_frame.entries + + return frame + + def eval_fallback(self, frame: FidelityFrame, node: ir.Statement): + + if isinstance(node, py.Constant): + # TODO: make sure this is a PyAttr + frame.const_values[node.result] = node.value.data - def eval_fallback(self, frame: ForwardFrame, node: ir.Statement): # NOTE: default is to conserve fidelity, so do nothing here - return + return tuple(self.lattice.bottom() for _ in range(len(node.results))) - def run(self, method: ir.Method, *args, **kwargs) -> tuple[ForwardFrame, Any]: + def run( + self, method: ir.Method, *args, **kwargs + ) -> tuple[FidelityFrame, EmptyLattice]: self._run_address_analysis(method) + + assert self.n_qubits is not None + return super().run(method, *args, **kwargs) def _run_address_analysis(self, method: ir.Method): - addr_analysis = AddressAnalysis(self.dialects) - addr_frame, _ = addr_analysis.run(method=method) + self.addr_analysis = AddressAnalysis(self.dialects) + addr_frame, _ = self.addr_analysis.run(method=method) self.addr_frame = addr_frame + self.n_qubits = self.addr_analysis.qubit_count + + return addr_frame + # NOTE: make sure we have as many probabilities as we have addresses - self.atom_survival_probability = [1.0] * addr_analysis.qubit_count + # self.atom_survival_probability = [1.0] * addr_analysis.qubit_count def method_self(self, method: ir.Method) -> EmptyLattice: return self.lattice.bottom() + + def get_address(self, stmt: ir.Statement, key: ir.SSAValue): + addr = None + + if self.addr_frame is not None: + addr = self.addr_frame.entries.get(key) + + if addr is not None: + return addr + + collected_addr = self.collected_address.get(stmt) + if collected_addr is not None: + addr = collected_addr.get(key) + + if addr is None: + # for stmt_key, _addresses in self.collected_address.items(): + # addr = _addresses.get(key) + # if addr is not None: + # return addr + + raise InterpreterError(f"Address of {key} at statement {stmt} not found!") + + return addr + + def get_addresses(self, stmt: ir.Statement, keys: Sequence[ir.SSAValue]): + return tuple(self.get_address(stmt, key) for key in keys) + + def collect_addresses( + self, stmt: ir.Statement, addresses: dict[ir.SSAValue, Address] + ): + if stmt in self.collected_address: + self.collected_address[stmt].update(addresses) + else: + self.collected_address[stmt] = addresses diff --git a/src/bloqade/analysis/fidelity/impls.py b/src/bloqade/analysis/fidelity/impls.py new file mode 100644 index 000000000..f81167236 --- /dev/null +++ b/src/bloqade/analysis/fidelity/impls.py @@ -0,0 +1,97 @@ +from kirin import interp +from kirin.dialects import scf, func + +from .analysis import FidelityFrame, FidelityAnalysis + + +@scf.dialect.register(key="circuit.fidelity") +class __ScfMethods(interp.MethodTable): + @interp.impl(scf.IfElse) + def if_else( + self, interp_: FidelityAnalysis, frame: FidelityFrame, stmt: scf.IfElse + ): + # TODO: check if the condition is constant and fix the branch in that case + # run both branches + with interp_.new_frame(stmt, has_parent_access=True) as then_frame: + interp_.frame_call_region( + then_frame, + stmt, + stmt.then_body, + ) + then_fids = then_frame.gate_fidelities + + with interp_.new_frame(stmt, has_parent_access=True) as else_frame: + interp_.frame_call_region( + else_frame, + stmt, + stmt.else_body, + ) + + else_fids = else_frame.gate_fidelities + + assert (n_qubits := interp_.n_qubits) is not None + for i in range(n_qubits): + min_fid = min(then_fids[i][0], else_fids[i][0]) + max_fid = max(then_fids[i][1], else_fids[i][1]) + frame.gate_fidelities[i][0] *= min_fid + frame.gate_fidelities[i][1] *= max_fid + + +@func.dialect.register(key="circuit.fidelity") +class __FuncMethods(interp.MethodTable): + @interp.impl(func.Invoke) + def invoke_( + self, interp_: FidelityAnalysis, frame: FidelityFrame, stmt: func.Invoke + ): + # ret, _ = interp_.call(stmt.callee.code, *frame.get_values(stmt.inputs)) + if stmt.callee.sym_name == "h": + pass + + print(stmt.callee.sym_name) + + # TODO: we need to know the address of qubit lists created inside the invoke + # since the address analysis impl for invoke does this by calling the method + # the frame containing the info we need does not exist at the top level + # Q: does that mean we need to re-run the address analysis on the body? + + parent_stmt = frame.parent_stmt or stmt + + addr_frame, addr_ret = interp_.addr_analysis.call( + stmt.callee, + interp_.addr_analysis.method_self(stmt.callee), + *interp_.get_addresses(parent_stmt, stmt.inputs), + ) + interp_.collect_addresses(stmt, addr_frame.entries) + + with interp_.new_frame(stmt.callee.code, has_parent_access=True) as body_frame: + for arg, input in zip( + stmt.callee.callable_region.blocks[0].args[ + 1: + ], # NOTE: skip method_self + stmt.inputs, + ): + const_value = frame.const_values.get(input) + if const_value is not None: + body_frame.const_values[arg] = const_value + + addr = interp_.get_address(parent_stmt, input) + interp_.collect_addresses(stmt, {arg: addr}) + if addr is not None: + # TODO: this also includes constant values, let's just use the Address lattice element for that + body_frame.current_addresses[arg] = addr + + body_frame.parent_stmt = stmt + + ret = interp_.frame_call( + body_frame, + stmt.callee.code, + interp_.method_self(stmt.callee), + *frame.get_values(stmt.inputs), + # *args + ) + + for i, (fid0, fid1) in enumerate(body_frame.gate_fidelities): + frame.gate_fidelities[i][0] *= fid0 + frame.gate_fidelities[i][1] *= fid1 + + return (ret,) diff --git a/src/bloqade/squin/__init__.py b/src/bloqade/squin/__init__.py index fba2659c3..9a8ffc7d4 100644 --- a/src/bloqade/squin/__init__.py +++ b/src/bloqade/squin/__init__.py @@ -47,6 +47,7 @@ two_qubit_pauli_channel as two_qubit_pauli_channel, single_qubit_pauli_channel as single_qubit_pauli_channel, ) +from .analysis.fidelity import impls as impls # NOTE: it's important to keep these imports here since they import squin.kernel # we skip isort here diff --git a/src/bloqade/squin/analysis/fidelity/__init__.py b/src/bloqade/squin/analysis/fidelity/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/bloqade/squin/analysis/fidelity/impls.py b/src/bloqade/squin/analysis/fidelity/impls.py new file mode 100644 index 000000000..b612a140c --- /dev/null +++ b/src/bloqade/squin/analysis/fidelity/impls.py @@ -0,0 +1,38 @@ +from typing import TypeVar, cast + +from kirin import interp + +from bloqade.squin import noise +from bloqade.analysis.address import AddressReg +from bloqade.analysis.fidelity import FidelityFrame, FidelityAnalysis + +T = TypeVar("T") + + +@noise.dialect.register(key="circuit.fidelity") +class __NoiseMethods(interp.MethodTable): + + @interp.impl(noise.stmts.SingleQubitPauliChannel) + def single_qubit_pauli_channel( + self, + interp_: FidelityAnalysis, + frame: FidelityFrame, + stmt: noise.stmts.SingleQubitPauliChannel, + ): + + defaults = interp_.default_probabilities.get(stmt.name, (None,) * 3) + px = frame.const_values.get(stmt.px, defaults[0]) + py = frame.const_values.get(stmt.py, defaults[1]) + pz = frame.const_values.get(stmt.pz, defaults[2]) + + if any((px is None, py is None, pz is None)): + return + + addresses = frame.current_addresses.get(stmt.qubits) + assert isinstance(addresses, AddressReg) + + fidelity = cast(float, 1 - (px + py + pz)) # type: ignore -- NOTE: the linter doesn't understand the above if + print(fidelity) + frame.update_fidelities(fidelity, addresses) + + return () diff --git a/test/analysis/fidelity/test_fidelity.py b/test/analysis/fidelity/test_fidelity.py index 78ca8f26c..6ba9126eb 100644 --- a/test/analysis/fidelity/test_fidelity.py +++ b/test/analysis/fidelity/test_fidelity.py @@ -1,8 +1,6 @@ import math -import pytest - -from bloqade import qasm2 +from bloqade import qasm2, squin from bloqade.qasm2 import noise from bloqade.analysis.fidelity import FidelityAnalysis from bloqade.qasm2.passes.noise import NoisePass @@ -80,7 +78,6 @@ def parallel_cz_errors(self, ctrls, qargs, rest): return {(0.01, 0.01, 0.01, 0.01): ctrls + qargs + rest} -@pytest.mark.xfail def test_if(): @qasm2.extended @@ -137,7 +134,6 @@ def main_if(): ) -@pytest.mark.xfail def test_for(): @qasm2.extended @@ -201,3 +197,17 @@ def main_for(): == fid_analysis.atom_survival_probability[0] < 1 ) + + +@squin.kernel +def main(): + q = squin.qalloc(2) + squin.h(q[0]) + squin.single_qubit_pauli_channel(0.1, 0.2, 0.3, q[0]) + squin.cx(q[0], q[1]) + + +fid_analysis = FidelityAnalysis(main.dialects) +frame, _ = fid_analysis.run(main) + +print(frame.gate_fidelities) From 02edebdb5deabf53be0400e157da9dcf3648df71 Mon Sep 17 00:00:00 2001 From: David Plankensteiner Date: Tue, 25 Nov 2025 12:57:23 +0100 Subject: [PATCH 02/15] Simplify frame --- src/bloqade/analysis/fidelity/analysis.py | 45 ++++++++------------ src/bloqade/analysis/fidelity/impls.py | 25 ++--------- src/bloqade/squin/analysis/fidelity/impls.py | 16 +++---- 3 files changed, 28 insertions(+), 58 deletions(-) diff --git a/src/bloqade/analysis/fidelity/analysis.py b/src/bloqade/analysis/fidelity/analysis.py index 73efe5f98..02af785b8 100644 --- a/src/bloqade/analysis/fidelity/analysis.py +++ b/src/bloqade/analysis/fidelity/analysis.py @@ -1,14 +1,13 @@ -from typing import Any, Sequence +from typing import Sequence from dataclasses import field, dataclass from kirin import ir from kirin.interp import InterpreterError from kirin.lattice import EmptyLattice -from kirin.analysis import ForwardExtra -from kirin.dialects import py +from kirin.analysis import ForwardExtra, const from kirin.analysis.forward import ForwardFrame -from ..address import Address, AddressReg, AddressAnalysis +from ..address import Address, AddressReg, ConstResult, AddressAnalysis def init_nested_dict(): @@ -20,9 +19,6 @@ class FidelityFrame(ForwardFrame[EmptyLattice]): gate_fidelities: list[list[float]] = field(init=False) """Gate fidelities of each qubit as (min, max) pairs to provide a range""" - const_values: dict[ir.SSAValue, Any] = field(default_factory=dict) - current_addresses: dict[ir.SSAValue, Address] = field(default_factory=dict) - parent_stmt: ir.Statement | None = None def update_fidelities(self, fidelity: float, addresses: AddressReg): @@ -96,13 +92,12 @@ def main(): addr_frame: ForwardFrame[Address] | None = None addr_analysis: AddressAnalysis = field(init=False) - collected_address: dict[ir.Statement, dict[ir.SSAValue, Address]] = field( + collected_addresses: dict[ir.Statement, dict[ir.SSAValue, Address]] = field( default_factory=init_nested_dict ) n_qubits: int | None = None - const_values: dict[ir.SSAValue, Any] = field(default_factory=dict) # def initialize(self): # super().initialize() # self._current_gate_fidelity = 1.0 @@ -119,17 +114,9 @@ def initialize_frame( if self.n_qubits is not None: frame.gate_fidelities = [[1.0, 1.0] for _ in range(self.n_qubits)] - if self.addr_frame is not None: - frame.current_addresses = self.addr_frame.entries - return frame def eval_fallback(self, frame: FidelityFrame, node: ir.Statement): - - if isinstance(node, py.Constant): - # TODO: make sure this is a PyAttr - frame.const_values[node.result] = node.value.data - # NOTE: default is to conserve fidelity, so do nothing here return tuple(self.lattice.bottom() for _ in range(len(node.results))) @@ -166,16 +153,11 @@ def get_address(self, stmt: ir.Statement, key: ir.SSAValue): if addr is not None: return addr - collected_addr = self.collected_address.get(stmt) + collected_addr = self.collected_addresses.get(stmt) if collected_addr is not None: addr = collected_addr.get(key) if addr is None: - # for stmt_key, _addresses in self.collected_address.items(): - # addr = _addresses.get(key) - # if addr is not None: - # return addr - raise InterpreterError(f"Address of {key} at statement {stmt} not found!") return addr @@ -183,10 +165,19 @@ def get_address(self, stmt: ir.Statement, key: ir.SSAValue): def get_addresses(self, stmt: ir.Statement, keys: Sequence[ir.SSAValue]): return tuple(self.get_address(stmt, key) for key in keys) - def collect_addresses( + def store_addresses( self, stmt: ir.Statement, addresses: dict[ir.SSAValue, Address] ): - if stmt in self.collected_address: - self.collected_address[stmt].update(addresses) + if stmt in self.collected_addresses: + self.collected_addresses[stmt].update(addresses) else: - self.collected_address[stmt] = addresses + self.collected_addresses[stmt] = addresses + + def get_const(self, stmt: ir.Statement, key: ir.SSAValue): + # NOTE: we rely on the address analysis to fetch constants and re-use the corresponding lattice element + addr = self.get_address(stmt, key) + + assert isinstance(addr, ConstResult) + assert isinstance(result := addr.result, const.Value) + + return result.data diff --git a/src/bloqade/analysis/fidelity/impls.py b/src/bloqade/analysis/fidelity/impls.py index f81167236..ce4e96836 100644 --- a/src/bloqade/analysis/fidelity/impls.py +++ b/src/bloqade/analysis/fidelity/impls.py @@ -43,25 +43,14 @@ class __FuncMethods(interp.MethodTable): def invoke_( self, interp_: FidelityAnalysis, frame: FidelityFrame, stmt: func.Invoke ): - # ret, _ = interp_.call(stmt.callee.code, *frame.get_values(stmt.inputs)) - if stmt.callee.sym_name == "h": - pass - - print(stmt.callee.sym_name) - - # TODO: we need to know the address of qubit lists created inside the invoke - # since the address analysis impl for invoke does this by calling the method - # the frame containing the info we need does not exist at the top level - # Q: does that mean we need to re-run the address analysis on the body? - parent_stmt = frame.parent_stmt or stmt - addr_frame, addr_ret = interp_.addr_analysis.call( + addr_frame, _ = interp_.addr_analysis.call( stmt.callee, interp_.addr_analysis.method_self(stmt.callee), *interp_.get_addresses(parent_stmt, stmt.inputs), ) - interp_.collect_addresses(stmt, addr_frame.entries) + interp_.store_addresses(stmt, addr_frame.entries) with interp_.new_frame(stmt.callee.code, has_parent_access=True) as body_frame: for arg, input in zip( @@ -70,15 +59,8 @@ def invoke_( ], # NOTE: skip method_self stmt.inputs, ): - const_value = frame.const_values.get(input) - if const_value is not None: - body_frame.const_values[arg] = const_value - addr = interp_.get_address(parent_stmt, input) - interp_.collect_addresses(stmt, {arg: addr}) - if addr is not None: - # TODO: this also includes constant values, let's just use the Address lattice element for that - body_frame.current_addresses[arg] = addr + interp_.store_addresses(parent_stmt, {arg: addr}) body_frame.parent_stmt = stmt @@ -87,7 +69,6 @@ def invoke_( stmt.callee.code, interp_.method_self(stmt.callee), *frame.get_values(stmt.inputs), - # *args ) for i, (fid0, fid1) in enumerate(body_frame.gate_fidelities): diff --git a/src/bloqade/squin/analysis/fidelity/impls.py b/src/bloqade/squin/analysis/fidelity/impls.py index b612a140c..394ef1347 100644 --- a/src/bloqade/squin/analysis/fidelity/impls.py +++ b/src/bloqade/squin/analysis/fidelity/impls.py @@ -19,20 +19,18 @@ def single_qubit_pauli_channel( frame: FidelityFrame, stmt: noise.stmts.SingleQubitPauliChannel, ): + parent_stmt = frame.parent_stmt or stmt + px = interp_.get_const(parent_stmt, stmt.px) + py = interp_.get_const(parent_stmt, stmt.py) + pz = interp_.get_const(parent_stmt, stmt.pz) - defaults = interp_.default_probabilities.get(stmt.name, (None,) * 3) - px = frame.const_values.get(stmt.px, defaults[0]) - py = frame.const_values.get(stmt.py, defaults[1]) - pz = frame.const_values.get(stmt.pz, defaults[2]) + # if any((px is None, py is None, pz is None)): + # return - if any((px is None, py is None, pz is None)): - return - - addresses = frame.current_addresses.get(stmt.qubits) + addresses = interp_.get_address(parent_stmt, stmt.qubits) assert isinstance(addresses, AddressReg) fidelity = cast(float, 1 - (px + py + pz)) # type: ignore -- NOTE: the linter doesn't understand the above if - print(fidelity) frame.update_fidelities(fidelity, addresses) return () From c48b222e1a4e00a8544cfb99ecf7c508669e64ca Mon Sep 17 00:00:00 2001 From: David Plankensteiner Date: Tue, 25 Nov 2025 13:10:14 +0100 Subject: [PATCH 03/15] Some more cleanup work --- src/bloqade/analysis/fidelity/analysis.py | 28 ++++++++++++++++---- src/bloqade/analysis/fidelity/impls.py | 16 ++++++----- src/bloqade/squin/analysis/fidelity/impls.py | 12 +++------ test/analysis/fidelity/test_fidelity.py | 23 +++++++++------- 4 files changed, 49 insertions(+), 30 deletions(-) diff --git a/src/bloqade/analysis/fidelity/analysis.py b/src/bloqade/analysis/fidelity/analysis.py index 02af785b8..b135c63e4 100644 --- a/src/bloqade/analysis/fidelity/analysis.py +++ b/src/bloqade/analysis/fidelity/analysis.py @@ -144,7 +144,7 @@ def _run_address_analysis(self, method: ir.Method): def method_self(self, method: ir.Method) -> EmptyLattice: return self.lattice.bottom() - def get_address(self, stmt: ir.Statement, key: ir.SSAValue): + def _get_address(self, stmt: ir.Statement, key: ir.SSAValue): addr = None if self.addr_frame is not None: @@ -162,10 +162,26 @@ def get_address(self, stmt: ir.Statement, key: ir.SSAValue): return addr - def get_addresses(self, stmt: ir.Statement, keys: Sequence[ir.SSAValue]): - return tuple(self.get_address(stmt, key) for key in keys) + def get_address(self, frame: FidelityFrame, stmt: ir.Statement, key: ir.SSAValue): + parent_stmt = frame.parent_stmt or stmt + return self._get_address(parent_stmt, key) + + def get_addresses( + self, frame: FidelityFrame, stmt: ir.Statement, keys: Sequence[ir.SSAValue] + ): + parent_stmt = frame.parent_stmt or stmt + return tuple(self._get_address(parent_stmt, key) for key in keys) def store_addresses( + self, + frame: FidelityFrame, + stmt: ir.Statement, + addresses: dict[ir.SSAValue, Address], + ): + parent_stmt = frame.parent_stmt or stmt + self._store_addresses(parent_stmt, addresses) + + def _store_addresses( self, stmt: ir.Statement, addresses: dict[ir.SSAValue, Address] ): if stmt in self.collected_addresses: @@ -173,9 +189,11 @@ def store_addresses( else: self.collected_addresses[stmt] = addresses - def get_const(self, stmt: ir.Statement, key: ir.SSAValue): + def get_const(self, frame: FidelityFrame, stmt: ir.Statement, key: ir.SSAValue): + parent_stmt = frame.parent_stmt or stmt + # NOTE: we rely on the address analysis to fetch constants and re-use the corresponding lattice element - addr = self.get_address(stmt, key) + addr = self._get_address(parent_stmt, key) assert isinstance(addr, ConstResult) assert isinstance(result := addr.result, const.Value) diff --git a/src/bloqade/analysis/fidelity/impls.py b/src/bloqade/analysis/fidelity/impls.py index ce4e96836..da1b7689d 100644 --- a/src/bloqade/analysis/fidelity/impls.py +++ b/src/bloqade/analysis/fidelity/impls.py @@ -43,27 +43,29 @@ class __FuncMethods(interp.MethodTable): def invoke_( self, interp_: FidelityAnalysis, frame: FidelityFrame, stmt: func.Invoke ): - parent_stmt = frame.parent_stmt or stmt - + # NOTE: re-run address analysis on the invoke body so we have the addresses of the values inside the body, not just the return addr_frame, _ = interp_.addr_analysis.call( stmt.callee, interp_.addr_analysis.method_self(stmt.callee), - *interp_.get_addresses(parent_stmt, stmt.inputs), + *interp_.get_addresses(frame, stmt, stmt.inputs), ) - interp_.store_addresses(stmt, addr_frame.entries) + interp_.store_addresses(frame, stmt, addr_frame.entries) with interp_.new_frame(stmt.callee.code, has_parent_access=True) as body_frame: + body_frame.parent_stmt = stmt + for arg, input in zip( stmt.callee.callable_region.blocks[0].args[ 1: ], # NOTE: skip method_self stmt.inputs, ): - addr = interp_.get_address(parent_stmt, input) - interp_.store_addresses(parent_stmt, {arg: addr}) - body_frame.parent_stmt = stmt + # NOTE: assign address of input to blockargument + addr = interp_.get_address(frame, stmt, input) + interp_.store_addresses(body_frame, stmt, {arg: addr}) + # NOTE: actually call the invoke to evaluate fidelity ret = interp_.frame_call( body_frame, stmt.callee.code, diff --git a/src/bloqade/squin/analysis/fidelity/impls.py b/src/bloqade/squin/analysis/fidelity/impls.py index 394ef1347..2f67c1ac8 100644 --- a/src/bloqade/squin/analysis/fidelity/impls.py +++ b/src/bloqade/squin/analysis/fidelity/impls.py @@ -19,15 +19,11 @@ def single_qubit_pauli_channel( frame: FidelityFrame, stmt: noise.stmts.SingleQubitPauliChannel, ): - parent_stmt = frame.parent_stmt or stmt - px = interp_.get_const(parent_stmt, stmt.px) - py = interp_.get_const(parent_stmt, stmt.py) - pz = interp_.get_const(parent_stmt, stmt.pz) + px = interp_.get_const(frame, stmt, stmt.px) + py = interp_.get_const(frame, stmt, stmt.py) + pz = interp_.get_const(frame, stmt, stmt.pz) - # if any((px is None, py is None, pz is None)): - # return - - addresses = interp_.get_address(parent_stmt, stmt.qubits) + addresses = interp_.get_address(frame, stmt, stmt.qubits) assert isinstance(addresses, AddressReg) fidelity = cast(float, 1 - (px + py + pz)) # type: ignore -- NOTE: the linter doesn't understand the above if diff --git a/test/analysis/fidelity/test_fidelity.py b/test/analysis/fidelity/test_fidelity.py index 6ba9126eb..d104b0dca 100644 --- a/test/analysis/fidelity/test_fidelity.py +++ b/test/analysis/fidelity/test_fidelity.py @@ -199,15 +199,18 @@ def main_for(): ) -@squin.kernel -def main(): - q = squin.qalloc(2) - squin.h(q[0]) - squin.single_qubit_pauli_channel(0.1, 0.2, 0.3, q[0]) - squin.cx(q[0], q[1]) - +def test_basic_pauli(): + @squin.kernel + def main(): + q = squin.qalloc(2) + squin.h(q[0]) + squin.single_qubit_pauli_channel(0.1, 0.2, 0.3, q[0]) + squin.cx(q[0], q[1]) -fid_analysis = FidelityAnalysis(main.dialects) -frame, _ = fid_analysis.run(main) + fid_analysis = FidelityAnalysis(main.dialects) + frame, _ = fid_analysis.run(main) -print(frame.gate_fidelities) + assert len(frame.gate_fidelities) == 2 + assert math.isclose(frame.gate_fidelities[0][0], 0.4) + assert math.isclose(frame.gate_fidelities[0][1], 0.4) + assert frame.gate_fidelities[1] == [1.0, 1.0] From 6142600493980ea7f61de57b183b48c89b030816 Mon Sep 17 00:00:00 2001 From: David Plankensteiner Date: Tue, 25 Nov 2025 13:26:59 +0100 Subject: [PATCH 04/15] Implement depolarize and if-stmt --- src/bloqade/analysis/fidelity/impls.py | 2 ++ src/bloqade/squin/analysis/fidelity/impls.py | 21 +++++++++++++++++-- test/analysis/fidelity/test_fidelity.py | 22 +++++++++++++++++++- 3 files changed, 42 insertions(+), 3 deletions(-) diff --git a/src/bloqade/analysis/fidelity/impls.py b/src/bloqade/analysis/fidelity/impls.py index da1b7689d..3144ab45f 100644 --- a/src/bloqade/analysis/fidelity/impls.py +++ b/src/bloqade/analysis/fidelity/impls.py @@ -17,6 +17,7 @@ def if_else( then_frame, stmt, stmt.then_body, + *(interp_.lattice.bottom() for _ in range(len(stmt.args))), ) then_fids = then_frame.gate_fidelities @@ -25,6 +26,7 @@ def if_else( else_frame, stmt, stmt.else_body, + *(interp_.lattice.bottom() for _ in range(len(stmt.args))), ) else_fids = else_frame.gate_fidelities diff --git a/src/bloqade/squin/analysis/fidelity/impls.py b/src/bloqade/squin/analysis/fidelity/impls.py index 2f67c1ac8..54dc695df 100644 --- a/src/bloqade/squin/analysis/fidelity/impls.py +++ b/src/bloqade/squin/analysis/fidelity/impls.py @@ -1,4 +1,4 @@ -from typing import TypeVar, cast +from typing import TypeVar from kirin import interp @@ -26,7 +26,24 @@ def single_qubit_pauli_channel( addresses = interp_.get_address(frame, stmt, stmt.qubits) assert isinstance(addresses, AddressReg) - fidelity = cast(float, 1 - (px + py + pz)) # type: ignore -- NOTE: the linter doesn't understand the above if + fidelity = 1 - (px + py + pz) + frame.update_fidelities(fidelity, addresses) + + return () + + @interp.impl(noise.stmts.Depolarize) + def depolarize( + self, + interp_: FidelityAnalysis, + frame: FidelityFrame, + stmt: noise.stmts.Depolarize, + ): + p = interp_.get_const(frame, stmt, stmt.p) + + addresses = interp_.get_address(frame, stmt, stmt.qubits) + assert isinstance(addresses, AddressReg) + + fidelity = 1 - p frame.update_fidelities(fidelity, addresses) return () diff --git a/test/analysis/fidelity/test_fidelity.py b/test/analysis/fidelity/test_fidelity.py index d104b0dca..80f8ef1e7 100644 --- a/test/analysis/fidelity/test_fidelity.py +++ b/test/analysis/fidelity/test_fidelity.py @@ -199,7 +199,7 @@ def main_for(): ) -def test_basic_pauli(): +def test_stdlib_call(): @squin.kernel def main(): q = squin.qalloc(2) @@ -214,3 +214,23 @@ def main(): assert math.isclose(frame.gate_fidelities[0][0], 0.4) assert math.isclose(frame.gate_fidelities[0][1], 0.4) assert frame.gate_fidelities[1] == [1.0, 1.0] + + +def test_squin_if(): + + @squin.kernel + def main(): + q = squin.qalloc(2) + squin.h(q[0]) + m = squin.measure(q[0]) + + if m: + qarg = [q[0]] + squin.depolarize(0.1, qarg[0]) + else: + squin.depolarize(0.2, q[1]) + + fidelity_analysis = FidelityAnalysis(main.dialects) + frame, _ = fidelity_analysis.run(main) + + assert frame.gate_fidelities == [[0.9, 1.0], [0.8, 1.0]] From 6b08b9792fbc8bedec9d5195388bfc753194d0bd Mon Sep 17 00:00:00 2001 From: David Plankensteiner Date: Tue, 25 Nov 2025 13:54:45 +0100 Subject: [PATCH 05/15] Qubit loss --- src/bloqade/analysis/fidelity/analysis.py | 12 +++++- src/bloqade/analysis/fidelity/impls.py | 11 ++++++ src/bloqade/squin/analysis/fidelity/impls.py | 39 +++++++++++++++++++- test/analysis/fidelity/test_fidelity.py | 7 ++++ 4 files changed, 66 insertions(+), 3 deletions(-) diff --git a/src/bloqade/analysis/fidelity/analysis.py b/src/bloqade/analysis/fidelity/analysis.py index b135c63e4..cc5f646af 100644 --- a/src/bloqade/analysis/fidelity/analysis.py +++ b/src/bloqade/analysis/fidelity/analysis.py @@ -19,15 +19,24 @@ class FidelityFrame(ForwardFrame[EmptyLattice]): gate_fidelities: list[list[float]] = field(init=False) """Gate fidelities of each qubit as (min, max) pairs to provide a range""" + qubit_survival_fidelities: list[list[float]] = field(init=False) + """Qubit survival fidelity given as (min, max) pairs""" + parent_stmt: ir.Statement | None = None - def update_fidelities(self, fidelity: float, addresses: AddressReg): + def update_gate_fidelities(self, fidelity: float, addresses: AddressReg): """short-hand to update both (min, max) values""" for idx in addresses.data: self.gate_fidelities[idx][0] *= fidelity self.gate_fidelities[idx][1] *= fidelity + def update_survival_fidelities(self, survival: float, addresses: AddressReg): + """short-hand to update both (min, max) values""" + for idx in addresses.data: + self.qubit_survival_fidelities[idx][0] *= survival + self.qubit_survival_fidelities[idx][1] *= survival + @dataclass class FidelityAnalysis(ForwardExtra[FidelityFrame, EmptyLattice]): @@ -113,6 +122,7 @@ def initialize_frame( if self.n_qubits is not None: frame.gate_fidelities = [[1.0, 1.0] for _ in range(self.n_qubits)] + frame.qubit_survival_fidelities = [[1.0, 1.0] for _ in range(self.n_qubits)] return frame diff --git a/src/bloqade/analysis/fidelity/impls.py b/src/bloqade/analysis/fidelity/impls.py index 3144ab45f..2d150f9a1 100644 --- a/src/bloqade/analysis/fidelity/impls.py +++ b/src/bloqade/analysis/fidelity/impls.py @@ -20,6 +20,7 @@ def if_else( *(interp_.lattice.bottom() for _ in range(len(stmt.args))), ) then_fids = then_frame.gate_fidelities + then_survival = then_frame.qubit_survival_fidelities with interp_.new_frame(stmt, has_parent_access=True) as else_frame: interp_.frame_call_region( @@ -30,6 +31,7 @@ def if_else( ) else_fids = else_frame.gate_fidelities + else_survival = else_frame.qubit_survival_fidelities assert (n_qubits := interp_.n_qubits) is not None for i in range(n_qubits): @@ -38,6 +40,11 @@ def if_else( frame.gate_fidelities[i][0] *= min_fid frame.gate_fidelities[i][1] *= max_fid + min_survival = min(then_survival[i][0], else_survival[i][0]) + max_survival = max(then_survival[i][1], else_survival[i][1]) + frame.qubit_survival_fidelities[i][0] *= min_survival + frame.qubit_survival_fidelities[i][1] *= max_survival + @func.dialect.register(key="circuit.fidelity") class __FuncMethods(interp.MethodTable): @@ -79,4 +86,8 @@ def invoke_( frame.gate_fidelities[i][0] *= fid0 frame.gate_fidelities[i][1] *= fid1 + for i, (fid0, fid1) in enumerate(body_frame.qubit_survival_fidelities): + frame.qubit_survival_fidelities[i][0] *= fid0 + frame.qubit_survival_fidelities[i][1] *= fid1 + return (ret,) diff --git a/src/bloqade/squin/analysis/fidelity/impls.py b/src/bloqade/squin/analysis/fidelity/impls.py index 54dc695df..77ed2aa0b 100644 --- a/src/bloqade/squin/analysis/fidelity/impls.py +++ b/src/bloqade/squin/analysis/fidelity/impls.py @@ -27,10 +27,20 @@ def single_qubit_pauli_channel( assert isinstance(addresses, AddressReg) fidelity = 1 - (px + py + pz) - frame.update_fidelities(fidelity, addresses) + frame.update_gate_fidelities(fidelity, addresses) return () + @interp.impl(noise.stmts.TwoQubitPauliChannel) + def two_qubit_pauli_channel( + self, + interp_: FidelityAnalysis, + frame: FidelityFrame, + stmt: noise.stmts.TwoQubitPauliChannel, + ): + stmt.probabilities + raise NotImplementedError("TODO") + @interp.impl(noise.stmts.Depolarize) def depolarize( self, @@ -44,6 +54,31 @@ def depolarize( assert isinstance(addresses, AddressReg) fidelity = 1 - p - frame.update_fidelities(fidelity, addresses) + frame.update_gate_fidelities(fidelity, addresses) return () + + @interp.impl(noise.stmts.Depolarize2) + def depolarize2( + self, + interp_: FidelityAnalysis, + frame: FidelityFrame, + stmt: noise.stmts.Depolarize2, + ): + stmt.p + raise NotImplementedError("TODO") + + @interp.impl(noise.stmts.QubitLoss) + def qubit_loss( + self, + interp_: FidelityAnalysis, + frame: FidelityFrame, + stmt: noise.stmts.QubitLoss, + ): + p = interp_.get_const(frame, stmt, stmt.p) + survival = 1 - p + + addresses = interp_.get_address(frame, stmt, stmt.qubits) + assert isinstance(addresses, AddressReg) + + frame.update_survival_fidelities(survival, addresses) diff --git a/test/analysis/fidelity/test_fidelity.py b/test/analysis/fidelity/test_fidelity.py index 80f8ef1e7..78b3f117f 100644 --- a/test/analysis/fidelity/test_fidelity.py +++ b/test/analysis/fidelity/test_fidelity.py @@ -206,6 +206,7 @@ def main(): squin.h(q[0]) squin.single_qubit_pauli_channel(0.1, 0.2, 0.3, q[0]) squin.cx(q[0], q[1]) + squin.qubit_loss(0.1, q[1]) fid_analysis = FidelityAnalysis(main.dialects) frame, _ = fid_analysis.run(main) @@ -215,6 +216,8 @@ def main(): assert math.isclose(frame.gate_fidelities[0][1], 0.4) assert frame.gate_fidelities[1] == [1.0, 1.0] + assert frame.qubit_survival_fidelities == [[1.0, 1.0], [0.9, 0.9]] + def test_squin_if(): @@ -227,10 +230,14 @@ def main(): if m: qarg = [q[0]] squin.depolarize(0.1, qarg[0]) + squin.qubit_loss(0.25, q[1]) else: squin.depolarize(0.2, q[1]) + squin.qubit_loss(0.15, q[0]) fidelity_analysis = FidelityAnalysis(main.dialects) frame, _ = fidelity_analysis.run(main) assert frame.gate_fidelities == [[0.9, 1.0], [0.8, 1.0]] + + assert frame.qubit_survival_fidelities == [[0.85, 1.0], [0.75, 1.0]] From c440be8efc0194b6ea4777ba0070c152bf4d5394 Mon Sep 17 00:00:00 2001 From: David Plankensteiner Date: Tue, 25 Nov 2025 15:49:41 +0100 Subject: [PATCH 06/15] WIP on for loop --- src/bloqade/analysis/fidelity/impls.py | 44 +++++++++++++++++++++++++ test/analysis/fidelity/test_fidelity.py | 16 +++++++++ 2 files changed, 60 insertions(+) diff --git a/src/bloqade/analysis/fidelity/impls.py b/src/bloqade/analysis/fidelity/impls.py index 2d150f9a1..8d282bc00 100644 --- a/src/bloqade/analysis/fidelity/impls.py +++ b/src/bloqade/analysis/fidelity/impls.py @@ -45,9 +45,48 @@ def if_else( frame.qubit_survival_fidelities[i][0] *= min_survival frame.qubit_survival_fidelities[i][1] *= max_survival + # TODO: re-use address analysis? + @interp.impl(scf.For) + def for_loop(self, interp_: FidelityAnalysis, frame: FidelityFrame, stmt: scf.For): + loop_vars_addr = interp_.get_addresses(frame, stmt, stmt.initializers) + loop_vars = tuple(interp_.lattice.bottom() for _ in range(len(stmt.results))) + + iter_type, iterable = interp_.addr_analysis.unpack_iterable(interp_.get_address(frame, stmt, stmt.iterable)) + + if iter_type is None: + return interp_.eval_fallback(frame, stmt) + + for value in iterable: + with interp_.addr_analysis.new_frame(stmt) as body_addr_frame: + loop_vars_addr = interp_.addr_analysis.frame_call_region( + body_addr_frame, stmt, stmt.body, value, *loop_vars_addr + ) + + with interp_.new_frame(stmt, has_parent_access=True) as body_frame: + body_frame.parent_stmt = stmt + interp_.store_addresses(body_frame, stmt, body_addr_frame.entries) + + for (arg, input) in zip(stmt.args, (stmt.iterable, *stmt.initializers)): + # NOTE: assign address of input to blockargument + addr = interp_.get_address(frame, stmt, input) + interp_.store_addresses(body_frame, stmt, {arg: addr}) + + interp_.frame_call_region( + body_frame, stmt, stmt.body, interp_.lattice.bottom(), *loop_vars + ) + + if loop_vars_addr is None: + loop_vars_addr = () + elif isinstance(loop_vars_addr, interp.ReturnValue): + break + + return loop_vars + @func.dialect.register(key="circuit.fidelity") class __FuncMethods(interp.MethodTable): + # TODO: re-use address analysis method table + # and re-use the address lattice so we just get addresses in the frame @interp.impl(func.Invoke) def invoke_( self, interp_: FidelityAnalysis, frame: FidelityFrame, stmt: func.Invoke @@ -60,6 +99,11 @@ def invoke_( ) interp_.store_addresses(frame, stmt, addr_frame.entries) + # TODO: + # * Store address analysis results for the body on the body_frame + # * Try to run address analysis in parallel with fidelity analysis (in order!) + # * Maybe re-use the address analysis lattice + with interp_.new_frame(stmt.callee.code, has_parent_access=True) as body_frame: body_frame.parent_stmt = stmt diff --git a/test/analysis/fidelity/test_fidelity.py b/test/analysis/fidelity/test_fidelity.py index 78b3f117f..9789d3f54 100644 --- a/test/analysis/fidelity/test_fidelity.py +++ b/test/analysis/fidelity/test_fidelity.py @@ -241,3 +241,19 @@ def main(): assert frame.gate_fidelities == [[0.9, 1.0], [0.8, 1.0]] assert frame.qubit_survival_fidelities == [[0.85, 1.0], [0.75, 1.0]] + + +def test_squin_for(): + @squin.kernel + def main(): + q = squin.qalloc(4) + + for i in range(4): + squin.depolarize(0.01 * i, q[i]) + + fidelity_analysis = FidelityAnalysis(main.dialects) + frame, _ = fidelity_analysis.run(main) + + print(frame.gate_fidelities) + +test_squin_for() \ No newline at end of file From a731b1d6f2157adf3eeee0c7fe43c6882e7163d6 Mon Sep 17 00:00:00 2001 From: David Plankensteiner Date: Wed, 26 Nov 2025 11:00:06 +0100 Subject: [PATCH 07/15] WIP --- src/bloqade/analysis/fidelity/analysis.py | 111 ++++------ src/bloqade/analysis/fidelity/impls.py | 208 +++++++++++-------- src/bloqade/squin/analysis/fidelity/impls.py | 6 +- test/analysis/fidelity/test_fidelity.py | 4 +- 4 files changed, 164 insertions(+), 165 deletions(-) diff --git a/src/bloqade/analysis/fidelity/analysis.py b/src/bloqade/analysis/fidelity/analysis.py index cc5f646af..24c2e4eea 100644 --- a/src/bloqade/analysis/fidelity/analysis.py +++ b/src/bloqade/analysis/fidelity/analysis.py @@ -3,43 +3,53 @@ from kirin import ir from kirin.interp import InterpreterError -from kirin.lattice import EmptyLattice from kirin.analysis import ForwardExtra, const from kirin.analysis.forward import ForwardFrame from ..address import Address, AddressReg, ConstResult, AddressAnalysis -def init_nested_dict(): - return dict(dict()) - @dataclass -class FidelityFrame(ForwardFrame[EmptyLattice]): - gate_fidelities: list[list[float]] = field(init=False) +class FidelityFrame(ForwardFrame[Address]): + gate_fidelities: list[list[float]] = field(init=False, default_factory=list) """Gate fidelities of each qubit as (min, max) pairs to provide a range""" - qubit_survival_fidelities: list[list[float]] = field(init=False) + qubit_survival_fidelities: list[list[float]] = field(init=False, default_factory=list) """Qubit survival fidelity given as (min, max) pairs""" parent_stmt: ir.Statement | None = None - def update_gate_fidelities(self, fidelity: float, addresses: AddressReg): + def extend_fidelity_lengths(self, n_qubits: int): + """make sure there are at least n_qubits fidelity pairs""" + + self.gate_fidelities.extend([[1.0, 1.0] for _ in range(n_qubits - len(self.gate_fidelities))]) + self.qubit_survival_fidelities.extend([[1.0, 1.0] for _ in range(n_qubits - len(self.qubit_survival_fidelities))]) + + def update_gate_fidelities(self, n_qubits: int, fidelity: float, addresses: AddressReg): """short-hand to update both (min, max) values""" + self.extend_fidelity_lengths(n_qubits) + + print(n_qubits) + print(self.gate_fidelities) + for idx in addresses.data: self.gate_fidelities[idx][0] *= fidelity self.gate_fidelities[idx][1] *= fidelity - def update_survival_fidelities(self, survival: float, addresses: AddressReg): + def update_survival_fidelities(self, n_qubits: int, survival: float, addresses: AddressReg): """short-hand to update both (min, max) values""" + + self.extend_fidelity_lengths(n_qubits) + for idx in addresses.data: self.qubit_survival_fidelities[idx][0] *= survival self.qubit_survival_fidelities[idx][1] *= survival @dataclass -class FidelityAnalysis(ForwardExtra[FidelityFrame, EmptyLattice]): +class FidelityAnalysis(AddressAnalysis): """ This analysis pass can be used to track the global addresses of qubits and wires. @@ -69,8 +79,8 @@ def main(): ``` """ - keys = ["circuit.fidelity"] - lattice = EmptyLattice + keys = ("circuit.fidelity", "qubit.address") + lattice = Address gate_fidelity: float = 1.0 """ @@ -101,11 +111,6 @@ def main(): addr_frame: ForwardFrame[Address] | None = None addr_analysis: AddressAnalysis = field(init=False) - collected_addresses: dict[ir.Statement, dict[ir.SSAValue, Address]] = field( - default_factory=init_nested_dict - ) - - n_qubits: int | None = None # def initialize(self): # super().initialize() @@ -120,57 +125,42 @@ def initialize_frame( ) -> FidelityFrame: frame = FidelityFrame(node, has_parent_access=has_parent_access) - if self.n_qubits is not None: - frame.gate_fidelities = [[1.0, 1.0] for _ in range(self.n_qubits)] - frame.qubit_survival_fidelities = [[1.0, 1.0] for _ in range(self.n_qubits)] + # if self.n_qubits is not None: + # frame.gate_fidelities = [[1.0, 1.0] for _ in range(self.n_qubits)] + # frame.qubit_survival_fidelities = [[1.0, 1.0] for _ in range(self.n_qubits)] return frame - def eval_fallback(self, frame: FidelityFrame, node: ir.Statement): - # NOTE: default is to conserve fidelity, so do nothing here - return tuple(self.lattice.bottom() for _ in range(len(node.results))) + # def eval_fallback(self, frame: FidelityFrame, node: ir.Statement): + # # NOTE: default is to conserve fidelity, so do nothing here + # return tuple(self.lattice.bottom() for _ in range(len(node.results))) - def run( - self, method: ir.Method, *args, **kwargs - ) -> tuple[FidelityFrame, EmptyLattice]: - self._run_address_analysis(method) + # def run( + # self, method: ir.Method, *args, **kwargs + # ) -> tuple[FidelityFrame, Address]: + # self._run_address_analysis(method) - assert self.n_qubits is not None + # assert self.n_qubits is not None - return super().run(method, *args, **kwargs) + # return super().run(method, *args, **kwargs) - def _run_address_analysis(self, method: ir.Method): - self.addr_analysis = AddressAnalysis(self.dialects) - addr_frame, _ = self.addr_analysis.run(method=method) - self.addr_frame = addr_frame + # def _run_address_analysis(self, method: ir.Method): + # self.addr_analysis = AddressAnalysis(self.dialects) + # addr_frame, _ = self.addr_analysis.run(method=method) + # self.addr_frame = addr_frame - self.n_qubits = self.addr_analysis.qubit_count + # # self.n_qubits = self.addr_analysis.qubit_count - return addr_frame + # return addr_frame # NOTE: make sure we have as many probabilities as we have addresses # self.atom_survival_probability = [1.0] * addr_analysis.qubit_count - def method_self(self, method: ir.Method) -> EmptyLattice: + def method_self(self, method: ir.Method) -> Address: return self.lattice.bottom() def _get_address(self, stmt: ir.Statement, key: ir.SSAValue): - addr = None - - if self.addr_frame is not None: - addr = self.addr_frame.entries.get(key) - - if addr is not None: - return addr - - collected_addr = self.collected_addresses.get(stmt) - if collected_addr is not None: - addr = collected_addr.get(key) - - if addr is None: - raise InterpreterError(f"Address of {key} at statement {stmt} not found!") - - return addr + return self.state.current_frame.get(key) def get_address(self, frame: FidelityFrame, stmt: ir.Statement, key: ir.SSAValue): parent_stmt = frame.parent_stmt or stmt @@ -182,23 +172,6 @@ def get_addresses( parent_stmt = frame.parent_stmt or stmt return tuple(self._get_address(parent_stmt, key) for key in keys) - def store_addresses( - self, - frame: FidelityFrame, - stmt: ir.Statement, - addresses: dict[ir.SSAValue, Address], - ): - parent_stmt = frame.parent_stmt or stmt - self._store_addresses(parent_stmt, addresses) - - def _store_addresses( - self, stmt: ir.Statement, addresses: dict[ir.SSAValue, Address] - ): - if stmt in self.collected_addresses: - self.collected_addresses[stmt].update(addresses) - else: - self.collected_addresses[stmt] = addresses - def get_const(self, frame: FidelityFrame, stmt: ir.Statement, key: ir.SSAValue): parent_stmt = frame.parent_stmt or stmt diff --git a/src/bloqade/analysis/fidelity/impls.py b/src/bloqade/analysis/fidelity/impls.py index 8d282bc00..db806d06b 100644 --- a/src/bloqade/analysis/fidelity/impls.py +++ b/src/bloqade/analysis/fidelity/impls.py @@ -1,6 +1,8 @@ from kirin import interp from kirin.dialects import scf, func +from bloqade.analysis.address.impls import Func as AddressFuncMethods + from .analysis import FidelityFrame, FidelityAnalysis @@ -33,8 +35,8 @@ def if_else( else_fids = else_frame.gate_fidelities else_survival = else_frame.qubit_survival_fidelities - assert (n_qubits := interp_.n_qubits) is not None - for i in range(n_qubits): + frame.extend_fidelity_lengths(interp_.qubit_count) + for i in range(interp_.qubit_count): min_fid = min(then_fids[i][0], else_fids[i][0]) max_fid = max(then_fids[i][1], else_fids[i][1]) frame.gate_fidelities[i][0] *= min_fid @@ -45,93 +47,115 @@ def if_else( frame.qubit_survival_fidelities[i][0] *= min_survival frame.qubit_survival_fidelities[i][1] *= max_survival - # TODO: re-use address analysis? - @interp.impl(scf.For) - def for_loop(self, interp_: FidelityAnalysis, frame: FidelityFrame, stmt: scf.For): - loop_vars_addr = interp_.get_addresses(frame, stmt, stmt.initializers) - loop_vars = tuple(interp_.lattice.bottom() for _ in range(len(stmt.results))) - - iter_type, iterable = interp_.addr_analysis.unpack_iterable(interp_.get_address(frame, stmt, stmt.iterable)) - - if iter_type is None: - return interp_.eval_fallback(frame, stmt) - - for value in iterable: - with interp_.addr_analysis.new_frame(stmt) as body_addr_frame: - loop_vars_addr = interp_.addr_analysis.frame_call_region( - body_addr_frame, stmt, stmt.body, value, *loop_vars_addr - ) - - with interp_.new_frame(stmt, has_parent_access=True) as body_frame: - body_frame.parent_stmt = stmt - interp_.store_addresses(body_frame, stmt, body_addr_frame.entries) - - for (arg, input) in zip(stmt.args, (stmt.iterable, *stmt.initializers)): - # NOTE: assign address of input to blockargument - addr = interp_.get_address(frame, stmt, input) - interp_.store_addresses(body_frame, stmt, {arg: addr}) - - interp_.frame_call_region( - body_frame, stmt, stmt.body, interp_.lattice.bottom(), *loop_vars - ) - - if loop_vars_addr is None: - loop_vars_addr = () - elif isinstance(loop_vars_addr, interp.ReturnValue): - break - - return loop_vars - - -@func.dialect.register(key="circuit.fidelity") -class __FuncMethods(interp.MethodTable): - # TODO: re-use address analysis method table - # and re-use the address lattice so we just get addresses in the frame - @interp.impl(func.Invoke) - def invoke_( - self, interp_: FidelityAnalysis, frame: FidelityFrame, stmt: func.Invoke - ): - # NOTE: re-run address analysis on the invoke body so we have the addresses of the values inside the body, not just the return - addr_frame, _ = interp_.addr_analysis.call( - stmt.callee, - interp_.addr_analysis.method_self(stmt.callee), - *interp_.get_addresses(frame, stmt, stmt.inputs), - ) - interp_.store_addresses(frame, stmt, addr_frame.entries) - - # TODO: - # * Store address analysis results for the body on the body_frame - # * Try to run address analysis in parallel with fidelity analysis (in order!) - # * Maybe re-use the address analysis lattice - - with interp_.new_frame(stmt.callee.code, has_parent_access=True) as body_frame: - body_frame.parent_stmt = stmt - - for arg, input in zip( - stmt.callee.callable_region.blocks[0].args[ - 1: - ], # NOTE: skip method_self - stmt.inputs, - ): - - # NOTE: assign address of input to blockargument - addr = interp_.get_address(frame, stmt, input) - interp_.store_addresses(body_frame, stmt, {arg: addr}) - - # NOTE: actually call the invoke to evaluate fidelity - ret = interp_.frame_call( - body_frame, - stmt.callee.code, - interp_.method_self(stmt.callee), - *frame.get_values(stmt.inputs), - ) - - for i, (fid0, fid1) in enumerate(body_frame.gate_fidelities): - frame.gate_fidelities[i][0] *= fid0 - frame.gate_fidelities[i][1] *= fid1 - - for i, (fid0, fid1) in enumerate(body_frame.qubit_survival_fidelities): - frame.qubit_survival_fidelities[i][0] *= fid0 - frame.qubit_survival_fidelities[i][1] *= fid1 - - return (ret,) + # # TODO: re-use address analysis? + # @interp.impl(scf.For) + # def for_loop(self, interp_: FidelityAnalysis, frame: FidelityFrame, stmt: scf.For): + # loop_vars_addr = interp_.get_addresses(frame, stmt, stmt.initializers) + # loop_vars = tuple(interp_.lattice.bottom() for _ in range(len(stmt.results))) + + # iter_type, iterable = interp_.addr_analysis.unpack_iterable(interp_.get_address(frame, stmt, stmt.iterable)) + + # if iter_type is None: + # return interp_.eval_fallback(frame, stmt) + + # for value in iterable: + # with interp_.addr_analysis.new_frame(stmt) as body_addr_frame: + # loop_vars_addr = interp_.addr_analysis.frame_call_region( + # body_addr_frame, stmt, stmt.body, value, *loop_vars_addr + # ) + + # with interp_.new_frame(stmt, has_parent_access=True) as body_frame: + # body_frame.parent_stmt = stmt + # interp_.store_addresses(body_frame, stmt, body_addr_frame.entries) + + # for (arg, input) in zip(stmt.args, (stmt.iterable, *stmt.initializers)): + # # NOTE: assign address of input to blockargument + # addr = interp_.get_address(frame, stmt, input) + # interp_.store_addresses(body_frame, stmt, {arg: addr}) + + # interp_.frame_call_region( + # body_frame, stmt, stmt.body, interp_.lattice.bottom(), *loop_vars + # ) + + # if loop_vars_addr is None: + # loop_vars_addr = () + # elif isinstance(loop_vars_addr, interp.ReturnValue): + # break + + # return loop_vars + + +# @func.dialect.register(key="circuit.fidelity") +# class __FuncMethods(interp.MethodTable): + +# @interp.impl(func.Invoke) +# def invoke( +# self, +# interp_: FidelityAnalysis, +# frame: FidelityFrame, +# stmt: func.Invoke, +# ): + +# addr_frame, ret = interp_.call( +# stmt.callee.code, +# interp_.method_self(stmt.callee), +# *frame.get_values(stmt.inputs), +# ) + +# frame.update_gate_fidelities + + +# return (ret,) + +# @func.dialect.register(key="circuit.fidelity") +# class __FuncMethods(interp.MethodTable): +# # TODO: re-use address analysis method table +# # and re-use the address lattice so we just get addresses in the frame +# @interp.impl(func.Invoke) +# def invoke_( +# self, interp_: FidelityAnalysis, frame: FidelityFrame, stmt: func.Invoke +# ): +# # NOTE: re-run address analysis on the invoke body so we have the addresses of the values inside the body, not just the return +# addr_frame, _ = interp_.addr_analysis.call( +# stmt.callee, +# interp_.addr_analysis.method_self(stmt.callee), +# *interp_.get_addresses(frame, stmt, stmt.inputs), +# ) +# interp_.store_addresses(frame, stmt, addr_frame.entries) + +# # TODO: +# # * Store address analysis results for the body on the body_frame +# # * Try to run address analysis in parallel with fidelity analysis (in order!) +# # * Maybe re-use the address analysis lattice + +# with interp_.new_frame(stmt.callee.code, has_parent_access=True) as body_frame: +# body_frame.parent_stmt = stmt + +# for arg, input in zip( +# stmt.callee.callable_region.blocks[0].args[ +# 1: +# ], # NOTE: skip method_self +# stmt.inputs, +# ): + +# # NOTE: assign address of input to blockargument +# addr = interp_.get_address(frame, stmt, input) +# interp_.store_addresses(body_frame, stmt, {arg: addr}) + +# # NOTE: actually call the invoke to evaluate fidelity +# ret = interp_.frame_call( +# body_frame, +# stmt.callee.code, +# interp_.method_self(stmt.callee), +# *frame.get_values(stmt.inputs), +# ) + +# for i, (fid0, fid1) in enumerate(body_frame.gate_fidelities): +# frame.gate_fidelities[i][0] *= fid0 +# frame.gate_fidelities[i][1] *= fid1 + +# for i, (fid0, fid1) in enumerate(body_frame.qubit_survival_fidelities): +# frame.qubit_survival_fidelities[i][0] *= fid0 +# frame.qubit_survival_fidelities[i][1] *= fid1 + +# return (ret,) diff --git a/src/bloqade/squin/analysis/fidelity/impls.py b/src/bloqade/squin/analysis/fidelity/impls.py index 77ed2aa0b..643c478e2 100644 --- a/src/bloqade/squin/analysis/fidelity/impls.py +++ b/src/bloqade/squin/analysis/fidelity/impls.py @@ -27,7 +27,7 @@ def single_qubit_pauli_channel( assert isinstance(addresses, AddressReg) fidelity = 1 - (px + py + pz) - frame.update_gate_fidelities(fidelity, addresses) + frame.update_gate_fidelities(interp_.qubit_count, fidelity, addresses) return () @@ -54,7 +54,7 @@ def depolarize( assert isinstance(addresses, AddressReg) fidelity = 1 - p - frame.update_gate_fidelities(fidelity, addresses) + frame.update_gate_fidelities(interp_.qubit_count, fidelity, addresses) return () @@ -81,4 +81,4 @@ def qubit_loss( addresses = interp_.get_address(frame, stmt, stmt.qubits) assert isinstance(addresses, AddressReg) - frame.update_survival_fidelities(survival, addresses) + frame.update_survival_fidelities(interp_.qubit_count, survival, addresses) diff --git a/test/analysis/fidelity/test_fidelity.py b/test/analysis/fidelity/test_fidelity.py index 9789d3f54..171f7826f 100644 --- a/test/analysis/fidelity/test_fidelity.py +++ b/test/analysis/fidelity/test_fidelity.py @@ -211,6 +211,8 @@ def main(): fid_analysis = FidelityAnalysis(main.dialects) frame, _ = fid_analysis.run(main) + print(frame.gate_fidelities) + assert len(frame.gate_fidelities) == 2 assert math.isclose(frame.gate_fidelities[0][0], 0.4) assert math.isclose(frame.gate_fidelities[0][1], 0.4) @@ -256,4 +258,4 @@ def main(): print(frame.gate_fidelities) -test_squin_for() \ No newline at end of file +test_stdlib_call() \ No newline at end of file From f69d4b19d6ff4bdddf1098a5dc4a79ea70414f6e Mon Sep 17 00:00:00 2001 From: David Plankensteiner Date: Wed, 26 Nov 2025 12:42:29 +0100 Subject: [PATCH 08/15] Implement SCF for fidelity analysis re-using AddressAnalysis --- src/bloqade/analysis/address/analysis.py | 13 +- src/bloqade/analysis/fidelity/__init__.py | 5 +- src/bloqade/analysis/fidelity/analysis.py | 179 +++++------------- src/bloqade/analysis/fidelity/impls.py | 187 ++++++------------- src/bloqade/squin/analysis/fidelity/impls.py | 27 +-- test/analysis/fidelity/test_fidelity.py | 23 ++- 6 files changed, 140 insertions(+), 294 deletions(-) diff --git a/src/bloqade/analysis/address/analysis.py b/src/bloqade/analysis/address/analysis.py index 921f4b3b5..d35d53a57 100644 --- a/src/bloqade/analysis/address/analysis.py +++ b/src/bloqade/analysis/address/analysis.py @@ -18,11 +18,20 @@ class AddressAnalysis(Forward[Address]): keys = ("qubit.address",) _const_prop: const.Propagate lattice = Address - next_address: int = field(init=False) + _next_address: int = field(init=False) + + # NOTE: the following are properties so we can hook into the setter in FidelityAnalysis + @property + def next_address(self) -> int: + return self._next_address + + @next_address.setter + def next_address(self, value: int): + self._next_address = value def initialize(self): super().initialize() - self.next_address: int = 0 + self.next_address = 0 self._const_prop = const.Propagate(self.dialects) self._const_prop.initialize() return self diff --git a/src/bloqade/analysis/fidelity/__init__.py b/src/bloqade/analysis/fidelity/__init__.py index 1d9f76450..5e57cf291 100644 --- a/src/bloqade/analysis/fidelity/__init__.py +++ b/src/bloqade/analysis/fidelity/__init__.py @@ -1,5 +1,2 @@ from . import impls as impls -from .analysis import ( - FidelityFrame as FidelityFrame, - FidelityAnalysis as FidelityAnalysis, -) +from .analysis import FidelityAnalysis as FidelityAnalysis diff --git a/src/bloqade/analysis/fidelity/analysis.py b/src/bloqade/analysis/fidelity/analysis.py index 24c2e4eea..dab8e55d8 100644 --- a/src/bloqade/analysis/fidelity/analysis.py +++ b/src/bloqade/analysis/fidelity/analysis.py @@ -1,53 +1,12 @@ -from typing import Sequence from dataclasses import field, dataclass from kirin import ir -from kirin.interp import InterpreterError -from kirin.analysis import ForwardExtra, const +from kirin.analysis import const from kirin.analysis.forward import ForwardFrame from ..address import Address, AddressReg, ConstResult, AddressAnalysis - -@dataclass -class FidelityFrame(ForwardFrame[Address]): - gate_fidelities: list[list[float]] = field(init=False, default_factory=list) - """Gate fidelities of each qubit as (min, max) pairs to provide a range""" - - qubit_survival_fidelities: list[list[float]] = field(init=False, default_factory=list) - """Qubit survival fidelity given as (min, max) pairs""" - - parent_stmt: ir.Statement | None = None - - def extend_fidelity_lengths(self, n_qubits: int): - """make sure there are at least n_qubits fidelity pairs""" - - self.gate_fidelities.extend([[1.0, 1.0] for _ in range(n_qubits - len(self.gate_fidelities))]) - self.qubit_survival_fidelities.extend([[1.0, 1.0] for _ in range(n_qubits - len(self.qubit_survival_fidelities))]) - - def update_gate_fidelities(self, n_qubits: int, fidelity: float, addresses: AddressReg): - """short-hand to update both (min, max) values""" - - self.extend_fidelity_lengths(n_qubits) - - print(n_qubits) - print(self.gate_fidelities) - - for idx in addresses.data: - self.gate_fidelities[idx][0] *= fidelity - self.gate_fidelities[idx][1] *= fidelity - - def update_survival_fidelities(self, n_qubits: int, survival: float, addresses: AddressReg): - """short-hand to update both (min, max) values""" - - self.extend_fidelity_lengths(n_qubits) - - for idx in addresses.data: - self.qubit_survival_fidelities[idx][0] *= survival - self.qubit_survival_fidelities[idx][1] *= survival - - @dataclass class FidelityAnalysis(AddressAnalysis): """ @@ -82,103 +41,61 @@ def main(): keys = ("circuit.fidelity", "qubit.address") lattice = Address - gate_fidelity: float = 1.0 - """ - The fidelity of the gate set described by the analysed program. It reduces whenever a noise channel is encountered. - """ - - atom_survival_probability: list[float] = field(init=False) - """ - The probabilities that each of the atoms in the register survive the duration of the analysed program. The order of the list follows the order they are in the register. - """ - - default_probabilities: dict[str, tuple[float, ...]] = field(default_factory=dict) - """Default probabilities for noise statements where the probabilities are runtime values. The key must be equal to `stmt.name` and the number of values must match the probabilities. - - Example: - - ```python - from bloqade import squin - - @squin.kernel - def main(): - ... - - analysis = FidelityAnalysis(main.dialects, default_probabilities = {'single_qubit_pauli_channel': [1e-4, 1e-4, 1e-4]}) - ``` - """ - - addr_frame: ForwardFrame[Address] | None = None - addr_analysis: AddressAnalysis = field(init=False) - - - # def initialize(self): - # super().initialize() - # self._current_gate_fidelity = 1.0 - # self._current_atom_survival_probability = [ - # 1.0 for _ in range(len(self.atom_survival_probability)) - # ] - # return self - - def initialize_frame( - self, node: ir.Statement, *, has_parent_access: bool = False - ) -> FidelityFrame: - frame = FidelityFrame(node, has_parent_access=has_parent_access) - - # if self.n_qubits is not None: - # frame.gate_fidelities = [[1.0, 1.0] for _ in range(self.n_qubits)] - # frame.qubit_survival_fidelities = [[1.0, 1.0] for _ in range(self.n_qubits)] - - return frame - - # def eval_fallback(self, frame: FidelityFrame, node: ir.Statement): - # # NOTE: default is to conserve fidelity, so do nothing here - # return tuple(self.lattice.bottom() for _ in range(len(node.results))) - - # def run( - # self, method: ir.Method, *args, **kwargs - # ) -> tuple[FidelityFrame, Address]: - # self._run_address_analysis(method) - - # assert self.n_qubits is not None - - # return super().run(method, *args, **kwargs) - - # def _run_address_analysis(self, method: ir.Method): - # self.addr_analysis = AddressAnalysis(self.dialects) - # addr_frame, _ = self.addr_analysis.run(method=method) - # self.addr_frame = addr_frame + gate_fidelities: list[list[float]] = field(init=False, default_factory=list) + """Gate fidelities of each qubit as (min, max) pairs to provide a range""" - # # self.n_qubits = self.addr_analysis.qubit_count + qubit_survival_fidelities: list[list[float]] = field( + init=False, default_factory=list + ) + """Qubit survival fidelity given as (min, max) pairs""" - # return addr_frame + @property + def next_address(self) -> int: + return self._next_address + + @next_address.setter + def next_address(self, value: int): + # NOTE: hook into setter to make sure we always have fidelities of the correct length + self._next_address = value + self.extend_fidelities() + + def extend_fidelities(self): + n = self.qubit_count + self.gate_fidelities.extend( + [[1.0, 1.0] for _ in range(n - len(self.gate_fidelities))] + ) + self.qubit_survival_fidelities.extend( + [[1.0, 1.0] for _ in range(n - len(self.qubit_survival_fidelities))] + ) + + def reset_fidelities(self): + self.gate_fidelities = [[1.0, 1.0] for _ in range(self.qubit_count)] + self.qubit_survival_fidelities = [[1.0, 1.0] for _ in range(self.qubit_count)] + + def update_gate_fidelities(self, fidelity: float, addresses: AddressReg): + """short-hand to update both (min, max) values""" - # NOTE: make sure we have as many probabilities as we have addresses - # self.atom_survival_probability = [1.0] * addr_analysis.qubit_count + for idx in addresses.data: + self.gate_fidelities[idx][0] *= fidelity + self.gate_fidelities[idx][1] *= fidelity - def method_self(self, method: ir.Method) -> Address: - return self.lattice.bottom() + def update_survival_fidelities(self, survival: float, addresses: AddressReg): + """short-hand to update both (min, max) values""" - def _get_address(self, stmt: ir.Statement, key: ir.SSAValue): - return self.state.current_frame.get(key) + for idx in addresses.data: + self.qubit_survival_fidelities[idx][0] *= survival + self.qubit_survival_fidelities[idx][1] *= survival - def get_address(self, frame: FidelityFrame, stmt: ir.Statement, key: ir.SSAValue): - parent_stmt = frame.parent_stmt or stmt - return self._get_address(parent_stmt, key) + def initialize(self): + super().initialize() + self.gate_fidelities = [] + self.qubit_survival_fidelities = [] + return self - def get_addresses( - self, frame: FidelityFrame, stmt: ir.Statement, keys: Sequence[ir.SSAValue] + def get_const( + self, frame: ForwardFrame[Address], stmt: ir.Statement, key: ir.SSAValue ): - parent_stmt = frame.parent_stmt or stmt - return tuple(self._get_address(parent_stmt, key) for key in keys) - - def get_const(self, frame: FidelityFrame, stmt: ir.Statement, key: ir.SSAValue): - parent_stmt = frame.parent_stmt or stmt - - # NOTE: we rely on the address analysis to fetch constants and re-use the corresponding lattice element - addr = self._get_address(parent_stmt, key) - + addr = frame.get(key) assert isinstance(addr, ConstResult) assert isinstance(result := addr.result, const.Value) - return result.data diff --git a/src/bloqade/analysis/fidelity/impls.py b/src/bloqade/analysis/fidelity/impls.py index db806d06b..22e1e5a6f 100644 --- a/src/bloqade/analysis/fidelity/impls.py +++ b/src/bloqade/analysis/fidelity/impls.py @@ -1,30 +1,42 @@ from kirin import interp -from kirin.dialects import scf, func +from kirin.analysis import ForwardFrame +from kirin.dialects import scf -from bloqade.analysis.address.impls import Func as AddressFuncMethods +from bloqade.analysis.address import Address -from .analysis import FidelityFrame, FidelityAnalysis +from .analysis import FidelityAnalysis @scf.dialect.register(key="circuit.fidelity") class __ScfMethods(interp.MethodTable): @interp.impl(scf.IfElse) def if_else( - self, interp_: FidelityAnalysis, frame: FidelityFrame, stmt: scf.IfElse + self, interp_: FidelityAnalysis, frame: ForwardFrame[Address], stmt: scf.IfElse ): + + # NOTE: store a copy of the fidelities + current_gate_fidelities = interp_.gate_fidelities + current_survival_fidelities = interp_.qubit_survival_fidelities + # TODO: check if the condition is constant and fix the branch in that case # run both branches with interp_.new_frame(stmt, has_parent_access=True) as then_frame: + # NOTE: reset fidelities before stepping into the then-body + interp_.reset_fidelities() + interp_.frame_call_region( then_frame, stmt, stmt.then_body, *(interp_.lattice.bottom() for _ in range(len(stmt.args))), ) - then_fids = then_frame.gate_fidelities - then_survival = then_frame.qubit_survival_fidelities + then_fids = interp_.gate_fidelities + then_survival = interp_.qubit_survival_fidelities with interp_.new_frame(stmt, has_parent_access=True) as else_frame: + # NOTE: reset again before stepping into else-body + interp_.reset_fidelities() + interp_.frame_call_region( else_frame, stmt, @@ -32,130 +44,41 @@ def if_else( *(interp_.lattice.bottom() for _ in range(len(stmt.args))), ) - else_fids = else_frame.gate_fidelities - else_survival = else_frame.qubit_survival_fidelities - - frame.extend_fidelity_lengths(interp_.qubit_count) - for i in range(interp_.qubit_count): - min_fid = min(then_fids[i][0], else_fids[i][0]) - max_fid = max(then_fids[i][1], else_fids[i][1]) - frame.gate_fidelities[i][0] *= min_fid - frame.gate_fidelities[i][1] *= max_fid - - min_survival = min(then_survival[i][0], else_survival[i][0]) - max_survival = max(then_survival[i][1], else_survival[i][1]) - frame.qubit_survival_fidelities[i][0] *= min_survival - frame.qubit_survival_fidelities[i][1] *= max_survival - - # # TODO: re-use address analysis? - # @interp.impl(scf.For) - # def for_loop(self, interp_: FidelityAnalysis, frame: FidelityFrame, stmt: scf.For): - # loop_vars_addr = interp_.get_addresses(frame, stmt, stmt.initializers) - # loop_vars = tuple(interp_.lattice.bottom() for _ in range(len(stmt.results))) - - # iter_type, iterable = interp_.addr_analysis.unpack_iterable(interp_.get_address(frame, stmt, stmt.iterable)) - - # if iter_type is None: - # return interp_.eval_fallback(frame, stmt) - - # for value in iterable: - # with interp_.addr_analysis.new_frame(stmt) as body_addr_frame: - # loop_vars_addr = interp_.addr_analysis.frame_call_region( - # body_addr_frame, stmt, stmt.body, value, *loop_vars_addr - # ) - - # with interp_.new_frame(stmt, has_parent_access=True) as body_frame: - # body_frame.parent_stmt = stmt - # interp_.store_addresses(body_frame, stmt, body_addr_frame.entries) - - # for (arg, input) in zip(stmt.args, (stmt.iterable, *stmt.initializers)): - # # NOTE: assign address of input to blockargument - # addr = interp_.get_address(frame, stmt, input) - # interp_.store_addresses(body_frame, stmt, {arg: addr}) - - # interp_.frame_call_region( - # body_frame, stmt, stmt.body, interp_.lattice.bottom(), *loop_vars - # ) - - # if loop_vars_addr is None: - # loop_vars_addr = () - # elif isinstance(loop_vars_addr, interp.ReturnValue): - # break - - # return loop_vars - - -# @func.dialect.register(key="circuit.fidelity") -# class __FuncMethods(interp.MethodTable): - -# @interp.impl(func.Invoke) -# def invoke( -# self, -# interp_: FidelityAnalysis, -# frame: FidelityFrame, -# stmt: func.Invoke, -# ): - -# addr_frame, ret = interp_.call( -# stmt.callee.code, -# interp_.method_self(stmt.callee), -# *frame.get_values(stmt.inputs), -# ) - -# frame.update_gate_fidelities - - -# return (ret,) - -# @func.dialect.register(key="circuit.fidelity") -# class __FuncMethods(interp.MethodTable): -# # TODO: re-use address analysis method table -# # and re-use the address lattice so we just get addresses in the frame -# @interp.impl(func.Invoke) -# def invoke_( -# self, interp_: FidelityAnalysis, frame: FidelityFrame, stmt: func.Invoke -# ): -# # NOTE: re-run address analysis on the invoke body so we have the addresses of the values inside the body, not just the return -# addr_frame, _ = interp_.addr_analysis.call( -# stmt.callee, -# interp_.addr_analysis.method_self(stmt.callee), -# *interp_.get_addresses(frame, stmt, stmt.inputs), -# ) -# interp_.store_addresses(frame, stmt, addr_frame.entries) - -# # TODO: -# # * Store address analysis results for the body on the body_frame -# # * Try to run address analysis in parallel with fidelity analysis (in order!) -# # * Maybe re-use the address analysis lattice - -# with interp_.new_frame(stmt.callee.code, has_parent_access=True) as body_frame: -# body_frame.parent_stmt = stmt - -# for arg, input in zip( -# stmt.callee.callable_region.blocks[0].args[ -# 1: -# ], # NOTE: skip method_self -# stmt.inputs, -# ): - -# # NOTE: assign address of input to blockargument -# addr = interp_.get_address(frame, stmt, input) -# interp_.store_addresses(body_frame, stmt, {arg: addr}) - -# # NOTE: actually call the invoke to evaluate fidelity -# ret = interp_.frame_call( -# body_frame, -# stmt.callee.code, -# interp_.method_self(stmt.callee), -# *frame.get_values(stmt.inputs), -# ) - -# for i, (fid0, fid1) in enumerate(body_frame.gate_fidelities): -# frame.gate_fidelities[i][0] *= fid0 -# frame.gate_fidelities[i][1] *= fid1 - -# for i, (fid0, fid1) in enumerate(body_frame.qubit_survival_fidelities): -# frame.qubit_survival_fidelities[i][0] *= fid0 -# frame.qubit_survival_fidelities[i][1] *= fid1 + else_fids = interp_.gate_fidelities + else_survival = interp_.qubit_survival_fidelities + + # NOTE: reset one last time to the state before + interp_.reset_fidelities() + + # TODO: maybe combine this with interp.extend_fidelities? + # NOTE: make sure they are all of the same length + n = interp_.qubit_count + current_gate_fidelities.extend( + [[1.0, 1.0] for _ in range(n - len(current_gate_fidelities))] + ) + current_survival_fidelities.extend( + [[1.0, 1.0] for _ in range(n - len(current_survival_fidelities))] + ) + then_fids.extend([[1.0, 1.0] for _ in range(n - len(then_fids))]) + else_fids.extend([[1.0, 1.0] for _ in range(n - len(else_fids))]) + + # NOTE: now we update min / max accordingly + for i, (current_fid, then_fid, else_fid) in enumerate( + zip(current_gate_fidelities, then_fids, else_fids) + ): + interp_.gate_fidelities[i][0] = current_fid[0] * min( + then_fid[0], else_fid[0] + ) + interp_.gate_fidelities[i][1] = current_fid[1] * max( + then_fid[1], else_fid[1] + ) -# return (ret,) + for i, (current_surv, then_surv, else_surv) in enumerate( + zip(current_survival_fidelities, then_survival, else_survival) + ): + interp_.qubit_survival_fidelities[i][0] = current_surv[0] * min( + then_surv[0], else_surv[0] + ) + interp_.qubit_survival_fidelities[i][1] = current_surv[1] * max( + then_surv[1], else_surv[1] + ) diff --git a/src/bloqade/squin/analysis/fidelity/impls.py b/src/bloqade/squin/analysis/fidelity/impls.py index 643c478e2..152a80441 100644 --- a/src/bloqade/squin/analysis/fidelity/impls.py +++ b/src/bloqade/squin/analysis/fidelity/impls.py @@ -1,10 +1,11 @@ from typing import TypeVar from kirin import interp +from kirin.analysis import ForwardFrame from bloqade.squin import noise -from bloqade.analysis.address import AddressReg -from bloqade.analysis.fidelity import FidelityFrame, FidelityAnalysis +from bloqade.analysis.address import Address, AddressReg +from bloqade.analysis.fidelity import FidelityAnalysis T = TypeVar("T") @@ -16,18 +17,18 @@ class __NoiseMethods(interp.MethodTable): def single_qubit_pauli_channel( self, interp_: FidelityAnalysis, - frame: FidelityFrame, + frame: ForwardFrame[Address], stmt: noise.stmts.SingleQubitPauliChannel, ): px = interp_.get_const(frame, stmt, stmt.px) py = interp_.get_const(frame, stmt, stmt.py) pz = interp_.get_const(frame, stmt, stmt.pz) - addresses = interp_.get_address(frame, stmt, stmt.qubits) + addresses = frame.get(stmt.qubits) assert isinstance(addresses, AddressReg) fidelity = 1 - (px + py + pz) - frame.update_gate_fidelities(interp_.qubit_count, fidelity, addresses) + interp_.update_gate_fidelities(fidelity, addresses) return () @@ -35,7 +36,7 @@ def single_qubit_pauli_channel( def two_qubit_pauli_channel( self, interp_: FidelityAnalysis, - frame: FidelityFrame, + frame: ForwardFrame[Address], stmt: noise.stmts.TwoQubitPauliChannel, ): stmt.probabilities @@ -45,16 +46,16 @@ def two_qubit_pauli_channel( def depolarize( self, interp_: FidelityAnalysis, - frame: FidelityFrame, + frame: ForwardFrame[Address], stmt: noise.stmts.Depolarize, ): p = interp_.get_const(frame, stmt, stmt.p) - addresses = interp_.get_address(frame, stmt, stmt.qubits) + addresses = frame.get(stmt.qubits) assert isinstance(addresses, AddressReg) fidelity = 1 - p - frame.update_gate_fidelities(interp_.qubit_count, fidelity, addresses) + interp_.update_gate_fidelities(fidelity, addresses) return () @@ -62,7 +63,7 @@ def depolarize( def depolarize2( self, interp_: FidelityAnalysis, - frame: FidelityFrame, + frame: ForwardFrame[Address], stmt: noise.stmts.Depolarize2, ): stmt.p @@ -72,13 +73,13 @@ def depolarize2( def qubit_loss( self, interp_: FidelityAnalysis, - frame: FidelityFrame, + frame: ForwardFrame[Address], stmt: noise.stmts.QubitLoss, ): p = interp_.get_const(frame, stmt, stmt.p) survival = 1 - p - addresses = interp_.get_address(frame, stmt, stmt.qubits) + addresses = frame.get(stmt.qubits) assert isinstance(addresses, AddressReg) - frame.update_survival_fidelities(interp_.qubit_count, survival, addresses) + interp_.update_survival_fidelities(survival, addresses) diff --git a/test/analysis/fidelity/test_fidelity.py b/test/analysis/fidelity/test_fidelity.py index 171f7826f..5bb9e54a9 100644 --- a/test/analysis/fidelity/test_fidelity.py +++ b/test/analysis/fidelity/test_fidelity.py @@ -211,14 +211,14 @@ def main(): fid_analysis = FidelityAnalysis(main.dialects) frame, _ = fid_analysis.run(main) - print(frame.gate_fidelities) + print(fid_analysis.gate_fidelities) - assert len(frame.gate_fidelities) == 2 - assert math.isclose(frame.gate_fidelities[0][0], 0.4) - assert math.isclose(frame.gate_fidelities[0][1], 0.4) - assert frame.gate_fidelities[1] == [1.0, 1.0] + assert len(fid_analysis.gate_fidelities) == 2 + assert math.isclose(fid_analysis.gate_fidelities[0][0], 0.4) + assert math.isclose(fid_analysis.gate_fidelities[0][1], 0.4) + assert fid_analysis.gate_fidelities[1] == [1.0, 1.0] - assert frame.qubit_survival_fidelities == [[1.0, 1.0], [0.9, 0.9]] + assert fid_analysis.qubit_survival_fidelities == [[1.0, 1.0], [0.9, 0.9]] def test_squin_if(): @@ -240,9 +240,8 @@ def main(): fidelity_analysis = FidelityAnalysis(main.dialects) frame, _ = fidelity_analysis.run(main) - assert frame.gate_fidelities == [[0.9, 1.0], [0.8, 1.0]] - - assert frame.qubit_survival_fidelities == [[0.85, 1.0], [0.75, 1.0]] + assert fidelity_analysis.gate_fidelities == [[0.9, 1.0], [0.8, 1.0]] + assert fidelity_analysis.qubit_survival_fidelities == [[0.85, 1.0], [0.75, 1.0]] def test_squin_for(): @@ -256,6 +255,6 @@ def main(): fidelity_analysis = FidelityAnalysis(main.dialects) frame, _ = fidelity_analysis.run(main) - print(frame.gate_fidelities) - -test_stdlib_call() \ No newline at end of file + assert fidelity_analysis.gate_fidelities == [ + [1.0 - i * 0.01, 1.0 - i * 0.01] for i in range(4) + ] From d4ba4de2dad4f354f64ba3d0abf1371788de78f6 Mon Sep 17 00:00:00 2001 From: David Plankensteiner Date: Wed, 26 Nov 2025 13:08:26 +0100 Subject: [PATCH 09/15] Simplify IfElse impl --- src/bloqade/analysis/fidelity/analysis.py | 31 ++++++++++++++---- src/bloqade/analysis/fidelity/impls.py | 40 +++++------------------ 2 files changed, 34 insertions(+), 37 deletions(-) diff --git a/src/bloqade/analysis/fidelity/analysis.py b/src/bloqade/analysis/fidelity/analysis.py index dab8e55d8..5d33a1663 100644 --- a/src/bloqade/analysis/fidelity/analysis.py +++ b/src/bloqade/analysis/fidelity/analysis.py @@ -60,13 +60,12 @@ def next_address(self, value: int): self.extend_fidelities() def extend_fidelities(self): + self.extend_fidelity(self.gate_fidelities) + self.extend_fidelity(self.qubit_survival_fidelities) + + def extend_fidelity(self, fidelities: list[list[float]]): n = self.qubit_count - self.gate_fidelities.extend( - [[1.0, 1.0] for _ in range(n - len(self.gate_fidelities))] - ) - self.qubit_survival_fidelities.extend( - [[1.0, 1.0] for _ in range(n - len(self.qubit_survival_fidelities))] - ) + fidelities.extend([[1.0, 1.0] for _ in range(n - len(fidelities))]) def reset_fidelities(self): self.gate_fidelities = [[1.0, 1.0] for _ in range(self.qubit_count)] @@ -86,6 +85,26 @@ def update_survival_fidelities(self, survival: float, addresses: AddressReg): self.qubit_survival_fidelities[idx][0] *= survival self.qubit_survival_fidelities[idx][1] *= survival + def update_branched_fidelities( + self, + fidelities: list[list[float]], + current_fidelities: list[list[float]], + then_fidelities: list[list[float]], + else_fidelities: list[list[float]], + ): + # NOTE: make sure they are all of the same length + map( + self.extend_fidelity, + (fidelities, current_fidelities, then_fidelities, else_fidelities), + ) + + # NOTE: now we update min / max accordingly + for fid, current_fid, then_fid, else_fid in zip( + fidelities, current_fidelities, then_fidelities, else_fidelities + ): + fid[0] = current_fid[0] * min(then_fid[0], else_fid[0]) + fid[1] = current_fid[1] * max(then_fid[1], else_fid[1]) + def initialize(self): super().initialize() self.gate_fidelities = [] diff --git a/src/bloqade/analysis/fidelity/impls.py b/src/bloqade/analysis/fidelity/impls.py index 22e1e5a6f..13a39d3f2 100644 --- a/src/bloqade/analysis/fidelity/impls.py +++ b/src/bloqade/analysis/fidelity/impls.py @@ -47,38 +47,16 @@ def if_else( else_fids = interp_.gate_fidelities else_survival = interp_.qubit_survival_fidelities - # NOTE: reset one last time to the state before + # NOTE: reset one last time interp_.reset_fidelities() - # TODO: maybe combine this with interp.extend_fidelities? - # NOTE: make sure they are all of the same length - n = interp_.qubit_count - current_gate_fidelities.extend( - [[1.0, 1.0] for _ in range(n - len(current_gate_fidelities))] + # NOTE: now update min / max pairs accordingly + interp_.update_branched_fidelities( + interp_.gate_fidelities, current_gate_fidelities, then_fids, else_fids ) - current_survival_fidelities.extend( - [[1.0, 1.0] for _ in range(n - len(current_survival_fidelities))] + interp_.update_branched_fidelities( + interp_.qubit_survival_fidelities, + current_survival_fidelities, + then_survival, + else_survival, ) - then_fids.extend([[1.0, 1.0] for _ in range(n - len(then_fids))]) - else_fids.extend([[1.0, 1.0] for _ in range(n - len(else_fids))]) - - # NOTE: now we update min / max accordingly - for i, (current_fid, then_fid, else_fid) in enumerate( - zip(current_gate_fidelities, then_fids, else_fids) - ): - interp_.gate_fidelities[i][0] = current_fid[0] * min( - then_fid[0], else_fid[0] - ) - interp_.gate_fidelities[i][1] = current_fid[1] * max( - then_fid[1], else_fid[1] - ) - - for i, (current_surv, then_surv, else_surv) in enumerate( - zip(current_survival_fidelities, then_survival, else_survival) - ): - interp_.qubit_survival_fidelities[i][0] = current_surv[0] * min( - then_surv[0], else_surv[0] - ) - interp_.qubit_survival_fidelities[i][1] = current_surv[1] * max( - then_surv[1], else_surv[1] - ) From 3cb28082cb1e89e2cf3c4371bf5c3b8f42f8cf37 Mon Sep 17 00:00:00 2001 From: David Plankensteiner Date: Wed, 26 Nov 2025 15:28:03 +0100 Subject: [PATCH 10/15] Clean up asserts, constant fetching and implement missing methods for squin --- src/bloqade/analysis/address/analysis.py | 4 +- src/bloqade/analysis/fidelity/analysis.py | 33 +--- src/bloqade/squin/analysis/fidelity/impls.py | 157 +++++++++++++++++-- test/analysis/fidelity/test_fidelity.py | 60 +++++++ 4 files changed, 213 insertions(+), 41 deletions(-) diff --git a/src/bloqade/analysis/address/analysis.py b/src/bloqade/analysis/address/analysis.py index d35d53a57..6fefa08e4 100644 --- a/src/bloqade/analysis/address/analysis.py +++ b/src/bloqade/analysis/address/analysis.py @@ -136,7 +136,9 @@ def run_lattice( case _: return Address.top() - def get_const_value(self, addr: Address, typ: Type[T]) -> T | None: + def get_const_value( + self, addr: Address, typ: Type[T] | tuple[Type[T], ...] + ) -> T | None: if not isinstance(addr, ConstResult): return None diff --git a/src/bloqade/analysis/fidelity/analysis.py b/src/bloqade/analysis/fidelity/analysis.py index 5d33a1663..8c3de885f 100644 --- a/src/bloqade/analysis/fidelity/analysis.py +++ b/src/bloqade/analysis/fidelity/analysis.py @@ -1,10 +1,6 @@ from dataclasses import field, dataclass -from kirin import ir -from kirin.analysis import const -from kirin.analysis.forward import ForwardFrame - -from ..address import Address, AddressReg, ConstResult, AddressAnalysis +from ..address import Address, AddressReg, AddressAnalysis @dataclass @@ -71,19 +67,15 @@ def reset_fidelities(self): self.gate_fidelities = [[1.0, 1.0] for _ in range(self.qubit_count)] self.qubit_survival_fidelities = [[1.0, 1.0] for _ in range(self.qubit_count)] - def update_gate_fidelities(self, fidelity: float, addresses: AddressReg): - """short-hand to update both (min, max) values""" - - for idx in addresses.data: - self.gate_fidelities[idx][0] *= fidelity - self.gate_fidelities[idx][1] *= fidelity - - def update_survival_fidelities(self, survival: float, addresses: AddressReg): + @staticmethod + def update_fidelities( + fidelities: list[list[float]], fidelity: float, addresses: AddressReg + ): """short-hand to update both (min, max) values""" for idx in addresses.data: - self.qubit_survival_fidelities[idx][0] *= survival - self.qubit_survival_fidelities[idx][1] *= survival + fidelities[idx][0] *= fidelity + fidelities[idx][1] *= fidelity def update_branched_fidelities( self, @@ -107,14 +99,5 @@ def update_branched_fidelities( def initialize(self): super().initialize() - self.gate_fidelities = [] - self.qubit_survival_fidelities = [] + self.reset_fidelities() return self - - def get_const( - self, frame: ForwardFrame[Address], stmt: ir.Statement, key: ir.SSAValue - ): - addr = frame.get(key) - assert isinstance(addr, ConstResult) - assert isinstance(result := addr.result, const.Value) - return result.data diff --git a/src/bloqade/squin/analysis/fidelity/impls.py b/src/bloqade/squin/analysis/fidelity/impls.py index 152a80441..6cd899dde 100644 --- a/src/bloqade/squin/analysis/fidelity/impls.py +++ b/src/bloqade/squin/analysis/fidelity/impls.py @@ -2,10 +2,12 @@ from kirin import interp from kirin.analysis import ForwardFrame +from kirin.dialects import ilist from bloqade.squin import noise from bloqade.analysis.address import Address, AddressReg from bloqade.analysis.fidelity import FidelityAnalysis +from bloqade.analysis.address.lattice import StaticContainer T = TypeVar("T") @@ -20,15 +22,19 @@ def single_qubit_pauli_channel( frame: ForwardFrame[Address], stmt: noise.stmts.SingleQubitPauliChannel, ): - px = interp_.get_const(frame, stmt, stmt.px) - py = interp_.get_const(frame, stmt, stmt.py) - pz = interp_.get_const(frame, stmt, stmt.pz) + px = interp_.get_const_value(frame.get(stmt.px), float) + py = interp_.get_const_value(frame.get(stmt.py), float) + pz = interp_.get_const_value(frame.get(stmt.pz), float) + + if px is None or py is None or pz is None: + return () addresses = frame.get(stmt.qubits) - assert isinstance(addresses, AddressReg) + if not isinstance(addresses, AddressReg): + return () fidelity = 1 - (px + py + pz) - interp_.update_gate_fidelities(fidelity, addresses) + interp_.update_fidelities(interp_.gate_fidelities, fidelity, addresses) return () @@ -39,8 +45,66 @@ def two_qubit_pauli_channel( frame: ForwardFrame[Address], stmt: noise.stmts.TwoQubitPauliChannel, ): - stmt.probabilities - raise NotImplementedError("TODO") + probabilities = interp_.get_const_value( + frame.get(stmt.probabilities), (list, tuple, ilist.IList) + ) + + if probabilities is None: + return () + + control_addresses = frame.get(stmt.controls) + target_addresses = frame.get(stmt.targets) + + if not isinstance(control_addresses, AddressReg) or not isinstance( + target_addresses, AddressReg + ): + return () + + # NOTE: total noise probability is the sum over all probabilities where non-identity is applied + p_control = 0.0 + p_target = 0.0 + + # NOTE: not elegant, but easy to ensure correctness + for i, (p, pauli_op) in enumerate( + zip( + probabilities, + ( + "IX", + "IY", + "IZ", + "XI", + "XX", + "XY", + "XZ", + "YI", + "YX", + "YY", + "YZ", + "ZI", + "ZX", + "ZY", + "ZZ", + ), + ) + ): + + if pauli_op[0] != "I": + p_control += p + + if pauli_op[1] != "I": + p_target += p + + fidelity_control = 1 - p_control + fidelity_target = 1 - p_target + + interp_.update_fidelities( + interp_.gate_fidelities, fidelity_control, control_addresses + ) + interp_.update_fidelities( + interp_.gate_fidelities, fidelity_target, target_addresses + ) + + return () @interp.impl(noise.stmts.Depolarize) def depolarize( @@ -49,13 +113,17 @@ def depolarize( frame: ForwardFrame[Address], stmt: noise.stmts.Depolarize, ): - p = interp_.get_const(frame, stmt, stmt.p) + p = interp_.get_const_value(frame.get(stmt.p), float) + + if p is None: + return () addresses = frame.get(stmt.qubits) - assert isinstance(addresses, AddressReg) + if not isinstance(addresses, AddressReg): + return () fidelity = 1 - p - interp_.update_gate_fidelities(fidelity, addresses) + interp_.update_fidelities(interp_.gate_fidelities, fidelity, addresses) return () @@ -66,8 +134,28 @@ def depolarize2( frame: ForwardFrame[Address], stmt: noise.stmts.Depolarize2, ): - stmt.p - raise NotImplementedError("TODO") + p = interp_.get_const_value(frame.get(stmt.p), float) + + if p is None: + return () + + control_addresses = frame.get(stmt.controls) + target_addresses = frame.get(stmt.targets) + + if not isinstance(control_addresses, AddressReg) or not isinstance( + target_addresses, AddressReg + ): + return () + + # NOTE: there are 15 potential noise operators, 3 of which apply identity to the first and 3 that apply identity to the second qubit + # leaving 12 / 15 noise channels for each qubit to decrease the fidelity + + fidelity = 1 - 12.0 * p / 15.0 + + interp_.update_fidelities(interp_.gate_fidelities, fidelity, control_addresses) + interp_.update_fidelities(interp_.gate_fidelities, fidelity, target_addresses) + + return () @interp.impl(noise.stmts.QubitLoss) def qubit_loss( @@ -76,10 +164,49 @@ def qubit_loss( frame: ForwardFrame[Address], stmt: noise.stmts.QubitLoss, ): - p = interp_.get_const(frame, stmt, stmt.p) + p = interp_.get_const_value(frame.get(stmt.p), float) + + if p is None: + return () + survival = 1 - p + addresses = frame.get(stmt.qubits) + if not isinstance(addresses, AddressReg): + return () + + interp_.update_fidelities( + interp_.qubit_survival_fidelities, survival, addresses + ) + + return () + + @interp.impl(noise.stmts.CorrelatedQubitLoss) + def correlated_qubit_loss( + self, + interp_: FidelityAnalysis, + frame: ForwardFrame[Address], + stmt: noise.stmts.CorrelatedQubitLoss, + ): + p = interp_.get_const_value(frame.get(stmt.p), float) + + if p is None: + return () addresses = frame.get(stmt.qubits) - assert isinstance(addresses, AddressReg) - interp_.update_survival_fidelities(survival, addresses) + if not isinstance(addresses, StaticContainer): + return () + + # NOTE: p is the probability with which an entire atom group is lost + # therefore, the fidelity of each atom decreases according to the following + fidelity = 1 - p + + for address in addresses.data: + if not isinstance(address, AddressReg): + continue + + interp_.update_fidelities( + interp_.qubit_survival_fidelities, fidelity, address + ) + + return () diff --git a/test/analysis/fidelity/test_fidelity.py b/test/analysis/fidelity/test_fidelity.py index 5bb9e54a9..c79340aa8 100644 --- a/test/analysis/fidelity/test_fidelity.py +++ b/test/analysis/fidelity/test_fidelity.py @@ -1,5 +1,7 @@ import math +from kirin.dialects import ilist + from bloqade import qasm2, squin from bloqade.qasm2 import noise from bloqade.analysis.fidelity import FidelityAnalysis @@ -258,3 +260,61 @@ def main(): assert fidelity_analysis.gate_fidelities == [ [1.0 - i * 0.01, 1.0 - i * 0.01] for i in range(4) ] + + +def test_all_noise_channels(): + @squin.kernel + def main(): + q = squin.qalloc(6) + squin.single_qubit_pauli_channel(0.15, 0.2, 0.25, q[0]) + squin.depolarize(0.2, q[1]) + squin.qubit_loss(0.1, q[0]) + + squin.two_qubit_pauli_channel( + ilist.IList( + [ + 0.01, + 0.01, + 0.01, + 0.01, + 0.01, + 0.01, + 0.01, + 0.01, + 0.01, + 0.01, + 0.01, + 0.01, + 0.01, + 0.01, + 0.01, + ] + ), + q[2], + q[3], + ) + + squin.depolarize2(0.15, q[4], q[5]) + + squin.correlated_qubit_loss(0.13, ilist.IList([q[2], q[3], q[4], q[5]])) + + fidelity_analysis = FidelityAnalysis(main.dialects) + frame, _ = fidelity_analysis.run(main) + + assert fidelity_analysis.gate_fidelities == [ + [0.4, 0.4], # squin.single_qubit_pauli_channel(0.15, 0.2, 0.25, q[0]) + [0.8, 0.8], # squin.depolarize(0.2, q[1]) + [1 - 12 * 0.01, 1 - 12 * 0.01], # squin.two_qubit_pauli_channel(..., q[2]) + [1 - 12 * 0.01, 1 - 12 * 0.01], # squin.two_qubit_pauli_channel(..., q[3]) + [0.88, 0.88], # squin.depolarize2(0.15, q[4]) + [0.88, 0.88], # squin.depolarize2(0.15, q[5]) + ] + + assert ( + fidelity_analysis.qubit_survival_fidelities + == [ + [0.9, 0.9], # squin.qubit_loss(0.1, q[0]) + [1.0, 1.0], + ] + + [[0.87, 0.87]] * 4 + ) # squin.correlated_qubit_loss From 26240d229758f1af22b814969a60effcbf607330 Mon Sep 17 00:00:00 2001 From: David Plankensteiner Date: Wed, 26 Nov 2025 15:47:18 +0100 Subject: [PATCH 11/15] Introduce FidelityRange data class to store values --- src/bloqade/analysis/fidelity/__init__.py | 5 ++- src/bloqade/analysis/fidelity/analysis.py | 42 ++++++++++++------- test/analysis/fidelity/test_fidelity.py | 49 +++++++++++++++-------- 3 files changed, 63 insertions(+), 33 deletions(-) diff --git a/src/bloqade/analysis/fidelity/__init__.py b/src/bloqade/analysis/fidelity/__init__.py index 5e57cf291..c5bf623ea 100644 --- a/src/bloqade/analysis/fidelity/__init__.py +++ b/src/bloqade/analysis/fidelity/__init__.py @@ -1,2 +1,5 @@ from . import impls as impls -from .analysis import FidelityAnalysis as FidelityAnalysis +from .analysis import ( + FidelityRange as FidelityRange, + FidelityAnalysis as FidelityAnalysis, +) diff --git a/src/bloqade/analysis/fidelity/analysis.py b/src/bloqade/analysis/fidelity/analysis.py index 8c3de885f..cb3667a27 100644 --- a/src/bloqade/analysis/fidelity/analysis.py +++ b/src/bloqade/analysis/fidelity/analysis.py @@ -3,6 +3,14 @@ from ..address import Address, AddressReg, AddressAnalysis +@dataclass +class FidelityRange: + """Range of fidelity for a qubit as pair of (min, max) values""" + + min: float + max: float + + @dataclass class FidelityAnalysis(AddressAnalysis): """ @@ -37,10 +45,10 @@ def main(): keys = ("circuit.fidelity", "qubit.address") lattice = Address - gate_fidelities: list[list[float]] = field(init=False, default_factory=list) + gate_fidelities: list[FidelityRange] = field(init=False, default_factory=list) """Gate fidelities of each qubit as (min, max) pairs to provide a range""" - qubit_survival_fidelities: list[list[float]] = field( + qubit_survival_fidelities: list[FidelityRange] = field( init=False, default_factory=list ) """Qubit survival fidelity given as (min, max) pairs""" @@ -59,30 +67,34 @@ def extend_fidelities(self): self.extend_fidelity(self.gate_fidelities) self.extend_fidelity(self.qubit_survival_fidelities) - def extend_fidelity(self, fidelities: list[list[float]]): + def extend_fidelity(self, fidelities: list[FidelityRange]): n = self.qubit_count - fidelities.extend([[1.0, 1.0] for _ in range(n - len(fidelities))]) + fidelities.extend([FidelityRange(1.0, 1.0) for _ in range(n - len(fidelities))]) def reset_fidelities(self): - self.gate_fidelities = [[1.0, 1.0] for _ in range(self.qubit_count)] - self.qubit_survival_fidelities = [[1.0, 1.0] for _ in range(self.qubit_count)] + self.gate_fidelities = [ + FidelityRange(1.0, 1.0) for _ in range(self.qubit_count) + ] + self.qubit_survival_fidelities = [ + FidelityRange(1.0, 1.0) for _ in range(self.qubit_count) + ] @staticmethod def update_fidelities( - fidelities: list[list[float]], fidelity: float, addresses: AddressReg + fidelities: list[FidelityRange], fidelity: float, addresses: AddressReg ): """short-hand to update both (min, max) values""" for idx in addresses.data: - fidelities[idx][0] *= fidelity - fidelities[idx][1] *= fidelity + fidelities[idx].min *= fidelity + fidelities[idx].max *= fidelity def update_branched_fidelities( self, - fidelities: list[list[float]], - current_fidelities: list[list[float]], - then_fidelities: list[list[float]], - else_fidelities: list[list[float]], + fidelities: list[FidelityRange], + current_fidelities: list[FidelityRange], + then_fidelities: list[FidelityRange], + else_fidelities: list[FidelityRange], ): # NOTE: make sure they are all of the same length map( @@ -94,8 +106,8 @@ def update_branched_fidelities( for fid, current_fid, then_fid, else_fid in zip( fidelities, current_fidelities, then_fidelities, else_fidelities ): - fid[0] = current_fid[0] * min(then_fid[0], else_fid[0]) - fid[1] = current_fid[1] * max(then_fid[1], else_fid[1]) + fid.min = current_fid.min * min(then_fid.min, else_fid.min) + fid.max = current_fid.max * max(then_fid.max, else_fid.max) def initialize(self): super().initialize() diff --git a/test/analysis/fidelity/test_fidelity.py b/test/analysis/fidelity/test_fidelity.py index c79340aa8..746f6d647 100644 --- a/test/analysis/fidelity/test_fidelity.py +++ b/test/analysis/fidelity/test_fidelity.py @@ -4,7 +4,7 @@ from bloqade import qasm2, squin from bloqade.qasm2 import noise -from bloqade.analysis.fidelity import FidelityAnalysis +from bloqade.analysis.fidelity import FidelityRange, FidelityAnalysis from bloqade.qasm2.passes.noise import NoisePass @@ -216,11 +216,14 @@ def main(): print(fid_analysis.gate_fidelities) assert len(fid_analysis.gate_fidelities) == 2 - assert math.isclose(fid_analysis.gate_fidelities[0][0], 0.4) - assert math.isclose(fid_analysis.gate_fidelities[0][1], 0.4) - assert fid_analysis.gate_fidelities[1] == [1.0, 1.0] + assert math.isclose(fid_analysis.gate_fidelities[0].max, 0.4) + assert math.isclose(fid_analysis.gate_fidelities[0].min, 0.4) + assert fid_analysis.gate_fidelities[1] == FidelityRange(1.0, 1.0) - assert fid_analysis.qubit_survival_fidelities == [[1.0, 1.0], [0.9, 0.9]] + assert fid_analysis.qubit_survival_fidelities == [ + FidelityRange(1.0, 1.0), + FidelityRange(0.9, 0.9), + ] def test_squin_if(): @@ -242,8 +245,14 @@ def main(): fidelity_analysis = FidelityAnalysis(main.dialects) frame, _ = fidelity_analysis.run(main) - assert fidelity_analysis.gate_fidelities == [[0.9, 1.0], [0.8, 1.0]] - assert fidelity_analysis.qubit_survival_fidelities == [[0.85, 1.0], [0.75, 1.0]] + assert fidelity_analysis.gate_fidelities == [ + FidelityRange(0.9, 1.0), + FidelityRange(0.8, 1.0), + ] + assert fidelity_analysis.qubit_survival_fidelities == [ + FidelityRange(0.85, 1.0), + FidelityRange(0.75, 1.0), + ] def test_squin_for(): @@ -258,7 +267,7 @@ def main(): frame, _ = fidelity_analysis.run(main) assert fidelity_analysis.gate_fidelities == [ - [1.0 - i * 0.01, 1.0 - i * 0.01] for i in range(4) + FidelityRange(1.0 - i * 0.01, 1.0 - i * 0.01) for i in range(4) ] @@ -302,19 +311,25 @@ def main(): frame, _ = fidelity_analysis.run(main) assert fidelity_analysis.gate_fidelities == [ - [0.4, 0.4], # squin.single_qubit_pauli_channel(0.15, 0.2, 0.25, q[0]) - [0.8, 0.8], # squin.depolarize(0.2, q[1]) - [1 - 12 * 0.01, 1 - 12 * 0.01], # squin.two_qubit_pauli_channel(..., q[2]) - [1 - 12 * 0.01, 1 - 12 * 0.01], # squin.two_qubit_pauli_channel(..., q[3]) - [0.88, 0.88], # squin.depolarize2(0.15, q[4]) - [0.88, 0.88], # squin.depolarize2(0.15, q[5]) + FidelityRange( + 0.4, 0.4 + ), # squin.single_qubit_pauli_channel(0.15, 0.2, 0.25, q[0]) + FidelityRange(0.8, 0.8), # squin.depolarize(0.2, q[1]) + FidelityRange( + 1 - 12 * 0.01, 1 - 12 * 0.01 + ), # squin.two_qubit_pauli_channel(..., q[2]) + FidelityRange( + 1 - 12 * 0.01, 1 - 12 * 0.01 + ), # squin.two_qubit_pauli_channel(..., q[3]) + FidelityRange(0.88, 0.88), # squin.depolarize2(0.15, q[4]) + FidelityRange(0.88, 0.88), # squin.depolarize2(0.15, q[5]) ] assert ( fidelity_analysis.qubit_survival_fidelities == [ - [0.9, 0.9], # squin.qubit_loss(0.1, q[0]) - [1.0, 1.0], + FidelityRange(0.9, 0.9), # squin.qubit_loss(0.1, q[0]) + FidelityRange(1.0, 1.0), ] - + [[0.87, 0.87]] * 4 + + [FidelityRange(0.87, 0.87)] * 4 ) # squin.correlated_qubit_loss From a2cb44c4d1267fd1572dcd79e12653434f686890 Mon Sep 17 00:00:00 2001 From: David Plankensteiner Date: Wed, 26 Nov 2025 16:48:37 +0100 Subject: [PATCH 12/15] Fix qasm2 impls --- src/bloqade/qasm2/dialects/noise/fidelity.py | 78 ++++++++++++++------ test/analysis/fidelity/test_fidelity.py | 55 ++++++++------ 2 files changed, 88 insertions(+), 45 deletions(-) diff --git a/src/bloqade/qasm2/dialects/noise/fidelity.py b/src/bloqade/qasm2/dialects/noise/fidelity.py index acd17ac9c..9ed8beff7 100644 --- a/src/bloqade/qasm2/dialects/noise/fidelity.py +++ b/src/bloqade/qasm2/dialects/noise/fidelity.py @@ -1,6 +1,7 @@ from kirin import interp -from kirin.lattice import EmptyLattice +from kirin.analysis import ForwardFrame +from bloqade.analysis.address import Address, AddressReg from bloqade.analysis.fidelity import FidelityAnalysis from .stmts import PauliChannel, CZPauliChannel, AtomLossChannel @@ -11,37 +12,68 @@ class FidelityMethodTable(interp.MethodTable): @interp.impl(PauliChannel) - @interp.impl(CZPauliChannel) def pauli_channel( self, - interp: FidelityAnalysis, - frame: interp.Frame[EmptyLattice], - stmt: PauliChannel | CZPauliChannel, + interp_: FidelityAnalysis, + frame: ForwardFrame[Address], + stmt: PauliChannel, + ): + (ps,) = stmt.probabilities + fidelity = 1 - sum(ps) + + addresses = frame.get(stmt.qargs) + + if not isinstance(addresses, AddressReg): + return () + + interp_.update_fidelities(interp_.gate_fidelities, fidelity, addresses) + + return () + + @interp.impl(CZPauliChannel) + def cz_pauli_channel( + self, + interp_: FidelityAnalysis, + frame: ForwardFrame[Address], + stmt: CZPauliChannel, ): - probs = stmt.probabilities - try: - ps, ps_ctrl = probs - except ValueError: - (ps,) = probs - ps_ctrl = () + ps_ctrl, ps_target = stmt.probabilities + + fidelity_ctrl = 1 - sum(ps_ctrl) + fidelity_target = 1 - sum(ps_target) + + addresses_ctrl = frame.get(stmt.ctrls) + addresses_target = frame.get(stmt.qargs) - p = sum(ps) - p_ctrl = sum(ps_ctrl) + if not isinstance(addresses_ctrl, AddressReg) or not isinstance( + addresses_target, AddressReg + ): + return () - # NOTE: fidelity is just the inverse probability of any noise to occur - fid = (1 - p) * (1 - p_ctrl) + interp_.update_fidelities( + interp_.gate_fidelities, fidelity_ctrl, addresses_ctrl + ) + interp_.update_fidelities( + interp_.gate_fidelities, fidelity_target, addresses_target + ) - interp.gate_fidelity *= fid + return () @interp.impl(AtomLossChannel) def atom_loss( self, - interp: FidelityAnalysis, - frame: interp.Frame[EmptyLattice], + interp_: FidelityAnalysis, + frame: ForwardFrame[Address], stmt: AtomLossChannel, ): - # NOTE: since AtomLossChannel acts on IList[Qubit], we know the assigned address is a tuple - addresses = interp.addr_frame.get(stmt.qargs) - # NOTE: get the corresponding index and reduce survival probability accordingly - for index in addresses.data: - interp.atom_survival_probability[index] *= 1 - stmt.prob + addresses = frame.get(stmt.qargs) + + if not isinstance(addresses, AddressReg): + return () + + fidelity = 1 - stmt.prob + interp_.update_fidelities( + interp_.qubit_survival_fidelities, fidelity, addresses + ) + + return () diff --git a/test/analysis/fidelity/test_fidelity.py b/test/analysis/fidelity/test_fidelity.py index 746f6d647..6d3ef71ab 100644 --- a/test/analysis/fidelity/test_fidelity.py +++ b/test/analysis/fidelity/test_fidelity.py @@ -21,9 +21,11 @@ def main(): fid_analysis = FidelityAnalysis(main.dialects) fid_analysis.run(main) - assert fid_analysis.gate_fidelity == fid_analysis._current_gate_fidelity == 1 - assert fid_analysis.atom_survival_probability[0] == 1 - p_loss - assert fid_analysis.atom_survival_probability[1] == 1 + assert fid_analysis.gate_fidelities == [FidelityRange(1.0, 1.0)] * 2 + assert fid_analysis.qubit_survival_fidelities == [ + FidelityRange(1 - p_loss, 1 - p_loss), + FidelityRange(1.0, 1.0), + ] def test_cz_noise(): @@ -51,9 +53,12 @@ def main(): fid_analysis = FidelityAnalysis(main.dialects) fid_analysis.run(main) - expected_fidelity = (1 - 3 * p_ch) ** 2 + expected_fidelity = 1 - 3 * p_ch - assert math.isclose(fid_analysis.gate_fidelity, expected_fidelity) + assert ( + fid_analysis.gate_fidelities + == [FidelityRange(expected_fidelity, expected_fidelity)] * 2 + ) def test_single_qubit_noise(): @@ -72,7 +77,10 @@ def main(): expected_fidelity = 1 - 3 * p_ch - assert math.isclose(fid_analysis.gate_fidelity, expected_fidelity) + assert fid_analysis.gate_fidelities == [ + FidelityRange(expected_fidelity, expected_fidelity), + FidelityRange(1.0, 1.0), + ] class NoiseTestModel(noise.MoveNoiseModelABC): @@ -113,6 +121,7 @@ def main_if(): model = NoiseTestModel( global_loss_prob=p_loss, + local_loss_prob=p_loss, global_px=px, global_py=py, global_pz=pz, @@ -122,18 +131,21 @@ def main_if(): fid_analysis = FidelityAnalysis(main.dialects) fid_analysis.run(main) - model = NoiseTestModel() NoisePass(main_if.dialects, noise_model=model)(main_if) fid_if_analysis = FidelityAnalysis(main_if.dialects) fid_if_analysis.run(main_if) - assert 0 < fid_if_analysis.gate_fidelity == fid_analysis.gate_fidelity < 1 - assert ( - 0 - < fid_if_analysis.atom_survival_probability[0] - == fid_analysis.atom_survival_probability[0] - < 1 - ) + main.print() + main_if.print() + + fidelity_if = fid_if_analysis.gate_fidelities[0] + fidelity = fid_analysis.gate_fidelities[0] + + survival = fid_analysis.qubit_survival_fidelities[0] + survival_if = fid_if_analysis.qubit_survival_fidelities[0] + + assert 0 < fidelity_if.min == fidelity.min == fidelity.max < fidelity_if.max < 1 + assert 0 < survival_if.min == survival.min == survival.max < survival_if.max < 1 def test_for(): @@ -184,7 +196,6 @@ def main_for(): fid_analysis = FidelityAnalysis(main.dialects) fid_analysis.run(main) - model = NoiseTestModel() NoisePass(main_for.dialects, noise_model=model)(main_for) main_for.print() @@ -192,13 +203,13 @@ def main_for(): fid_for_analysis = FidelityAnalysis(main_for.dialects) fid_for_analysis.run(main_for) - assert 0 < fid_for_analysis.gate_fidelity == fid_analysis.gate_fidelity < 1 - assert ( - 0 - < fid_for_analysis.atom_survival_probability[0] - == fid_analysis.atom_survival_probability[0] - < 1 - ) + fid = fid_analysis.gate_fidelities[0] + fid_for = fid_for_analysis.gate_fidelities[0] + survival = fid_analysis.qubit_survival_fidelities[0] + survival_for = fid_for_analysis.qubit_survival_fidelities[0] + + assert 0 < fid.min == fid.max == fid_for.min == fid_for.max < 1 + assert 0 < survival.min == survival.max == survival_for.min == survival_for.max < 1 def test_stdlib_call(): From 9d0c67c7ba097804aa6cf8456c0e51549a137017 Mon Sep 17 00:00:00 2001 From: David Plankensteiner Date: Wed, 26 Nov 2025 16:57:06 +0100 Subject: [PATCH 13/15] Update some docstrings --- src/bloqade/analysis/fidelity/analysis.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/src/bloqade/analysis/fidelity/analysis.py b/src/bloqade/analysis/fidelity/analysis.py index cb3667a27..7a00f1cbe 100644 --- a/src/bloqade/analysis/fidelity/analysis.py +++ b/src/bloqade/analysis/fidelity/analysis.py @@ -1,6 +1,6 @@ from dataclasses import field, dataclass -from ..address import Address, AddressReg, AddressAnalysis +from ..address import AddressReg, AddressAnalysis @dataclass @@ -37,13 +37,12 @@ def main(): fid_analysis = FidelityAnalysis(main.dialects) fid_analysis.run_analysis(main, no_raise=False) - gate_fidelity = fid_analysis.gate_fidelity - atom_survival_probs = fid_analysis.atom_survival_probability + gate_fidelities = fid_analysis.gate_fidelities + qubit_survival_probs = fid_analysis.qubit_survival_fidelities ``` """ keys = ("circuit.fidelity", "qubit.address") - lattice = Address gate_fidelities: list[FidelityRange] = field(init=False, default_factory=list) """Gate fidelities of each qubit as (min, max) pairs to provide a range""" @@ -64,14 +63,20 @@ def next_address(self, value: int): self.extend_fidelities() def extend_fidelities(self): + """Extend both fidelity lists so their length matches the number of qubits""" + self.extend_fidelity(self.gate_fidelities) self.extend_fidelity(self.qubit_survival_fidelities) def extend_fidelity(self, fidelities: list[FidelityRange]): + """Extend a list of fidelities so its length matches the number of qubits""" + n = self.qubit_count fidelities.extend([FidelityRange(1.0, 1.0) for _ in range(n - len(fidelities))]) def reset_fidelities(self): + """Reset fidelities to unity for all qubits""" + self.gate_fidelities = [ FidelityRange(1.0, 1.0) for _ in range(self.qubit_count) ] @@ -96,6 +101,7 @@ def update_branched_fidelities( then_fidelities: list[FidelityRange], else_fidelities: list[FidelityRange], ): + """Update fidelity (min, max) values after evaluating differing branches such as IfElse""" # NOTE: make sure they are all of the same length map( self.extend_fidelity, From b422d66c1c0354940cc5883600c265a8af721beb Mon Sep 17 00:00:00 2001 From: David Plankensteiner Date: Thu, 27 Nov 2025 10:05:13 +0100 Subject: [PATCH 14/15] Fix IfElse impl and test known branch execution --- src/bloqade/analysis/fidelity/analysis.py | 18 ++++------ src/bloqade/analysis/fidelity/impls.py | 40 ++++++++++++++++++----- test/analysis/fidelity/test_fidelity.py | 38 +++++++++++++++++++++ 3 files changed, 76 insertions(+), 20 deletions(-) diff --git a/src/bloqade/analysis/fidelity/analysis.py b/src/bloqade/analysis/fidelity/analysis.py index 7a00f1cbe..764c1b436 100644 --- a/src/bloqade/analysis/fidelity/analysis.py +++ b/src/bloqade/analysis/fidelity/analysis.py @@ -19,23 +19,17 @@ class FidelityAnalysis(AddressAnalysis): ## Usage examples ``` - from bloqade import qasm2 - from bloqade.noise import native - from bloqade.analysis.fidelity import FidelityAnalysis - from bloqade.qasm2.passes.noise import NoisePass + from bloqade import squin - noise_main = qasm2.extended.add(native.dialect) - - @noise_main + @squin.kernel def main(): - q = qasm2.qreg(2) - qasm2.x(q[0]) + q = squin.qalloc(1) + squin.x(q[0]) + squin.depolarize(q[0]) return q - NoisePass(main.dialects)(main) - fid_analysis = FidelityAnalysis(main.dialects) - fid_analysis.run_analysis(main, no_raise=False) + fid_analysis.run(main) gate_fidelities = fid_analysis.gate_fidelities qubit_survival_probs = fid_analysis.qubit_survival_fidelities diff --git a/src/bloqade/analysis/fidelity/impls.py b/src/bloqade/analysis/fidelity/impls.py index 13a39d3f2..b6c515fff 100644 --- a/src/bloqade/analysis/fidelity/impls.py +++ b/src/bloqade/analysis/fidelity/impls.py @@ -1,8 +1,8 @@ from kirin import interp -from kirin.analysis import ForwardFrame +from kirin.analysis import ForwardFrame, const from kirin.dialects import scf -from bloqade.analysis.address import Address +from bloqade.analysis.address import Address, ConstResult from .analysis import FidelityAnalysis @@ -18,17 +18,27 @@ def if_else( current_gate_fidelities = interp_.gate_fidelities current_survival_fidelities = interp_.qubit_survival_fidelities - # TODO: check if the condition is constant and fix the branch in that case - # run both branches + address_cond = frame.get(stmt.cond) + + # NOTE: if the condition is known at compile time, run specific branch + if isinstance(address_cond, ConstResult) and isinstance( + const_cond := address_cond.result, const.Value + ): + body = stmt.then_body if const_cond.data else stmt.else_body + with interp_.new_frame(stmt, has_parent_access=True) as body_frame: + ret = interp_.frame_call_region(body_frame, stmt, body, address_cond) + return ret + + # NOTE: runtime condition, evaluate both with interp_.new_frame(stmt, has_parent_access=True) as then_frame: # NOTE: reset fidelities before stepping into the then-body interp_.reset_fidelities() - interp_.frame_call_region( + then_results = interp_.frame_call_region( then_frame, stmt, stmt.then_body, - *(interp_.lattice.bottom() for _ in range(len(stmt.args))), + address_cond, ) then_fids = interp_.gate_fidelities then_survival = interp_.qubit_survival_fidelities @@ -37,11 +47,11 @@ def if_else( # NOTE: reset again before stepping into else-body interp_.reset_fidelities() - interp_.frame_call_region( + else_results = interp_.frame_call_region( else_frame, stmt, stmt.else_body, - *(interp_.lattice.bottom() for _ in range(len(stmt.args))), + address_cond, ) else_fids = interp_.gate_fidelities @@ -60,3 +70,17 @@ def if_else( then_survival, else_survival, ) + + # TODO: pick the non-return value + if isinstance(then_results, interp.ReturnValue) and isinstance( + else_results, interp.ReturnValue + ): + return interp.ReturnValue(then_results.value.join(else_results.value)) + elif isinstance(then_results, interp.ReturnValue): + ret = else_results + elif isinstance(else_results, interp.ReturnValue): + ret = then_results + else: + ret = interp_.join_results(then_results, else_results) + + return ret diff --git a/test/analysis/fidelity/test_fidelity.py b/test/analysis/fidelity/test_fidelity.py index 6d3ef71ab..047ffa4d0 100644 --- a/test/analysis/fidelity/test_fidelity.py +++ b/test/analysis/fidelity/test_fidelity.py @@ -344,3 +344,41 @@ def main(): ] + [FidelityRange(0.87, 0.87)] * 4 ) # squin.correlated_qubit_loss + + +def test_squin_know_if(): + @squin.kernel + def main(): + x = True + q = squin.qalloc(4) + + if x: + squin.depolarize(0.1, q[0]) + squin.qubit_loss(0.1, q[0]) + else: + squin.depolarize(0.1, q[1]) + squin.qubit_loss(0.1, q[1]) + + if not x: + squin.depolarize(0.2, q[2]) + squin.qubit_loss(0.2, q[2]) + else: + squin.depolarize(0.2, q[3]) + squin.qubit_loss(0.2, q[3]) + + fidelity_analysis = FidelityAnalysis(main.dialects) + fidelity_analysis.run(main) + + assert fidelity_analysis.gate_fidelities == [ + FidelityRange(0.9, 0.9), + FidelityRange(1.0, 1.0), + FidelityRange(1.0, 1.0), + FidelityRange(0.8, 0.8), + ] + + assert fidelity_analysis.qubit_survival_fidelities == [ + FidelityRange(0.9, 0.9), + FidelityRange(1.0, 1.0), + FidelityRange(1.0, 1.0), + FidelityRange(0.8, 0.8), + ] From 8767fc3c7c1464a803493076572e696764d5ffba Mon Sep 17 00:00:00 2001 From: David Plankensteiner Date: Thu, 27 Nov 2025 10:07:02 +0100 Subject: [PATCH 15/15] Fix docstring example --- src/bloqade/analysis/fidelity/analysis.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/bloqade/analysis/fidelity/analysis.py b/src/bloqade/analysis/fidelity/analysis.py index 764c1b436..5b3b878f0 100644 --- a/src/bloqade/analysis/fidelity/analysis.py +++ b/src/bloqade/analysis/fidelity/analysis.py @@ -20,12 +20,13 @@ class FidelityAnalysis(AddressAnalysis): ``` from bloqade import squin + from bloqade.analysis.fidelity import FidelityAnalysis @squin.kernel def main(): q = squin.qalloc(1) squin.x(q[0]) - squin.depolarize(q[0]) + squin.depolarize(0.1, q[0]) return q fid_analysis = FidelityAnalysis(main.dialects)