Skip to content

Commit ecace1c

Browse files
authored
Merge pull request ClickHouse#79677 from ClickHouse/fix-delta-kernel-crash
Fix potential crash in delta-kernel implementation
2 parents 11d5900 + 7d487a1 commit ecace1c

File tree

2 files changed

+47
-42
lines changed

2 files changed

+47
-42
lines changed

src/Storages/ObjectStorage/DataLakes/DeltaLake/TableSnapshot.cpp

Lines changed: 43 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -227,60 +227,63 @@ class TableSnapshot::Iterator final : public DB::IObjectIterator
227227
/// but instead in data files paths directory names.
228228
/// So we extract these values here and put into `partitions_info`.
229229
DB::ObjectInfoWithPartitionColumns::PartitionColumnsInfo partitions_info;
230-
for (const auto & partition_column : context->partition_columns)
230+
if (partition_map)
231231
{
232-
std::string * value;
233-
/// This map is empty if columnMappingMode = ''.
234-
/// (E.g. empty string, which is the default mode).
235-
if (context->physical_names_map.empty())
232+
for (const auto & partition_column : context->partition_columns)
236233
{
237-
value = static_cast<std::string *>(ffi::get_from_string_map(
238-
partition_map,
239-
KernelUtils::toDeltaString(partition_column),
240-
KernelUtils::allocateString));
241-
}
242-
else
243-
{
244-
/// DeltaKernel has inconsistency, getPartitionColumns returns logical column names,
245-
/// while here in partition_map we would have physical columns as map keys.
246-
/// This will be fixed after switching to "transform"'s.
247-
auto it = context->physical_names_map.find(partition_column);
248-
if (it == context->physical_names_map.end())
234+
std::string * value;
235+
/// This map is empty if columnMappingMode = ''.
236+
/// (E.g. empty string, which is the default mode).
237+
if (context->physical_names_map.empty())
249238
{
250-
throw DB::Exception(
251-
DB::ErrorCodes::LOGICAL_ERROR,
252-
"Cannot find parititon column {} in physical columns map",
253-
partition_column);
239+
value = static_cast<std::string *>(ffi::get_from_string_map(
240+
partition_map,
241+
KernelUtils::toDeltaString(partition_column),
242+
KernelUtils::allocateString));
243+
}
244+
else
245+
{
246+
/// DeltaKernel has inconsistency, getPartitionColumns returns logical column names,
247+
/// while here in partition_map we would have physical columns as map keys.
248+
/// This will be fixed after switching to "transform"'s.
249+
auto it = context->physical_names_map.find(partition_column);
250+
if (it == context->physical_names_map.end())
251+
{
252+
throw DB::Exception(
253+
DB::ErrorCodes::LOGICAL_ERROR,
254+
"Cannot find parititon column {} in physical columns map",
255+
partition_column);
256+
}
257+
258+
value = static_cast<std::string *>(ffi::get_from_string_map(
259+
partition_map,
260+
KernelUtils::toDeltaString(it->second),
261+
KernelUtils::allocateString));
254262
}
255263

256-
value = static_cast<std::string *>(ffi::get_from_string_map(
257-
partition_map,
258-
KernelUtils::toDeltaString(it->second),
259-
KernelUtils::allocateString));
260-
}
261-
262-
SCOPE_EXIT({ delete value; });
264+
SCOPE_EXIT({ delete value; });
263265

264-
if (value)
265-
{
266-
auto name_and_type = context->schema.tryGetByName(partition_column);
267-
if (!name_and_type)
266+
if (value)
268267
{
269-
throw DB::Exception(
270-
DB::ErrorCodes::LOGICAL_ERROR,
271-
"Cannot find column `{}` in schema, there are only columns: `{}`",
272-
partition_column, fmt::join(context->schema.getNames(), ", "));
268+
auto name_and_type = context->schema.tryGetByName(partition_column);
269+
if (!name_and_type)
270+
{
271+
throw DB::Exception(
272+
DB::ErrorCodes::LOGICAL_ERROR,
273+
"Cannot find column `{}` in schema, there are only columns: `{}`",
274+
partition_column, fmt::join(context->schema.getNames(), ", "));
275+
}
276+
partitions_info.emplace_back(
277+
name_and_type.value(),
278+
DB::parseFieldFromString(*value, name_and_type->type));
273279
}
274-
partitions_info.emplace_back(
275-
name_and_type.value(),
276-
DB::parseFieldFromString(*value, name_and_type->type));
277280
}
278281
}
279282

280283
LOG_TEST(
281284
context->log,
282285
"Scanned file: {}, size: {}, num records: {}, partition columns: {}",
283-
full_path, size, stats->num_records, partitions_info.size());
286+
full_path, size, stats ? DB::toString(stats->num_records) : "Unknown", partitions_info.size());
284287

285288
DB::ObjectInfoPtr object;
286289
if (partitions_info.empty())

tests/integration/test_database_delta/test.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,12 @@ def execute_multiple_spark_queries(node, queries_list, ignore_exit_code=False):
7777
return execute_spark_query(node, ";".join(queries_list), ignore_exit_code)
7878

7979

80-
def test_embedded_database_and_tables(started_cluster):
80+
@pytest.mark.parametrize("use_delta_kernel", ["1", "0"])
81+
def test_embedded_database_and_tables(started_cluster, use_delta_kernel):
8182
node1 = started_cluster.instances["node1"]
83+
node1.query("drop database if exists unity_test")
8284
node1.query(
83-
"create database unity_test engine DataLakeCatalog('http://localhost:8080/api/2.1/unity-catalog') settings warehouse = 'unity', catalog_type='unity', vended_credentials=false",
85+
f"create database unity_test engine DataLakeCatalog('http://localhost:8080/api/2.1/unity-catalog') settings warehouse = 'unity', catalog_type='unity', vended_credentials=false, allow_experimental_delta_kernel_rs={use_delta_kernel}",
8486
settings={"allow_experimental_database_unity_catalog": "1"},
8587
)
8688
default_tables = list(

0 commit comments

Comments
 (0)