Skip to content

Commit 2f44967

Browse files
jpshackelfordenystopenhands-agent
authored
feat(websocket): add after_timestamp filter for bi-directional event loading (#1880)
Co-authored-by: Engel Nyst <engel.nyst@gmail.com> Co-authored-by: openhands <openhands@all-hands.dev>
1 parent 3abc11c commit 2f44967

File tree

5 files changed

+634
-351
lines changed

5 files changed

+634
-351
lines changed

openhands-agent-server/openhands/agent_server/event_router.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,20 +38,25 @@
3838

3939
def normalize_datetime_to_server_timezone(dt: datetime) -> datetime:
4040
"""
41-
Normalize datetime to server timezone for consistent comparison.
41+
Normalize datetime to server timezone for consistent comparison with events.
4242
43-
If the datetime has timezone info, convert to server native timezone.
43+
Event timestamps are stored as naive datetimes in server local time.
44+
This function ensures filter datetimes are also naive in server local time
45+
so they can be compared correctly.
46+
47+
If the datetime has timezone info, convert to server native timezone and
48+
strip the tzinfo to make it naive.
4449
If it's naive (no timezone), assume it's already in server timezone.
4550
4651
Args:
4752
dt: Input datetime (may be timezone-aware or naive)
4853
4954
Returns:
50-
Datetime in server native timezone (timezone-aware)
55+
Naive datetime in server local time
5156
"""
5257
if dt.tzinfo is not None:
53-
# Timezone-aware: convert to server native timezone
54-
return dt.astimezone(None)
58+
# Timezone-aware: convert to server native timezone, then make naive
59+
return dt.astimezone(None).replace(tzinfo=None)
5560
else:
5661
# Naive datetime: assume it's already in server timezone
5762
return dt

openhands-agent-server/openhands/agent_server/sockets.py

Lines changed: 120 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99

1010
import logging
1111
from dataclasses import dataclass
12-
from typing import Annotated
12+
from datetime import datetime
13+
from typing import Annotated, Literal
1314
from uuid import UUID
1415

1516
from fastapi import (
@@ -24,6 +25,7 @@
2425
from openhands.agent_server.conversation_service import (
2526
get_default_conversation_service,
2627
)
28+
from openhands.agent_server.event_router import normalize_datetime_to_server_timezone
2729
from openhands.agent_server.models import BashEventBase, ExecuteBashRequest
2830
from openhands.agent_server.pub_sub import Subscriber
2931
from openhands.sdk import Event, Message
@@ -91,9 +93,54 @@ async def events_socket(
9193
conversation_id: UUID,
9294
websocket: WebSocket,
9395
session_api_key: Annotated[str | None, Query(alias="session_api_key")] = None,
94-
resend_all: Annotated[bool, Query()] = False,
96+
resend_mode: Annotated[
97+
Literal["all", "since"] | None,
98+
Query(
99+
description=(
100+
"Mode for resending historical events on connect. "
101+
"'all' sends all events, 'since' sends events after 'after_timestamp'."
102+
)
103+
),
104+
] = None,
105+
after_timestamp: Annotated[
106+
datetime | None,
107+
Query(
108+
description=(
109+
"Required when resend_mode='since'. Events with timestamp >= this "
110+
"value will be sent. Accepts ISO 8601 format. Timezone-aware "
111+
"datetimes are converted to server local time; naive datetimes "
112+
"assumed in server timezone."
113+
)
114+
),
115+
] = None,
116+
# Deprecated parameter - kept for backward compatibility
117+
resend_all: Annotated[
118+
bool,
119+
Query(
120+
include_in_schema=False,
121+
deprecated=True,
122+
),
123+
] = False,
95124
):
96-
"""WebSocket endpoint for conversation events."""
125+
"""WebSocket endpoint for conversation events.
126+
127+
Args:
128+
conversation_id: The conversation ID to subscribe to.
129+
websocket: The WebSocket connection.
130+
session_api_key: Optional API key for authentication.
131+
resend_mode: Mode for resending historical events on connect.
132+
- 'all': Resend all existing events
133+
- 'since': Resend events after 'after_timestamp' (requires after_timestamp)
134+
- None: Don't resend, just subscribe to new events
135+
after_timestamp: Required when resend_mode='since'. Events with
136+
timestamp >= this value will be sent. Timestamps are interpreted in
137+
server local time. Timezone-aware datetimes are converted to server
138+
timezone. Enables efficient bi-directional loading where REST fetches
139+
historical events and WebSocket handles events after a specific point.
140+
resend_all: DEPRECATED. Use resend_mode='all' instead. Kept for
141+
backward compatibility - if True and resend_mode is None, behaves
142+
as resend_mode='all'.
143+
"""
97144
if not await _accept_authenticated_websocket(websocket, session_api_key):
98145
return
99146

@@ -108,12 +155,44 @@ async def events_socket(
108155
_WebSocketSubscriber(websocket)
109156
)
110157

158+
# Determine effective resend mode (handle deprecated resend_all)
159+
effective_mode = resend_mode
160+
if effective_mode is None and resend_all:
161+
logger.warning(
162+
"resend_all is deprecated, use resend_mode='all' instead: "
163+
f"{conversation_id}"
164+
)
165+
effective_mode = "all"
166+
167+
# Normalize timezone-aware datetimes to server timezone
168+
normalized_after_timestamp = (
169+
normalize_datetime_to_server_timezone(after_timestamp)
170+
if after_timestamp
171+
else None
172+
)
173+
111174
try:
112-
# Resend all existing events if requested
113-
if resend_all:
114-
logger.info(f"Resending events: {conversation_id}")
175+
# Resend existing events based on mode
176+
if effective_mode == "all":
177+
logger.info(f"Resending all events: {conversation_id}")
115178
async for event in page_iterator(event_service.search_events):
116179
await _send_event(event, websocket)
180+
elif effective_mode == "since":
181+
if not normalized_after_timestamp:
182+
logger.warning(
183+
f"resend_mode='since' requires after_timestamp, "
184+
f"no events will be resent: {conversation_id}"
185+
)
186+
else:
187+
logger.info(
188+
f"Resending events since {normalized_after_timestamp}: "
189+
f"{conversation_id}"
190+
)
191+
async for event in page_iterator(
192+
event_service.search_events,
193+
timestamp__gte=normalized_after_timestamp,
194+
):
195+
await _send_event(event, websocket)
117196

118197
# Listen for messages over the socket
119198
while True:
@@ -140,19 +219,51 @@ async def events_socket(
140219
async def bash_events_socket(
141220
websocket: WebSocket,
142221
session_api_key: Annotated[str | None, Query(alias="session_api_key")] = None,
143-
resend_all: Annotated[bool, Query()] = False,
222+
resend_mode: Annotated[
223+
Literal["all"] | None,
224+
Query(
225+
description=(
226+
"Mode for resending historical events on connect. "
227+
"'all' sends all events."
228+
)
229+
),
230+
] = None,
231+
# Deprecated parameter - kept for backward compatibility
232+
resend_all: Annotated[
233+
bool,
234+
Query(
235+
include_in_schema=False,
236+
deprecated=True,
237+
),
238+
] = False,
144239
):
145-
"""WebSocket endpoint for bash events."""
240+
"""WebSocket endpoint for bash events.
241+
242+
Args:
243+
websocket: The WebSocket connection.
244+
session_api_key: Optional API key for authentication.
245+
resend_mode: Mode for resending historical events on connect.
246+
- 'all': Resend all existing bash events
247+
- None: Don't resend, just subscribe to new events
248+
resend_all: DEPRECATED. Use resend_mode='all' instead.
249+
"""
146250
if not await _accept_authenticated_websocket(websocket, session_api_key):
147251
return
148252

149253
logger.info("Bash Websocket Connected")
150254
subscriber_id = await bash_event_service.subscribe_to_events(
151255
_BashWebSocketSubscriber(websocket)
152256
)
257+
258+
# Determine effective resend mode (handle deprecated resend_all)
259+
effective_mode = resend_mode
260+
if effective_mode is None and resend_all:
261+
logger.warning("resend_all is deprecated, use resend_mode='all' instead")
262+
effective_mode = "all"
263+
153264
try:
154265
# Resend all existing events if requested
155-
if resend_all:
266+
if effective_mode == "all":
156267
logger.info("Resending bash events")
157268
async for event in page_iterator(bash_event_service.search_bash_events):
158269
await _send_bash_event(event, websocket)

tests/agent_server/test_event_router.py

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Tests for event_router.py endpoints."""
22

3+
from datetime import UTC, datetime, timedelta, timezone
34
from pathlib import Path
45
from typing import cast
56
from unittest.mock import AsyncMock, MagicMock
@@ -10,14 +11,56 @@
1011
from fastapi.testclient import TestClient
1112

1213
from openhands.agent_server.dependencies import get_event_service
13-
from openhands.agent_server.event_router import event_router
14+
from openhands.agent_server.event_router import (
15+
event_router,
16+
normalize_datetime_to_server_timezone,
17+
)
1418
from openhands.agent_server.event_service import EventService
1519
from openhands.agent_server.models import SendMessageRequest
1620
from openhands.sdk import Message
1721
from openhands.sdk.event.llm_convertible.message import MessageEvent
1822
from openhands.sdk.llm.message import ImageContent, TextContent
1923

2024

25+
def test_normalize_datetime_naive_passthrough():
26+
"""Naive datetimes should be returned unchanged."""
27+
naive_dt = datetime(2025, 1, 15, 10, 30, 0)
28+
result = normalize_datetime_to_server_timezone(naive_dt)
29+
30+
assert result == naive_dt
31+
assert result.tzinfo is None
32+
33+
34+
def test_normalize_datetime_utc_converted_to_naive():
35+
"""UTC datetime should be converted to server local time and made naive."""
36+
utc_dt = datetime(2025, 1, 15, 10, 30, 0, tzinfo=UTC)
37+
result = normalize_datetime_to_server_timezone(utc_dt)
38+
39+
assert result.tzinfo is None
40+
expected = utc_dt.astimezone(None).replace(tzinfo=None)
41+
assert result == expected
42+
43+
44+
def test_normalize_datetime_preserves_microseconds():
45+
"""Microseconds should be preserved through conversion."""
46+
utc_dt = datetime(2025, 1, 15, 10, 30, 0, 123456, tzinfo=UTC)
47+
result = normalize_datetime_to_server_timezone(utc_dt)
48+
49+
assert result.microsecond == 123456
50+
51+
52+
def test_normalize_datetime_fixed_offset_timezone():
53+
"""Test with a specific fixed offset timezone (UTC+5:30)."""
54+
ist = timezone(timedelta(hours=5, minutes=30))
55+
ist_dt = datetime(2025, 1, 15, 16, 0, 0, tzinfo=ist)
56+
57+
result = normalize_datetime_to_server_timezone(ist_dt)
58+
59+
assert result.tzinfo is None
60+
expected = ist_dt.astimezone(None).replace(tzinfo=None)
61+
assert result == expected
62+
63+
2164
@pytest.fixture
2265
def client():
2366
"""Create a test client for the FastAPI app without authentication."""

0 commit comments

Comments
 (0)