Skip to content

Commit 03b9adb

Browse files
Fix for simultaneous queries issue when iterating (jayvynl#133)
* Demonstrating breaking connection * Stop reusing broken connection * Added changelog and explanatory comment to test
1 parent 0abe336 commit 03b9adb

File tree

3 files changed

+47
-1
lines changed

3 files changed

+47
-1
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
### 1.5.0
2-
2+
- feat: #133: Fix simultaneous queries error when iteration is interrupted
33
- feat: #130: Add `distributed_migrations` database setting to support distributed migration queries.
44
- feat: #129: Add `toYearWeek` datetime functionality
55

clickhouse_backend/driver/pool.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,11 @@ def push(
105105
raise InterfaceError("trying to put unkeyed client")
106106
if len(self._pool) < self.connections_min and not close:
107107
# TODO: verify connection still valid
108+
109+
# If the connection is currently executing a query, it shouldn't be reused.
110+
# Explicitly disconnect it instead.
111+
if client.connection.is_query_executing:
112+
client.disconnect()
108113
if client.connection.connected:
109114
self._pool.append(client)
110115
else:
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,52 @@
1+
from django.db import connection
12
from django.test import TestCase
23

34
from clickhouse_backend.driver import connect
45

6+
from .. import models
7+
58

69
class Tests(TestCase):
710
def test_pool_size(self):
811
conn = connect(host="localhost", connections_min=2, connections_max=4)
912
assert conn.pool.connections_min == 2
1013
assert conn.pool.connections_max == 4
1114
assert len(conn.pool._pool) == 2
15+
16+
17+
class IterationTests(TestCase):
18+
"""
19+
Testing connection behaviour when iterating over queryset is interrupted.
20+
"""
21+
22+
@classmethod
23+
def setUpTestData(cls):
24+
cls.a1, cls.a2, cls.a3 = models.Author.objects.bulk_create(
25+
[
26+
models.Author(name="a1"),
27+
models.Author(name="a2"),
28+
models.Author(name="a3"),
29+
]
30+
)
31+
32+
def test_connection_not_reused_when_iteration_interrupted(self):
33+
"""
34+
This test demonstrates that if a queryset is iterated over and the
35+
iteration is interrupted (e.g. via a break statement), the connection
36+
used for that iteration is disconnected and not returned to the pool.
37+
"""
38+
pool = connection.connection.pool
39+
40+
connection_count_before = len(pool._pool)
41+
assert connection_count_before == 1
42+
43+
authors = models.Author.objects.all()
44+
for author in authors.iterator(1):
45+
author = author.name
46+
break
47+
48+
connection_count_after_iterator = len(pool._pool)
49+
# Connection was closed and not returned to pool
50+
assert connection_count_after_iterator == 0
51+
52+
author = authors.get(id=self.a1.id)

0 commit comments

Comments
 (0)