Skip to content

Commit 18083d5

Browse files
authored
Merge pull request #18 from dapper91/dev
- unix socket managers implemented. - hooks usage example added.
2 parents 970402d + 23dbad7 commit 18083d5

19 files changed

+396
-53
lines changed

CHANGELOG.rst

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
Changelog
22
=========
33

4+
0.5.0 (2023-08-17)
5+
------------------
6+
7+
- unix socket managers implemented.
8+
9+
410
0.4.1 (2023-08-16)
511
------------------
612

docs/source/pages/api.rst

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,6 @@ _______
3535

3636
.. automodule:: generic_connection_pool.contrib.unix
3737
:members:
38+
39+
.. automodule:: generic_connection_pool.contrib.unix_async
40+
:members:

docs/source/pages/quickstart.rst

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,14 @@ The following example illustrate how to implement a custom connection manager fo
125125
:language: python
126126

127127

128+
Connection manager allows to define methods to be called on connection acquire, release or when
129+
a connection determined to be dead. That helps to log pool actions or collect metrics.
130+
The following examples illustrate how to collect pool metrics and export them to prometheus.
131+
132+
.. literalinclude:: ../../../examples/manager_hooks.py
133+
:language: python
134+
135+
128136
Examples
129137
________
130138

examples/manager_hooks.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import ssl
2+
import time
3+
from ssl import SSLSocket
4+
from typing import Any, Dict, Tuple
5+
6+
import prometheus_client as prom
7+
8+
from generic_connection_pool.contrib.socket import SslSocketConnectionManager
9+
from generic_connection_pool.threading import ConnectionPool
10+
11+
Hostname = str
12+
Port = int
13+
Endpoint = Tuple[Hostname, Port]
14+
15+
acquire_latency_hist = prom.Histogram('acquire_latency', 'Connections acquire latency', labelnames=['hostname'])
16+
acquire_total = prom.Counter('acquire_total', 'Connections acquire count', labelnames=['hostname'])
17+
dead_conn_total = prom.Counter('dead_conn_total', 'Dead connections count', labelnames=['hostname'])
18+
19+
20+
class ObservableConnectionManager(SslSocketConnectionManager):
21+
22+
def __init__(self, *args: Any, **kwargs: Any):
23+
super().__init__(*args, **kwargs)
24+
self._acquires: Dict[SSLSocket, float] = {}
25+
26+
def on_acquire(self, endpoint: Endpoint, conn: SSLSocket) -> None:
27+
hostname, port = endpoint
28+
29+
acquire_total.labels(hostname).inc()
30+
self._acquires[conn] = time.time()
31+
32+
def on_release(self, endpoint: Endpoint, conn: SSLSocket) -> None:
33+
hostname, port = endpoint
34+
35+
acquired_at = self._acquires.pop(conn)
36+
acquire_latency_hist.labels(hostname).observe(time.time() - acquired_at)
37+
38+
def on_connection_dead(self, endpoint: Endpoint, conn: SSLSocket) -> None:
39+
hostname, port = endpoint
40+
41+
dead_conn_total.labels(hostname).inc()
42+
43+
44+
http_pool = ConnectionPool[Endpoint, SSLSocket](
45+
ObservableConnectionManager(ssl.create_default_context()),
46+
)

examples/ssl_pool.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import socket
21
import ssl
32
import urllib.parse
43
from http.client import HTTPResponse
@@ -10,7 +9,7 @@
109
Hostname = str
1110
Port = int
1211
Endpoint = Tuple[Hostname, Port]
13-
Connection = socket.socket
12+
Connection = ssl.SSLSocket
1413

1514

1615
http_pool = ConnectionPool[Endpoint, Connection](

examples/tcp_pool.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
import socket
2-
from ipaddress import IPv4Address
3-
from typing import Tuple
2+
from ipaddress import IPv4Address, IPv6Address
3+
from typing import Tuple, Union
44

55
from generic_connection_pool.contrib.socket import TcpSocketConnectionManager
66
from generic_connection_pool.threading import ConnectionPool
77

88
Port = int
9-
Endpoint = Tuple[IPv4Address, Port]
9+
IpAddress = Union[IPv4Address, IPv6Address]
10+
Endpoint = Tuple[IpAddress, Port]
1011
Connection = socket.socket
1112

1213

@@ -21,7 +22,7 @@
2122
)
2223

2324

24-
def command(addr: IPv4Address, port: int, cmd: str) -> None:
25+
def command(addr: IpAddress, port: int, cmd: str) -> None:
2526
with redis_pool.connection(endpoint=(addr, port), timeout=5.0) as sock:
2627
sock.sendall(cmd.encode() + b'\n')
2728
response = sock.recv(1024)

generic_connection_pool/contrib/socket_async.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,11 @@ async def create(self, endpoint: TcpEndpoint) -> socket.socket:
6161
return sock
6262

6363
async def dispose(self, endpoint: TcpEndpoint, conn: socket.socket) -> None:
64-
conn.shutdown(socket.SHUT_RDWR)
64+
try:
65+
conn.shutdown(socket.SHUT_RDWR)
66+
except OSError:
67+
pass
68+
6569
conn.close()
6670

6771

generic_connection_pool/contrib/unix.py

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,24 +3,29 @@
33
"""
44

55
import errno
6+
import pathlib
67
import socket
78
import sys
8-
from typing import Optional
9+
from typing import Generic, Optional
910

10-
from generic_connection_pool.contrib.socket import TcpEndpoint
11+
from generic_connection_pool.contrib.socket import BaseConnectionManager
12+
from generic_connection_pool.threading import EndpointT
1113

1214
from .socket import socket_timeout
1315

14-
if sys.platform not in ('linux', 'darwin'):
15-
raise AssertionError('this module is supported only on linux and darwin platform')
16+
if sys.platform not in ('linux', 'darwin', 'freebsd'):
17+
raise AssertionError('this module is supported only on unix platforms')
1618

1719

18-
class CheckSocketAlivenessMixin:
20+
UnixSocketEndpoint = pathlib.Path
21+
22+
23+
class CheckSocketAlivenessMixin(Generic[EndpointT]):
1924
"""
2025
Socket aliveness checking mixin.
2126
"""
2227

23-
def check_aliveness(self, endpoint: TcpEndpoint, conn: socket.socket, timeout: Optional[float] = None) -> bool:
28+
def check_aliveness(self, endpoint: EndpointT, conn: socket.socket, timeout: Optional[float] = None) -> bool:
2429
try:
2530
with socket_timeout(conn, timeout):
2631
resp = conn.recv(1, socket.MSG_PEEK | socket.MSG_DONTWAIT)
@@ -33,3 +38,28 @@ def check_aliveness(self, endpoint: TcpEndpoint, conn: socket.socket, timeout: O
3338
return False
3439

3540
return True
41+
42+
43+
class UnixSocketConnectionManager(
44+
CheckSocketAlivenessMixin[UnixSocketEndpoint],
45+
BaseConnectionManager[UnixSocketEndpoint, socket.socket],
46+
):
47+
"""
48+
Unix socket connection manager.
49+
"""
50+
51+
def create(self, endpoint: UnixSocketEndpoint, timeout: Optional[float] = None) -> socket.socket:
52+
sock = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM)
53+
54+
with socket_timeout(sock, timeout):
55+
sock.connect(str(endpoint))
56+
57+
return sock
58+
59+
def dispose(self, endpoint: UnixSocketEndpoint, conn: socket.socket, timeout: Optional[float] = None) -> None:
60+
try:
61+
conn.shutdown(socket.SHUT_RDWR)
62+
except OSError:
63+
pass
64+
65+
conn.close()
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
"""
2+
Asynchronous unix specific functionality.
3+
"""
4+
5+
import asyncio
6+
import pathlib
7+
import socket
8+
import sys
9+
10+
from generic_connection_pool.contrib.socket_async import BaseConnectionManager, SocketAlivenessCheckingMixin, Stream
11+
from generic_connection_pool.contrib.socket_async import StreamAlivenessCheckingMixin
12+
13+
if sys.platform not in ('linux', 'darwin', 'freebsd'):
14+
raise AssertionError('this module is supported only on unix platforms')
15+
16+
17+
UnixSocketEndpoint = pathlib.Path
18+
19+
20+
class UnixSocketConnectionManager(
21+
SocketAlivenessCheckingMixin[UnixSocketEndpoint],
22+
BaseConnectionManager[UnixSocketEndpoint, socket.socket],
23+
):
24+
"""
25+
Asynchronous unix socket connection manager.
26+
"""
27+
28+
async def create(self, endpoint: UnixSocketEndpoint) -> socket.socket:
29+
loop = asyncio.get_running_loop()
30+
31+
sock = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM)
32+
sock.setblocking(False)
33+
34+
await loop.sock_connect(sock, address=(str(endpoint)))
35+
36+
return sock
37+
38+
async def dispose(self, endpoint: UnixSocketEndpoint, conn: socket.socket) -> None:
39+
try:
40+
conn.shutdown(socket.SHUT_RDWR)
41+
except OSError:
42+
pass
43+
44+
conn.close()
45+
46+
47+
class UnixSocketStreamConnectionManager(
48+
StreamAlivenessCheckingMixin[UnixSocketEndpoint],
49+
BaseConnectionManager[UnixSocketEndpoint, Stream],
50+
):
51+
"""
52+
Asynchronous unix socket stream connection manager.
53+
"""
54+
55+
async def create(self, endpoint: UnixSocketEndpoint) -> Stream:
56+
reader, writer = await asyncio.open_unix_connection(path=endpoint)
57+
58+
return reader, writer
59+
60+
async def dispose(self, endpoint: UnixSocketEndpoint, conn: Stream) -> None:
61+
reader, writer = conn
62+
if writer.can_write_eof():
63+
writer.write_eof()
64+
65+
writer.close()
66+
await writer.wait_closed()

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "generic-connection-pool"
3-
version = "0.4.1"
3+
version = "0.5.0"
44
description = "generic connection pool"
55
authors = ["Dmitry Pershin <[email protected]>"]
66
license = "Unlicense"

0 commit comments

Comments
 (0)