|
4 | 4 | import json |
5 | 5 | import logging |
6 | 6 | import os |
| 7 | +import socket |
| 8 | +import sys |
7 | 9 | import time |
8 | 10 | from concurrent.futures import ThreadPoolExecutor |
9 | 11 | from contextlib import nullcontext |
|
24 | 26 | SENTRY_ENABLED, |
25 | 27 | WEBSOCKET_PING_INTERVAL, |
26 | 28 | WEBSOCKET_PING_TIMEOUT, |
| 29 | + WEBSOCKET_TCP_KEEPALIVE_COUNT, |
| 30 | + WEBSOCKET_TCP_KEEPALIVE_ENABLED, |
| 31 | + WEBSOCKET_TCP_KEEPALIVE_IDLE, |
| 32 | + WEBSOCKET_TCP_KEEPALIVE_INTERVAL, WEBSOCKET_APP_KEEPALIVE_ENABLED, |
27 | 33 | ) |
28 | 34 | from robusta.core.playbooks.playbook_utils import to_safe_str |
29 | 35 | from robusta.core.playbooks.playbooks_event_handler import PlaybooksEventHandler |
|
42 | 48 | WEBSOCKET_THREADPOOL_SIZE = int(os.environ.get("WEBSOCKET_THREADPOOL_SIZE", 10)) |
43 | 49 |
|
44 | 50 |
|
| 51 | +def _get_tcp_keepalive_options() -> tuple: |
| 52 | + """Build TCP keepalive socket options tuple for run_forever(sockopt=...).""" |
| 53 | + # TCP_KEEPIDLE is Linux-only; macOS uses TCP_KEEPALIVE (0x10) for the same purpose |
| 54 | + if sys.platform == "darwin": |
| 55 | + tcp_keepalive_idle = 0x10 |
| 56 | + else: |
| 57 | + tcp_keepalive_idle = socket.TCP_KEEPIDLE |
| 58 | + |
| 59 | + return ( |
| 60 | + (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1), |
| 61 | + (socket.IPPROTO_TCP, tcp_keepalive_idle, WEBSOCKET_TCP_KEEPALIVE_IDLE), |
| 62 | + (socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, WEBSOCKET_TCP_KEEPALIVE_INTERVAL), |
| 63 | + (socket.IPPROTO_TCP, socket.TCP_KEEPCNT, WEBSOCKET_TCP_KEEPALIVE_COUNT), |
| 64 | + ) |
| 65 | + |
| 66 | + |
45 | 67 | class ValidationResponse(BaseModel): |
46 | 68 | http_code: int = 200 |
47 | 69 | error_code: Optional[int] = None |
@@ -114,11 +136,22 @@ def start_receiver(self): |
114 | 136 |
|
115 | 137 | def run_forever(self): |
116 | 138 | logging.info("starting relay receiver") |
| 139 | + sockopt = None |
| 140 | + if WEBSOCKET_TCP_KEEPALIVE_ENABLED: |
| 141 | + sockopt = _get_tcp_keepalive_options() |
| 142 | + logging.info( |
| 143 | + f"TCP keepalive enabled: idle={WEBSOCKET_TCP_KEEPALIVE_IDLE}s, " |
| 144 | + f"interval={WEBSOCKET_TCP_KEEPALIVE_INTERVAL}s, count={WEBSOCKET_TCP_KEEPALIVE_COUNT}" |
| 145 | + ) |
117 | 146 | while self.active: |
| 147 | + # Handles WEBSOCKET_PING_INTERVAL == 0 |
| 148 | + ping_timeout = WEBSOCKET_PING_TIMEOUT if WEBSOCKET_PING_INTERVAL else None |
| 149 | + logging.info("relay websocket starting") |
118 | 150 | self.ws.run_forever( |
119 | 151 | ping_interval=WEBSOCKET_PING_INTERVAL, |
120 | 152 | ping_payload="p", |
121 | | - ping_timeout=WEBSOCKET_PING_TIMEOUT, |
| 153 | + ping_timeout=ping_timeout, |
| 154 | + sockopt=sockopt, |
122 | 155 | ) |
123 | 156 | logging.info("relay websocket closed") |
124 | 157 | time.sleep(INCOMING_WEBSOCKET_RECONNECT_DELAY_SEC) |
|
0 commit comments