diff --git a/CHANGELOG.md b/CHANGELOG.md index a6bbfe8..6b5d361 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,5 @@ ### 1.5.0 - +- feat: #133: Fix simultaneous queries error when iteration is interrupted - feat: #130: Add `distributed_migrations` database setting to support distributed migration queries. - feat: #129: Add `toYearWeek` datetime functionality diff --git a/clickhouse_backend/driver/pool.py b/clickhouse_backend/driver/pool.py index 4c51cd4..cf4c8fc 100644 --- a/clickhouse_backend/driver/pool.py +++ b/clickhouse_backend/driver/pool.py @@ -105,6 +105,11 @@ def push( raise InterfaceError("trying to put unkeyed client") if len(self._pool) < self.connections_min and not close: # TODO: verify connection still valid + + # If the connection is currently executing a query, it shouldn't be reused. + # Explicitly disconnect it instead. + if client.connection.is_query_executing: + client.disconnect() if client.connection.connected: self._pool.append(client) else: diff --git a/tests/backends/clickhouse/test_driver.py b/tests/backends/clickhouse/test_driver.py index cc29e71..215294d 100644 --- a/tests/backends/clickhouse/test_driver.py +++ b/tests/backends/clickhouse/test_driver.py @@ -1,7 +1,10 @@ +from django.db import connection from django.test import TestCase from clickhouse_backend.driver import connect +from .. import models + class Tests(TestCase): def test_pool_size(self): @@ -9,3 +12,41 @@ def test_pool_size(self): assert conn.pool.connections_min == 2 assert conn.pool.connections_max == 4 assert len(conn.pool._pool) == 2 + + +class IterationTests(TestCase): + """ + Testing connection behaviour when iterating over queryset is interrupted. + """ + + @classmethod + def setUpTestData(cls): + cls.a1, cls.a2, cls.a3 = models.Author.objects.bulk_create( + [ + models.Author(name="a1"), + models.Author(name="a2"), + models.Author(name="a3"), + ] + ) + + def test_connection_not_reused_when_iteration_interrupted(self): + """ + This test demonstrates that if a queryset is iterated over and the + iteration is interrupted (e.g. via a break statement), the connection + used for that iteration is disconnected and not returned to the pool. + """ + pool = connection.connection.pool + + connection_count_before = len(pool._pool) + assert connection_count_before == 1 + + authors = models.Author.objects.all() + for author in authors.iterator(1): + author = author.name + break + + connection_count_after_iterator = len(pool._pool) + # Connection was closed and not returned to pool + assert connection_count_after_iterator == 0 + + author = authors.get(id=self.a1.id)