Skip to content

Commit d72e89a

Browse files
authored
feat: Allow customization of socket options (#385)
* feat: Allow customization of socket options This allows clients to configure (e.g.) socket keep alive probes * test: Prevent leaking modified _session * chore: Add a type ignore * feat: Add `disable_connection_reuse` config method Disables connection pooling * set_socket_options is idempotent
1 parent ce38fb2 commit d72e89a

File tree

3 files changed

+148
-13
lines changed

3 files changed

+148
-13
lines changed

posthog/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,12 @@
2222
InconclusiveMatchError as InconclusiveMatchError,
2323
RequiresServerEvaluation as RequiresServerEvaluation,
2424
)
25+
from posthog.request import (
26+
disable_connection_reuse as disable_connection_reuse,
27+
enable_keep_alive as enable_keep_alive,
28+
set_socket_options as set_socket_options,
29+
SocketOptions as SocketOptions,
30+
)
2531
from posthog.types import (
2632
FeatureFlag,
2733
FlagsAndPayloads,

posthog/request.py

Lines changed: 93 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,47 @@
11
import json
22
import logging
33
import re
4+
import socket
45
from dataclasses import dataclass
56
from datetime import date, datetime
67
from gzip import GzipFile
78
from io import BytesIO
8-
from typing import Any, Optional, Union
9+
from typing import Any, List, Optional, Tuple, Union
10+
911

1012
import requests
1113
from dateutil.tz import tzutc
14+
from requests.adapters import HTTPAdapter # type: ignore[import-untyped]
15+
from urllib3.connection import HTTPConnection
1216
from urllib3.util.retry import Retry
1317

1418
from posthog.utils import remove_trailing_slash
1519
from posthog.version import VERSION
1620

21+
SocketOptions = List[Tuple[int, int, Union[int, bytes]]]
22+
23+
KEEPALIVE_IDLE_SECONDS = 60
24+
KEEPALIVE_INTERVAL_SECONDS = 60
25+
KEEPALIVE_PROBE_COUNT = 3
26+
27+
# TCP keepalive probes idle connections to prevent them from being dropped.
28+
# SO_KEEPALIVE is cross-platform, but timing options vary:
29+
# - Linux: TCP_KEEPIDLE, TCP_KEEPINTVL, TCP_KEEPCNT
30+
# - macOS: only SO_KEEPALIVE (uses system defaults)
31+
# - Windows: TCP_KEEPIDLE, TCP_KEEPINTVL (since Windows 10 1709)
32+
KEEP_ALIVE_SOCKET_OPTIONS: SocketOptions = list(
33+
HTTPConnection.default_socket_options
34+
) + [
35+
(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
36+
]
37+
for attr, value in [
38+
("TCP_KEEPIDLE", KEEPALIVE_IDLE_SECONDS),
39+
("TCP_KEEPINTVL", KEEPALIVE_INTERVAL_SECONDS),
40+
("TCP_KEEPCNT", KEEPALIVE_PROBE_COUNT),
41+
]:
42+
if hasattr(socket, attr):
43+
KEEP_ALIVE_SOCKET_OPTIONS.append((socket.SOL_TCP, getattr(socket, attr), value))
44+
1745

1846
def _mask_tokens_in_url(url: str) -> str:
1947
"""Mask token values in URLs for safe logging, keeping first 10 chars visible."""
@@ -29,17 +57,69 @@ class GetResponse:
2957
not_modified: bool = False
3058

3159

32-
# Retry on both connect and read errors
33-
# by default read errors will only retry idempotent HTTP methods (so not POST)
34-
adapter = requests.adapters.HTTPAdapter(
35-
max_retries=Retry(
36-
total=2,
37-
connect=2,
38-
read=2,
60+
class HTTPAdapterWithSocketOptions(HTTPAdapter):
61+
"""HTTPAdapter with configurable socket options."""
62+
63+
def __init__(self, *args, socket_options: Optional[SocketOptions] = None, **kwargs):
64+
self.socket_options = socket_options
65+
super().__init__(*args, **kwargs)
66+
67+
def init_poolmanager(self, *args, **kwargs):
68+
if self.socket_options is not None:
69+
kwargs["socket_options"] = self.socket_options
70+
super().init_poolmanager(*args, **kwargs)
71+
72+
73+
def _build_session(socket_options: Optional[SocketOptions] = None) -> requests.Session:
74+
adapter = HTTPAdapterWithSocketOptions(
75+
max_retries=Retry(
76+
total=2,
77+
connect=2,
78+
read=2,
79+
),
80+
socket_options=socket_options,
3981
)
40-
)
41-
_session = requests.sessions.Session()
42-
_session.mount("https://", adapter)
82+
session = requests.sessions.Session()
83+
session.mount("https://", adapter)
84+
return session
85+
86+
87+
_session = _build_session()
88+
_socket_options: Optional[SocketOptions] = None
89+
_pooling_enabled = True
90+
91+
92+
def _get_session() -> requests.Session:
93+
if _pooling_enabled:
94+
return _session
95+
return _build_session(_socket_options)
96+
97+
98+
def set_socket_options(socket_options: Optional[SocketOptions]) -> None:
99+
"""
100+
Configure socket options for all HTTP connections.
101+
102+
Example:
103+
from posthog import set_socket_options
104+
set_socket_options([(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)])
105+
"""
106+
global _session, _socket_options
107+
if socket_options == _socket_options:
108+
return
109+
_socket_options = socket_options
110+
_session = _build_session(socket_options)
111+
112+
113+
def enable_keep_alive() -> None:
114+
"""Enable TCP keepalive to prevent idle connections from being dropped."""
115+
set_socket_options(KEEP_ALIVE_SOCKET_OPTIONS)
116+
117+
118+
def disable_connection_reuse() -> None:
119+
"""Disable connection reuse, creating a fresh connection for each request."""
120+
global _pooling_enabled
121+
_pooling_enabled = False
122+
43123

44124
US_INGESTION_ENDPOINT = "https://us.i.posthog.com"
45125
EU_INGESTION_ENDPOINT = "https://eu.i.posthog.com"
@@ -85,7 +165,7 @@ def post(
85165
gz.write(data.encode("utf-8"))
86166
data = buf.getvalue()
87167

88-
res = _session.post(url, data=data, headers=headers, timeout=timeout)
168+
res = _get_session().post(url, data=data, headers=headers, timeout=timeout)
89169

90170
if res.status_code == 200:
91171
log.debug("data uploaded successfully")
@@ -200,7 +280,7 @@ def get(
200280
if etag:
201281
headers["If-None-Match"] = etag
202282

203-
res = _session.get(full_url, headers=headers, timeout=timeout)
283+
res = _get_session().get(full_url, headers=headers, timeout=timeout)
204284

205285
masked_url = _mask_tokens_in_url(full_url)
206286

posthog/test/test_request.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,21 @@
66
import pytest
77
import requests
88

9+
import posthog.request as request_module
910
from posthog.request import (
1011
APIError,
1112
DatetimeSerializer,
1213
GetResponse,
14+
KEEP_ALIVE_SOCKET_OPTIONS,
1315
QuotaLimitError,
1416
_mask_tokens_in_url,
1517
batch_post,
1618
decide,
1719
determine_server_host,
20+
disable_connection_reuse,
21+
enable_keep_alive,
1822
get,
23+
set_socket_options,
1924
)
2025
from posthog.test.test_utils import TEST_API_KEY
2126

@@ -344,3 +349,47 @@ def test_get_removes_trailing_slash_from_host(self, mock_get):
344349
)
345350
def test_routing_to_custom_host(host, expected):
346351
assert determine_server_host(host) == expected
352+
353+
354+
def test_enable_keep_alive_sets_socket_options():
355+
try:
356+
enable_keep_alive()
357+
from posthog.request import _session
358+
359+
adapter = _session.get_adapter("https://example.com")
360+
assert adapter.socket_options == KEEP_ALIVE_SOCKET_OPTIONS
361+
finally:
362+
set_socket_options(None)
363+
364+
365+
def test_set_socket_options_clears_with_none():
366+
try:
367+
enable_keep_alive()
368+
set_socket_options(None)
369+
from posthog.request import _session
370+
371+
adapter = _session.get_adapter("https://example.com")
372+
assert adapter.socket_options is None
373+
finally:
374+
set_socket_options(None)
375+
376+
377+
def test_disable_connection_reuse_creates_fresh_sessions():
378+
try:
379+
disable_connection_reuse()
380+
session1 = request_module._get_session()
381+
session2 = request_module._get_session()
382+
assert session1 is not session2
383+
finally:
384+
request_module._pooling_enabled = True
385+
386+
387+
def test_set_socket_options_is_idempotent():
388+
try:
389+
enable_keep_alive()
390+
session1 = request_module._session
391+
enable_keep_alive()
392+
session2 = request_module._session
393+
assert session1 is session2
394+
finally:
395+
set_socket_options(None)

0 commit comments

Comments
 (0)