Skip to content

Commit 1cb6d15

Browse files
PlinerCopilot
andauthored
SELECT 1 only if we haven't seen any events (#222)
* SELECT 1 only if we haven't seen any events * CHANGELOG * Update asyncpg_listen/listener.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update asyncpg_listen/listener.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Fix * Remove unused tasks --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent 553aefb commit 1cb6d15

File tree

4 files changed

+66
-37
lines changed

4 files changed

+66
-37
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
## unreleased
2+
3+
# [SELECT 1 only if we haven't seen any events](https://github.com/anna-money/asyncpg-listen/pull/222)
4+
5+
16
## v0.0.8 (2025-05-25)
27

38
* Support python 3.13

asyncpg_listen/listener.py

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import enum
44
import logging
55
import sys
6+
import time
67
from typing import Any, Callable, Coroutine
78

89
import asyncpg
@@ -31,6 +32,7 @@ class Notification:
3132
NotificationHandler = Callable[[NotificationOrTimeout], Coroutine]
3233

3334
NO_TIMEOUT: float = -1
35+
MAX_SILENCED_FAILED_CONNECT_ATTEMPTS = 3
3436

3537

3638
def connect_func(*args: Any, **kwargs: Any) -> ConnectFunc:
@@ -63,7 +65,8 @@ async def run(
6365
async with asyncio.TaskGroup() as tg:
6466
tg.create_task(
6567
self.__read_notifications(
66-
queue_per_channel=queue_per_channel, check_interval=max(1.0, notification_timeout / 3.0)
68+
queue_per_channel=queue_per_channel,
69+
notification_timeout=notification_timeout,
6770
),
6871
name=__package__,
6972
)
@@ -128,31 +131,45 @@ async def run_coro(c: Coroutine) -> None:
128131
logger.exception("Failed to handle %s", notification)
129132

130133
async def __read_notifications(
131-
self, queue_per_channel: dict[str, asyncio.Queue[Notification]], check_interval: float
134+
self, queue_per_channel: dict[str, asyncio.Queue[Notification]], notification_timeout: float
132135
) -> None:
133136
failed_connect_attempts = 0
137+
per_attempt_keep_alive_budget = max(1.0, notification_timeout / 3.0)
134138
while True:
135139
try:
136140
connection = await self.__connect()
137141
failed_connect_attempts = 0
138142
try:
143+
event = asyncio.Event()
139144
for channel, queue in queue_per_channel.items():
140-
await connection.add_listener(channel, self.__get_push_callback(queue))
141-
142-
while True:
143-
await asyncio.sleep(check_interval)
144-
await connection.execute("SELECT 1")
145+
await connection.add_listener(channel, self.__get_push_callback(queue, event))
146+
147+
while not connection.is_closed():
148+
started_at = time.monotonic()
149+
if not event.is_set():
150+
await connection.execute("SELECT 1", timeout=per_attempt_keep_alive_budget)
151+
event.clear()
152+
finished_at = time.monotonic()
153+
elapsed = finished_at - started_at
154+
if elapsed < per_attempt_keep_alive_budget:
155+
await asyncio.sleep(per_attempt_keep_alive_budget - elapsed)
156+
logger.warning("Connection was lost")
145157
finally:
146158
await asyncio.shield(connection.close())
147159
except Exception:
148-
logger.exception("Connection was lost or not established")
149-
160+
if failed_connect_attempts < MAX_SILENCED_FAILED_CONNECT_ATTEMPTS:
161+
logger.warning("Connection was lost or not established", exc_info=True)
162+
else:
163+
logger.exception("Connection was lost or not established")
150164
await asyncio.sleep(self.__reconnect_delay * failed_connect_attempts)
151165
failed_connect_attempts += 1
152166

153167
@staticmethod
154-
def __get_push_callback(queue: asyncio.Queue[Notification]) -> Callable[[Any, Any, Any, Any], None]:
168+
def __get_push_callback(
169+
queue: asyncio.Queue[Notification], event: asyncio.Event
170+
) -> Callable[[Any, Any, Any, Any], None]:
155171
def _push(_: Any, __: Any, channel: Any, payload: Any) -> None:
156172
queue.put_nowait(Notification(channel, payload))
173+
event.set()
157174

158175
return _push

requirements-dev.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
pytest==8.3.4
22
pytest-aiohttp==1.0.5
3-
pytest-pg==0.0.21
3+
pytest-pg==0.0.25
44
pytest-runner==6.0.1
55
pytest-asyncio==0.24.0
66
isort==5.13.2

tests/test_listener.py

Lines changed: 33 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
import asyncio
22
import contextlib
33
import dataclasses
4-
from typing import Any, Awaitable, Callable, List
4+
import logging
5+
from typing import Awaitable, Callable
56

67
import asyncpg
8+
import pytest
79
import pytest_pg
810

911
import asyncpg_listen
@@ -14,7 +16,7 @@
1416
class Handler:
1517
def __init__(self, delay: float = 0) -> None:
1618
self.delay = delay
17-
self.notifications: List[asyncpg_listen.NotificationOrTimeout] = []
19+
self.notifications: list[asyncpg_listen.NotificationOrTimeout] = []
1820

1921
async def handle(self, notification: asyncpg_listen.NotificationOrTimeout) -> None:
2022
await asyncio.sleep(self.delay)
@@ -27,10 +29,10 @@ async def cancel_and_wait(future: "asyncio.Future[None]") -> None:
2729
await future
2830

2931

30-
async def test_two_inactive_channels(pg_11: pytest_pg.PG) -> None:
32+
async def test_two_inactive_channels(pg_14: pytest_pg.PG) -> None:
3133
handler_1 = Handler()
3234
handler_2 = Handler()
33-
listener = asyncpg_listen.NotificationListener(asyncpg_listen.connect_func(**dataclasses.asdict(pg_11)))
35+
listener = asyncpg_listen.NotificationListener(asyncpg_listen.connect_func(**dataclasses.asdict(pg_14)))
3436
listener_task = asyncio.create_task(
3537
listener.run({"inactive_1": handler_1.handle, "inactive_2": handler_2.handle}, notification_timeout=1)
3638
)
@@ -42,14 +44,14 @@ async def test_two_inactive_channels(pg_11: pytest_pg.PG) -> None:
4244
assert handler_2.notifications == [asyncpg_listen.Timeout("inactive_2")]
4345

4446

45-
async def test_one_active_channel_and_one_passive_channel(pg_11: pytest_pg.PG) -> None:
47+
async def test_one_active_channel_and_one_passive_channel(pg_14: pytest_pg.PG) -> None:
4648
active_handler = Handler()
4749
inactive_handler = Handler()
48-
listener = asyncpg_listen.NotificationListener(asyncpg_listen.connect_func(**dataclasses.asdict(pg_11)))
50+
listener = asyncpg_listen.NotificationListener(asyncpg_listen.connect_func(**dataclasses.asdict(pg_14)))
4951
listener_task = asyncio.create_task(
5052
listener.run({"active": active_handler.handle, "inactive": inactive_handler.handle}, notification_timeout=1)
5153
)
52-
connection = await asyncpg.connect(**dataclasses.asdict(pg_11))
54+
connection = await asyncpg.connect(**dataclasses.asdict(pg_14))
5355
try:
5456
await asyncio.sleep(0.75)
5557
await connection.execute("NOTIFY active, '1'")
@@ -69,16 +71,16 @@ async def test_one_active_channel_and_one_passive_channel(pg_11: pytest_pg.PG) -
6971
]
7072

7173

72-
async def test_two_active_channels(pg_11: pytest_pg.PG) -> None:
74+
async def test_two_active_channels(pg_14: pytest_pg.PG) -> None:
7375
handler_1 = Handler()
7476
handler_2 = Handler()
75-
listener = asyncpg_listen.NotificationListener(asyncpg_listen.connect_func(**dataclasses.asdict(pg_11)))
77+
listener = asyncpg_listen.NotificationListener(asyncpg_listen.connect_func(**dataclasses.asdict(pg_14)))
7678
listener_task = asyncio.create_task(
7779
listener.run({"active_1": handler_1.handle, "active_2": handler_2.handle}, notification_timeout=1)
7880
)
7981
await asyncio.sleep(0.1)
8082

81-
connection = await asyncpg.connect(**dataclasses.asdict(pg_11))
83+
connection = await asyncpg.connect(**dataclasses.asdict(pg_14))
8284
try:
8385
await connection.execute("NOTIFY active_1, '1'")
8486
await connection.execute("NOTIFY active_2, '2'")
@@ -100,15 +102,15 @@ async def test_two_active_channels(pg_11: pytest_pg.PG) -> None:
100102
]
101103

102104

103-
async def test_listen_policy_last(pg_11: pytest_pg.PG) -> None:
105+
async def test_listen_policy_last(pg_14: pytest_pg.PG) -> None:
104106
handler = Handler(delay=0.1)
105-
listener = asyncpg_listen.NotificationListener(asyncpg_listen.connect_func(**dataclasses.asdict(pg_11)))
107+
listener = asyncpg_listen.NotificationListener(asyncpg_listen.connect_func(**dataclasses.asdict(pg_14)))
106108
listener_task = asyncio.create_task(
107109
listener.run({"simple": handler.handle}, policy=asyncpg_listen.ListenPolicy.LAST, notification_timeout=1)
108110
)
109111
await asyncio.sleep(0.1)
110112

111-
connection = await asyncpg.connect(**dataclasses.asdict(pg_11))
113+
connection = await asyncpg.connect(**dataclasses.asdict(pg_14))
112114
try:
113115
for i in range(10):
114116
await connection.execute(f"NOTIFY simple, '{i}'")
@@ -124,13 +126,13 @@ async def test_listen_policy_last(pg_11: pytest_pg.PG) -> None:
124126
]
125127

126128

127-
async def test_listen_policy_all(pg_11: pytest_pg.PG) -> None:
129+
async def test_listen_policy_all(pg_14: pytest_pg.PG) -> None:
128130
handler = Handler(delay=0.05)
129-
listener = asyncpg_listen.NotificationListener(asyncpg_listen.connect_func(**dataclasses.asdict(pg_11)))
131+
listener = asyncpg_listen.NotificationListener(asyncpg_listen.connect_func(**dataclasses.asdict(pg_14)))
130132
listener_task = asyncio.create_task(listener.run({"simple": handler.handle}, notification_timeout=1))
131133
await asyncio.sleep(0.1)
132134

133-
connection = await asyncpg.connect(**dataclasses.asdict(pg_11))
135+
connection = await asyncpg.connect(**dataclasses.asdict(pg_14))
134136
try:
135137
for i in range(10):
136138
await connection.execute(f"NOTIFY simple, '{i}'")
@@ -171,16 +173,16 @@ async def connect() -> asyncpg.Connection:
171173
assert handler.notifications == []
172174

173175

174-
async def test_failing_handler(pg_11: pytest_pg.PG) -> None:
176+
async def test_failing_handler(pg_14: pytest_pg.PG) -> None:
175177
async def handle(_: asyncpg_listen.NotificationOrTimeout) -> None:
176178
raise RuntimeError("Oops")
177179

178-
listener = asyncpg_listen.NotificationListener(asyncpg_listen.connect_func(**dataclasses.asdict(pg_11)))
180+
listener = asyncpg_listen.NotificationListener(asyncpg_listen.connect_func(**dataclasses.asdict(pg_14)))
179181
listener_task = asyncio.create_task(listener.run({"simple": handle}, notification_timeout=1))
180182

181183
await asyncio.sleep(0.1)
182184

183-
connection = await asyncpg.connect(**dataclasses.asdict(pg_11))
185+
connection = await asyncpg.connect(**dataclasses.asdict(pg_14))
184186
try:
185187
await connection.execute("NOTIFY simple")
186188
await connection.execute("NOTIFY simple")
@@ -198,22 +200,23 @@ async def handle(_: asyncpg_listen.NotificationOrTimeout) -> None:
198200
async def test_reconnect(
199201
tcp_proxy: Callable[[int, int], Awaitable[TcpProxy]],
200202
unused_port: Callable[[], int],
201-
pg_11: pytest_pg.PG,
202-
caplog: Any,
203+
pg_14: pytest_pg.PG,
204+
caplog: pytest.LogCaptureFixture,
203205
) -> None:
206+
caplog.set_level(logging.WARNING)
204207
proxy_port = unused_port()
205208

206209
handler = Handler()
207-
proxy = await tcp_proxy(proxy_port, pg_11.port)
210+
proxy = await tcp_proxy(proxy_port, pg_14.port)
208211
listener = asyncpg_listen.NotificationListener(
209-
asyncpg_listen.connect_func(**{**(dataclasses.asdict(pg_11)), **{"port": proxy_port}})
212+
asyncpg_listen.connect_func(**{**(dataclasses.asdict(pg_14)), **{"port": proxy_port}})
210213
)
211214

212215
listener_task = asyncio.create_task(listener.run({"simple": handler.handle}, notification_timeout=1))
213216

214217
await asyncio.sleep(0.5)
215218

216-
connection = await asyncpg.connect(**dataclasses.asdict(pg_11))
219+
connection = await asyncpg.connect(**dataclasses.asdict(pg_14))
217220
try:
218221
await connection.execute("NOTIFY simple, 'before'")
219222
finally:
@@ -223,7 +226,7 @@ async def test_reconnect(
223226
await proxy.drop_connections()
224227
await asyncio.sleep(2)
225228

226-
connection = await asyncpg.connect(**dataclasses.asdict(pg_11))
229+
connection = await asyncpg.connect(**dataclasses.asdict(pg_14))
227230
try:
228231
await connection.execute("NOTIFY simple, 'after'")
229232
finally:
@@ -235,4 +238,8 @@ async def test_reconnect(
235238
assert asyncpg_listen.Notification("simple", "before") in handler.notifications
236239
assert asyncpg_listen.Notification("simple", "after") in handler.notifications
237240

238-
assert any(record for record in caplog.records if "Connection was lost or not established" in record.message)
241+
assert any(
242+
record
243+
for record in caplog.records
244+
if "Connection was lost or not established" in record.message or "Connection was lost" in record.message
245+
)

0 commit comments

Comments
 (0)