Skip to content

Commit 492d77a

Browse files
committed
Fix Q+A log interleaving in high concurrency by printing them together
1 parent 126455d commit 492d77a

File tree

1 file changed

+78
-4
lines changed

1 file changed

+78
-4
lines changed

codex_gateway/server.py

Lines changed: 78 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,9 @@ def reset(self) -> "RequestStats":
298298
_request_stats = RequestStats()
299299
_STATS_INTERVAL_SECONDS = 60 # Report every 60 seconds
300300

301+
# Store pending questions for Q+A pairing in high concurrency
302+
_pending_questions: dict[str, str] = {}
303+
301304

302305
def _maybe_print_stats() -> None:
303306
"""Print stats summary if interval has passed."""
@@ -407,6 +410,62 @@ def _maybe_print_markdown(
407410
return True
408411

409412

413+
def _print_qa_together(
414+
resp_id: str,
415+
question: str,
416+
answer: str,
417+
*,
418+
duration_ms: int | None = None,
419+
usage: dict[str, int] | None = None,
420+
) -> bool:
421+
"""
422+
Print Question and Answer together in a single grouped panel.
423+
This ensures Q and A stay together even under high concurrency.
424+
"""
425+
if not settings.log_render_markdown:
426+
return False
427+
if not question and not answer:
428+
return False
429+
try:
430+
from rich.console import Console
431+
from rich.markdown import Markdown
432+
from rich.panel import Panel
433+
from rich.console import Group
434+
except Exception:
435+
return False
436+
437+
global _RICH_CONSOLE
438+
if _RICH_CONSOLE is None:
439+
_RICH_CONSOLE = Console(stderr=True)
440+
441+
console: Console = _RICH_CONSOLE # type: ignore[assignment]
442+
short = _short_id(resp_id)
443+
444+
# Prepare Q panel content
445+
panel_limit = max(settings.log_max_chars * 10, 50000)
446+
q_payload = question[:panel_limit] if len(question) > panel_limit else question
447+
q_payload = q_payload.rstrip("\n")
448+
q_title = f"📝 Question [{short}] 📏 {len(question):,} chars"
449+
q_panel = Panel(Markdown(q_payload), title=q_title, border_style="cyan", expand=False)
450+
451+
# Prepare A panel content
452+
a_payload = answer[:panel_limit] if len(answer) > panel_limit else answer
453+
a_payload = a_payload.rstrip("\n")
454+
parts = [f"✅ Answer [{short}]"]
455+
if duration_ms:
456+
parts.append(f"⏱️ {duration_ms/1000:.1f}s")
457+
if usage:
458+
total = usage.get("prompt_tokens", 0) + usage.get("completion_tokens", 0)
459+
if total > 0:
460+
parts.append(f"🔢 {total:,} tokens")
461+
a_title = " ".join(parts)
462+
a_panel = Panel(Markdown(a_payload), title=a_title, border_style="green", expand=False)
463+
464+
# Print both panels as a group (atomically)
465+
console.print(Group(q_panel, a_panel))
466+
return True
467+
468+
410469
def _print_error_panel(resp_id: str, error_msg: str, status_code: int = 500) -> None:
411470
"""Print error in a red panel for visibility."""
412471
try:
@@ -918,8 +977,8 @@ def _normalize_effort(raw: str | None) -> str | None:
918977

919978
q = "\n\n---\n\n".join(q_parts) if q_parts else ""
920979
if q:
921-
if not _maybe_print_markdown(resp_id, "Q", q):
922-
logger.info("[%s] Q:\n%s", resp_id, _truncate_for_log(q))
980+
# Store question for later pairing with answer (avoids interleaving in high concurrency)
981+
_pending_questions[resp_id] = q
923982
elif log_mode == "full":
924983
logger.info("[%s] PROMPT:\n%s", resp_id, _truncate_for_log(prompt))
925984

@@ -1321,17 +1380,23 @@ async def _run_codex_once(model_id: str):
13211380
_maybe_print_stats()
13221381

13231382
if log_mode == "qa" and text:
1324-
if not _maybe_print_markdown(resp_id, "A", text, duration_ms=duration_ms, usage=usage):
1383+
# Retrieve stored question and print Q+A together
1384+
stored_q = _pending_questions.pop(resp_id, "")
1385+
if not _print_qa_together(resp_id, stored_q, text, duration_ms=duration_ms, usage=usage):
13251386
# Fallback to plain logging
13261387
usage_str = f" usage={usage}" if isinstance(usage, dict) else ""
13271388
logger.info("[%s] response status=200 duration_ms=%d chars=%d%s", resp_id, duration_ms, len(text), usage_str)
1389+
if stored_q:
1390+
logger.info("[%s] Q:\n%s", resp_id, _truncate_for_log(stored_q))
13281391
logger.info("[%s] A:\n%s", resp_id, _truncate_for_log(text))
13291392
elif log_mode == "full" and text:
1393+
_pending_questions.pop(resp_id, None) # Clean up
13301394
if not _maybe_print_markdown(resp_id, "RESPONSE", text, duration_ms=duration_ms, usage=usage):
13311395
usage_str = f" usage={usage}" if isinstance(usage, dict) else ""
13321396
logger.info("[%s] response status=200 duration_ms=%d chars=%d%s", resp_id, duration_ms, len(text), usage_str)
13331397
logger.info("[%s] RESPONSE:\n%s", resp_id, _truncate_for_log(text))
13341398
elif not settings.log_render_markdown:
1399+
_pending_questions.pop(resp_id, None) # Clean up
13351400
# Only log summary when not using rich panels
13361401
usage_str = f" usage={usage}" if isinstance(usage, dict) else ""
13371402
logger.info("[%s] response status=200 duration_ms=%d chars=%d%s", resp_id, duration_ms, len(text), usage_str)
@@ -1724,16 +1789,22 @@ async def _pump_events() -> None:
17241789
_maybe_print_stats()
17251790

17261791
if log_mode == "qa" and assembled:
1727-
if not _maybe_print_markdown(resp_id, "A", assembled, duration_ms=duration_ms, usage=stream_usage):
1792+
# Retrieve stored question and print Q+A together
1793+
stored_q = _pending_questions.pop(resp_id, "")
1794+
if not _print_qa_together(resp_id, stored_q, assembled, duration_ms=duration_ms, usage=stream_usage):
17281795
usage_str = f" usage={stream_usage}" if isinstance(stream_usage, dict) else ""
17291796
logger.info("[%s] response status=200 duration_ms=%d chars=%d%s", resp_id, duration_ms, len(assembled), usage_str)
1797+
if stored_q:
1798+
logger.info("[%s] Q:\n%s", resp_id, _truncate_for_log(stored_q))
17301799
logger.info("[%s] A:\n%s", resp_id, _truncate_for_log(assembled))
17311800
elif log_mode == "full" and assembled:
1801+
_pending_questions.pop(resp_id, None) # Clean up
17321802
if not _maybe_print_markdown(resp_id, "RESPONSE", assembled, duration_ms=duration_ms, usage=stream_usage):
17331803
usage_str = f" usage={stream_usage}" if isinstance(stream_usage, dict) else ""
17341804
logger.info("[%s] response status=200 duration_ms=%d chars=%d%s", resp_id, duration_ms, len(assembled), usage_str)
17351805
logger.info("[%s] RESPONSE:\n%s", resp_id, _truncate_for_log(assembled))
17361806
elif not settings.log_render_markdown:
1807+
_pending_questions.pop(resp_id, None) # Clean up
17371808
usage_str = f" usage={stream_usage}" if isinstance(stream_usage, dict) else ""
17381809
logger.info("[%s] response status=200 duration_ms=%d chars=%d%s", resp_id, duration_ms, len(assembled), usage_str)
17391810

@@ -1742,12 +1813,14 @@ async def _pump_events() -> None:
17421813
error_msg = f"Request timed out after {settings.timeout_seconds}s"
17431814
logger.error("[%s] error status=504 timeout_seconds=%d", resp_id, settings.timeout_seconds)
17441815
_active_requests -= 1
1816+
_pending_questions.pop(resp_id, None) # Clean up
17451817
_request_stats.record_failure()
17461818
_print_error_panel(resp_id, error_msg, 504)
17471819
return _openai_error(error_msg, status_code=504)
17481820
except HTTPException:
17491821
# Let FastAPI handle already-structured HTTP errors (auth, validation, etc.).
17501822
_active_requests -= 1
1823+
_pending_questions.pop(resp_id, None) # Clean up
17511824
_request_stats.record_failure()
17521825
raise
17531826
except Exception as e:
@@ -1756,6 +1829,7 @@ async def _pump_events() -> None:
17561829
error_msg = str(e)
17571830
logger.error("[%s] error status=%d %s", resp_id, status, _truncate_for_log(error_msg))
17581831
_active_requests -= 1
1832+
_pending_questions.pop(resp_id, None) # Clean up
17591833
_request_stats.record_failure()
17601834
_print_error_panel(resp_id, error_msg, status)
17611835
return _openai_error(error_msg, status_code=status)

0 commit comments

Comments
 (0)