Skip to content

Commit ae960ad

Browse files
grdsdevclaude
andcommitted
feat(realtime): add explicit REST API call for broadcast
Added new public method `http_send` for explicit usage of REST API for broadcast messages. This method always uses the REST API endpoint regardless of WebSocket connection state, giving users more control over message delivery. Changes: - Add `http_send()` method to AsyncRealtimeChannel for explicit REST delivery - Add deprecation warning to `send_broadcast()` when falling back to REST - Add httpx dependency for REST API calls - Add comprehensive test suite for http_send functionality Ported from supabase-js PR #1751 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent d741716 commit ae960ad

File tree

4 files changed

+357
-7
lines changed

4 files changed

+357
-7
lines changed

src/realtime/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ dependencies = [
1717
"websockets >=11,<16",
1818
"typing-extensions >=4.14.0",
1919
"pydantic (>=2.11.7,<3.0.0)",
20+
"httpx[http2] >=0.26,<0.29",
2021
]
2122

2223
[project.urls]

src/realtime/src/realtime/_async/channel.py

Lines changed: 87 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@
33
import asyncio
44
import json
55
import logging
6-
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Mapping, Optional
6+
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Mapping, Optional, Union
77

8+
import httpx
89
from typing_extensions import assert_never
910

1011
from realtime.types import (
@@ -488,11 +489,96 @@ async def send_broadcast(self, event: str, data: Any) -> None:
488489
:param event: The name of the broadcast event
489490
:param data: The payload to broadcast
490491
"""
492+
if not self._can_push():
493+
logger.warning(
494+
"Realtime send_broadcast() is automatically falling back to REST API. "
495+
"This behavior will be deprecated in the future. "
496+
"Please use http_send() explicitly for REST delivery."
497+
)
498+
491499
await self.push(
492500
ChannelEvents.broadcast,
493501
{"type": "broadcast", "event": event, "payload": data},
494502
)
495503

504+
async def http_send(
505+
self,
506+
event: str,
507+
payload: Any,
508+
timeout: Optional[int] = None,
509+
) -> Dict[str, Union[bool, int, str]]:
510+
"""
511+
Sends a broadcast message explicitly via REST API.
512+
513+
This method always uses the REST API endpoint regardless of WebSocket connection state.
514+
Useful when you want to guarantee REST delivery or when gradually migrating from implicit REST fallback.
515+
516+
:param event: The name of the broadcast event
517+
:param payload: Payload to be sent (required)
518+
:param timeout: Optional timeout in milliseconds
519+
:return: Dictionary with success status, and error details if failed
520+
:raises ValueError: If payload is None or undefined
521+
:raises Exception: If the HTTP request fails
522+
"""
523+
if payload is None:
524+
raise ValueError("Payload is required for http_send()")
525+
526+
authorization = (
527+
f"Bearer {self.socket.access_token}" if self.socket.access_token else ""
528+
)
529+
530+
headers = {
531+
"Authorization": authorization,
532+
"apikey": self.socket.apikey or "",
533+
"Content-Type": "application/json",
534+
}
535+
536+
config: RealtimeChannelConfig = self.params["config"]
537+
private = config.get("private", False)
538+
539+
body = {
540+
"messages": [
541+
{
542+
"topic": self.topic,
543+
"event": event,
544+
"payload": payload,
545+
"private": private,
546+
}
547+
]
548+
}
549+
550+
timeout_ms = timeout or self.timeout
551+
timeout_seconds = timeout_ms / 1000.0
552+
553+
try:
554+
async with httpx.AsyncClient(timeout=timeout_seconds) as client:
555+
response = await client.post(
556+
self.broadcast_endpoint_url,
557+
headers=headers,
558+
json=body,
559+
)
560+
561+
if response.status_code == 202:
562+
return {"success": True}
563+
564+
error_message = response.reason_phrase
565+
try:
566+
error_body = response.json()
567+
error_message = (
568+
error_body.get("error")
569+
or error_body.get("message")
570+
or error_message
571+
)
572+
except Exception:
573+
pass
574+
575+
raise Exception(error_message)
576+
577+
except httpx.TimeoutException as e:
578+
raise Exception(f"Request timeout: {str(e)}")
579+
except Exception as e:
580+
raise
581+
496582
# Internal methods
497583

498584
async def _resubscribe(self) -> None:
Lines changed: 261 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
import os
2+
from unittest.mock import AsyncMock, patch
3+
4+
import httpx
5+
import pytest
6+
from dotenv import load_dotenv
7+
8+
from realtime import AsyncRealtimeChannel, AsyncRealtimeClient
9+
10+
load_dotenv()
11+
12+
URL = os.getenv("SUPABASE_URL") or "http://127.0.0.1:54321"
13+
ANON_KEY = (
14+
os.getenv("SUPABASE_ANON_KEY")
15+
or "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6ImFub24iLCJleHAiOjE5ODM4MTI5OTZ9.CRXP1A7WOeoJeXxjNni43kdQwgnWNReilDMblYTn_I0"
16+
)
17+
18+
19+
@pytest.fixture
20+
def socket() -> AsyncRealtimeClient:
21+
url = f"{URL}/realtime/v1"
22+
key = ANON_KEY
23+
return AsyncRealtimeClient(url, key)
24+
25+
26+
def create_mock_response(status_code: int, reason_phrase: str = "OK", body: dict = None):
27+
"""Create a mock HTTP response."""
28+
from unittest.mock import Mock
29+
30+
mock_response = Mock()
31+
mock_response.status_code = status_code
32+
mock_response.reason_phrase = reason_phrase
33+
if body:
34+
mock_response.json = Mock(return_value=body)
35+
else:
36+
mock_response.json = Mock(side_effect=Exception("No JSON body"))
37+
return mock_response
38+
39+
40+
@pytest.mark.asyncio
41+
async def test_http_send_without_access_token():
42+
"""Test http_send with no access token."""
43+
# Create a client without setting access_token
44+
url = f"{URL}/realtime/v1"
45+
socket_no_token = AsyncRealtimeClient(url, None)
46+
channel: AsyncRealtimeChannel = socket_no_token.channel("test-topic")
47+
48+
mock_response = create_mock_response(202, "Accepted")
49+
50+
with patch("httpx.AsyncClient.post", return_value=mock_response) as mock_post:
51+
result = await channel.http_send("test-event", {"data": "test"})
52+
53+
assert result == {"success": True}
54+
assert mock_post.called
55+
call_args = mock_post.call_args
56+
57+
# Verify headers
58+
headers = call_args.kwargs["headers"]
59+
assert headers["Authorization"] == ""
60+
assert headers["apikey"] == ""
61+
assert headers["Content-Type"] == "application/json"
62+
63+
# Verify body
64+
body = call_args.kwargs["json"]
65+
assert body["messages"][0]["topic"] == "realtime:test-topic"
66+
assert body["messages"][0]["event"] == "test-event"
67+
assert body["messages"][0]["payload"] == {"data": "test"}
68+
assert body["messages"][0]["private"] is False
69+
70+
71+
@pytest.mark.asyncio
72+
async def test_http_send_with_access_token(socket: AsyncRealtimeClient):
73+
"""Test http_send with access token."""
74+
await socket.set_auth("token123")
75+
channel: AsyncRealtimeChannel = socket.channel("test-topic")
76+
77+
mock_response = create_mock_response(202, "Accepted")
78+
79+
with patch("httpx.AsyncClient.post", return_value=mock_response) as mock_post:
80+
result = await channel.http_send("test-event", {"data": "test"})
81+
82+
assert result == {"success": True}
83+
assert mock_post.called
84+
call_args = mock_post.call_args
85+
86+
# Verify Authorization header includes token
87+
headers = call_args.kwargs["headers"]
88+
assert headers["Authorization"] == "Bearer token123"
89+
assert headers["apikey"] == ANON_KEY
90+
91+
92+
@pytest.mark.asyncio
93+
async def test_http_send_rejects_when_payload_is_none(socket: AsyncRealtimeClient):
94+
"""Test http_send raises ValueError when payload is None."""
95+
channel: AsyncRealtimeChannel = socket.channel("test-topic")
96+
97+
with pytest.raises(ValueError, match="Payload is required for http_send"):
98+
await channel.http_send("test-event", None)
99+
100+
101+
@pytest.mark.asyncio
102+
async def test_http_send_handles_timeout_error(socket: AsyncRealtimeClient):
103+
"""Test http_send handles timeout errors."""
104+
channel: AsyncRealtimeChannel = socket.channel("test-topic")
105+
106+
with patch(
107+
"httpx.AsyncClient.post", side_effect=httpx.TimeoutException("Request timeout")
108+
):
109+
with pytest.raises(Exception, match="Request timeout"):
110+
await channel.http_send("test-event", {"data": "test"})
111+
112+
113+
@pytest.mark.asyncio
114+
async def test_http_send_handles_non_202_status(socket: AsyncRealtimeClient):
115+
"""Test http_send handles non-202 status codes."""
116+
channel: AsyncRealtimeChannel = socket.channel("test-topic")
117+
118+
mock_response = create_mock_response(
119+
500, "Internal Server Error", {"error": "Server error"}
120+
)
121+
122+
with patch("httpx.AsyncClient.post", return_value=mock_response):
123+
with pytest.raises(Exception, match="Server error"):
124+
await channel.http_send("test-event", {"data": "test"})
125+
126+
127+
@pytest.mark.asyncio
128+
async def test_http_send_uses_error_message_from_body(socket: AsyncRealtimeClient):
129+
"""Test http_send uses error message from response body."""
130+
channel: AsyncRealtimeChannel = socket.channel("test-topic")
131+
132+
mock_response = create_mock_response(
133+
400, "Bad Request", {"message": "Invalid request"}
134+
)
135+
136+
with patch("httpx.AsyncClient.post", return_value=mock_response):
137+
with pytest.raises(Exception, match="Invalid request"):
138+
await channel.http_send("test-event", {"data": "test"})
139+
140+
141+
@pytest.mark.asyncio
142+
async def test_http_send_falls_back_to_reason_phrase(socket: AsyncRealtimeClient):
143+
"""Test http_send falls back to reason phrase when JSON parsing fails."""
144+
channel: AsyncRealtimeChannel = socket.channel("test-topic")
145+
146+
mock_response = create_mock_response(503, "Service Unavailable")
147+
148+
with patch("httpx.AsyncClient.post", return_value=mock_response):
149+
with pytest.raises(Exception, match="Service Unavailable"):
150+
await channel.http_send("test-event", {"data": "test"})
151+
152+
153+
@pytest.mark.asyncio
154+
async def test_http_send_respects_custom_timeout(socket: AsyncRealtimeClient):
155+
"""Test http_send respects custom timeout option."""
156+
channel: AsyncRealtimeChannel = socket.channel("test-topic")
157+
158+
mock_response = create_mock_response(202, "Accepted")
159+
160+
with patch("httpx.AsyncClient") as mock_client_class:
161+
mock_client = AsyncMock()
162+
mock_client.__aenter__.return_value = mock_client
163+
mock_client.__aexit__.return_value = None
164+
mock_client.post.return_value = mock_response
165+
mock_client_class.return_value = mock_client
166+
167+
await channel.http_send("test-event", {"data": "test"}, timeout=3000)
168+
169+
# Verify timeout was passed correctly (3000ms = 3.0s)
170+
assert mock_client_class.called
171+
call_args = mock_client_class.call_args
172+
assert call_args.kwargs["timeout"] == 3.0
173+
174+
175+
@pytest.mark.asyncio
176+
async def test_http_send_with_private_channel(socket: AsyncRealtimeClient):
177+
"""Test http_send with a private channel."""
178+
channel: AsyncRealtimeChannel = socket.channel(
179+
"test-topic", params={"config": {"private": True}}
180+
)
181+
182+
mock_response = create_mock_response(202, "Accepted")
183+
184+
with patch("httpx.AsyncClient.post", return_value=mock_response) as mock_post:
185+
result = await channel.http_send("test-event", {"data": "test"})
186+
187+
assert result == {"success": True}
188+
assert mock_post.called
189+
190+
# Verify private flag is set
191+
body = mock_post.call_args.kwargs["json"]
192+
assert body["messages"][0]["private"] is True
193+
194+
195+
@pytest.mark.asyncio
196+
async def test_http_send_uses_default_timeout(socket: AsyncRealtimeClient):
197+
"""Test http_send uses default timeout when not specified."""
198+
socket_with_custom_timeout = AsyncRealtimeClient(
199+
f"{URL}/realtime/v1", ANON_KEY, timeout=5000
200+
)
201+
channel: AsyncRealtimeChannel = socket_with_custom_timeout.channel("test-topic")
202+
203+
mock_response = create_mock_response(202, "Accepted")
204+
205+
with patch("httpx.AsyncClient") as mock_client_class:
206+
mock_client = AsyncMock()
207+
mock_client.__aenter__.return_value = mock_client
208+
mock_client.__aexit__.return_value = None
209+
mock_client.post.return_value = mock_response
210+
mock_client_class.return_value = mock_client
211+
212+
await channel.http_send("test-event", {"data": "test"})
213+
214+
# Verify default timeout was used (5000ms = 5.0s)
215+
assert mock_client_class.called
216+
call_args = mock_client_class.call_args
217+
assert call_args.kwargs["timeout"] == 5.0
218+
219+
220+
@pytest.mark.asyncio
221+
async def test_http_send_sends_correct_payload(socket: AsyncRealtimeClient):
222+
"""Test http_send sends the correct payload structure."""
223+
channel: AsyncRealtimeChannel = socket.channel("test-topic")
224+
225+
mock_response = create_mock_response(202, "Accepted")
226+
227+
with patch("httpx.AsyncClient.post", return_value=mock_response) as mock_post:
228+
test_payload = {"key": "value", "nested": {"data": 123}}
229+
result = await channel.http_send("test-payload-event", test_payload)
230+
231+
assert result == {"success": True}
232+
assert mock_post.called
233+
234+
# Verify the exact payload structure
235+
body = mock_post.call_args.kwargs["json"]
236+
assert body["messages"][0]["topic"] == "realtime:test-topic"
237+
assert body["messages"][0]["event"] == "test-payload-event"
238+
assert body["messages"][0]["payload"] == test_payload
239+
240+
241+
@pytest.mark.asyncio
242+
async def test_send_broadcast_shows_warning_when_not_connected(
243+
socket: AsyncRealtimeClient, caplog
244+
):
245+
"""Test send_broadcast shows deprecation warning when not connected."""
246+
channel: AsyncRealtimeChannel = socket.channel("test-topic")
247+
248+
# Don't connect the socket, so _can_push() returns False
249+
# This will trigger the warning
250+
251+
with pytest.raises(Exception):
252+
# send_broadcast will fail because we're not subscribed, but we want to check the warning
253+
await channel.send_broadcast("test-event", {"data": "test"})
254+
255+
# Check that the warning was logged
256+
warning_found = any(
257+
"falling back to REST API" in record.message
258+
for record in caplog.records
259+
if record.levelname == "WARNING"
260+
)
261+
assert warning_found, "Expected deprecation warning was not logged"

0 commit comments

Comments
 (0)