Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions openhands-agent-server/openhands/agent_server/event_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,25 @@

def normalize_datetime_to_server_timezone(dt: datetime) -> datetime:
"""
Normalize datetime to server timezone for consistent comparison.
Normalize datetime to server timezone for consistent comparison with events.

If the datetime has timezone info, convert to server native timezone.
Event timestamps are stored as naive datetimes in server local time.
This function ensures filter datetimes are also naive in server local time
so they can be compared correctly.

If the datetime has timezone info, convert to server native timezone and
strip the tzinfo to make it naive.
If it's naive (no timezone), assume it's already in server timezone.

Args:
dt: Input datetime (may be timezone-aware or naive)

Returns:
Datetime in server native timezone (timezone-aware)
Naive datetime in server local time
"""
if dt.tzinfo is not None:
# Timezone-aware: convert to server native timezone
return dt.astimezone(None)
# Timezone-aware: convert to server native timezone, then make naive
return dt.astimezone(None).replace(tzinfo=None)
else:
# Naive datetime: assume it's already in server timezone
return dt
Expand Down
129 changes: 120 additions & 9 deletions openhands-agent-server/openhands/agent_server/sockets.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@

import logging
from dataclasses import dataclass
from typing import Annotated
from datetime import datetime
from typing import Annotated, Literal
from uuid import UUID

from fastapi import (
Expand All @@ -24,6 +25,7 @@
from openhands.agent_server.conversation_service import (
get_default_conversation_service,
)
from openhands.agent_server.event_router import normalize_datetime_to_server_timezone
from openhands.agent_server.models import BashEventBase, ExecuteBashRequest
from openhands.agent_server.pub_sub import Subscriber
from openhands.sdk import Event, Message
Expand Down Expand Up @@ -91,9 +93,54 @@ async def events_socket(
conversation_id: UUID,
websocket: WebSocket,
session_api_key: Annotated[str | None, Query(alias="session_api_key")] = None,
resend_all: Annotated[bool, Query()] = False,
resend_mode: Annotated[
Literal["all", "since"] | None,
Query(
description=(
"Mode for resending historical events on connect. "
"'all' sends all events, 'since' sends events after 'after_timestamp'."
)
),
] = None,
after_timestamp: Annotated[
datetime | None,
Query(
description=(
"Required when resend_mode='since'. Events with timestamp >= this "
"value will be sent. Accepts ISO 8601 format. Timezone-aware "
"datetimes are converted to server local time; naive datetimes "
"assumed in server timezone."
)
),
] = None,
# Deprecated parameter - kept for backward compatibility
resend_all: Annotated[
bool,
Query(
include_in_schema=False,
deprecated=True,
),
] = False,
):
"""WebSocket endpoint for conversation events."""
"""WebSocket endpoint for conversation events.

Args:
conversation_id: The conversation ID to subscribe to.
websocket: The WebSocket connection.
session_api_key: Optional API key for authentication.
resend_mode: Mode for resending historical events on connect.
- 'all': Resend all existing events
- 'since': Resend events after 'after_timestamp' (requires after_timestamp)
- None: Don't resend, just subscribe to new events
after_timestamp: Required when resend_mode='since'. Events with
timestamp >= this value will be sent. Timestamps are interpreted in
server local time. Timezone-aware datetimes are converted to server
timezone. Enables efficient bi-directional loading where REST fetches
historical events and WebSocket handles events after a specific point.
resend_all: DEPRECATED. Use resend_mode='all' instead. Kept for
backward compatibility - if True and resend_mode is None, behaves
as resend_mode='all'.
"""
if not await _accept_authenticated_websocket(websocket, session_api_key):
return

Expand All @@ -108,12 +155,44 @@ async def events_socket(
_WebSocketSubscriber(websocket)
)

# Determine effective resend mode (handle deprecated resend_all)
effective_mode = resend_mode
if effective_mode is None and resend_all:
logger.warning(
"resend_all is deprecated, use resend_mode='all' instead: "
f"{conversation_id}"
)
effective_mode = "all"

# Normalize timezone-aware datetimes to server timezone
normalized_after_timestamp = (
normalize_datetime_to_server_timezone(after_timestamp)
if after_timestamp
else None
)

try:
# Resend all existing events if requested
if resend_all:
logger.info(f"Resending events: {conversation_id}")
# Resend existing events based on mode
if effective_mode == "all":
logger.info(f"Resending all events: {conversation_id}")
async for event in page_iterator(event_service.search_events):
await _send_event(event, websocket)
elif effective_mode == "since":
if not normalized_after_timestamp:
logger.warning(
f"resend_mode='since' requires after_timestamp, "
f"no events will be resent: {conversation_id}"
)
else:
logger.info(
f"Resending events since {normalized_after_timestamp}: "
f"{conversation_id}"
)
async for event in page_iterator(
event_service.search_events,
timestamp__gte=normalized_after_timestamp,
):
await _send_event(event, websocket)

# Listen for messages over the socket
while True:
Expand All @@ -140,19 +219,51 @@ async def events_socket(
async def bash_events_socket(
websocket: WebSocket,
session_api_key: Annotated[str | None, Query(alias="session_api_key")] = None,
resend_all: Annotated[bool, Query()] = False,
resend_mode: Annotated[
Literal["all"] | None,
Query(
description=(
"Mode for resending historical events on connect. "
"'all' sends all events."
)
),
] = None,
# Deprecated parameter - kept for backward compatibility
resend_all: Annotated[
bool,
Query(
include_in_schema=False,
deprecated=True,
),
] = False,
):
"""WebSocket endpoint for bash events."""
"""WebSocket endpoint for bash events.

Args:
websocket: The WebSocket connection.
session_api_key: Optional API key for authentication.
resend_mode: Mode for resending historical events on connect.
- 'all': Resend all existing bash events
- None: Don't resend, just subscribe to new events
resend_all: DEPRECATED. Use resend_mode='all' instead.
"""
if not await _accept_authenticated_websocket(websocket, session_api_key):
return

logger.info("Bash Websocket Connected")
subscriber_id = await bash_event_service.subscribe_to_events(
_BashWebSocketSubscriber(websocket)
)

# Determine effective resend mode (handle deprecated resend_all)
effective_mode = resend_mode
if effective_mode is None and resend_all:
logger.warning("resend_all is deprecated, use resend_mode='all' instead")
effective_mode = "all"

try:
# Resend all existing events if requested
if resend_all:
if effective_mode == "all":
logger.info("Resending bash events")
async for event in page_iterator(bash_event_service.search_bash_events):
await _send_bash_event(event, websocket)
Expand Down
45 changes: 44 additions & 1 deletion tests/agent_server/test_event_router.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Tests for event_router.py endpoints."""

from datetime import UTC, datetime, timedelta, timezone
from pathlib import Path
from typing import cast
from unittest.mock import AsyncMock, MagicMock
Expand All @@ -10,14 +11,56 @@
from fastapi.testclient import TestClient

from openhands.agent_server.dependencies import get_event_service
from openhands.agent_server.event_router import event_router
from openhands.agent_server.event_router import (
event_router,
normalize_datetime_to_server_timezone,
)
from openhands.agent_server.event_service import EventService
from openhands.agent_server.models import SendMessageRequest
from openhands.sdk import Message
from openhands.sdk.event.llm_convertible.message import MessageEvent
from openhands.sdk.llm.message import ImageContent, TextContent


def test_normalize_datetime_naive_passthrough():
"""Naive datetimes should be returned unchanged."""
naive_dt = datetime(2025, 1, 15, 10, 30, 0)
result = normalize_datetime_to_server_timezone(naive_dt)

assert result == naive_dt
assert result.tzinfo is None


def test_normalize_datetime_utc_converted_to_naive():
"""UTC datetime should be converted to server local time and made naive."""
utc_dt = datetime(2025, 1, 15, 10, 30, 0, tzinfo=UTC)
result = normalize_datetime_to_server_timezone(utc_dt)

assert result.tzinfo is None
expected = utc_dt.astimezone(None).replace(tzinfo=None)
assert result == expected


def test_normalize_datetime_preserves_microseconds():
"""Microseconds should be preserved through conversion."""
utc_dt = datetime(2025, 1, 15, 10, 30, 0, 123456, tzinfo=UTC)
result = normalize_datetime_to_server_timezone(utc_dt)

assert result.microsecond == 123456


def test_normalize_datetime_fixed_offset_timezone():
"""Test with a specific fixed offset timezone (UTC+5:30)."""
ist = timezone(timedelta(hours=5, minutes=30))
ist_dt = datetime(2025, 1, 15, 16, 0, 0, tzinfo=ist)

result = normalize_datetime_to_server_timezone(ist_dt)

assert result.tzinfo is None
expected = ist_dt.astimezone(None).replace(tzinfo=None)
assert result == expected


@pytest.fixture
def client():
"""Create a test client for the FastAPI app without authentication."""
Expand Down
Loading
Loading