Skip to content

Commit 79d44e0

Browse files
Merge pull request ClickHouse#86879 from ClickHouse/backport/25.8/86367
Backport ClickHouse#86367 to 25.8: Fix logical error during writes to DeltaLake
2 parents 61228a1 + b3bc426 commit 79d44e0

File tree

2 files changed

+40
-2
lines changed

2 files changed

+40
-2
lines changed

src/Storages/ObjectStorage/DataLakes/DeltaLake/WriteTransaction.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,12 +208,14 @@ void WriteTransaction::create()
208208
void WriteTransaction::validateSchema(const DB::Block & header) const
209209
{
210210
assertTransactionCreated();
211-
if (write_schema.getNames() != header.getNames())
211+
auto write_column_names = write_schema.getNameSet();
212+
auto header_column_names = header.getNamesAndTypesList().getNameSet();
213+
if (write_column_names != header_column_names)
212214
{
213215
throw DB::Exception(
214216
DB::ErrorCodes::LOGICAL_ERROR,
215217
"Header does not match write schema. Expected: {}, got: {}",
216-
fmt::join(write_schema.getNames(), ", "), fmt::join(header.getNames(), ", "));
218+
fmt::join(write_column_names, ", "), fmt::join(header_column_names, ", "));
217219
}
218220
}
219221

tests/integration/test_storage_delta/test.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3326,3 +3326,39 @@ def test_writes_spark_compatibility(started_cluster):
33263326
"[Row(id=10, name='10'), Row(id=11, name='11'), Row(id=12, name='12'), Row(id=13, name='13'), Row(id=14, name='14'), Row(id=15, name='15'), Row(id=16, name='16'), Row(id=17, name='17'), Row(id=18, name='18'), Row(id=19, name='19'), Row(id=0, name='0'), Row(id=1, name='1'), Row(id=2, name='2'), Row(id=3, name='3'), Row(id=4, name='4'), Row(id=5, name='5'), Row(id=6, name='6'), Row(id=7, name='7'), Row(id=8, name='8'), Row(id=9, name='9')]"
33273327
== str(df)
33283328
)
3329+
3330+
3331+
def test_write_column_order(started_cluster):
3332+
instance = started_cluster.instances["node1"]
3333+
minio_client = started_cluster.minio_client
3334+
bucket = started_cluster.minio_bucket
3335+
table_name = randomize_table_name("test_write_column_order")
3336+
result_file = f"{table_name}_data"
3337+
schema = pa.schema([("c1", pa.int32()), ("c0", pa.string())])
3338+
empty_arrays = [pa.array([], type=pa.int32()), pa.array([], type=pa.string())]
3339+
write_deltalake(
3340+
f"file:///{result_file}",
3341+
pa.Table.from_arrays(empty_arrays, schema=schema),
3342+
mode="overwrite",
3343+
)
3344+
LocalUploader(instance).upload_directory(f"/{result_file}/", f"/{result_file}/")
3345+
3346+
instance.query(
3347+
f"CREATE TABLE {table_name} (c0 String, c1 Int32) ENGINE = DeltaLakeLocal('/{result_file}') SETTINGS output_format_parquet_compression_method = 'none'"
3348+
)
3349+
num_rows = 10
3350+
instance.query(
3351+
f"INSERT INTO {table_name} (c1, c0) SELECT number as c1, toString(number % 2) as c0 FROM numbers(10)"
3352+
)
3353+
3354+
assert num_rows == int(instance.query(f"SELECT count() FROM {table_name}"))
3355+
assert (
3356+
"0\t0\n1\t1\n0\t2\n1\t3\n0\t4\n1\t5\n0\t6\n1\t7\n0\t8\n1\t9"
3357+
== instance.query(f"SELECT c0, c1 FROM {table_name}").strip()
3358+
)
3359+
3360+
instance.query(
3361+
f"INSERT INTO {table_name} (c1, c0) SELECT c1, c0 FROM generateRandom('c1 Int32, c0 String', 16920040705558589162, 7706, 3) LIMIT {num_rows}"
3362+
)
3363+
3364+
assert num_rows * 2 == int(instance.query(f"SELECT count() FROM {table_name}"))

0 commit comments

Comments
 (0)