Skip to content

Commit bc2cf08

Browse files
grdsdevclaudeo-santi
authored
feat(realtime): add support for broadcast replay configuration (#1235)
Co-authored-by: Claude <[email protected]> Co-authored-by: Leonardo Santiago <[email protected]>
1 parent adce07c commit bc2cf08

File tree

2 files changed

+207
-2
lines changed

2 files changed

+207
-2
lines changed

src/realtime/src/realtime/types.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,15 @@ class PostgresChangesPayload(TypedDict):
107107
ids: List[int]
108108

109109

110+
class BroadcastMeta(TypedDict, total=False):
111+
replayed: bool
112+
id: str
113+
114+
110115
class BroadcastPayload(TypedDict):
111116
event: str
112117
payload: dict[str, Any]
118+
meta: NotRequired[BroadcastMeta]
113119

114120

115121
@dataclass(frozen=True)
@@ -172,9 +178,15 @@ def __init__(self, events: PresenceEvents):
172178

173179

174180
# TypedDicts
175-
class RealtimeChannelBroadcastConfig(TypedDict):
181+
class ReplayOption(TypedDict, total=False):
182+
since: int
183+
limit: int
184+
185+
186+
class RealtimeChannelBroadcastConfig(TypedDict, total=False):
176187
ack: bool
177188
self: bool
189+
replay: ReplayOption
178190

179191

180192
class RealtimeChannelPresenceConfig(TypedDict):

src/realtime/tests/test_connection.py

Lines changed: 194 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import pytest
77
from dotenv import load_dotenv
88
from pydantic import BaseModel
9+
from websockets import broadcast
910

1011
from realtime import (
1112
AsyncRealtimeChannel,
@@ -297,7 +298,7 @@ def insert_callback(payload):
297298

298299
assert insert["data"]["record"]["id"] == created_todo_id
299300
assert insert["data"]["record"]["description"] == "Test todo"
300-
assert insert["data"]["record"]["is_completed"] == False
301+
assert insert["data"]["record"]["is_completed"] is False
301302

302303
assert received_events["insert"] == [insert, message_insert]
303304

@@ -488,3 +489,195 @@ async def test_send_message_reconnection(socket: AsyncRealtimeClient):
488489
await socket.send(message)
489490

490491
await socket.close()
492+
493+
494+
@pytest.mark.asyncio
495+
async def test_subscribe_to_private_channel_with_broadcast_replay(
496+
socket: AsyncRealtimeClient,
497+
):
498+
"""Test that channel subscription sends correct payload with broadcast replay configuration."""
499+
import json
500+
from unittest.mock import AsyncMock, patch
501+
502+
# Mock the websocket connection
503+
mock_ws = AsyncMock()
504+
socket._ws_connection = mock_ws
505+
506+
# Connect the socket (this will use our mock)
507+
await socket.connect()
508+
509+
# Calculate replay timestamp
510+
ten_mins_ago = datetime.datetime.now() - datetime.timedelta(minutes=10)
511+
ten_mins_ago_ms = int(ten_mins_ago.timestamp() * 1000)
512+
513+
# Create channel with broadcast replay configuration
514+
channel: AsyncRealtimeChannel = socket.channel(
515+
"test-private-channel",
516+
params={
517+
"config": {
518+
"private": True,
519+
"broadcast": {"replay": {"since": ten_mins_ago_ms, "limit": 100}},
520+
"presence": {"enabled": True, "key": ""},
521+
}
522+
},
523+
)
524+
525+
# Mock the subscription callback to be called immediately
526+
callback_called = False
527+
528+
def mock_callback(state, error):
529+
nonlocal callback_called
530+
callback_called = True
531+
532+
# Subscribe to the channel
533+
await channel.subscribe(mock_callback)
534+
535+
# Verify that send was called with the correct payload
536+
assert mock_ws.send.called, "WebSocket send should have been called"
537+
538+
# Get the sent message
539+
sent_message = mock_ws.send.call_args[0][0]
540+
message_data = json.loads(sent_message)
541+
542+
# Verify the message structure
543+
assert message_data["topic"] == "realtime:test-private-channel"
544+
assert message_data["event"] == "phx_join"
545+
assert "ref" in message_data
546+
assert "payload" in message_data
547+
548+
# Verify the payload contains the correct broadcast replay configuration
549+
payload = message_data["payload"]
550+
assert "config" in payload
551+
552+
config = payload["config"]
553+
assert config["private"] is True
554+
assert "broadcast" in config
555+
556+
broadcast_config = config["broadcast"]
557+
assert "replay" in broadcast_config
558+
559+
replay_config = broadcast_config["replay"]
560+
assert replay_config["since"] == ten_mins_ago_ms
561+
assert replay_config["limit"] == 100
562+
563+
# Verify postgres_changes array is present (even if empty)
564+
assert "postgres_changes" in config
565+
assert isinstance(config["postgres_changes"], list)
566+
567+
await socket.close()
568+
569+
570+
@pytest.mark.asyncio
571+
async def test_subscribe_to_channel_with_empty_replay_config(
572+
socket: AsyncRealtimeClient,
573+
):
574+
"""Test that channel subscription handles empty replay configuration correctly."""
575+
import json
576+
from unittest.mock import AsyncMock, patch
577+
578+
# Mock the websocket connection
579+
mock_ws = AsyncMock()
580+
socket._ws_connection = mock_ws
581+
582+
# Connect the socket
583+
await socket.connect()
584+
585+
# Create channel with empty replay configuration
586+
channel: AsyncRealtimeChannel = socket.channel(
587+
"test-empty-replay",
588+
params={
589+
"config": {
590+
"private": False,
591+
"broadcast": {"ack": True, "self": False, "replay": {}},
592+
"presence": {"enabled": True, "key": ""},
593+
}
594+
},
595+
)
596+
597+
# Mock the subscription callback
598+
callback_called = False
599+
600+
def mock_callback(state, error):
601+
nonlocal callback_called
602+
callback_called = True
603+
604+
# Subscribe to the channel
605+
await channel.subscribe(mock_callback)
606+
607+
# Verify that send was called
608+
assert mock_ws.send.called, "WebSocket send should have been called"
609+
610+
# Get the sent message
611+
sent_message = mock_ws.send.call_args[0][0]
612+
message_data = json.loads(sent_message)
613+
614+
# Verify the payload structure
615+
payload = message_data["payload"]
616+
config = payload["config"]
617+
618+
assert config["private"] is False
619+
assert "broadcast" in config
620+
621+
broadcast_config = config["broadcast"]
622+
assert broadcast_config["ack"] is True
623+
assert broadcast_config["self"] is False
624+
assert broadcast_config["replay"] == {}
625+
626+
await socket.close()
627+
628+
629+
@pytest.mark.asyncio
630+
async def test_subscribe_to_channel_without_replay_config(socket: AsyncRealtimeClient):
631+
"""Test that channel subscription works without replay configuration."""
632+
import json
633+
from unittest.mock import AsyncMock, patch
634+
635+
# Mock the websocket connection
636+
mock_ws = AsyncMock()
637+
socket._ws_connection = mock_ws
638+
639+
# Connect the socket
640+
await socket.connect()
641+
642+
# Create channel without replay configuration
643+
channel: AsyncRealtimeChannel = socket.channel(
644+
"test-no-replay",
645+
params={
646+
"config": {
647+
"private": False,
648+
"broadcast": {"ack": True, "self": True},
649+
"presence": {"enabled": True, "key": ""},
650+
}
651+
},
652+
)
653+
654+
# Mock the subscription callback
655+
callback_called = False
656+
657+
def mock_callback(state, error):
658+
nonlocal callback_called
659+
callback_called = True
660+
661+
# Subscribe to the channel
662+
await channel.subscribe(mock_callback)
663+
664+
# Verify that send was called
665+
assert mock_ws.send.called, "WebSocket send should have been called"
666+
667+
# Get the sent message
668+
sent_message = mock_ws.send.call_args[0][0]
669+
message_data = json.loads(sent_message)
670+
671+
# Verify the payload structure
672+
payload = message_data["payload"]
673+
config = payload["config"]
674+
675+
assert config["private"] is False
676+
assert "broadcast" in config
677+
678+
broadcast_config = config["broadcast"]
679+
assert broadcast_config["ack"] is True
680+
assert broadcast_config["self"] is True
681+
assert "replay" not in broadcast_config
682+
683+
await socket.close()

0 commit comments

Comments
 (0)