Skip to content

Commit 8f94044

Browse files
Merge pull request ClickHouse#92306 from ClickHouse/backport/25.8/84611
Backport ClickHouse#84611 to 25.8: Distributed INSERT SELECT with WHERE
2 parents 293f098 + 6d5def6 commit 8f94044

File tree

3 files changed

+94
-2
lines changed

3 files changed

+94
-2
lines changed

src/Interpreters/InterpreterInsertQuery.cpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -633,8 +633,6 @@ static bool isInsertSelectTrivialEnoughForDistributedExecution(const ASTInsertQu
633633
/// TODO: replace with QueryTree analysis after switching to analyzer completely
634634
return (!select_query->distinct
635635
&& !select_query->limit_with_ties
636-
&& !select_query->prewhere()
637-
&& !select_query->where()
638636
&& !select_query->groupBy()
639637
&& !select_query->having()
640638
&& !select_query->orderBy()

tests/integration/test_parallel_replicas_insert_select/test.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,3 +274,57 @@ def test_insert_select_with_constant(start_cluster, max_parallel_replicas, paral
274274
)
275275
== f"{populate_count}\n"
276276
)
277+
278+
279+
@pytest.mark.parametrize(
280+
"max_parallel_replicas",
281+
[
282+
pytest.param(2),
283+
pytest.param(3),
284+
],
285+
)
286+
@pytest.mark.parametrize(
287+
"parallel_replicas_local_plan",
288+
[
289+
pytest.param(False),
290+
pytest.param(True),
291+
]
292+
)
293+
def test_insert_select_where(start_cluster, max_parallel_replicas, parallel_replicas_local_plan):
294+
populate_count = 1_000_000
295+
count = int(populate_count / 10)
296+
cluster_name = "test_1_shard_3_replicas"
297+
298+
source_table = "t_source"
299+
create_tables(source_table, populate_count=populate_count, skip_last_replica=False)
300+
target_table = "t_target"
301+
create_tables(target_table, populate_count=0, skip_last_replica=False)
302+
303+
query_id = str(uuid.uuid4())
304+
node1.query(
305+
f"INSERT INTO {target_table} SELECT * FROM {source_table} WHERE key % 10 = 0",
306+
settings={
307+
"parallel_distributed_insert_select": 2,
308+
"enable_parallel_replicas": 2,
309+
"max_parallel_replicas": max_parallel_replicas,
310+
"cluster_for_parallel_replicas": cluster_name,
311+
"parallel_replicas_local_plan": parallel_replicas_local_plan,
312+
"enable_analyzer": 1,
313+
},
314+
query_id=query_id
315+
)
316+
node1.query(f"SYSTEM SYNC REPLICA {target_table} LIGHTWEIGHT")
317+
assert (
318+
node1.query(
319+
f"select count() from {target_table}"
320+
)
321+
== f"{count}\n"
322+
)
323+
324+
# check that query executed in distributed way
325+
execute_on_cluster(f"SYSTEM FLUSH LOGS query_log")
326+
number_of_queries = node1.query(
327+
f"""SELECT count() FROM clusterAllReplicas({cluster_name}, system.query_log) WHERE current_database = currentDatabase() AND initial_query_id = '{query_id}' AND type = 'QueryFinish' AND query_kind = 'Insert'""",
328+
settings={"skip_unavailable_shards": 1},
329+
)
330+
assert (int(number_of_queries) > 1)

tests/integration/test_s3_cluster_insert_select/test.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,46 @@ def test_distributed_insert_select_to_rmt_limit(started_cluster):
187187
)
188188

189189

190+
def test_distributed_insert_select_to_rmt_where(started_cluster):
191+
table = "t_rmt_target"
192+
cluster_name = "cluster_1_shard_3_replicas"
193+
194+
node1.query(
195+
f"""DROP TABLE IF EXISTS {table} ON CLUSTER '{cluster_name}' SYNC;"""
196+
)
197+
198+
node1.query(
199+
f"""
200+
CREATE TABLE {table} ON CLUSTER {cluster_name} (a String, b UInt64)
201+
ENGINE=ReplicatedMergeTree('/clickhouse/tables/32c614a9-13af-43c5-848c-a3f62a78e390/{table}', '{{replica}}')
202+
ORDER BY (a, b);
203+
"""
204+
)
205+
206+
node1.query(
207+
f"""
208+
INSERT INTO {table} SELECT * FROM s3Cluster(
209+
'{cluster_name}',
210+
'http://minio1:9001/root/data/generated/*.csv', 'minio', '{minio_secret_key}', 'CSV','a String, b UInt64'
211+
) WHERE b = 100 SETTINGS parallel_distributed_insert_select=2;
212+
"""
213+
)
214+
215+
node1.query(f"SYSTEM SYNC REPLICA {table}")
216+
217+
assert (
218+
int(
219+
node1.query(
220+
f"SELECT count(*) FROM {table};"
221+
).strip()
222+
) == 99
223+
)
224+
225+
node1.query(
226+
f"""DROP TABLE IF EXISTS {table} ON CLUSTER '{cluster_name}' SYNC;"""
227+
)
228+
229+
190230
def test_distributed_insert_select_to_rmt_cte_const(started_cluster):
191231
table = "t_rmt_target"
192232
cluster_name = "cluster_1_shard_3_replicas"

0 commit comments

Comments
 (0)