Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 29 additions & 6 deletions src/pqnstack/app/api/routes/serial.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,38 @@
import logging
from typing import Annotated
from typing import cast

from fastapi import APIRouter
from fastapi import Depends
from pydantic import BaseModel

from pqnstack.app.core.config import settings
from pqnstack.pqn.drivers.rotaryencoder import MockRotaryEncoder
from pqnstack.pqn.drivers.rotaryencoder import RotaryEncoderInstrument
from pqnstack.pqn.drivers.rotaryencoder import SerialRotaryEncoder

logger = logging.getLogger(__name__)
router = APIRouter(prefix="/serial", tags=["measure"])


def get_rotary_encoder() -> SerialRotaryEncoder:
def get_rotary_encoder() -> RotaryEncoderInstrument:
if settings.rotary_encoder is None:
rotary_encoder = SerialRotaryEncoder(
label="rotary_encoder", address=settings.rotary_encoder_address, offset_degrees=0.0
)
settings.rotary_encoder = rotary_encoder
if settings.virtual_rotator:
# Virtual rotator mode enabled, use mock with terminal input
logger.info("Virtual rotator mode enabled")
mock_encoder = MockRotaryEncoder()
settings.rotary_encoder = mock_encoder
else:
# Use the real serial rotary encoder
rotary_encoder = SerialRotaryEncoder(
label="rotary_encoder", address=settings.rotary_encoder_address, offset_degrees=0.0
)
settings.rotary_encoder = rotary_encoder

return settings.rotary_encoder


SERDep = Annotated[SerialRotaryEncoder, Depends(get_rotary_encoder)]
SERDep = Annotated[RotaryEncoderInstrument, Depends(get_rotary_encoder)]


class AngleResponse(BaseModel):
Expand All @@ -32,3 +42,16 @@ class AngleResponse(BaseModel):
@router.get("/")
async def read_angle(rotary_encoder: SERDep) -> AngleResponse:
return AngleResponse(theta=rotary_encoder.read())


@router.post("/debug_set_angle")
async def debug_set_angle(rotary_encoder: SERDep, angle: float) -> AngleResponse:
try:
rotary_encoder = cast("MockRotaryEncoder", rotary_encoder)
rotary_encoder.theta = angle
except AttributeError:
logger.exception("Attempted to set angle on non-mock rotary encoder")
raise

logger.info("Debug: Theta set to %s", rotary_encoder.theta)
return AngleResponse(theta=rotary_encoder.read())
Comment thread
marcosfrenkel marked this conversation as resolved.
5 changes: 3 additions & 2 deletions src/pqnstack/app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from pqnstack.constants import BellState
from pqnstack.constants import QKDEncodingBasis
from pqnstack.pqn.drivers.rotaryencoder import SerialRotaryEncoder
from pqnstack.pqn.drivers.rotaryencoder import RotaryEncoderInstrument
from pqnstack.pqn.protocols.measurement import MeasurementConfig

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -40,8 +40,9 @@ class Settings(BaseSettings):
bell_state: BellState = BellState.Phi_plus
timetagger: tuple[str, str] | None = None # Name of the timetagger to use for the CHSH experiment.
rotary_encoder_address: str = "/dev/ttyACM0"
virtual_rotator: bool = False # If True, use terminal input instead of hardware rotary encoder

rotary_encoder: SerialRotaryEncoder | None = None
rotary_encoder: RotaryEncoderInstrument | None = None

model_config = SettingsConfigDict(toml_file="./config.toml", env_file=".env", env_file_encoding="utf-8")

Expand Down
17 changes: 17 additions & 0 deletions src/pqnstack/pqn/drivers/rotaryencoder.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
import atexit
from dataclasses import dataclass
from dataclasses import field
from typing import Protocol
from typing import runtime_checkable

import serial


@runtime_checkable
class RotaryEncoderInstrument(Protocol):
def read(self) -> float: ...


@dataclass(slots=True)
class SerialRotaryEncoder:
label: str
Expand All @@ -28,3 +35,13 @@ def read(self) -> float:
self._conn.write(b"ANGLE?\n")
angle = self._conn.readline().decode().strip()
return float(angle) + self.offset_degrees


@dataclass(slots=True)
class MockRotaryEncoder:
"""Mock rotary encoder for terminal input when hardware is not available."""

theta: float = 0.0

def read(self) -> float:
return self.theta