diff --git a/.github/workflows/base-test.yml b/.github/workflows/base-test.yml index c777dc5..9bed06f 100644 --- a/.github/workflows/base-test.yml +++ b/.github/workflows/base-test.yml @@ -8,16 +8,29 @@ on: jobs: test: - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 strategy: fail-fast: false matrix: - python-version: ["3.10", "3.12"] - django-version: ["3.2", "4.0", "4.1", "4.2", "5.0", "5.1"] + python-version: ["3.10", "3.13"] + django-version: ["3.2", "4.0", "4.1", "4.2", "5.0", "5.1", "5.2"] clickhouse-version: ["23.8", "latest"] include: - python-version: "3.7" django-version: "3.2" + - python-version: "3.8" + django-version: "3.2" + - python-version: "3.8" + django-version: "4.0" + - python-version: "3.8" + django-version: "4.1" + - python-version: "3.8" + django-version: "4.2" + exclude: + - python-version: "3.13" + django-version: "3.2" + - python-version: "3.13" + django-version: "4.0" name: ClickHouse${{ matrix.clickhouse-version }} Python${{ matrix.python-version }} Django${{ matrix.django-version }} steps: diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 49d88a4..557ca60 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -24,7 +24,7 @@ jobs: - name: Build a binary wheel and a source tarball run: python3 -m build - name: Store the distribution packages - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: python-package-distributions path: dist/ @@ -43,7 +43,7 @@ jobs: steps: - name: Download all the dists - uses: actions/download-artifact@v3 + uses: actions/download-artifact@v4 with: name: python-package-distributions path: dist/ @@ -64,12 +64,12 @@ jobs: steps: - name: Download all the dists - uses: actions/download-artifact@v3 + uses: actions/download-artifact@v4 with: name: python-package-distributions path: dist/ - name: Sign the dists with Sigstore - uses: sigstore/gh-action-sigstore-python@v2.1.1 + uses: sigstore/gh-action-sigstore-python@v3.0.0 with: inputs: >- ./dist/*.tar.gz diff --git a/CHANGELOG.md b/CHANGELOG.md index 8d6bb67..6b5d361 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,22 @@ +### 1.5.0 +- feat: #133: Fix simultaneous queries error when iteration is interrupted +- feat: #130: Add `distributed_migrations` database setting to support distributed migration queries. +- feat: #129: Add `toYearWeek` datetime functionality + +### 1.4.0 + +- feat: #119 Allow query results returned in columns and deserialized to `numpy` objects +- feat: #125 Add database functions `toStartOfMinute`, `toStartOfFiveMinutes`, `toStartOfTenMinutes`, `toStartOfFifteenMinutes` and `toStartofHour` +- feat: #122 Django 5.2 Support + +### 1.3.2 + +- feat(aggragation-function): add anyLast function. +- fix: pass DSN to clickhouse-client if configured. +- feat: #108 Queryset.iterator use clickhouse_driver.Client.execute_iter. +- chore: test for python3.13. +- refactor: Using collections.abc.Iterable instead of deprecated django.utils.itercompat.is_iterable + ### 1.3.1 - fix: #99 update value containing "where" cause exception. diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index b0c6e65..2d5a194 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -98,3 +98,7 @@ run test for all supported Python version and Django version: ```shell tox ``` + +### Other + +- Don't forget writing [Changelog](CHANGELOG.md) diff --git a/README.md b/README.md index aa93e87..20a88e2 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,7 @@ Read [Documentation](https://github.com/jayvynl/django-clickhouse-backend/blob/m - Support most clickhouse data types. - Support [SETTINGS in SELECT Query](https://clickhouse.com/docs/en/sql-reference/statements/select/#settings-in-select-query). - Support [PREWHERE clause](https://clickhouse.com/docs/en/sql-reference/statements/select/prewhere). +- Support query results returned in columns and [deserialized to `numpy` objects](https://clickhouse-driver.readthedocs.io/en/latest/features.html#numpy-pandas-support). **Notes:** @@ -381,6 +382,60 @@ and [distributed table engine](https://clickhouse.com/docs/en/engines/table-engi The following example assumes that a cluster defined by [docker compose in this repository](https://github.com/jayvynl/django-clickhouse-backend/blob/main/compose.yaml) is used. This cluster name is `cluster`, it has 2 shards, every shard has 2 replica. +Query results returned as columns and/or deserialized into `numpy` objects +--- + +`clickhouse-driver` allows results to be returned as columns and/or deserialized into +`numpy` objects. This backend supports both options by using the context manager, +`Cursor.set_query_execution_args()`. + +```python +import numpy as np +from django.db import connection + +sql = """ + SELECT toDateTime32('2022-01-01 01:00:05', 'UTC'), number, number*2.5 + FROM system.numbers + LIMIT 3 +""" +with connection.cursor() as cursorWrapper: + with cursorWrapper.cursor.set_query_execution_args( + columnar=True, use_numpy=True + ) as cursor: + cursor.execute(sql) + np.testing.assert_equal( + cursor.fetchall(), + [ + np.array( + [ + np.datetime64("2022-01-01T01:00:05"), + np.datetime64("2022-01-01T01:00:05"), + np.datetime64("2022-01-01T01:00:05"), + ], + dtype="datetime64[s]", + ), + np.array([0, 1, 2], dtype=np.uint64), + np.array([0, 2.5, 5.0], dtype=np.float64), + ], + ) + + cursor.execute(sql) + np.testing.assert_equal( + cursor.fetchmany(2), + [ + np.array( + [ + np.datetime64("2022-01-01T01:00:05"), + np.datetime64("2022-01-01T01:00:05"), + np.datetime64("2022-01-01T01:00:05"), + ], + dtype="datetime64[s]", + ), + np.array([0, 1, 2], dtype=np.uint64), + ], + ) +``` + ### Configuration ```python @@ -470,6 +525,34 @@ Extra settings explanation: Do not hardcode database name when you define replicated table or distributed table. Because test database name is different from deployed database name. +#### Clickhouse cluster behind a load balancer + +If your clickhouse cluster is running behind a load balancer, you can optionally set `distributed_migrations` to `True` under database OPTIONS. +Then a distributed migration table will be created on all nodes of the cluster, and all migration operations will be performed on this +distributed migrations table instead of a local migrations table. Otherwise, sequentially running migrations will have no effect on other nodes. + +Configuration example: + +```python +DATABASES = { + "default": { + "HOST": "clickhouse-load-balancer", + "PORT": 9000, + "ENGINE": "clickhouse_backend.backend", + "OPTIONS": { + "migration_cluster": "cluster", + "distributed_migrations": True, + "settings": { + "mutations_sync": 2, + "insert_distributed_sync": 1, + "insert_quorum": 2, + "alter_sync": 2, + }, + }, + } +} +``` + ### Model `cluster` in `Meta` class will make models being created on cluster. diff --git a/clickhouse-config/node1/remote-servers.xml b/clickhouse-config/node1/remote-servers.xml index 710591f..7bbf2d5 100644 --- a/clickhouse-config/node1/remote-servers.xml +++ b/clickhouse-config/node1/remote-servers.xml @@ -7,10 +7,12 @@ node1 9000 + clickhouse_password node2 9000 + clickhouse_password @@ -18,12 +20,14 @@ node3 9000 + clickhouse_password node4 9000 + clickhouse_password - \ No newline at end of file + diff --git a/clickhouse-config/node2/remote-servers.xml b/clickhouse-config/node2/remote-servers.xml index 710591f..7bbf2d5 100644 --- a/clickhouse-config/node2/remote-servers.xml +++ b/clickhouse-config/node2/remote-servers.xml @@ -7,10 +7,12 @@ node1 9000 + clickhouse_password node2 9000 + clickhouse_password @@ -18,12 +20,14 @@ node3 9000 + clickhouse_password node4 9000 + clickhouse_password - \ No newline at end of file + diff --git a/clickhouse-config/node3/remote-servers.xml b/clickhouse-config/node3/remote-servers.xml index 710591f..7bbf2d5 100644 --- a/clickhouse-config/node3/remote-servers.xml +++ b/clickhouse-config/node3/remote-servers.xml @@ -7,10 +7,12 @@ node1 9000 + clickhouse_password node2 9000 + clickhouse_password @@ -18,12 +20,14 @@ node3 9000 + clickhouse_password node4 9000 + clickhouse_password - \ No newline at end of file + diff --git a/clickhouse-config/node4/remote-servers.xml b/clickhouse-config/node4/remote-servers.xml index 710591f..7bbf2d5 100644 --- a/clickhouse-config/node4/remote-servers.xml +++ b/clickhouse-config/node4/remote-servers.xml @@ -7,10 +7,12 @@ node1 9000 + clickhouse_password node2 9000 + clickhouse_password @@ -18,12 +20,14 @@ node3 9000 + clickhouse_password node4 9000 + clickhouse_password - \ No newline at end of file + diff --git a/clickhouse_backend/__init__.py b/clickhouse_backend/__init__.py index c6185ed..7b9b417 100644 --- a/clickhouse_backend/__init__.py +++ b/clickhouse_backend/__init__.py @@ -1,5 +1,5 @@ from clickhouse_backend.utils.version import get_version -VERSION = (1, 3, 1, "final", 0) +VERSION = (1, 4, 0, "final", 0) __version__ = get_version(VERSION) diff --git a/clickhouse_backend/backend/base.py b/clickhouse_backend/backend/base.py index 72610bc..152191b 100644 --- a/clickhouse_backend/backend/base.py +++ b/clickhouse_backend/backend/base.py @@ -163,6 +163,11 @@ def __init__(self, settings_dict, alias=DEFAULT_DB_ALIAS): self.migration_cluster = self.settings_dict["OPTIONS"].pop( "migration_cluster", None ) + self.distributed_migrations = self.settings_dict["OPTIONS"].pop( + "distributed_migrations", None + ) + # https://clickhouse-driver.readthedocs.io/en/latest/quickstart.html#streaming-results + self.max_block_size = self.settings_dict["OPTIONS"].pop("max_block_size", 65409) if not self.settings_dict["NAME"]: self.settings_dict["NAME"] = "default" @@ -224,6 +229,12 @@ def init_connection_state(self): def create_cursor(self, name=None): return self.connection.cursor() + @async_unsafe + def chunked_cursor(self): + cursor = self._cursor() + cursor.cursor.set_stream_results(True, self.max_block_size) + return cursor + def _savepoint(self, sid): pass diff --git a/clickhouse_backend/backend/client.py b/clickhouse_backend/backend/client.py index 9904bf2..893d6f8 100644 --- a/clickhouse_backend/backend/client.py +++ b/clickhouse_backend/backend/client.py @@ -17,19 +17,23 @@ def settings_to_cmd_args_env(cls, settings_dict, parameters): user = settings_dict.get("USER") passwd = settings_dict.get("PASSWORD") secure = options.get("secure") + dsn = options.get("dsn") - if host: - args += ["-h", host] - if port: - args += ["--port", str(port)] - if user: - args += ["-u", user] - if passwd: - args += ["--password", passwd] - if dbname: - args += ["-d", dbname] - if secure: - args += ["--secure"] + if dsn: + args += [dsn] + else: + if host: + args += ["-h", host] + if port: + args += ["--port", str(port)] + if user: + args += ["-u", user] + if passwd: + args += ["--password", passwd] + if dbname: + args += ["-d", dbname] + if secure: + args += ["--secure"] args.extend(parameters) return args, None diff --git a/clickhouse_backend/driver/connection.py b/clickhouse_backend/driver/connection.py index f5ec18d..efc10af 100644 --- a/clickhouse_backend/driver/connection.py +++ b/clickhouse_backend/driver/connection.py @@ -1,8 +1,11 @@ import re +import typing as T +from contextlib import contextmanager from clickhouse_driver import connection from clickhouse_driver.dbapi import connection as dbapi_connection from clickhouse_driver.dbapi import cursor, errors +from clickhouse_driver.result import IterQueryResult, ProgressQueryResult, QueryResult from django.conf import settings from .escape import escape_params @@ -70,6 +73,10 @@ def send_query(self, query, query_id=None, params=None): class Cursor(cursor.Cursor): + # Whether to return data in columnar format. For backwards-compatibility, + # let's default to None. + columnar = None + def close(self): """Push client back to connection pool""" if self.closed: @@ -81,12 +88,64 @@ def close(self): def closed(self): return self._state == self._states.CURSOR_CLOSED + @property + def use_numpy(self): + return self._client.client_settings["use_numpy"] + + @use_numpy.setter + def use_numpy(self, value): + self._client.client_settings["use_numpy"] = value + if value: + try: + from clickhouse_driver.numpy.result import ( + NumpyIterQueryResult, + NumpyProgressQueryResult, + NumpyQueryResult, + ) + + self._client.query_result_cls = NumpyQueryResult + self._client.iter_query_result_cls = NumpyIterQueryResult + self._client.progress_query_result_cls = NumpyProgressQueryResult + except ImportError as e: + raise RuntimeError("Extras for NumPy must be installed") from e + else: + self._client.query_result_cls = QueryResult + self._client.iter_query_result_cls = IterQueryResult + self._client.progress_query_result_cls = ProgressQueryResult + + @contextmanager + def set_query_execution_args( + self, columnar: T.Optional[bool] = None, use_numpy: T.Optional[bool] = None + ): + original_use_numpy = self.use_numpy + if use_numpy is not None: + self.use_numpy = use_numpy + + original_columnar = self.columnar + if columnar is not None: + self.columnar = columnar + + yield self + + self.use_numpy = original_use_numpy + self.columnar = original_columnar + def __del__(self): # If someone forgets calling close method, # then release connection when gc happens. if not self.closed: self.close() + def _prepare(self): + """Override clickhouse_driver.Cursor._prepare() to add columnar kwargs. + + See https://github.com/jayvynl/django-clickhouse-backend/issues/119 + """ + execute, execute_kwargs = super()._prepare() + if self.columnar is not None: + execute_kwargs["columnar"] = self.columnar + return execute, execute_kwargs + def execute(self, operation, parameters=None): """fix https://github.com/jayvynl/django-clickhouse-backend/issues/9""" if getattr( @@ -116,9 +175,9 @@ class Connection(dbapi_connection.Connection): """Connection class with support for connection pool.""" def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) kwargs.setdefault("connections_min", 10) kwargs.setdefault("connections_max", 100) + super().__init__(*args, **kwargs) self.pool = ClickhousePool( dsn=self.dsn, host=self.host, diff --git a/clickhouse_backend/driver/pool.py b/clickhouse_backend/driver/pool.py index 11a009b..cf4c8fc 100644 --- a/clickhouse_backend/driver/pool.py +++ b/clickhouse_backend/driver/pool.py @@ -17,8 +17,8 @@ def __init__(self, **kwargs): self.dsn = kwargs.pop("dsn", None) self.connections_min = kwargs.pop("connections_min", 10) self.connections_max = kwargs.pop("connections_max", 20) - - self.connection_args = {"host": kwargs.pop("host", "localhost"), **kwargs} + kwargs.setdefault("host", "localhost") + self.connection_args = kwargs self.closed = False self._pool = [] self._used = {} @@ -105,6 +105,11 @@ def push( raise InterfaceError("trying to put unkeyed client") if len(self._pool) < self.connections_min and not close: # TODO: verify connection still valid + + # If the connection is currently executing a query, it shouldn't be reused. + # Explicitly disconnect it instead. + if client.connection.is_query_executing: + client.disconnect() if client.connection.connected: self._pool.append(client) else: diff --git a/clickhouse_backend/models/engines.py b/clickhouse_backend/models/engines.py index 9b7df28..67e992d 100644 --- a/clickhouse_backend/models/engines.py +++ b/clickhouse_backend/models/engines.py @@ -1,5 +1,6 @@ +from collections.abc import Iterable + from django.db.models import Func, Value -from django.utils.itercompat import is_iterable __all__ = [ "Distributed", @@ -139,7 +140,7 @@ def __init__( for key in ["order_by", "primary_key", "partition_by"]: value = getattr(self, key) if value is not None: - if isinstance(value, str) or not is_iterable(value): + if isinstance(value, str) or not isinstance(value, Iterable): value = (value,) setattr(self, key, value) elif not isinstance(value, tuple): diff --git a/clickhouse_backend/models/fields/__init__.py b/clickhouse_backend/models/fields/__init__.py index 138e043..694012c 100644 --- a/clickhouse_backend/models/fields/__init__.py +++ b/clickhouse_backend/models/fields/__init__.py @@ -1,10 +1,10 @@ import ipaddress +from collections.abc import Iterable from datetime import datetime from django.core import checks, exceptions from django.db.models import IntegerChoices, fields from django.utils.functional import cached_property -from django.utils.itercompat import is_iterable from django.utils.translation import gettext_lazy as _ from clickhouse_backend.validators import MaxBytesValidator @@ -284,7 +284,7 @@ def _check_choices(self): id="fields.E005", ) ] - if not is_iterable(self.choices) or isinstance(self.choices, str): + if not isinstance(self.choices, Iterable) or isinstance(self.choices, str): return invalid_errors if not self.choices: diff --git a/clickhouse_backend/models/fields/array.py b/clickhouse_backend/models/fields/array.py index 55a4f0d..0eae21a 100644 --- a/clickhouse_backend/models/fields/array.py +++ b/clickhouse_backend/models/fields/array.py @@ -1,11 +1,11 @@ import json +from collections.abc import Iterable from django.contrib.postgres.utils import prefix_validation_error from django.contrib.postgres.validators import ArrayMaxLengthValidator from django.core import checks, exceptions from django.db.models import Field, Func, Value, lookups from django.db.models.fields.mixins import CheckFieldDefaultMixin -from django.utils.itercompat import is_iterable from django.utils.translation import gettext_lazy as _ from .base import FieldMixin @@ -97,7 +97,7 @@ def cast_db_type(self, connection): return "Array(%s)" % self.base_field.cast_db_type(connection) def get_db_prep_value(self, value, connection, prepared=False): - if is_iterable(value) and not isinstance(value, (str, bytes)): + if isinstance(value, Iterable) and not isinstance(value, (str, bytes)): return [ self.base_field.get_db_prep_value(i, connection, prepared=prepared) for i in value @@ -105,7 +105,7 @@ def get_db_prep_value(self, value, connection, prepared=False): return value def get_db_prep_save(self, value, connection): - if is_iterable(value) and not isinstance(value, (str, bytes)): + if isinstance(value, Iterable) and not isinstance(value, (str, bytes)): return [self.base_field.get_db_prep_save(i, connection) for i in value] return value diff --git a/clickhouse_backend/models/fields/tuple.py b/clickhouse_backend/models/fields/tuple.py index 4150d43..dd75d98 100644 --- a/clickhouse_backend/models/fields/tuple.py +++ b/clickhouse_backend/models/fields/tuple.py @@ -2,12 +2,12 @@ import collections.abc import copy import json +from collections.abc import Iterable from django.contrib.postgres.utils import prefix_validation_error from django.core import checks, exceptions from django.db.models import Field, Func, Value, lookups from django.utils.functional import cached_property -from django.utils.itercompat import is_iterable from django.utils.translation import gettext_lazy as _ from .base import FieldMixin @@ -38,7 +38,7 @@ def _check_base_fields(self, base_fields): "field instances or (field name, field instance) tuples, " "and field name must be valid python identifier." ) - if not is_iterable(base_fields) or isinstance(base_fields, str): + if not isinstance(base_fields, Iterable) or isinstance(base_fields, str): raise invalid_error fields = [] @@ -192,7 +192,7 @@ def call_base_fields(self, func_name, value, *args, **kwargs): return values def get_db_prep_value(self, value, connection, prepared=False): - if is_iterable(value) and not isinstance(value, (str, bytes)): + if isinstance(value, Iterable) and not isinstance(value, (str, bytes)): values = self.call_base_fields( "get_db_prep_value", value, connection, prepared=prepared ) @@ -200,7 +200,7 @@ def get_db_prep_value(self, value, connection, prepared=False): return value def get_db_prep_save(self, value, connection): - if is_iterable(value) and not isinstance(value, (str, bytes)): + if isinstance(value, Iterable) and not isinstance(value, (str, bytes)): values = self.call_base_fields("get_db_prep_save", value, connection) return tuple(values) return value diff --git a/clickhouse_backend/models/functions/datetime.py b/clickhouse_backend/models/functions/datetime.py index 4929b11..637eca9 100644 --- a/clickhouse_backend/models/functions/datetime.py +++ b/clickhouse_backend/models/functions/datetime.py @@ -6,9 +6,15 @@ from .base import Func __all__ = [ + "toStartOfMinute", + "toStartOfFiveMinutes", + "toStartOfTenMinutes", + "toStartOfFifteenMinutes", + "toStartOfHour", "toYYYYMM", "toYYYYMMDD", "toYYYYMMDDhhmmss", + "toYearWeek", ] @@ -39,3 +45,58 @@ class toYYYYMMDD(toYYYYMM): class toYYYYMMDDhhmmss(toYYYYMM): output_field = fields.UInt64Field() + + +class toStartOfMinute(Func): + output_field = models.fields.DateTimeField() + + def __init__(self, *expressions): + arity = len(expressions) + if arity < 1 or arity > 1: + raise TypeError( + "'%s' takes 1 argument (%s given)" + % ( + self.__class__.__name__, + len(expressions), + ) + ) + + super().__init__(*expressions) + + +class toStartOfFiveMinutes(toStartOfMinute): + pass + + +class toStartOfTenMinutes(toStartOfMinute): + pass + + +class toStartOfFifteenMinutes(toStartOfMinute): + pass + + +class toStartOfHour(toStartOfMinute): + pass + + +class toYearWeek(Func): + output_field = fields.UInt32Field() + + def __init__(self, *expressions): + arity = len(expressions) + if arity < 1 or arity > 3: + raise TypeError( + "'%s' takes between 1 and 3 arguments (%s given)" + % ( + self.__class__.__name__, + len(expressions), + ) + ) + + expressions = ( + expressions[0], + *(models.Value(expr) for expr in expressions[1:]), + ) + + super().__init__(*expressions) diff --git a/clickhouse_backend/models/sql/compiler.py b/clickhouse_backend/models/sql/compiler.py index e54b801..a0569b8 100644 --- a/clickhouse_backend/models/sql/compiler.py +++ b/clickhouse_backend/models/sql/compiler.py @@ -286,27 +286,6 @@ def as_sql(self, with_limits=True, with_col_aliases=False): class SQLInsertCompiler(compiler.SQLInsertCompiler): - def field_as_sql(self, field, val): - """ - Take a field and a value intended to be saved on that field, and - return placeholder SQL and accompanying params. Check for raw values, - expressions, and fields with get_placeholder() defined in that order. - - When field is None, consider the value raw and use it as the - placeholder, with no corresponding parameters returned. - """ - if field is None: - # A field value of None means the value is raw. - sql, params = val, [] - elif hasattr(val, "as_sql"): - # This is an expression, let's compile it. - sql, params = self.compile(val) - else: - # Return the common case for the placeholder - sql, params = "%s", [val] - - return sql, params - def as_sql(self): # We don't need quote_name_unless_alias() here, since these are all # going to be column names (so we can avoid the extra overhead). diff --git a/clickhouse_backend/patch/migrations.py b/clickhouse_backend/patch/migrations.py index bdf2bd9..e4e3f15 100644 --- a/clickhouse_backend/patch/migrations.py +++ b/clickhouse_backend/patch/migrations.py @@ -1,7 +1,8 @@ from django.apps.registry import Apps +from django.db import DatabaseError from django.db import models as django_models from django.db.migrations import Migration -from django.db.migrations.exceptions import IrreversibleError +from django.db.migrations.exceptions import IrreversibleError, MigrationSchemaMissing from django.db.migrations.operations.fields import FieldOperation from django.db.migrations.operations.models import ( DeleteModel, @@ -15,6 +16,41 @@ __all__ = ["patch_migrations", "patch_migration_recorder", "patch_migration"] +def _should_distribute_migrations(connection): + """ + Check if the connection is configured for distributed migrations. + """ + return getattr(connection, "distributed_migrations", False) and getattr( + connection, "migration_cluster", None + ) + + +def _get_model_table_name(connection): + """ + Return the name of the table that will be used by the MigrationRecorder. + If distributed migrations are enabled, return the distributed table name. + Otherwise, return the regular django_migrations table name. + """ + if _should_distribute_migrations(connection): + return "distributed_django_migrations" + return "django_migrations" + + +def _check_replicas(connection): + """ + Check if the connection has replicas configured for the migration cluster. + """ + if hasattr(connection, "has_replicas"): + return connection.has_replicas + + with connection.cursor() as cursor: + cursor.execute( + f"select replica_num from system.clusters where cluster={connection.migration_cluster}" + ) + (replica_count,) = cursor.fetchone() + return replica_count >= 1 + + def patch_migrations(): patch_migration_recorder() patch_migration() @@ -29,22 +65,75 @@ def Migration(self): if self._migration_class is None: if self.connection.vendor == "clickhouse": from clickhouse_backend import models + from clickhouse_backend.models import currentDatabase - class Migration(models.ClickhouseModel): - app = models.StringField(max_length=255) - name = models.StringField(max_length=255) - applied = models.DateTime64Field(default=now) - deleted = models.BoolField(default=False) + # Only create a distributed migration model if the connection + # has distributed migrations enabled and a migration cluster is set. + # otherwise, create a regular merge tree. + if _should_distribute_migrations(self.connection): + has_replicas = _check_replicas(self.connection) - class Meta: - apps = Apps() - app_label = "migrations" - db_table = "django_migrations" - engine = models.MergeTree(order_by=("app", "name")) - cluster = getattr(self.connection, "migration_cluster", None) + Engine = models.MergeTree + if has_replicas: + Engine = models.ReplicatedMergeTree - def __str__(self): - return "Migration %s for %s" % (self.name, self.app) + self.connection.has_replicas = has_replicas + + class _Migration(models.ClickhouseModel): + app = models.StringField(max_length=255) + name = models.StringField(max_length=255) + applied = models.DateTime64Field(default=now) + deleted = models.BoolField(default=False) + + class Meta: + apps = Apps() + app_label = "migrations" + db_table = "django_migrations" + engine = Engine(order_by=("app", "name")) + cluster = self.connection.migration_cluster + + def __str__(self): + return "Migration %s for %s" % (self.name, self.app) + + class Migration(models.ClickhouseModel): + app = models.StringField(max_length=255) + name = models.StringField(max_length=255) + applied = models.DateTime64Field(default=now) + deleted = models.BoolField(default=False) + + class Meta: + apps = Apps() + app_label = "migrations" + db_table = _get_model_table_name(self.connection) + engine = models.Distributed( + self.connection.migration_cluster, + currentDatabase(), + _Migration._meta.db_table, + models.Rand(), + ) + cluster = self.connection.migration_cluster + + Migration._meta.local_model_class = _Migration + + else: + + class Migration(models.ClickhouseModel): + app = models.StringField(max_length=255) + name = models.StringField(max_length=255) + applied = models.DateTime64Field(default=now) + deleted = models.BoolField(default=False) + + class Meta: + apps = Apps() + app_label = "migrations" + db_table = _get_model_table_name(self.connection) + engine = models.MergeTree(order_by=("app", "name")) + cluster = getattr( + self.connection, "migration_cluster", None + ) + + def __str__(self): + return "Migration %s for %s" % (self.name, self.app) else: @@ -69,15 +158,45 @@ def has_table(self): # Assert migration table won't be deleted once created. if not getattr(self, "_has_table", False): with self.connection.cursor() as cursor: + table = self.Migration._meta.db_table tables = self.connection.introspection.table_names(cursor) - self._has_table = self.Migration._meta.db_table in tables + self._has_table = table in tables if self._has_table and self.connection.vendor == "clickhouse": # fix https://github.com/jayvynl/django-clickhouse-backend/issues/51 cursor.execute( - "ALTER table django_migrations ADD COLUMN IF NOT EXISTS deleted Bool" + f"ALTER table {table} ADD COLUMN IF NOT EXISTS deleted Bool" ) return self._has_table + def ensure_schema(self): + """Ensure the table exists and has the correct schema.""" + # If the table's there, that's fine - we've never changed its schema + # in the codebase. + if self.has_table(): + return + + # In case of distributed migrations, we need to ensure the local model exists first and + # then create the distributed model. + try: + with self.connection.schema_editor() as editor: + if ( + editor.connection.vendor == "clickhouse" + and _should_distribute_migrations(editor.connection) + ): + with editor.connection.cursor() as cursor: + tables = editor.connection.introspection.table_names(cursor) + local_model_class = self.Migration._meta.local_model_class + local_table = local_model_class._meta.db_table + if local_table not in tables: + # Create the local model first + editor.create_model(self.Migration._meta.local_model_class) + + editor.create_model(self.Migration) + except DatabaseError as exc: + raise MigrationSchemaMissing( + "Unable to create the django_migrations table (%s)" % exc + ) + def migration_qs(self): if self.connection.vendor == "clickhouse": return self.Migration.objects.using(self.connection.alias).filter( @@ -118,6 +237,7 @@ def flush(self): MigrationRecorder.Migration = property(Migration) MigrationRecorder.has_table = has_table + MigrationRecorder.ensure_schema = ensure_schema MigrationRecorder.migration_qs = property(migration_qs) MigrationRecorder.record_applied = record_applied MigrationRecorder.record_unapplied = record_unapplied @@ -136,13 +256,15 @@ def apply(self, project_state, schema_editor, collect_sql=False): """ applied_on_remote = False if getattr(schema_editor.connection, "migration_cluster", None): + _table = _get_model_table_name(schema_editor.connection) + with schema_editor.connection.cursor() as cursor: cursor.execute( "select EXISTS(select 1 from clusterAllReplicas(%s, currentDatabase(), %s)" " where app=%s and name=%s and deleted=false)", [ schema_editor.connection.migration_cluster, - "django_migrations", + _table, self.app_label, self.name, ], @@ -203,13 +325,15 @@ def unapply(self, project_state, schema_editor, collect_sql=False): """ unapplied_on_remote = False if getattr(schema_editor.connection, "migration_cluster", None): + _table = _get_model_table_name(schema_editor.connection) + with schema_editor.connection.cursor() as cursor: cursor.execute( "select EXISTS(select 1 from clusterAllReplicas(%s, currentDatabase(), %s)" " where app=%s and name=%s and deleted=true)", [ schema_editor.connection.migration_cluster, - "django_migrations", + _table, self.app_label, self.name, ], diff --git a/compose.yaml b/compose.yaml index 0c79a8d..c2daa51 100644 --- a/compose.yaml +++ b/compose.yaml @@ -1,5 +1,7 @@ x-base-service: &base-service image: clickhouse/clickhouse-server:${CLICKHOUSE_VERSION:-23.6.2.18} + environment: + CLICKHOUSE_PASSWORD: "clickhouse_password" restart: always ulimits: nofile: @@ -19,6 +21,7 @@ services: - "node1:/var/lib/clickhouse/" - "./clickhouse-config/node1/:/etc/clickhouse-server/config.d/" ports: + - "127.0.0.1:8123:8123" - "127.0.0.1:9000:9000" node2: <<: *base-service @@ -27,6 +30,7 @@ services: - "node2:/var/lib/clickhouse/" - "./clickhouse-config/node2/:/etc/clickhouse-server/config.d/" ports: + - "127.0.0.1:8124:8123" - "127.0.0.1:9001:9000" node3: <<: *base-service @@ -35,6 +39,7 @@ services: - "node3:/var/lib/clickhouse/" - "./clickhouse-config/node3/:/etc/clickhouse-server/config.d/" ports: + - "127.0.0.1:8125:8123" - "127.0.0.1:9002:9000" node4: <<: *base-service @@ -43,8 +48,25 @@ services: - "node4:/var/lib/clickhouse/" - "./clickhouse-config/node4/:/etc/clickhouse-server/config.d/" ports: + - "127.0.0.1:8126:8123" - "127.0.0.1:9003:9000" + haproxy: + image: haproxy:latest + container_name: clickhouse-haproxy + command: sh -c "haproxy -f /usr/local/etc/haproxy/haproxy.cfg" + restart: always + volumes: + - "./proxy-config/haproxy.cfg:/usr/local/etc/haproxy/haproxy.cfg" + ports: + - "127.0.0.1:8127:8123" + - "127.0.0.1:9004:9000" + depends_on: + node1: + condition: service_healthy + node3: + condition: service_healthy + volumes: node1: name: clickhouse-node1 diff --git a/docs/Configurations.md b/docs/Configurations.md index 185a6ce..7ee7845 100644 --- a/docs/Configurations.md +++ b/docs/Configurations.md @@ -40,10 +40,13 @@ Migration table will be created on this cluster with [Distributed Engine](https: - `connections_min` is maximum number of connections can be kept in connection pool, default 10. Set this value to 0 will disable connection pool. - `connections_max` is maximum number of connections can be used, default 100. In fact, `connections_max` is maximum numbers of queries one can execute concurrently. Because [source code of DBAPI Connection](https://github.com/mymarilyn/clickhouse-driver/blob/0.2.5/clickhouse_driver/dbapi/connection.py#L46) shows that every cursor creates a new connection. +- `max_block_size` is used for [streaming results](https://clickhouse-driver.readthedocs.io/en/latest/quickstart.html#streaming-results). - `dsn` provide connection url, for example `clickhouse://localhost/test?param1=value1&...`. If dsn is provided, all other connection parameters are ignored. - All other [clickhouse_driver.connection.Connection](https://clickhouse-driver.readthedocs.io/en/latest/api.html#connection) parameters. - `settings` can contain [clickhouse_driver.Client](https://clickhouse-driver.readthedocs.io/en/latest/api.html?highlight=client#clickhouse_driver.Client) settings and [clickhouse settings](https://clickhouse.com/docs/en/operations/settings/settings). +> *Changed in version 1.3.2:* Add `max_block_size`, refer [#108](https://github.com/jayvynl/django-clickhouse-backend/issues/108). + Valid `TEST` keys: - `managed`: whether create(`True`) test database or not(`False`), default `True`. diff --git a/proxy-config/haproxy.cfg b/proxy-config/haproxy.cfg new file mode 100644 index 0000000..983aef8 --- /dev/null +++ b/proxy-config/haproxy.cfg @@ -0,0 +1,26 @@ +# TCP frontend for native ClickHouse protocol +frontend clickhouse_tcp + bind *:9000 + mode tcp + default_backend clickhouse_tcp_nodes + +backend clickhouse_tcp_nodes + mode tcp + balance roundrobin + option tcp-check + tcp-check connect + server ch1 clickhouse-node1:9000 check + server ch2 clickhouse-node3:9000 check + +# HTTP frontend for ClickHouse's HTTP interface +frontend clickhouse_http + bind *:8123 + mode http + default_backend clickhouse_http_nodes + +backend clickhouse_http_nodes + mode http + balance roundrobin + option httpchk GET /ping + server ch1 clickhouse-node1:8123 check + server ch2 clickhouse-node3:8123 check diff --git a/pyproject.toml b/pyproject.toml index 60fc426..6bda6cf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,8 +19,10 @@ classifiers = [ "Framework :: Django :: 4.0", "Framework :: Django :: 4.1", "Framework :: Django :: 4.2", + "Framework :: Django :: 5", "Framework :: Django :: 5.0", "Framework :: Django :: 5.1", + "Framework :: Django :: 5.2", "Intended Audience :: Developers", "License :: OSI Approved :: MIT License", "Programming Language :: Python", @@ -32,6 +34,7 @@ classifiers = [ "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", ] dependencies = [ "django>=3.2", diff --git a/tests/aggregation/tests.py b/tests/aggregation/tests.py index f20674f..245cf56 100644 --- a/tests/aggregation/tests.py +++ b/tests/aggregation/tests.py @@ -1097,6 +1097,8 @@ def test_annotated_aggregate_over_annotated_aggregate(self): Book.objects.annotate(Max("id")).annotate(Sum("id__max")) class MyMax(Max): + arity = None + def as_sql(self, compiler, connection): self.set_source_expressions(self.get_source_expressions()[0:1]) return super().as_sql(compiler, connection) diff --git a/tests/backends/clickhouse/test_driver.py b/tests/backends/clickhouse/test_driver.py index cc29e71..215294d 100644 --- a/tests/backends/clickhouse/test_driver.py +++ b/tests/backends/clickhouse/test_driver.py @@ -1,7 +1,10 @@ +from django.db import connection from django.test import TestCase from clickhouse_backend.driver import connect +from .. import models + class Tests(TestCase): def test_pool_size(self): @@ -9,3 +12,41 @@ def test_pool_size(self): assert conn.pool.connections_min == 2 assert conn.pool.connections_max == 4 assert len(conn.pool._pool) == 2 + + +class IterationTests(TestCase): + """ + Testing connection behaviour when iterating over queryset is interrupted. + """ + + @classmethod + def setUpTestData(cls): + cls.a1, cls.a2, cls.a3 = models.Author.objects.bulk_create( + [ + models.Author(name="a1"), + models.Author(name="a2"), + models.Author(name="a3"), + ] + ) + + def test_connection_not_reused_when_iteration_interrupted(self): + """ + This test demonstrates that if a queryset is iterated over and the + iteration is interrupted (e.g. via a break statement), the connection + used for that iteration is disconnected and not returned to the pool. + """ + pool = connection.connection.pool + + connection_count_before = len(pool._pool) + assert connection_count_before == 1 + + authors = models.Author.objects.all() + for author in authors.iterator(1): + author = author.name + break + + connection_count_after_iterator = len(pool._pool) + # Connection was closed and not returned to pool + assert connection_count_after_iterator == 0 + + author = authors.get(id=self.a1.id) diff --git a/tests/backends/tests.py b/tests/backends/tests.py index 5a32a76..2bd399a 100644 --- a/tests/backends/tests.py +++ b/tests/backends/tests.py @@ -1,5 +1,6 @@ """Tests related to django.db.backends that haven't been organized.""" import datetime +import importlib import threading import unittest import warnings @@ -560,6 +561,208 @@ def test_timezone_none_use_tz_false(self): connection.init_connection_state() +def check_numpy(): + """Check if numpy is installed.""" + spec = importlib.util.find_spec("numpy") + return spec is not None + + +class ColumnarTestCase(TransactionTestCase): + available_apps = ["backends"] + databases = {"default", "s2r1"} + + def test_columnar_query(self): + sql = """ + SELECT number, number*2, number*3, number*4, number*5 + FROM system.numbers + LIMIT 10 + """ + with connections["s2r1"].cursor() as cursorWrapper: + with cursorWrapper.cursor.set_query_execution_args(columnar=True) as cursor: + cursor.execute(sql) + self.assertEqual( + cursor.fetchall(), + [ + (0, 1, 2, 3, 4, 5, 6, 7, 8, 9), + (0, 2, 4, 6, 8, 10, 12, 14, 16, 18), + (0, 3, 6, 9, 12, 15, 18, 21, 24, 27), + (0, 4, 8, 12, 16, 20, 24, 28, 32, 36), + (0, 5, 10, 15, 20, 25, 30, 35, 40, 45), + ], + ) + + cursor.execute(sql) + self.assertEqual( + cursor.fetchmany(2), + [ + (0, 1, 2, 3, 4, 5, 6, 7, 8, 9), + (0, 2, 4, 6, 8, 10, 12, 14, 16, 18), + ], + ) + + actual_results = [ + r + for results in iter(lambda: cursor.fetchmany(2), []) + for r in results + ] + self.assertEqual( + actual_results, + [ + (0, 3, 6, 9, 12, 15, 18, 21, 24, 27), + (0, 4, 8, 12, 16, 20, 24, 28, 32, 36), + (0, 5, 10, 15, 20, 25, 30, 35, 40, 45), + ], + ) + + cursor.execute(sql) + self.assertEqual( + cursor.fetchone(), + (0, 1, 2, 3, 4, 5, 6, 7, 8, 9), + ) + + @unittest.skipUnless(check_numpy(), "numpy is not installed") + def test_use_numpy_query(self): + sql = """ + SELECT toDateTime32('2022-01-01 01:00:05', 'UTC'), number, number*2.5 + FROM system.numbers + LIMIT 3 + """ + import numpy as np + + with connections["s2r1"].cursor() as cursorWrapper: + with cursorWrapper.cursor.set_query_execution_args( + columnar=True, use_numpy=True + ) as cursor: + cursor.execute(sql) + np.testing.assert_equal( + cursor.fetchall(), + [ + np.array( + [ + np.datetime64("2022-01-01T01:00:05"), + np.datetime64("2022-01-01T01:00:05"), + np.datetime64("2022-01-01T01:00:05"), + ], + dtype="datetime64[s]", + ), + np.array([0, 1, 2], dtype=np.uint64), + np.array([0, 2.5, 5.0], dtype=np.float64), + ], + ) + + cursor.execute(sql) + np.testing.assert_equal( + cursor.fetchmany(2), + [ + np.array( + [ + np.datetime64("2022-01-01T01:00:05"), + np.datetime64("2022-01-01T01:00:05"), + np.datetime64("2022-01-01T01:00:05"), + ], + dtype="datetime64[s]", + ), + np.array([0, 1, 2], dtype=np.uint64), + ], + ) + + actual_results = [ + r + for results in iter(lambda: cursor.fetchmany(2), []) + for r in results + ] + np.testing.assert_equal( + actual_results, + [ + np.array([0, 2.5, 5], dtype=np.float64), + ], + ) + + cursor.execute(sql) + np.testing.assert_equal( + cursor.fetchone(), + np.array( + [ + np.datetime64("2022-01-01T01:00:05"), + np.datetime64("2022-01-01T01:00:05"), + np.datetime64("2022-01-01T01:00:05"), + ], + dtype="datetime64[s]", + ), + ) + + @unittest.skipUnless(check_numpy(), "numpy is not installed") + def test_use_numpy_but_not_columnar_format(self): + sql = """ + SELECT toDateTime32('2022-01-01 01:00:05', 'UTC'), number, number*2.5 + FROM system.numbers + LIMIT 3 + """ + import numpy as np + + with connections["s2r1"].cursor() as cursorWrapper: + with cursorWrapper.cursor.set_query_execution_args( + columnar=False, use_numpy=True + ) as cursor: + cursor.execute(sql) + np.testing.assert_equal( + cursor.fetchall(), + [ + np.array( + [datetime.datetime(2022, 1, 1, 1, 0, 5), 0, 0.0], + dtype=object, + ), + np.array( + [datetime.datetime(2022, 1, 1, 1, 0, 5), 1, 2.5], + dtype=object, + ), + np.array( + [datetime.datetime(2022, 1, 1, 1, 0, 5), 2, 5.0], + dtype=object, + ), + ], + ) + + cursor.execute(sql) + np.testing.assert_equal( + cursor.fetchmany(2), + [ + np.array( + [datetime.datetime(2022, 1, 1, 1, 0, 5), 0, 0.0], + dtype="object", + ), + np.array( + [datetime.datetime(2022, 1, 1, 1, 0, 5), 1, 2.5], + dtype="object", + ), + ], + ) + + actual_results = [ + r + for results in iter(lambda: cursor.fetchmany(2), []) + for r in results + ] + np.testing.assert_equal( + actual_results, + [ + np.array( + [datetime.datetime(2022, 1, 1, 1, 0, 5), 2, 5.0], + dtype="object", + ), + ], + ) + + cursor.execute(sql) + np.testing.assert_equal( + cursor.fetchone(), + np.array( + [datetime.datetime(2022, 1, 1, 1, 0, 5), 0, 0.0], + dtype="object", + ), + ) + + # These tests aren't conditional because it would require differentiating # between MySQL+InnoDB and MySQL+MYISAM (something we currently can't do). class FkConstraintsTests(TransactionTestCase): diff --git a/tests/clickhouse_fields/tests.py b/tests/clickhouse_fields/tests.py index 9202c7b..e6ebe81 100644 --- a/tests/clickhouse_fields/tests.py +++ b/tests/clickhouse_fields/tests.py @@ -1,3 +1,4 @@ +import ipaddress from decimal import Decimal from uuid import uuid4 @@ -611,10 +612,10 @@ def test_deconstruct(self): self.assertNotIn("max_length", kwargs) def test_value(self): - v = "::ffff:304:506" + v = "::ffff:3.4.5.6" o = IPv6Model.objects.create(ipv6=v) o.refresh_from_db() - self.assertEqual(o.ipv6, v) + self.assertEqual(o.ipv6, str(ipaddress.ip_address(v))) def test_filter(self): v = "::ffff:3.4.5.6" diff --git a/tests/clickhouse_functions/test_datetime.py b/tests/clickhouse_functions/test_datetime.py index 6e598b0..047028b 100644 --- a/tests/clickhouse_functions/test_datetime.py +++ b/tests/clickhouse_functions/test_datetime.py @@ -17,13 +17,22 @@ def setUpTestData(cls): alias="smithj", # https://stackoverflow.com/a/18862958 birthday=pytz.timezone(get_timezone()).localize( - datetime(2023, 11, 30, 16), is_dst=False + datetime(2023, 11, 30, hour=16, minute=12, second=15), is_dst=False ), ) cls.elena = Author.objects.create( name="Élena Jordan", alias="elena", - birthday=pytz.utc.localize(datetime(2023, 11, 30, 16), is_dst=False), + birthday=pytz.utc.localize( + datetime(2023, 11, 30, hour=16, minute=59, second=59), is_dst=False + ), + ) + cls.sarah = Author.objects.create( + name="Sarah Connor", + alias="sconnor", + birthday=pytz.utc.localize( + datetime(2023, 12, 31, hour=23, minute=30, second=00), is_dst=False + ), ) def test_yyyymm(self): @@ -50,8 +59,144 @@ def test_yyyymmddhhmmss(self): john = Author.objects.annotate(v=models.toYYYYMMDDhhmmss("birthday")).get( id=self.john.id ) - self.assertEqual(john.v, 20231130160000) + self.assertEqual(john.v, 20231130161215) elena = Author.objects.annotate( v=models.toYYYYMMDDhhmmss("birthday", "Asia/Shanghai") ).get(id=self.elena.id) - self.assertEqual(elena.v, 20231201000000) + self.assertEqual(elena.v, 20231201005959) + + def test_tostartofminute(self): + john = Author.objects.annotate(v=models.toStartOfMinute("birthday")).get( + id=self.john.id + ) + self.assertEqual( + john.v, + datetime( + 2023, + 11, + 30, + hour=16, + minute=12, + second=00, + ), + ) + + elena = Author.objects.annotate(v=models.toStartOfMinute("birthday")).get( + id=self.elena.id + ) + self.assertEqual( + elena.v, + datetime(2023, 11, 30, hour=10, minute=59, second=00), + ) + + def test_tostartoffiveminutes(self): + john = Author.objects.annotate(v=models.toStartOfFiveMinutes("birthday")).get( + id=self.john.id + ) + self.assertEqual( + john.v, + datetime( + 2023, + 11, + 30, + hour=16, + minute=10, + second=00, + ), + ) + + elena = Author.objects.annotate(v=models.toStartOfFiveMinutes("birthday")).get( + id=self.elena.id + ) + self.assertEqual( + elena.v, + datetime(2023, 11, 30, hour=10, minute=55, second=00), + ) + + def test_tostartoftenminutes(self): + john = Author.objects.annotate(v=models.toStartOfTenMinutes("birthday")).get( + id=self.john.id + ) + self.assertEqual( + john.v, + datetime( + 2023, + 11, + 30, + hour=16, + minute=10, + second=00, + ), + ) + + elena = Author.objects.annotate(v=models.toStartOfTenMinutes("birthday")).get( + id=self.elena.id + ) + self.assertEqual( + elena.v, + datetime(2023, 11, 30, hour=10, minute=50, second=00), + ) + + def test_tostartoffifteenminutes(self): + john = Author.objects.annotate( + v=models.toStartOfFifteenMinutes("birthday") + ).get(id=self.john.id) + self.assertEqual( + john.v, + datetime( + 2023, + 11, + 30, + hour=16, + minute=00, + second=00, + ), + ) + + elena = Author.objects.annotate( + v=models.toStartOfFifteenMinutes("birthday") + ).get(id=self.elena.id) + self.assertEqual( + elena.v, + datetime(2023, 11, 30, hour=10, minute=45, second=00), + ) + + def test_tostartofhour(self): + john = Author.objects.annotate(v=models.toStartOfHour("birthday")).get( + id=self.john.id + ) + self.assertEqual( + john.v, + datetime( + 2023, + 11, + 30, + hour=16, + minute=00, + second=00, + ), + ) + + elena = Author.objects.annotate(v=models.toStartOfHour("birthday")).get( + id=self.elena.id + ) + self.assertEqual( + elena.v, + datetime(2023, 11, 30, hour=10, minute=00, second=00), + ) + + def test_toyearweek(self): + sarah = Author.objects.annotate(v=models.toYearWeek("birthday")).get( + id=self.sarah.id + ) + self.assertEqual(sarah.v, 202353) + + sarah = Author.objects.annotate(v=models.toYearWeek("birthday", 1)).get( + id=self.sarah.id + ) + self.assertEqual(sarah.v, 202352) + + sarah = Author.objects.annotate( + v=models.toYearWeek("birthday", 1, "Pacific/Kiritimati") + ).get(id=self.sarah.id) + self.assertEqual(sarah.v, 202401) diff --git a/tests/migrations/test_loader.py b/tests/migrations/test_loader.py index dd448e7..1110a0e 100644 --- a/tests/migrations/test_loader.py +++ b/tests/migrations/test_loader.py @@ -1,5 +1,6 @@ import compileall import os +from copy import deepcopy from importlib import import_module from django.db import connection, connections @@ -13,6 +14,7 @@ from django.test import TestCase, modify_settings, override_settings from clickhouse_backend import compat +from clickhouse_backend.backend.base import DatabaseWrapper from .test_base import MigrationTestBase @@ -51,6 +53,155 @@ def test_apply(self): ) +class DistributedMigrationTests(MigrationTestBase): + databases = ("default", "s2r1", "s1r2") + + lb = { + "ENGINE": "clickhouse_backend.backend", + "HOST": "localhost", + "USER": "default", + "PASSWORD": "clickhouse_password", + "PORT": 9004, + "NAME": "test_default", + "OPTIONS": { + "distributed_migrations": True, + "migration_cluster": "cluster", + "connections_min": 1, + "settings": { + "mutations_sync": 2, + "insert_distributed_sync": 1, + "insert_quorum": 2, + "alter_sync": 2, + "allow_suspicious_low_cardinality_types": 1, + "allow_experimental_object_type": 1, + }, + }, + "TEST": {"cluster": "cluster", "managed": False}, + "TIME_ZONE": None, + "AUTOCOMMIT": True, + "CONN_MAX_AGE": 0, + "CONN_HEALTH_CHECKS": True, + } + + def tearDown(self): + if "load_balancer" in connections: + # Remove the load balancer connection to avoid conflicts with other tests + connections["load_balancer"].close() + del connections["load_balancer"] + + def assertMigrationTablesExists(self): + for db in self.databases: + conn = connections[db] + tables = conn.introspection.table_names() + self.assertIn( + "django_migrations", + tables, + f"django_migrations table not found in {conn.alias}", + ) + self.assertIn( + "distributed_django_migrations", + tables, + f"distributed_django_migrations table not found in {conn.alias}", + ) + + def assertMigrationExists(self, conn, name, app, deleted=False): + with conn.cursor() as cursor: + cursor.execute( + f"SELECT * FROM distributed_django_migrations where name = '{name}'" + ) + res = cursor.fetchall() + + self.assertEqual(len(res), 1, f"Migration {name} not found in {conn.alias}") + + self.assertEqual(res[0][1], app) + self.assertEqual(res[0][2], name) + self.assertEqual(res[0][-1], deleted) + + def _drop_migrations(self): + for conn in self.databases: + recorder = MigrationRecorder(connections[conn]) + self.assertEqual(recorder.migration_qs.db, conn) + self.assertEqual( + recorder.migration_qs.model._meta.db_table, "django_migrations" + ) + with recorder.connection.cursor() as cursor: + cursor.execute("DROP TABLE IF EXISTS django_migrations") + cursor.execute("DROP TABLE IF EXISTS distributed_django_migrations") + tables = recorder.connection.introspection.table_names(cursor) + self.assertNotIn("django_migrations", tables) + self.assertNotIn("distributed_django_migrations", tables) + + # node4 can throw error when trying to create django_migrations table, even if other nodes are ok + db = deepcopy(self.lb) + db["PORT"] = 9003 + del db["OPTIONS"]["distributed_migrations"] + recorder = MigrationRecorder(DatabaseWrapper(db, alias="node4")) + with recorder.connection.cursor() as cursor: + cursor.execute("DROP TABLE IF EXISTS django_migrations") + cursor.execute("DROP TABLE IF EXISTS distributed_django_migrations") + tables = recorder.connection.introspection.table_names(cursor) + self.assertNotIn("django_migrations", tables) + self.assertNotIn("distributed_django_migrations", tables) + + def test_distributed_migration_schema_with_existing_migrations(self): + """ + Tests that migration tables are created in all nodes even if the django_migrations table already exists + """ + db = DatabaseWrapper(deepcopy(self.lb), alias="load_balancer") + connections["load_balancer"] = db + recorder = MigrationRecorder(db) + + recorder.ensure_schema() + + self.assertEqual(recorder.migration_qs.db, "load_balancer") + self.assertEqual( + recorder.migration_qs.model._meta.db_table, "distributed_django_migrations" + ) + + self.assertMigrationTablesExists() + + def test_distributed_migration_schema_without_migrations(self): + """ + Tests that migration tables are created in all nodes when migration tables do not exist + """ + + self._drop_migrations() + + db = DatabaseWrapper(deepcopy(self.lb), alias="load_balancer") + connections["load_balancer"] = db + recorder = MigrationRecorder(db) + + recorder.ensure_schema() + + self.assertMigrationTablesExists() + + def test_apply_unapply_distributed(self): + """ + Tests marking migrations as applied/unapplied in a distributed setup. + """ + databases = [x for x in self.databases] + + self._drop_migrations() + + lb = DatabaseWrapper(deepcopy(self.lb), alias="load_balancer") + connections["load_balancer"] = lb + databases.append("load_balancer") + + recorder = MigrationRecorder(lb) + + recorder.record_applied("myapp", "0432_ponies") + + for db in databases: + conn = connections[db] + self.assertMigrationExists(conn, "0432_ponies", "myapp", deleted=False) + + recorder.record_unapplied("myapp", "0432_ponies") + + for db in databases: + conn = connections[db] + self.assertMigrationExists(conn, "0432_ponies", "myapp", deleted=True) + + class LoaderTests(TestCase): """ Tests the disk and database loader, and running through migrations diff --git a/tests/settings.py b/tests/settings.py index d8b60c1..ae5fde7 100644 --- a/tests/settings.py +++ b/tests/settings.py @@ -19,6 +19,7 @@ DATABASES = { "default": { "ENGINE": "clickhouse_backend.backend", + "PASSWORD": "clickhouse_password", "OPTIONS": { "migration_cluster": "cluster", "connections_min": 1, @@ -35,6 +36,7 @@ }, "s1r2": { "ENGINE": "clickhouse_backend.backend", + "PASSWORD": "clickhouse_password", "PORT": 9001, "OPTIONS": { "migration_cluster": "cluster", @@ -52,6 +54,7 @@ }, "s2r1": { "ENGINE": "clickhouse_backend.backend", + "PASSWORD": "clickhouse_password", "PORT": 9002, "OPTIONS": { "migration_cluster": "cluster", diff --git a/tox.ini b/tox.ini index f94251f..b783624 100644 --- a/tox.ini +++ b/tox.ini @@ -1,7 +1,11 @@ [tox] requires = tox>=4 -env_list = py3.7-django3.2, py{3.8,3.9,3.10,3.11,3.12}-django{3.2,4.0,4.1,4.2}, py{3.10,3.11,3.12}-django{5.0,5.1} +env_list = + py3.7-django3.2 + py{3.8,3.9}-django{3.2,4.0,4.1,4.2} + py{3.10,3.11,3.12}-django{3.2,4.0,4.1,4.2,5.0,5.1,5.2} + py3.13-django{4.1,4.2,5.0,5.1,5.2} [variables] code = clickhouse_backend example tests @@ -14,8 +18,10 @@ deps = django4.2: Django>=4.2,<5.0 django5.0: Django>=5.0,<5.1 django5.1: Django>=5.1,<5.2 + django5.2: Django>=5.2,<5.3 coverage commands = + pip install pandas # Use local clickhouse_backend package so that coverage works properly. pip install -e . coverage run tests/runtests.py --debug-sql {posargs} @@ -25,7 +31,9 @@ description = lint code skip_install = true deps = flake8 + isort commands = + isort -c {[variables]code} flake8 --max-line-length=88 --extend-ignore=E203,E501 {[variables]code} [testenv:format] @@ -33,7 +41,7 @@ description = format code skip_install = true deps = black==23.7.0 - isort==5.12.0 + isort commands = isort {[variables]code} black -t py37 -t py38 -t py39 -t py310 -t py311 -t py312 {[variables]code}