From 562a7e5b1948b1483386c7239c5d480e27d389cd Mon Sep 17 00:00:00 2001 From: "cmccaffrey@cloudsmith.io" Date: Thu, 18 Sep 2025 16:59:00 +0100 Subject: [PATCH 1/3] Demonstrating breaking connection --- tests/backends/clickhouse/test_driver.py | 63 ++++++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/tests/backends/clickhouse/test_driver.py b/tests/backends/clickhouse/test_driver.py index cc29e71..bf48031 100644 --- a/tests/backends/clickhouse/test_driver.py +++ b/tests/backends/clickhouse/test_driver.py @@ -1,7 +1,15 @@ +from clickhouse_driver.dbapi.errors import ( + OperationalError as ch_driver_OperationalError, +) +from clickhouse_driver.errors import PartiallyConsumedQueryError +from django.db import connection +from django.db.utils import OperationalError as django_OperationalError from django.test import TestCase from clickhouse_backend.driver import connect +from .. import models + class Tests(TestCase): def test_pool_size(self): @@ -9,3 +17,58 @@ def test_pool_size(self): assert conn.pool.connections_min == 2 assert conn.pool.connections_max == 4 assert len(conn.pool._pool) == 2 + + +class IterationTests(TestCase): + """ + Testing connection behaviour when iterating over queryset is interrupted. + """ + + @classmethod + def setUpTestData(cls): + cls.a1, cls.a2, cls.a3 = models.Author.objects.bulk_create( + [ + models.Author(name="a1"), + models.Author(name="a2"), + models.Author(name="a3"), + ] + ) + + def test_connection_unusable_when_iteration_interrupted(self): + """ + This test demonstrates that if a queryset is iterated over and the iteration + is interrupted (e.g. via a break statement), the connection used for that + iteration is not cleaned up and is left in a broken state. Any subsequent + queries using that connection will fail. + """ + pool = connection.connection.pool + connection_count_before = len(pool._pool) + + assert connection_count_before == 1 + + # Asserts most recent exception is Django OperationalError + with self.assertRaises(django_OperationalError) as ex_context: + # Get queryset + authors = models.Author.objects.all() + # Access iterator, but break after first item + for author in authors.iterator(1): + author = author.name + break + + # Assert connection pool size is unchanged despite broken connection + connection_count_after_iterator = len(pool._pool) + assert connection_count_after_iterator == 1 + + # Try to access queryset again, which won't work via same connection + author = authors.get(id=self.a1.id) + + # Caused by ch driver driver Operational error + self.assertIsInstance( + ex_context.exception.__cause__, ch_driver_OperationalError + ) + + # ...The context of which is a PartiallyConsumedQueryError + # https://github.com/mymarilyn/clickhouse-driver/blob/master/clickhouse_driver/connection.py#L801 + self.assertIsInstance( + ex_context.exception.__cause__.__context__, PartiallyConsumedQueryError + ) From 3652ef2852f9eb93a6c7bc4b32c98906736c3d59 Mon Sep 17 00:00:00 2001 From: "cmccaffrey@cloudsmith.io" Date: Thu, 18 Sep 2025 17:03:22 +0100 Subject: [PATCH 2/3] Stop reusing broken connection --- clickhouse_backend/driver/pool.py | 5 +++ tests/backends/clickhouse/test_driver.py | 47 +++++------------------- 2 files changed, 15 insertions(+), 37 deletions(-) diff --git a/clickhouse_backend/driver/pool.py b/clickhouse_backend/driver/pool.py index 4c51cd4..cf4c8fc 100644 --- a/clickhouse_backend/driver/pool.py +++ b/clickhouse_backend/driver/pool.py @@ -105,6 +105,11 @@ def push( raise InterfaceError("trying to put unkeyed client") if len(self._pool) < self.connections_min and not close: # TODO: verify connection still valid + + # If the connection is currently executing a query, it shouldn't be reused. + # Explicitly disconnect it instead. + if client.connection.is_query_executing: + client.disconnect() if client.connection.connected: self._pool.append(client) else: diff --git a/tests/backends/clickhouse/test_driver.py b/tests/backends/clickhouse/test_driver.py index bf48031..a749a3e 100644 --- a/tests/backends/clickhouse/test_driver.py +++ b/tests/backends/clickhouse/test_driver.py @@ -1,9 +1,4 @@ -from clickhouse_driver.dbapi.errors import ( - OperationalError as ch_driver_OperationalError, -) -from clickhouse_driver.errors import PartiallyConsumedQueryError from django.db import connection -from django.db.utils import OperationalError as django_OperationalError from django.test import TestCase from clickhouse_backend.driver import connect @@ -34,41 +29,19 @@ def setUpTestData(cls): ] ) - def test_connection_unusable_when_iteration_interrupted(self): - """ - This test demonstrates that if a queryset is iterated over and the iteration - is interrupted (e.g. via a break statement), the connection used for that - iteration is not cleaned up and is left in a broken state. Any subsequent - queries using that connection will fail. - """ + def test_connection_not_reused_when_iteration_interrupted(self): pool = connection.connection.pool - connection_count_before = len(pool._pool) + connection_count_before = len(pool._pool) assert connection_count_before == 1 - # Asserts most recent exception is Django OperationalError - with self.assertRaises(django_OperationalError) as ex_context: - # Get queryset - authors = models.Author.objects.all() - # Access iterator, but break after first item - for author in authors.iterator(1): - author = author.name - break - - # Assert connection pool size is unchanged despite broken connection - connection_count_after_iterator = len(pool._pool) - assert connection_count_after_iterator == 1 + authors = models.Author.objects.all() + for author in authors.iterator(1): + author = author.name + break - # Try to access queryset again, which won't work via same connection - author = authors.get(id=self.a1.id) + connection_count_after_iterator = len(pool._pool) + # Connection was closed and not returned to pool + assert connection_count_after_iterator == 0 - # Caused by ch driver driver Operational error - self.assertIsInstance( - ex_context.exception.__cause__, ch_driver_OperationalError - ) - - # ...The context of which is a PartiallyConsumedQueryError - # https://github.com/mymarilyn/clickhouse-driver/blob/master/clickhouse_driver/connection.py#L801 - self.assertIsInstance( - ex_context.exception.__cause__.__context__, PartiallyConsumedQueryError - ) + author = authors.get(id=self.a1.id) From 552a483ef027abfda9e0ab3a423d998f111be2c2 Mon Sep 17 00:00:00 2001 From: "cmccaffrey@cloudsmith.io" Date: Thu, 18 Sep 2025 17:22:10 +0100 Subject: [PATCH 3/3] Added changelog and explanatory comment to test --- CHANGELOG.md | 2 +- tests/backends/clickhouse/test_driver.py | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a6bbfe8..6b5d361 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,5 @@ ### 1.5.0 - +- feat: #133: Fix simultaneous queries error when iteration is interrupted - feat: #130: Add `distributed_migrations` database setting to support distributed migration queries. - feat: #129: Add `toYearWeek` datetime functionality diff --git a/tests/backends/clickhouse/test_driver.py b/tests/backends/clickhouse/test_driver.py index a749a3e..215294d 100644 --- a/tests/backends/clickhouse/test_driver.py +++ b/tests/backends/clickhouse/test_driver.py @@ -30,6 +30,11 @@ def setUpTestData(cls): ) def test_connection_not_reused_when_iteration_interrupted(self): + """ + This test demonstrates that if a queryset is iterated over and the + iteration is interrupted (e.g. via a break statement), the connection + used for that iteration is disconnected and not returned to the pool. + """ pool = connection.connection.pool connection_count_before = len(pool._pool)