Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion configs/config_app_example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ bell_state = 0
[chsh_settings]
hwp = ["provider", "instrument_hwp"] # Replace with actual HWP names
request_hwp = ["provider", "instrument_hwp"]
expectation_signs = [-1, 1, 1, 1] # This is for HV + VH. for HH + VV, the signs are [1, -1, 1, 1]

# CHSH measurement configuration
[chsh_settings.measurement_config]
Expand All @@ -42,4 +43,12 @@ integration_time_s = 5
binwidth = 500
channel1 = 1
channel2 = 2
dark_count = 0
dark_count = 0

# Daily CHSH report settings (for automated Slack reporting)
[daily_report]
slack_webhook_url = "https://hooks.slack.com/services/YOUR/WEBHOOK/URL" # Get from https://api.slack.com/apps
api_url = "http://localhost:8000" # API endpoint (usually localhost if running on same machine)
timetagger_address = "127.0.0.1:8000" # TimeTagger address
follower_node_address = "192.168.1.100:9000" # Replace with actual follower node address
basis = [0, 22.5] # CHSH basis angles to use for daily measurements
199 changes: 199 additions & 0 deletions scripts/chsh_daily_report.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
#!/usr/bin/env python3
"""
CHSH Daily Report Script.

Runs CHSH measurement and posts results to Slack.

Reads all configuration from config.toml (including Slack webhook URL).

Usage:
uv run scripts/chsh_daily_report.py
"""

import logging
import sys
import tomllib
from datetime import UTC
from datetime import datetime
from pathlib import Path

import httpx
from pydantic import ValidationError

from pqnstack.app.api.routes.chsh import ChshResult
from pqnstack.app.core.config import DailyReportConfig

logger = logging.getLogger(__name__)


def load_config() -> DailyReportConfig:
"""Load and validate the [daily_report] section from config.toml."""
config_path = Path(__file__).parent.parent / "config.toml"

if not config_path.exists():
logger.error("config.toml not found at %s", config_path)
logger.error("Please create config.toml from configs/config_app_example.toml")
sys.exit(1)

with config_path.open("rb") as f:
raw = tomllib.load(f)

daily_report_data = raw.get("daily_report")
if not daily_report_data:
logger.error("[daily_report] section not found in config.toml")
logger.error("Please add it following the example in configs/config_app_example.toml")
sys.exit(1)

try:
return DailyReportConfig.model_validate(daily_report_data)
except ValidationError:
logger.exception("Invalid [daily_report] configuration")
sys.exit(1)


def run_chsh_measurement(config: DailyReportConfig) -> ChshResult:
"""Run CHSH measurement via API."""
logger.info("Starting CHSH measurement at %s", datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S"))
logger.info("Basis: %s", config.basis)
logger.info("Follower: %s", config.follower_node_address)
logger.info("TimeTagger: %s", config.timetagger_address)

try:
with httpx.Client(timeout=600.0) as client:
response = client.post(
f"{config.api_url}/chsh/",
params={
"follower_node_address": config.follower_node_address,
"timetagger_address": config.timetagger_address,
},
json=config.basis,
)
response.raise_for_status()
return ChshResult.model_validate(response.json())

except httpx.HTTPError:
logger.exception("Failed to contact CHSH API")
sys.exit(1)


def post_to_slack(webhook_url: str, chsh_data: ChshResult, config: DailyReportConfig) -> None:
"""Post CHSH results to Slack."""
# Determine emoji based on Bell inequality violation (CHSH > classical limit)
bell_inequality_classical_limit = 2
emoji = ":sparkles:" if chsh_data.chsh_value > bell_inequality_classical_limit else ":thinking_face:"

# Build fields dynamically from all returned data
fields = []
for key, value in chsh_data.model_dump().items():
field_name = key.replace("_", " ").title()

if isinstance(value, float):
formatted_value = f"{value:.4f}"
elif isinstance(value, list):
if all(isinstance(x, (int, float)) for x in value):
formatted_value = "[" + ", ".join(f"{x:.4f}" if isinstance(x, float) else str(x) for x in value) + "]"
else:
formatted_value = str(value)
else:
formatted_value = str(value)

fields.append({"type": "mrkdwn", "text": f"*{field_name}:*\n`{formatted_value}`"})

# Create sections with 2 fields each (Slack limit)
sections = []
for i in range(0, len(fields), 2):
section_fields = fields[i : i + 2]
sections.append({"type": "section", "fields": section_fields})

# Add configuration info section
sections.append(
{
"type": "section",
"fields": [
{"type": "mrkdwn", "text": f"*Basis:*\n`{config.basis}`"},
{"type": "mrkdwn", "text": f"*Timestamp:*\n{datetime.now(UTC).strftime('%Y-%m-%d %H:%M:%S')}"},
],
}
)

# Format Slack message using Block Kit
slack_message = {
"blocks": [
{
"type": "header",
"text": {"type": "plain_text", "text": f"{emoji} CHSH Daily Measurement Report", "emoji": True},
},
*sections,
{
"type": "context",
"elements": [
{
"type": "mrkdwn",
"text": f"Follower: `{config.follower_node_address}` | TimeTagger: `{config.timetagger_address}`",
}
],
},
]
}

logger.info("Posting to Slack...")

try:
with httpx.Client() as client:
response = client.post(webhook_url, json=slack_message)

if response.text == "ok":
logger.info("Successfully posted to Slack")
else:
logger.error("Failed to post to Slack: %s", response.text)
sys.exit(1)

except httpx.HTTPError:
logger.exception("Failed to post to Slack")
sys.exit(1)


def post_error_to_slack(webhook_url: str, error_message: str) -> None:
"""Post error message to Slack."""
slack_message = {
"text": f":x: CHSH Daily Report Failed\n*Error:* {error_message}\n*Time:* {datetime.now(UTC).strftime('%Y-%m-%d %H:%M:%S')}"
}

try:
with httpx.Client() as client:
client.post(webhook_url, json=slack_message)
except httpx.HTTPError:
logger.debug("Failed to post error notification to Slack")


def main() -> None:
"""Execute the CHSH daily report."""
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")

try:
config = load_config()

chsh_data = run_chsh_measurement(config)

logger.info("CHSH measurement completed")
logger.info("Value: %.4f ± %.4f", chsh_data.chsh_value, chsh_data.chsh_error)

post_to_slack(config.slack_webhook_url, chsh_data, config)

logger.info("CHSH daily report completed successfully")

except Exception as e:
logger.exception("Unexpected error")

# Try to post error to Slack if possible
try:
config = load_config()
post_error_to_slack(config.slack_webhook_url, str(e))
except Exception: # noqa: BLE001
logger.debug("Failed to post error notification to Slack")

sys.exit(1)


if __name__ == "__main__":
main()
98 changes: 85 additions & 13 deletions src/pqnstack/app/api/routes/chsh.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
import asyncio
import json
import logging
from collections.abc import AsyncGenerator
from typing import TYPE_CHECKING
from typing import cast

from fastapi import APIRouter
from fastapi import HTTPException
from fastapi import status
from fastapi.responses import StreamingResponse
from pydantic import BaseModel

from pqnstack.app.api.deps import ClientDep
from pqnstack.app.api.deps import StateDep
from pqnstack.app.core.config import chsh_progress_event
from pqnstack.app.core.config import settings
from pqnstack.app.core.models import calculate_chsh_expectation_error
from pqnstack.network.client import Client
Expand All @@ -17,17 +23,74 @@

logger = logging.getLogger(__name__)


class ChshResult(BaseModel):
chsh_value: float
chsh_error: float
expectation_values: list[float]
expectation_errors: list[float]
expectation_values_sign_fixed: list[float]


router = APIRouter(prefix="/chsh", tags=["chsh"])


@router.get("/progress")
async def chsh_progress(state: StateDep) -> StreamingResponse:
"""SSE endpoint for streaming CHSH measurement progress to frontend."""

async def event_generator() -> AsyncGenerator[str, None]:
try:
# Send initial connection event
yield f"data: {json.dumps({'event': 'connected'})}\n\n"

while True:
event_sent = False

# Check for progress event
try:
await asyncio.wait_for(chsh_progress_event.wait(), timeout=1.0)
yield f"data: {json.dumps({'event': 'chsh_progress', 'current': state.chsh_progress_current, 'total': state.chsh_progress_total, 'running': state.chsh_running})}\n\n"
chsh_progress_event.clear()
event_sent = True
except TimeoutError:
pass

# Send heartbeat if no event was sent to keep connection alive
if not event_sent:
yield ":\n"

await asyncio.sleep(1.0)

except asyncio.CancelledError:
logger.info("CHSH SSE connection closed by client")
raise

return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
},
)


async def _chsh( # Complexity is high due to the nature of the CHSH experiment.
basis: tuple[float, float],
follower_node_address: str,
http_client: ClientDep,
timetagger_address: str,
) -> tuple[float, float]:
state: StateDep,
) -> ChshResult:
logger.debug("Starting CHSH")

# Initialize progress tracking
state.chsh_running = True
state.chsh_progress_current = 0
state.chsh_progress_total = 16 # 2 basis x 2 follower x 2 angles x 2 perp
chsh_progress_event.set()

logger.debug("Instantiating client")
client = Client(host=settings.router_address, port=settings.router_port, timeout=600_000)

Expand Down Expand Up @@ -74,6 +137,10 @@ async def _chsh( # Complexity is high due to the nature of the CHSH experiment.
count = cast("int", count_ret.json())
counts.append(count)

# Update progress
state.chsh_progress_current += 1
chsh_progress_event.set()

# Calculating expectation value
numerator = counts[0] - counts[1] - counts[2] + counts[3]
denominator = sum(counts) - 4 * settings.chsh_settings.measurement_config.dark_count
Expand All @@ -96,16 +163,26 @@ async def _chsh( # Complexity is high due to the nature of the CHSH experiment.
logger.info("Expectation errors: %s", expectation_errors)

# FIXME: This is a temporary fix for handling impossible expectation values. We should not have to rely on the settings for this.
expectation_values = [
expectation_values_sign_fixed = [
x * y for x, y in zip(expectation_values, settings.chsh_settings.expectation_signs, strict=False)
]
logger.info("What are you settings? %s", settings.chsh_settings.expectation_signs)

logger.info("After passing signed calculation: %s", expectation_values)
chsh_value = sum(x for x in expectation_values)
logger.info("After passing signed calculation: %s", expectation_values_sign_fixed)
chsh_value = abs(sum(x for x in expectation_values_sign_fixed))
chsh_error = sum(x**2 for x in expectation_errors) ** 0.5

return chsh_value, chsh_error
# Mark CHSH as complete
state.chsh_running = False
chsh_progress_event.set()

return ChshResult(
chsh_value=chsh_value,
chsh_error=chsh_error,
expectation_values=expectation_values,
expectation_errors=expectation_errors,
expectation_values_sign_fixed=expectation_values_sign_fixed,
)


@router.post("/")
Expand All @@ -114,15 +191,10 @@ async def chsh(
follower_node_address: str,
http_client: ClientDep,
timetagger_address: str,
) -> dict[str, float]:
state: StateDep,
) -> ChshResult:
logger.info("Starting CHSH experiment with basis: %s", basis)

chsh_value, chsh_error = await _chsh(basis, follower_node_address, http_client, timetagger_address)

return {
"chsh_value": chsh_value,
"chsh_error": chsh_error,
}
return await _chsh(basis, follower_node_address, http_client, timetagger_address, state)


@router.post("/request-angle-by-basis")
Expand Down
Loading