diff --git a/configs/config_app_example.toml b/configs/config_app_example.toml index 84c8a813..a7c7f428 100644 --- a/configs/config_app_example.toml +++ b/configs/config_app_example.toml @@ -20,6 +20,7 @@ bell_state = 0 [chsh_settings] hwp = ["provider", "instrument_hwp"] # Replace with actual HWP names request_hwp = ["provider", "instrument_hwp"] +expectation_signs = [-1, 1, 1, 1] # This is for HV + VH. for HH + VV, the signs are [1, -1, 1, 1] # CHSH measurement configuration [chsh_settings.measurement_config] @@ -42,4 +43,12 @@ integration_time_s = 5 binwidth = 500 channel1 = 1 channel2 = 2 -dark_count = 0 \ No newline at end of file +dark_count = 0 + +# Daily CHSH report settings (for automated Slack reporting) +[daily_report] +slack_webhook_url = "https://hooks.slack.com/services/YOUR/WEBHOOK/URL" # Get from https://api.slack.com/apps +api_url = "http://localhost:8000" # API endpoint (usually localhost if running on same machine) +timetagger_address = "127.0.0.1:8000" # TimeTagger address +follower_node_address = "192.168.1.100:9000" # Replace with actual follower node address +basis = [0, 22.5] # CHSH basis angles to use for daily measurements \ No newline at end of file diff --git a/scripts/chsh_daily_report.py b/scripts/chsh_daily_report.py new file mode 100755 index 00000000..9e445748 --- /dev/null +++ b/scripts/chsh_daily_report.py @@ -0,0 +1,199 @@ +#!/usr/bin/env python3 +""" +CHSH Daily Report Script. + +Runs CHSH measurement and posts results to Slack. + +Reads all configuration from config.toml (including Slack webhook URL). + +Usage: + uv run scripts/chsh_daily_report.py +""" + +import logging +import sys +import tomllib +from datetime import UTC +from datetime import datetime +from pathlib import Path + +import httpx +from pydantic import ValidationError + +from pqnstack.app.api.routes.chsh import ChshResult +from pqnstack.app.core.config import DailyReportConfig + +logger = logging.getLogger(__name__) + + +def load_config() -> DailyReportConfig: + """Load and validate the [daily_report] section from config.toml.""" + config_path = Path(__file__).parent.parent / "config.toml" + + if not config_path.exists(): + logger.error("config.toml not found at %s", config_path) + logger.error("Please create config.toml from configs/config_app_example.toml") + sys.exit(1) + + with config_path.open("rb") as f: + raw = tomllib.load(f) + + daily_report_data = raw.get("daily_report") + if not daily_report_data: + logger.error("[daily_report] section not found in config.toml") + logger.error("Please add it following the example in configs/config_app_example.toml") + sys.exit(1) + + try: + return DailyReportConfig.model_validate(daily_report_data) + except ValidationError: + logger.exception("Invalid [daily_report] configuration") + sys.exit(1) + + +def run_chsh_measurement(config: DailyReportConfig) -> ChshResult: + """Run CHSH measurement via API.""" + logger.info("Starting CHSH measurement at %s", datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S")) + logger.info("Basis: %s", config.basis) + logger.info("Follower: %s", config.follower_node_address) + logger.info("TimeTagger: %s", config.timetagger_address) + + try: + with httpx.Client(timeout=600.0) as client: + response = client.post( + f"{config.api_url}/chsh/", + params={ + "follower_node_address": config.follower_node_address, + "timetagger_address": config.timetagger_address, + }, + json=config.basis, + ) + response.raise_for_status() + return ChshResult.model_validate(response.json()) + + except httpx.HTTPError: + logger.exception("Failed to contact CHSH API") + sys.exit(1) + + +def post_to_slack(webhook_url: str, chsh_data: ChshResult, config: DailyReportConfig) -> None: + """Post CHSH results to Slack.""" + # Determine emoji based on Bell inequality violation (CHSH > classical limit) + bell_inequality_classical_limit = 2 + emoji = ":sparkles:" if chsh_data.chsh_value > bell_inequality_classical_limit else ":thinking_face:" + + # Build fields dynamically from all returned data + fields = [] + for key, value in chsh_data.model_dump().items(): + field_name = key.replace("_", " ").title() + + if isinstance(value, float): + formatted_value = f"{value:.4f}" + elif isinstance(value, list): + if all(isinstance(x, (int, float)) for x in value): + formatted_value = "[" + ", ".join(f"{x:.4f}" if isinstance(x, float) else str(x) for x in value) + "]" + else: + formatted_value = str(value) + else: + formatted_value = str(value) + + fields.append({"type": "mrkdwn", "text": f"*{field_name}:*\n`{formatted_value}`"}) + + # Create sections with 2 fields each (Slack limit) + sections = [] + for i in range(0, len(fields), 2): + section_fields = fields[i : i + 2] + sections.append({"type": "section", "fields": section_fields}) + + # Add configuration info section + sections.append( + { + "type": "section", + "fields": [ + {"type": "mrkdwn", "text": f"*Basis:*\n`{config.basis}`"}, + {"type": "mrkdwn", "text": f"*Timestamp:*\n{datetime.now(UTC).strftime('%Y-%m-%d %H:%M:%S')}"}, + ], + } + ) + + # Format Slack message using Block Kit + slack_message = { + "blocks": [ + { + "type": "header", + "text": {"type": "plain_text", "text": f"{emoji} CHSH Daily Measurement Report", "emoji": True}, + }, + *sections, + { + "type": "context", + "elements": [ + { + "type": "mrkdwn", + "text": f"Follower: `{config.follower_node_address}` | TimeTagger: `{config.timetagger_address}`", + } + ], + }, + ] + } + + logger.info("Posting to Slack...") + + try: + with httpx.Client() as client: + response = client.post(webhook_url, json=slack_message) + + if response.text == "ok": + logger.info("Successfully posted to Slack") + else: + logger.error("Failed to post to Slack: %s", response.text) + sys.exit(1) + + except httpx.HTTPError: + logger.exception("Failed to post to Slack") + sys.exit(1) + + +def post_error_to_slack(webhook_url: str, error_message: str) -> None: + """Post error message to Slack.""" + slack_message = { + "text": f":x: CHSH Daily Report Failed\n*Error:* {error_message}\n*Time:* {datetime.now(UTC).strftime('%Y-%m-%d %H:%M:%S')}" + } + + try: + with httpx.Client() as client: + client.post(webhook_url, json=slack_message) + except httpx.HTTPError: + logger.debug("Failed to post error notification to Slack") + + +def main() -> None: + """Execute the CHSH daily report.""" + logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") + + try: + config = load_config() + + chsh_data = run_chsh_measurement(config) + + logger.info("CHSH measurement completed") + logger.info("Value: %.4f ± %.4f", chsh_data.chsh_value, chsh_data.chsh_error) + + post_to_slack(config.slack_webhook_url, chsh_data, config) + + logger.info("CHSH daily report completed successfully") + + except Exception as e: + logger.exception("Unexpected error") + + # Try to post error to Slack if possible + try: + config = load_config() + post_error_to_slack(config.slack_webhook_url, str(e)) + except Exception: # noqa: BLE001 + logger.debug("Failed to post error notification to Slack") + + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/src/pqnstack/app/api/routes/chsh.py b/src/pqnstack/app/api/routes/chsh.py index 9a12b7af..c5250aa3 100644 --- a/src/pqnstack/app/api/routes/chsh.py +++ b/src/pqnstack/app/api/routes/chsh.py @@ -1,13 +1,19 @@ +import asyncio +import json import logging +from collections.abc import AsyncGenerator from typing import TYPE_CHECKING from typing import cast from fastapi import APIRouter from fastapi import HTTPException from fastapi import status +from fastapi.responses import StreamingResponse +from pydantic import BaseModel from pqnstack.app.api.deps import ClientDep from pqnstack.app.api.deps import StateDep +from pqnstack.app.core.config import chsh_progress_event from pqnstack.app.core.config import settings from pqnstack.app.core.models import calculate_chsh_expectation_error from pqnstack.network.client import Client @@ -17,17 +23,74 @@ logger = logging.getLogger(__name__) + +class ChshResult(BaseModel): + chsh_value: float + chsh_error: float + expectation_values: list[float] + expectation_errors: list[float] + expectation_values_sign_fixed: list[float] + + router = APIRouter(prefix="/chsh", tags=["chsh"]) +@router.get("/progress") +async def chsh_progress(state: StateDep) -> StreamingResponse: + """SSE endpoint for streaming CHSH measurement progress to frontend.""" + + async def event_generator() -> AsyncGenerator[str, None]: + try: + # Send initial connection event + yield f"data: {json.dumps({'event': 'connected'})}\n\n" + + while True: + event_sent = False + + # Check for progress event + try: + await asyncio.wait_for(chsh_progress_event.wait(), timeout=1.0) + yield f"data: {json.dumps({'event': 'chsh_progress', 'current': state.chsh_progress_current, 'total': state.chsh_progress_total, 'running': state.chsh_running})}\n\n" + chsh_progress_event.clear() + event_sent = True + except TimeoutError: + pass + + # Send heartbeat if no event was sent to keep connection alive + if not event_sent: + yield ":\n" + + await asyncio.sleep(1.0) + + except asyncio.CancelledError: + logger.info("CHSH SSE connection closed by client") + raise + + return StreamingResponse( + event_generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "X-Accel-Buffering": "no", + }, + ) + + async def _chsh( # Complexity is high due to the nature of the CHSH experiment. basis: tuple[float, float], follower_node_address: str, http_client: ClientDep, timetagger_address: str, -) -> tuple[float, float]: + state: StateDep, +) -> ChshResult: logger.debug("Starting CHSH") + # Initialize progress tracking + state.chsh_running = True + state.chsh_progress_current = 0 + state.chsh_progress_total = 16 # 2 basis x 2 follower x 2 angles x 2 perp + chsh_progress_event.set() + logger.debug("Instantiating client") client = Client(host=settings.router_address, port=settings.router_port, timeout=600_000) @@ -74,6 +137,10 @@ async def _chsh( # Complexity is high due to the nature of the CHSH experiment. count = cast("int", count_ret.json()) counts.append(count) + # Update progress + state.chsh_progress_current += 1 + chsh_progress_event.set() + # Calculating expectation value numerator = counts[0] - counts[1] - counts[2] + counts[3] denominator = sum(counts) - 4 * settings.chsh_settings.measurement_config.dark_count @@ -96,16 +163,26 @@ async def _chsh( # Complexity is high due to the nature of the CHSH experiment. logger.info("Expectation errors: %s", expectation_errors) # FIXME: This is a temporary fix for handling impossible expectation values. We should not have to rely on the settings for this. - expectation_values = [ + expectation_values_sign_fixed = [ x * y for x, y in zip(expectation_values, settings.chsh_settings.expectation_signs, strict=False) ] logger.info("What are you settings? %s", settings.chsh_settings.expectation_signs) - logger.info("After passing signed calculation: %s", expectation_values) - chsh_value = sum(x for x in expectation_values) + logger.info("After passing signed calculation: %s", expectation_values_sign_fixed) + chsh_value = abs(sum(x for x in expectation_values_sign_fixed)) chsh_error = sum(x**2 for x in expectation_errors) ** 0.5 - return chsh_value, chsh_error + # Mark CHSH as complete + state.chsh_running = False + chsh_progress_event.set() + + return ChshResult( + chsh_value=chsh_value, + chsh_error=chsh_error, + expectation_values=expectation_values, + expectation_errors=expectation_errors, + expectation_values_sign_fixed=expectation_values_sign_fixed, + ) @router.post("/") @@ -114,15 +191,10 @@ async def chsh( follower_node_address: str, http_client: ClientDep, timetagger_address: str, -) -> dict[str, float]: + state: StateDep, +) -> ChshResult: logger.info("Starting CHSH experiment with basis: %s", basis) - - chsh_value, chsh_error = await _chsh(basis, follower_node_address, http_client, timetagger_address) - - return { - "chsh_value": chsh_value, - "chsh_error": chsh_error, - } + return await _chsh(basis, follower_node_address, http_client, timetagger_address, state) @router.post("/request-angle-by-basis") diff --git a/src/pqnstack/app/api/routes/coordination.py b/src/pqnstack/app/api/routes/coordination.py index 4c368807..ecd9fb8f 100644 --- a/src/pqnstack/app/api/routes/coordination.py +++ b/src/pqnstack/app/api/routes/coordination.py @@ -1,5 +1,7 @@ import asyncio +import json import logging +from collections.abc import AsyncGenerator from fastapi import APIRouter from fastapi import HTTPException @@ -7,12 +9,14 @@ from fastapi import WebSocket from fastapi import WebSocketDisconnect from fastapi import status +from fastapi.responses import StreamingResponse from pydantic import BaseModel from pqnstack.app.api.deps import ClientDep from pqnstack.app.api.deps import StateDep from pqnstack.app.core.config import NodeRole from pqnstack.app.core.config import ask_user_for_follow_event +from pqnstack.app.core.config import protocol_cancelled_event from pqnstack.app.core.config import settings from pqnstack.app.core.config import user_replied_event @@ -31,14 +35,44 @@ class ResetCoordinationStateResponse(BaseModel): message: str = "Coordination state reset successfully" +class ProtocolCancellationNotification(BaseModel): + reason: str = "Protocol cancelled by peer" + cancelled_by_role: str + + router = APIRouter(prefix="/coordination", tags=["coordination"]) # TODO: Send a disconnection message if I was following/leading someone. # FIXME: This is technically resetting more than just coordination state. including qkd. @router.post("/reset_coordination_state") -async def reset_coordination_state(state: StateDep) -> ResetCoordinationStateResponse: +async def reset_coordination_state(state: StateDep, http_client: ClientDep) -> ResetCoordinationStateResponse: """Reset the coordination state of the node.""" + # Notify peer node BEFORE resetting state + peer_address = None + current_role = state.role + + if state.role == NodeRole.LEADER and state.followers_address: + peer_address = state.followers_address + elif state.role == NodeRole.FOLLOWER and state.leaders_address: + peer_address = state.leaders_address + + # Try to notify peer (best-effort, don't fail if peer is unreachable) + if peer_address: + try: + logger.info("Notifying peer at %s about protocol cancellation", peer_address) + await http_client.post( + f"http://{peer_address}/coordination/protocol_cancelled", + json={"reason": "Protocol cancelled by user", "cancelled_by_role": current_role.value}, + timeout=5.0, # Short timeout to avoid hanging + ) + except Exception as e: # noqa: BLE001 + logger.warning("Failed to notify peer about cancellation: %s. Proceeding with reset.", str(e)) + + # Set local cancellation event to unblock any waiting operations + protocol_cancelled_event.set() + + # Reset state state.role = NodeRole.INDEPENDENT state.followers_address = "" state.following_requested = False @@ -55,9 +89,29 @@ async def reset_coordination_state(state: StateDep) -> ResetCoordinationStateRes state.qkd_request_basis_list = [] state.qkd_request_bit_list = [] state.qkd_n_matching_bits = -1 + + # Clear the cancellation event for next use + protocol_cancelled_event.clear() + return ResetCoordinationStateResponse() +@router.post("/protocol_cancelled") +async def protocol_cancelled( + notification: ProtocolCancellationNotification, +) -> dict[str, str]: + """Receive notification that peer node cancelled the protocol.""" + logger.info("Received protocol cancellation from %s: %s", notification.cancelled_by_role, notification.reason) + protocol_cancelled_event.set() + + # Give waiting operations a chance to wake up and handle the cancellation + # Then clear for the next operation + await asyncio.sleep(0.5) + protocol_cancelled_event.clear() + + return {"status": "acknowledged"} + + @router.post("/collect_follower") async def collect_follower( request: Request, address: str, state: StateDep, http_client: ClientDep @@ -127,7 +181,27 @@ async def follow_requested( ask_user_for_follow_event.set() logger.debug("Asking user to accept follow request from %s (%s)", leaders_name, leaders_address) - await user_replied_event.wait() # Wait for a state change event to see if user accepted + + # Wait for EITHER user reply OR cancellation + _done, pending = await asyncio.wait( + [asyncio.create_task(user_replied_event.wait()), asyncio.create_task(protocol_cancelled_event.wait())], + return_when=asyncio.FIRST_COMPLETED, + ) + + # Cancel pending tasks + for task in pending: + task.cancel() + + # Check if protocol was cancelled + if protocol_cancelled_event.is_set(): + logger.warning("Follow request cancelled") + # Clean up state + state.leaders_address = "" + state.leaders_name = "" + state.following_requested = False + state.following_requested_user_response = None + return FollowRequestResponse(accepted=False) + user_replied_event.clear() # Reset the event for the next change if state.following_requested_user_response: logger.debug("Follow request from %s accepted.", leaders_address) @@ -193,3 +267,44 @@ async def client_message_handler() -> None: finally: state_change_task.cancel() client_message_task.cancel() + + +@router.get("/state_events") +async def state_events(state: StateDep) -> StreamingResponse: + """SSE endpoint for streaming state change events to frontend.""" + + async def event_generator() -> AsyncGenerator[str, None]: + try: + # Send initial connection event + yield f"data: {json.dumps({'event': 'connected', 'role': state.role.value})}\n\n" + + while True: + event_sent = False + + # Check for cancellation event + try: + await asyncio.wait_for(protocol_cancelled_event.wait(), timeout=1.0) + yield f"data: {json.dumps({'event': 'protocol_cancelled', 'reason': 'Protocol cancelled by peer or user'})}\n\n" + protocol_cancelled_event.clear() + event_sent = True + except TimeoutError: + pass + + # Send heartbeat if no event was sent to keep connection alive + if not event_sent: + yield ":\n" + + await asyncio.sleep(1.0) + + except asyncio.CancelledError: + logger.info("SSE connection closed by client") + raise + + return StreamingResponse( + event_generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "X-Accel-Buffering": "no", + }, + ) diff --git a/src/pqnstack/app/api/routes/qkd.py b/src/pqnstack/app/api/routes/qkd.py index 456ac4d9..fc090bc9 100644 --- a/src/pqnstack/app/api/routes/qkd.py +++ b/src/pqnstack/app/api/routes/qkd.py @@ -15,6 +15,7 @@ from pqnstack.app.api.deps import StateDep from pqnstack.app.core.config import NodeRole from pqnstack.app.core.config import NodeState +from pqnstack.app.core.config import protocol_cancelled_event from pqnstack.app.core.config import qkd_result_received_event from pqnstack.app.core.config import settings from pqnstack.constants import BasisBool @@ -296,8 +297,21 @@ async def submit_result(result: QKDResult, state: StateDep) -> None: async def _wait_for_follower_ready(state: NodeState, http_client: httpx.AsyncClient) -> None: """Poll the follower until it's ready, checking every 0.5 seconds.""" ready = False + first_503 = True while not ready: + # Check if protocol was cancelled + if protocol_cancelled_event.is_set(): + logger.warning("Protocol cancelled while waiting for follower ready") + raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Protocol cancelled by peer or user") + r = await http_client.get(f"http://{state.followers_address}/qkd/is_follower_ready") + + # If the follower disconnects while the leader is waiting, the 503 error of `Node is not a follower` error might come before we can handle the cancellation event. + if r.status_code == status.HTTP_503_SERVICE_UNAVAILABLE and first_503: + logger.warning("Received QKD result from follower: %s", r) + first_503 = False + continue + if r.status_code != status.HTTP_200_OK: logger.error("Failed to check if follower is ready: %s", r.text) raise HTTPException( @@ -308,9 +322,9 @@ async def _wait_for_follower_ready(state: NodeState, http_client: httpx.AsyncCli ready = r.json() if not ready: logger.info("Follower is not ready yet, waiting.") - await asyncio.sleep(0.5) + await asyncio.sleep(0.4) # Make sure this is smaller than the protocol_cancelled event clear timer. - logger.info("Follower ready is ready") + logger.info("Follower is ready") async def _submit_result_to_follower(state: NodeState, http_client: httpx.AsyncClient, qkd_result: QKDResult) -> None: @@ -352,8 +366,23 @@ async def _submit_basis_list_follower(state: NodeState, basis_list: list[QKDEnco # don't wait for the event if the result is already set. This avoids deadlocks in case the result was set before this function is called. if state.qkd_n_matching_bits == -1: - # Wait until the leader submits the QKD result - await qkd_result_received_event.wait() + # Wait for EITHER result OR cancellation + _done, pending = await asyncio.wait( + [ + asyncio.create_task(qkd_result_received_event.wait()), + asyncio.create_task(protocol_cancelled_event.wait()), + ], + return_when=asyncio.FIRST_COMPLETED, + ) + + # Cancel pending tasks + for task in pending: + task.cancel() + + # Check if protocol was cancelled + if protocol_cancelled_event.is_set(): + logger.warning("Protocol cancelled while waiting for QKD result") + raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Protocol cancelled by peer or user") # Reassemble the QKDResult object from the state qkd_result = QKDResult( diff --git a/src/pqnstack/app/api/routes/rng.py b/src/pqnstack/app/api/routes/rng.py index db9d27e8..200ac0ad 100644 --- a/src/pqnstack/app/api/routes/rng.py +++ b/src/pqnstack/app/api/routes/rng.py @@ -1,4 +1,7 @@ +import asyncio +import json import logging +from collections.abc import AsyncGenerator from typing import Annotated from typing import Any @@ -6,14 +9,58 @@ from fastapi import HTTPException from fastapi import Query from fastapi import status +from fastapi.responses import StreamingResponse from pqnstack.app.api.deps import ClientDep +from pqnstack.app.api.deps import StateDep +from pqnstack.app.core.config import rng_progress_event logger = logging.getLogger(__name__) router = APIRouter(prefix="/rng", tags=["rng"]) +@router.get("/progress") +async def rng_progress(state: StateDep) -> StreamingResponse: + """SSE endpoint for streaming RNG fortune measurement progress to frontend.""" + + async def event_generator() -> AsyncGenerator[str, None]: + try: + # Send initial connection event + yield f"data: {json.dumps({'event': 'connected'})}\n\n" + + while True: + event_sent = False + + # Check for progress event + try: + await asyncio.wait_for(rng_progress_event.wait(), timeout=1.0) + yield f"data: {json.dumps({'event': 'rng_progress', 'current': state.rng_progress_current, 'total': state.rng_progress_total, 'running': state.rng_running})}\n\n" + rng_progress_event.clear() + event_sent = True + except TimeoutError: + pass + + # Send heartbeat if no event was sent to keep connection alive + if not event_sent: + yield ":\n" + + await asyncio.sleep(1.0) + + except asyncio.CancelledError: + logger.info("RNG SSE connection closed by client") + raise + + return StreamingResponse( + event_generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "X-Accel-Buffering": "no", + }, + ) + + @router.get("/singles_parity") async def singles_parity( timetagger_address: str, @@ -52,17 +99,24 @@ async def singles_parity( @router.get("/fortune") -async def fortune( +async def fortune( # noqa: PLR0913 timetagger_address: str, integration_time_s: float, fortune_size: int, http_client: ClientDep, + state: StateDep, channels: Annotated[list[int], Query()], ) -> list[int]: """Run singles parity `fortune_size` times and, per channel, interpret the result in bitstring as a decimal number.""" if fortune_size <= 0: raise HTTPException(status_code=400, detail="fortune_size must be a positive integer") + # Initialize progress tracking + state.rng_running = True + state.rng_progress_current = 0 + state.rng_progress_total = fortune_size + rng_progress_event.set() + trials: list[list[int]] = [] for _ in range(fortune_size): params: list[tuple[str, str | int | float | bool | None]] = [ @@ -75,6 +129,10 @@ async def fortune( parities = await http_client.get(url, params=params) trials.append(parities.json()) + # Update progress + state.rng_progress_current += 1 + rng_progress_event.set() + results: list[int] = [] for bits_for_channel in zip(*trials, strict=True): value = 0 @@ -88,4 +146,9 @@ async def fortune( fortune_size, results, ) + + # Mark RNG as complete + state.rng_running = False + rng_progress_event.set() + return results diff --git a/src/pqnstack/app/core/config.py b/src/pqnstack/app/core/config.py index 61f9f1c7..0505b17c 100644 --- a/src/pqnstack/app/core/config.py +++ b/src/pqnstack/app/core/config.py @@ -17,6 +17,14 @@ logger = logging.getLogger(__name__) +class DailyReportConfig(BaseModel): + slack_webhook_url: str + follower_node_address: str + api_url: str = "http://localhost:8000" + timetagger_address: str = "127.0.0.1:8000" + basis: list[float] = Field(default_factory=lambda: [0.0, 22.5]) + + class CHSHSettings(BaseModel): # Specifies which half waveplate to use for the CHSH experiment. First value is the provider's name, second is the motor name. hwp: tuple[str, str] = ("", "") @@ -47,7 +55,12 @@ class Settings(BaseSettings): rotary_encoder_address: str = "/dev/ttyACM0" virtual_rotator: bool = False # If True, use terminal input instead of hardware rotary encoder - model_config = SettingsConfigDict(toml_file="./config.toml", env_file=".env", env_file_encoding="utf-8") + model_config = SettingsConfigDict( + toml_file="./config.toml", + env_file=".env", + env_file_encoding="utf-8", + extra="ignore", # Allow extra fields in config.toml (e.g., daily_report) + ) @classmethod def settings_customise_sources( @@ -102,11 +115,17 @@ class NodeState(BaseModel): # CHSH state chsh_request_basis: list[float] = [22.5, 67.5] + chsh_progress_current: int = 0 # Current iteration in CHSH measurement + chsh_progress_total: int = 16 # Total iterations (2 basis x 2 follower x 2 angles x 2 perp) + chsh_running: bool = False # Whether CHSH measurement is currently running # QKD state # FIXME: At the moment the reset_coordination_state resets this, probably want to refactor that function out. qkd_question_order: list[int] = [] # Order of questions for QKD qkd_emoji_pick: str = "" # Emoji chosen for QKD + qkd_progress_current: int = 0 # Current iteration in QKD measurement + qkd_progress_total: int = 11 # Total iterations (bitstring length) + qkd_running: bool = False # Whether QKD measurement is currently running qkd_leader_basis_list: list[QKDEncodingBasis] = [ QKDEncodingBasis.DA, QKDEncodingBasis.DA, @@ -128,11 +147,19 @@ class NodeState(BaseModel): qkd_request_bit_list: list[int] = [] qkd_n_matching_bits: int = -1 # Leaders populate this value after qkd is done. Same with the emoji + # RNG state + rng_progress_current: int = 0 # Current iteration in RNG fortune measurement + rng_progress_total: int = 0 # Total iterations (fortune_size) + rng_running: bool = False # Whether RNG fortune measurement is currently running + state = NodeState() ask_user_for_follow_event = asyncio.Event() user_replied_event = asyncio.Event() qkd_result_received_event = asyncio.Event() +protocol_cancelled_event = asyncio.Event() +chsh_progress_event = asyncio.Event() +rng_progress_event = asyncio.Event() def get_state() -> NodeState: