Skip to content

Commit 629858a

Browse files
sqalchemy lightweight delete and statement compilation support (#551)
* sqalchemy lightweight delete support * linter fix * explicitly order select * add sqa support for select and joins * add explicit delete test * deal with cloud test latency * change engine type * simplify join
1 parent 4d40b1f commit 629858a

File tree

5 files changed

+541
-0
lines changed

5 files changed

+541
-0
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ instead of being passed as ClickHouse server settings. This is in conjunction wi
2222
The supported method of passing ClickHouse server settings is to prefix such arguments/query parameters with`ch_`.
2323

2424
## UNRELEASED
25+
- Add support for lightweight `DELETE` in sqlalchemy. Closes [#382](https://github.com/ClickHouse/clickhouse-connect/issues/382)
2526
- Added client connection option `rename_response_column` (default `None`) that allows the user to define how response columns are automatically renamed according to a predefined scheme. Helpful for stripping alias prefixes, etc. in potentially complex queries. Closes [#228](https://github.com/ClickHouse/clickhouse-connect/issues/228)
2627
- Add third-party library identifiers (name/version) in the User-Agent, e.g. pandas/2.2.5. Users can opt out by changing the common setting `send_integration_tags` to `False`.
2728
- Added support for form encoding query parameters when using HTTP interface. This addresses [#342](https://github.com/ClickHouse/clickhouse-connect/issues/342). Query parameters can now be sent as form-encoded data in the request body by setting `form_encode_query_params=True` when creating the client. This is particularly useful for queries with large parameter payloads that might exceed URL length limits.

clickhouse_connect/cc_sqlalchemy/dialect.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from clickhouse_connect.cc_sqlalchemy.inspector import ChInspector
77
from clickhouse_connect.cc_sqlalchemy.sql import full_table
88
from clickhouse_connect.cc_sqlalchemy.sql.ddlcompiler import ChDDLCompiler
9+
from clickhouse_connect.cc_sqlalchemy.sql.compiler import ChStatementCompiler
910
from clickhouse_connect.cc_sqlalchemy import ischema_names, dialect_name
1011
from clickhouse_connect.cc_sqlalchemy.sql.preparer import ChIdentifierPreparer
1112
from clickhouse_connect.driver.binding import quote_identifier, format_str
@@ -26,6 +27,7 @@ class ClickHouseDialect(DefaultDialect):
2627
returns_unicode_strings = True
2728
postfetch_lastrowid = False
2829
ddl_compiler = ChDDLCompiler
30+
statement_compiler = ChStatementCompiler
2931
preparer = ChIdentifierPreparer
3032
description_encoding = None
3133
max_identifier_length = 127
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
from sqlalchemy.exc import CompileError
2+
from sqlalchemy.sql.compiler import SQLCompiler
3+
4+
from clickhouse_connect.cc_sqlalchemy.sql import format_table
5+
6+
7+
# pylint: disable=arguments-differ
8+
class ChStatementCompiler(SQLCompiler):
9+
10+
# pylint: disable=attribute-defined-outside-init
11+
def visit_delete(self, delete_stmt, visiting_cte=None, **kw):
12+
table = delete_stmt.table
13+
text = f"DELETE FROM {format_table(table)}"
14+
15+
if delete_stmt.whereclause is not None:
16+
self._in_delete_where = True
17+
try:
18+
text += " WHERE " + self.process(delete_stmt.whereclause, **kw)
19+
finally:
20+
self._in_delete_where = False
21+
else:
22+
raise CompileError("ClickHouse DELETE statements require a WHERE clause. To delete all rows, use 'TRUNCATE TABLE' instead.")
23+
24+
return text
25+
26+
def visit_select(self, select_stmt, **kw):
27+
return super().visit_select(select_stmt, **kw)
28+
29+
def visit_join(self, join, **kw):
30+
left = self.process(join.left, **kw)
31+
right = self.process(join.right, **kw)
32+
onclause = join.onclause
33+
34+
if getattr(join, "full", False):
35+
join_kw = " FULL OUTER JOIN "
36+
elif onclause is None:
37+
join_kw = " CROSS JOIN "
38+
elif join.isouter:
39+
join_kw = " LEFT OUTER JOIN "
40+
else:
41+
join_kw = " INNER JOIN "
42+
43+
text = left + join_kw + right
44+
45+
if onclause is not None:
46+
text += " ON " + self.process(onclause, **kw)
47+
48+
return text
49+
50+
def visit_column(self, column, add_to_result_map=None, include_table=True, result_map_targets=(), ambiguous_table_name_map=None, **kw):
51+
if getattr(self, "_in_delete_where", False):
52+
return self.preparer.quote(column.name)
53+
54+
return super().visit_column(
55+
column,
56+
add_to_result_map=add_to_result_map,
57+
include_table=include_table,
58+
result_map_targets=result_map_targets,
59+
**kw,
60+
)
61+
62+
# Abstract methods required by SQLCompiler
63+
def delete_extra_from_clause(self, delete_stmt, from_table, extra_froms, from_hints, **kw):
64+
raise NotImplementedError("ClickHouse doesn't support DELETE with extra FROM clause")
65+
66+
def update_from_clause(self, update_stmt, from_table, extra_froms, from_hints, **kw):
67+
raise NotImplementedError("ClickHouse doesn't support UPDATE with FROM clause")
68+
69+
# pylint: disable=unused-argument
70+
def visit_empty_set_expr(self, element_types, **kw):
71+
return "SELECT 1 WHERE 1=0"
72+
73+
def visit_sequence(self, sequence, **kw):
74+
raise NotImplementedError("ClickHouse doesn't support sequences")
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
import pytest
2+
import sqlalchemy as db
3+
from sqlalchemy import MetaData, delete, text
4+
from sqlalchemy.engine import Engine
5+
from sqlalchemy.exc import CompileError
6+
7+
from clickhouse_connect.cc_sqlalchemy.datatypes.sqltypes import String, UInt64
8+
from clickhouse_connect.cc_sqlalchemy.ddl.tableengine import engine_map
9+
10+
11+
def test_delete_with_table_object(test_engine: Engine, test_db: str, test_table_engine: str):
12+
"""DELETE using SQLAlchemy Table object"""
13+
engine_cls = engine_map[test_table_engine]
14+
metadata = MetaData(schema=test_db)
15+
16+
test_table = db.Table(
17+
"delete_test",
18+
metadata,
19+
db.Column("id", UInt64),
20+
db.Column("name", String),
21+
db.Column("status", String),
22+
engine_cls("id"),
23+
)
24+
25+
with test_engine.begin() as conn:
26+
conn.execute(text("DROP TABLE IF EXISTS delete_test"))
27+
test_table.create(conn)
28+
29+
conn.execute(db.insert(test_table).values({"id": 1, "name": "hello world", "status": "active"}))
30+
conn.execute(db.insert(test_table).values({"id": 2, "name": "test data", "status": "inactive"}))
31+
conn.execute(db.insert(test_table).values({"id": 3, "name": "hello test", "status": "active"}))
32+
starting = conn.execute(db.select(test_table).order_by(test_table.c.id)).fetchall()
33+
assert len(starting) == 3
34+
assert [row.id for row in starting] == [1, 2, 3]
35+
36+
delete_stmt = delete(test_table).where(test_table.c.name.like("%hello%"))
37+
conn.execute(delete_stmt)
38+
39+
remaining = conn.execute(db.select(test_table).order_by(test_table.c.id)).fetchall()
40+
assert len(remaining) == 1
41+
assert remaining[0].name == "test data"
42+
43+
44+
def test_delete_with_multiple_conditions(test_engine: Engine, test_db: str, test_table_engine: str):
45+
"""DELETE with multiple WHERE conditions"""
46+
engine_cls = engine_map[test_table_engine]
47+
metadata = MetaData(schema=test_db)
48+
49+
test_table = db.Table(
50+
"delete_multi_test",
51+
metadata,
52+
db.Column("id", UInt64),
53+
db.Column("category", String),
54+
db.Column("value", UInt64),
55+
engine_cls("id"),
56+
)
57+
58+
with test_engine.begin() as conn:
59+
conn.execute(text("DROP TABLE IF EXISTS delete_multi_test"))
60+
test_table.create(conn)
61+
62+
conn.execute(db.insert(test_table).values({"id": 1, "category": "A", "value": 100}))
63+
conn.execute(db.insert(test_table).values({"id": 2, "category": "B", "value": 200}))
64+
conn.execute(db.insert(test_table).values({"id": 3, "category": "A", "value": 300}))
65+
conn.execute(db.insert(test_table).values({"id": 4, "category": "C", "value": 50}))
66+
starting = conn.execute(db.select(test_table).order_by(test_table.c.id)).fetchall()
67+
assert len(starting) == 4
68+
assert [row.id for row in starting] == [1, 2, 3, 4]
69+
70+
delete_stmt = delete(test_table).where((test_table.c.category == "A") & (test_table.c.value > 150))
71+
conn.execute(delete_stmt)
72+
73+
remaining = conn.execute(db.select(test_table).order_by(test_table.c.id)).fetchall()
74+
assert len(remaining) == 3
75+
assert [row.id for row in remaining] == [1, 2, 4]
76+
77+
78+
def test_delete_all_rows_error(test_engine: Engine, test_db: str, test_table_engine: str):
79+
"""DELETE without WHERE should raise"""
80+
engine_cls = engine_map[test_table_engine]
81+
metadata = MetaData(schema=test_db)
82+
83+
test_table = db.Table(
84+
"delete_all_test",
85+
metadata,
86+
db.Column("id", UInt64),
87+
db.Column("data", String),
88+
engine_cls("id"),
89+
)
90+
91+
with test_engine.begin() as conn:
92+
conn.execute(text("DROP TABLE IF EXISTS delete_all_test"))
93+
test_table.create(conn)
94+
conn.execute(db.insert(test_table).values({"id": 1, "data": "test1"}))
95+
96+
delete_stmt = delete(test_table)
97+
with pytest.raises(CompileError, match="require a WHERE clause"):
98+
conn.execute(delete_stmt)
99+
100+
101+
def test_delete_basic_functionality(test_engine: Engine, test_table_engine: str):
102+
"""Basic DELETE statement compilation and execution"""
103+
engine_cls = engine_map[test_table_engine]
104+
metadata = MetaData()
105+
106+
test_table = db.Table(
107+
"delete_simple_test",
108+
metadata,
109+
db.Column("id", UInt64),
110+
db.Column("name", String),
111+
engine_cls("id"),
112+
)
113+
114+
with test_engine.begin() as conn:
115+
conn.execute(text("DROP TABLE IF EXISTS delete_simple_test"))
116+
test_table.create(conn)
117+
118+
conn.execute(db.insert(test_table).values({"id": 1, "name": "test_row"}))
119+
starting = conn.execute(db.select(test_table).order_by(test_table.c.id)).fetchall()
120+
assert len(starting) == 1
121+
assert starting[0].name == "test_row"
122+
123+
delete_stmt = delete(test_table).where(test_table.c.id == 1)
124+
conn.execute(delete_stmt)
125+
126+
result = conn.execute(db.select(test_table).order_by(test_table.c.id)).fetchall()
127+
assert len(result) == 0
128+
129+
130+
def test_delete_with_text_condition(test_engine: Engine, test_table_engine: str):
131+
"""Test DELETE with text-based WHERE condition"""
132+
engine_cls = engine_map[test_table_engine]
133+
metadata = MetaData()
134+
135+
test_table = db.Table(
136+
"delete_text_test",
137+
metadata,
138+
db.Column("id", UInt64),
139+
db.Column("status", String),
140+
engine_cls("id"),
141+
)
142+
143+
with test_engine.begin() as conn:
144+
conn.execute(text("DROP TABLE IF EXISTS delete_text_test"))
145+
test_table.create(conn)
146+
147+
conn.execute(db.insert(test_table).values({"id": 1, "status": "active"}))
148+
conn.execute(db.insert(test_table).values({"id": 2, "status": "inactive"}))
149+
conn.execute(db.insert(test_table).values({"id": 3, "status": "active"}))
150+
starting = conn.execute(db.select(test_table).order_by(test_table.c.id)).fetchall()
151+
assert len(starting) == 3
152+
assert [row.id for row in starting] == [1, 2, 3]
153+
154+
delete_stmt = delete(test_table).where(test_table.c.status == "inactive")
155+
result = conn.execute(delete_stmt)
156+
157+
result = conn.execute(db.select(test_table).order_by(test_table.c.id)).fetchall()
158+
assert len(result) == 2
159+
assert [row.id for row in result] == [1, 3]
160+
161+
162+
def test_explicit_delete(test_engine: Engine, test_table_engine: str):
163+
"""Test explicit DELETE"""
164+
engine_cls = engine_map[test_table_engine]
165+
metadata = MetaData()
166+
167+
test_table = db.Table(
168+
"delete_explicit_test",
169+
metadata,
170+
db.Column("id", UInt64),
171+
db.Column("name", String),
172+
engine_cls("id"),
173+
)
174+
175+
with test_engine.begin() as conn:
176+
conn.execute(text("DROP TABLE IF EXISTS delete_explicit_test"))
177+
test_table.create(conn)
178+
179+
conn.execute(db.insert(test_table).values({"id": 1, "name": "hello world"}))
180+
conn.execute(db.insert(test_table).values({"id": 2, "name": "test data"}))
181+
conn.execute(db.insert(test_table).values({"id": 3, "name": "hello test"}))
182+
starting = conn.execute(db.select(test_table).order_by(test_table.c.id)).fetchall()
183+
assert len(starting) == 3
184+
assert [row.id for row in starting] == [1, 2, 3]
185+
186+
conn.execute(text("DELETE FROM delete_explicit_test WHERE name LIKE '%hello%'"))
187+
188+
result = conn.execute(db.select(test_table).order_by(test_table.c.id)).fetchall()
189+
assert len(result) == 1
190+
assert result[0].name == "test data"

0 commit comments

Comments
 (0)