Skip to content

Commit 2fb2183

Browse files
committed
Ensure connections always reconnect deterministically.
1 parent 84385f1 commit 2fb2183

File tree

2 files changed

+47
-17
lines changed

2 files changed

+47
-17
lines changed

CHANGES.rst

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,14 @@ Changelog
44
4.3 (unreleased)
55
----------------
66

7+
- Ensure connections always reconnect deterministically after a server
8+
disconnect (or crash).
9+
Previously the pool may have harboured long-running connections that only
10+
got reconnected when the pool felt like returning it to the application.
11+
Those connections can easily have lingered for a long time, causing
12+
user-visible errors long after the original problem was fixed, e.g. when
13+
the server has restarted a couple of hours ago.
14+
715
- Add support for Python 3.13.
816

917
- Drop support for Python 3.7 and 3.8.

src/Products/ZPsycopgDA/db.py

Lines changed: 39 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ class DB(TM):
4545
_p_oid = _p_changed = None
4646
_registered = False
4747
_sort_key = '1'
48+
_conn = None
4849

4950
def __init__(self, dsn, tilevel, typecasts, enc='utf-8'):
5051
self.dsn = dsn
@@ -58,31 +59,52 @@ def __init__(self, dsn, tilevel, typecasts, enc='utf-8'):
5859
self.calls = 0
5960
self.make_mappings()
6061

61-
def getconn(self, init='ignored', retry=100):
62-
conn = pool.getconn(self.dsn)
63-
_pool = pool.getpool(self.dsn, create=False)
64-
if id(conn) not in _pool._initialized:
62+
def getconn(self, init='ignored'):
63+
if self._conn:
64+
# As the TM is a short-lived object, keep a reference to the
65+
# connection - we really expect the pool's getconn() to reliably
66+
# return the same connection anyway. This is needed to
67+
# differentiate between an initial connection where its fine to
68+
# keep going through the pool and fetching a new one versus an
69+
# existing connection in a running transaction where e.g. a failure
70+
# on the server side has aborted the transaction and we can't e.g.
71+
# run `select 1` any longer but we also need the original
72+
# connection to clean up correctly within Zope's transaction
73+
# management. Theoretically this could be placed in `_register()`
74+
# but I'm not 100% sure someone might be using getconn() without
75+
# the TM integration thus going around `_register()` ...
76+
return self._conn
77+
78+
_pool = pool.getpool(self.dsn, create=True)
79+
80+
# Loop to support cleaning up potentially all `maxconn` faulty
81+
# connections. Add 1 more to force a fresh connection at least once.
82+
tries = max([_pool.maxconn + 1, 1])
83+
for _ in range(tries):
84+
conn = pool.getconn(self.dsn, create=False)
6585
try:
66-
conn.set_session(isolation_level=int(self.tilevel))
67-
except psycopg2.InterfaceError:
68-
# we got a closed connection from a poisoned pool ->
69-
# close it and retry:
70-
pool.putconn(self.dsn, conn, True)
71-
if retry <= 0:
72-
raise ConflictError("InterfaceError from psycopg2")
73-
return self.getconn(retry=retry - 1)
86+
cursor = conn.cursor()
87+
cursor.execute("SELECT 1")
88+
conn.rollback()
89+
break
90+
except (psycopg2.InterfaceError, psycopg2.OperationalError):
91+
pool.putconn(self.dsn, conn, close=True)
92+
93+
if id(conn) not in _pool._initialized:
94+
conn.set_session(isolation_level=int(self.tilevel))
7495
conn.set_client_encoding(self.encoding)
7596
for tc in self.typecasts:
7697
register_type(tc, conn)
7798
_pool._initialized.add(id(conn))
99+
100+
self._conn = conn
78101
return conn
79102

80103
def putconn(self, close=False):
81-
try:
82-
conn = pool.getconn(self.dsn, False)
83-
except AttributeError:
84-
pass
85-
pool.putconn(self.dsn, conn, close)
104+
if not self._conn:
105+
return
106+
pool.putconn(self.dsn, self._conn, close)
107+
self._conn = None
86108

87109
def getcursor(self):
88110
conn = self.getconn()

0 commit comments

Comments
 (0)