-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsender.py
More file actions
141 lines (119 loc) · 5.39 KB
/
sender.py
File metadata and controls
141 lines (119 loc) · 5.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
"""
Async priority send queue with token-bucket rate limiting.
priority=0 protocol traffic (PONG, CAP, NICK, QUIT) — bypass bucket
priority=1 normal output (PRIVMSG, NOTICE, JOIN) — subject to bucket
Burst: 5 tokens. Refill: 1 per 1.5s (~40 msg/min sustained).
Thread-safe: enqueue() can be called from any thread (module handlers
run in asyncio.to_thread).
"""
from __future__ import annotations
import asyncio
import logging
import threading
log = logging.getLogger("internets.sender")
class Sender:
"""Async priority send queue with token-bucket rate limiting.
Priority 0 bypasses the bucket (protocol traffic). Priority 1 is
subject to rate limiting (~40 msg/min sustained, 5 burst).
``enqueue()`` is thread-safe — modules call it from worker threads.
"""
CAPACITY: int = 5
REFILL: float = 1.5
MAX_QUEUE: int = 200 # BUG-056: Bound queue to prevent OOM during disconnects
def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
self._loop = loop
self._q: asyncio.PriorityQueue[tuple[int, int, str]] = asyncio.PriorityQueue(maxsize=self.MAX_QUEUE)
self._seq = 0
self._seq_lk = threading.Lock() # protects _seq from concurrent enqueue
self._writer: asyncio.StreamWriter | None = None
self._task: asyncio.Task[None] | None = None
def start(self, writer: asyncio.StreamWriter) -> None:
"""Begin draining the queue to *writer*. Call from the event loop."""
self._writer = writer
self._q = asyncio.PriorityQueue(maxsize=self.MAX_QUEUE)
with self._seq_lk:
self._seq = 0
self._task = asyncio.create_task(self._drain(), name="sender")
async def stop(self) -> None:
"""Cancel the drain task. Call from the event loop."""
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
def _safe_put(self, item: tuple[int, int, str]) -> None:
"""Put an item on the queue, dropping if full. Runs in event loop thread."""
try:
self._q.put_nowait(item)
except asyncio.QueueFull:
log.warning("Send queue full — dropping message")
def enqueue(self, msg: str, priority: int = 1) -> None:
"""Thread-safe enqueue. Safe to call from any thread."""
with self._seq_lk:
self._seq += 1
seq = self._seq
item = (priority, seq, msg)
self._loop.call_soon_threadsafe(self._safe_put, item)
# Prefixes of outgoing IRC commands whose arguments contain secrets.
_REDACT_OUT: tuple[str, ...] = (
"PASS ", "OPER ", "PRIVMSG NickServ :IDENTIFY ", "AUTHENTICATE ",
)
# Maximum IRC line length including \r\n (RFC 2812 §2.3).
_MAX_IRC_LINE = 512
def _write_line(self, msg: str) -> None:
"""Sanitize, log, and buffer a single IRC line. NOT async — just buffers."""
# Strip embedded CR/LF/NUL to prevent protocol injection.
msg = msg.replace("\r", "").replace("\n", "").replace("\x00", "")
# BUG-026: Enforce 512-byte IRC line limit (including \r\n).
encoded = msg.encode("utf-8", errors="replace")
if len(encoded) > self._MAX_IRC_LINE - 2: # reserve 2 for \r\n
encoded = encoded[:self._MAX_IRC_LINE - 2]
# Avoid splitting a multi-byte UTF-8 char.
while encoded and (encoded[-1] & 0xC0) == 0x80:
encoded = encoded[:-1]
msg = encoded.decode("utf-8", errors="replace")
# Redact credentials from logs.
log_msg = msg
for prefix in self._REDACT_OUT:
if msg.upper().startswith(prefix.upper()):
log_msg = prefix + "[REDACTED]"
break
log.debug(f">> {log_msg}")
try:
if self._writer and not self._writer.is_closing():
self._writer.write((msg + "\r\n").encode("utf-8", errors="replace"))
except Exception as e:
log.warning(f"Send error: {e}")
async def _drain(self) -> None:
"""Consume the queue, apply token-bucket rate limiting, write + drain."""
tokens = float(self.CAPACITY)
last = self._loop.time()
while True:
try:
pri, _, msg = await asyncio.wait_for(self._q.get(), timeout=0.25)
except asyncio.TimeoutError:
# Replenish tokens even when idle.
now = self._loop.time()
tokens = min(self.CAPACITY, tokens + (now - last) / self.REFILL)
last = now
continue
now = self._loop.time()
tokens = min(self.CAPACITY, tokens + (now - last) / self.REFILL)
last = now
if pri > 0:
# Normal traffic — wait for a token.
while tokens < 1.0:
await asyncio.sleep(0.05)
now = self._loop.time()
tokens = min(self.CAPACITY, tokens + (now - last) / self.REFILL)
last = now
tokens -= 1.0
self._write_line(msg)
# Flush the write buffer to the OS.
try:
if self._writer and not self._writer.is_closing():
await self._writer.drain()
except Exception as e:
log.warning(f"Drain error: {e}")