Skip to content

Commit df4ad83

Browse files
authored
Merge pull request ClickHouse#92687 from ClickHouse/backport/25.8/92006
Backport ClickHouse#92006 to 25.8: Cache schema only for 1 file in globs by default in schema inference
2 parents ee90b1a + 3b5d46b commit df4ad83

File tree

9 files changed

+73
-77
lines changed

9 files changed

+73
-77
lines changed

src/Formats/ReadSchemaUtils.cpp

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -166,10 +166,7 @@ try
166166
throw Exception(ErrorCodes::LOGICAL_ERROR, "Schema from cache was returned, but format name is unknown");
167167

168168
if (mode == SchemaInferenceMode::DEFAULT)
169-
{
170-
read_buffer_iterator.setResultingSchema(*iterator_data.cached_columns);
171169
return {*iterator_data.cached_columns, *format_name};
172-
}
173170

174171
schemas_for_union_mode.emplace_back(iterator_data.cached_columns->getAll(), read_buffer_iterator.getLastFilePath());
175172
continue;
@@ -258,12 +255,13 @@ try
258255
if (num_rows)
259256
read_buffer_iterator.setNumRowsToLastFile(*num_rows);
260257

258+
if (!names_and_types.empty())
259+
read_buffer_iterator.setSchemaToLastFile(ColumnsDescription(names_and_types));
260+
261261
/// In default mode, we finish when schema is inferred successfully from any file.
262262
if (mode == SchemaInferenceMode::DEFAULT)
263263
break;
264264

265-
if (!names_and_types.empty())
266-
read_buffer_iterator.setSchemaToLastFile(ColumnsDescription(names_and_types));
267265
schemas_for_union_mode.emplace_back(names_and_types, read_buffer_iterator.getLastFilePath());
268266
}
269267
catch (...)
@@ -416,6 +414,9 @@ try
416414
read_buffer_iterator.setFormatName(*format_name);
417415
}
418416

417+
if (format_name)
418+
read_buffer_iterator.setSchemaToLastFile(ColumnsDescription(names_and_types));
419+
419420
if (mode == SchemaInferenceMode::UNION)
420421
{
421422
/// For UNION mode we need to know the schema of each file,
@@ -424,7 +425,6 @@ try
424425
if (!format_name)
425426
throw Exception(ErrorCodes::CANNOT_DETECT_FORMAT, "The data format cannot be detected by the contents of the files. You can specify the format manually");
426427

427-
read_buffer_iterator.setSchemaToLastFile(ColumnsDescription(names_and_types));
428428
schemas_for_union_mode.emplace_back(names_and_types, read_buffer_iterator.getLastFilePath());
429429
}
430430

@@ -527,10 +527,7 @@ try
527527
std::remove_if(names_and_types.begin(), names_and_types.end(), [](const NameAndTypePair & pair) { return pair.name.empty(); }),
528528
names_and_types.end());
529529

530-
auto columns = ColumnsDescription(names_and_types);
531-
if (mode == SchemaInferenceMode::DEFAULT)
532-
read_buffer_iterator.setResultingSchema(columns);
533-
return {columns, *format_name};
530+
return {ColumnsDescription(names_and_types), *format_name};
534531
}
535532

536533
throw Exception(

src/Formats/ReadSchemaUtils.h

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,9 @@ struct IReadBufferIterator
4646
/// Used for caching number of rows from files metadata during schema inference.
4747
virtual void setNumRowsToLastFile(size_t /*num_rows*/) {}
4848

49-
/// Set schema inferred from last file. Used for UNION mode to cache schema
50-
/// per file.
49+
/// Set schema inferred from last file.
5150
virtual void setSchemaToLastFile(const ColumnsDescription & /*columns*/) {}
5251

53-
/// Set resulting inferred schema. Used for DEFAULT mode to cache schema
54-
/// for all files.
55-
virtual void setResultingSchema(const ColumnsDescription & /*columns*/) {}
56-
5752
/// Set auto detected format name.
5853
virtual void setFormatName(const String & /*format_name*/) {}
5954

src/Storages/ObjectStorage/ReadBufferIterator.cpp

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -119,20 +119,8 @@ void ReadBufferIterator::setNumRowsToLastFile(size_t num_rows)
119119

120120
void ReadBufferIterator::setSchemaToLastFile(const ColumnsDescription & columns)
121121
{
122-
if (query_settings.schema_inference_use_cache
123-
&& query_settings.schema_inference_mode == SchemaInferenceMode::UNION)
124-
{
122+
if (query_settings.schema_inference_use_cache)
125123
schema_cache.addColumns(getKeyForSchemaCache(*current_object_info, *format), columns);
126-
}
127-
}
128-
129-
void ReadBufferIterator::setResultingSchema(const ColumnsDescription & columns)
130-
{
131-
if (query_settings.schema_inference_use_cache
132-
&& query_settings.schema_inference_mode == SchemaInferenceMode::DEFAULT)
133-
{
134-
schema_cache.addManyColumns(getKeysForSchemaCache(), columns);
135-
}
136124
}
137125

138126
void ReadBufferIterator::setFormatName(const String & format_name)

src/Storages/ObjectStorage/ReadBufferIterator.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@ class ReadBufferIterator : public IReadBufferIterator, WithContext
2727

2828
void setSchemaToLastFile(const ColumnsDescription & columns) override;
2929

30-
void setResultingSchema(const ColumnsDescription & columns) override;
31-
3230
String getLastFilePath() const override;
3331

3432
void setFormatName(const String & format_name) override;

src/Storages/StorageFile.cpp

Lines changed: 2 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -527,8 +527,7 @@ namespace
527527

528528
void setSchemaToLastFile(const ColumnsDescription & columns) override
529529
{
530-
if (!getContext()->getSettingsRef()[Setting::schema_inference_use_cache_for_file]
531-
|| getContext()->getSettingsRef()[Setting::schema_inference_mode] != SchemaInferenceMode::UNION)
530+
if (!getContext()->getSettingsRef()[Setting::schema_inference_use_cache_for_file])
532531
return;
533532

534533
/// For union mode, schema can be different for different files, so we need to
@@ -537,17 +536,6 @@ namespace
537536
StorageFile::getSchemaCache(getContext()).addColumns(cache_key, columns);
538537
}
539538

540-
void setResultingSchema(const ColumnsDescription & columns) override
541-
{
542-
if (!getContext()->getSettingsRef()[Setting::schema_inference_use_cache_for_file]
543-
|| getContext()->getSettingsRef()[Setting::schema_inference_mode] != SchemaInferenceMode::DEFAULT)
544-
return;
545-
546-
/// For default mode we cache resulting schema for all paths.
547-
auto cache_keys = getKeysForSchemaCache(paths, *format, format_settings, getContext());
548-
StorageFile::getSchemaCache(getContext()).addManyColumns(cache_keys, columns);
549-
}
550-
551539
String getLastFilePath() const override
552540
{
553541
if (current_index != 0)
@@ -793,8 +781,7 @@ namespace
793781

794782
void setSchemaToLastFile(const ColumnsDescription & columns) override
795783
{
796-
if (!getContext()->getSettingsRef()[Setting::schema_inference_use_cache_for_file]
797-
|| getContext()->getSettingsRef()[Setting::schema_inference_mode] != SchemaInferenceMode::UNION)
784+
if (!getContext()->getSettingsRef()[Setting::schema_inference_use_cache_for_file])
798785
return;
799786

800787
/// For union mode, schema can be different for different files in archive, so we need to
@@ -804,22 +791,6 @@ namespace
804791
schema_cache.addColumns(cache_key, columns);
805792
}
806793

807-
void setResultingSchema(const ColumnsDescription & columns) override
808-
{
809-
if (!getContext()->getSettingsRef()[Setting::schema_inference_use_cache_for_file]
810-
|| getContext()->getSettingsRef()[Setting::schema_inference_mode] != SchemaInferenceMode::DEFAULT)
811-
return;
812-
813-
/// For default mode we cache resulting schema for all paths.
814-
/// Also add schema for initial paths (maybe with globes) in cache,
815-
/// so next time we won't iterate through files (that can be expensive).
816-
for (const auto & archive : archive_info.paths_to_archives)
817-
paths_for_schema_cache.emplace_back(fmt::format("{}::{}", archive, archive_info.path_in_archive));
818-
auto & schema_cache = StorageFile::getSchemaCache(getContext());
819-
auto cache_keys = getKeysForSchemaCache(paths_for_schema_cache, *format, format_settings, getContext());
820-
schema_cache.addManyColumns(cache_keys, columns);
821-
}
822-
823794
void setFormatName(const String & format_name) override
824795
{
825796
format = format_name;

src/Storages/StorageURL.cpp

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -925,27 +925,13 @@ namespace
925925

926926
void setSchemaToLastFile(const ColumnsDescription & columns) override
927927
{
928-
if (!getContext()->getSettingsRef()[Setting::schema_inference_use_cache_for_url]
929-
|| getContext()->getSettingsRef()[Setting::schema_inference_mode] != SchemaInferenceMode::UNION)
928+
if (!getContext()->getSettingsRef()[Setting::schema_inference_use_cache_for_url])
930929
return;
931930

932931
auto key = getKeyForSchemaCache(current_url_option, *format, format_settings, getContext());
933932
StorageURL::getSchemaCache(getContext()).addColumns(key, columns);
934933
}
935934

936-
void setResultingSchema(const ColumnsDescription & columns) override
937-
{
938-
if (!getContext()->getSettingsRef()[Setting::schema_inference_use_cache_for_url]
939-
|| getContext()->getSettingsRef()[Setting::schema_inference_mode] != SchemaInferenceMode::DEFAULT)
940-
return;
941-
942-
for (const auto & options : url_options_to_check)
943-
{
944-
auto keys = getKeysForSchemaCache(options, *format, format_settings, getContext());
945-
StorageURL::getSchemaCache(getContext()).addManyColumns(keys, columns);
946-
}
947-
}
948-
949935
void setFormatName(const String & format_name) override
950936
{
951937
format = format_name;

tests/integration/test_storage_s3/test.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2918,3 +2918,44 @@ def test_partition_by_without_wildcard(started_cluster):
29182918
PARTITION BY (b, c)
29192919
"""
29202920
)
2921+
2922+
def test_schema_inference_cache_multi_path(started_cluster):
2923+
bucket = started_cluster.minio_bucket
2924+
instance = started_cluster.instances["dummy"]
2925+
s3_path_prefix = f"http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_schema_infer_cache"
2926+
query1 = "insert into table function s3('{}/{}', 'Parquet', '{}') settings s3_truncate_on_insert=1 values {}".format(
2927+
s3_path_prefix,
2928+
"test1.parquet",
2929+
"column1 UInt32, column2 String",
2930+
"(1, 'a'), (2, 'b')",
2931+
)
2932+
query2 = "insert into table function s3('{}/{}', 'Parquet', '{}') settings s3_truncate_on_insert=1 values {}".format(
2933+
s3_path_prefix,
2934+
"test2.parquet",
2935+
"column1 String, column2 UInt32",
2936+
"('a', 1), ('b', 2)",
2937+
)
2938+
2939+
run_query(instance, query1)
2940+
run_query(instance, query2)
2941+
2942+
# Sleep so files last modification time is in the past
2943+
time.sleep(2)
2944+
2945+
instance.query(f"DESCRIBE TABLE s3('{s3_path_prefix}/*')")
2946+
2947+
assert "a\t1\nb\t2\n" == instance.query(
2948+
f"SELECT * FROM s3('{s3_path_prefix}/test2.parquet')"
2949+
)
2950+
assert "1\ta\n2\tb\n" == instance.query(
2951+
f"SELECT * FROM s3('{s3_path_prefix}/test1.parquet')"
2952+
)
2953+
2954+
instance.query(f"DESCRIBE TABLE url('{s3_path_prefix}/{{test1.parquet,test2.parquet}}')")
2955+
2956+
assert "a\t1\nb\t2\n" == instance.query(
2957+
f"SELECT * FROM url('{s3_path_prefix}/test2.parquet')"
2958+
)
2959+
assert "1\ta\n2\tb\n" == instance.query(
2960+
f"SELECT * FROM url('{s3_path_prefix}/test1.parquet')"
2961+
)
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
0
2+
Hello 42
3+
42 Hello
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
#!/usr/bin/env bash
2+
# Tags: no-fasttest
3+
4+
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
5+
# shellcheck source=../shell_config.sh
6+
. "$CUR_DIR"/../shell_config.sh
7+
8+
$CLICKHOUSE_LOCAL -q "select 'Hello' as c1, 42 as c2 format Parquet" > $CLICKHOUSE_TEST_UNIQUE_NAME.1.parquet
9+
$CLICKHOUSE_LOCAL -q "select 42 as c1, 'Hello' as c2 format Parquet" > $CLICKHOUSE_TEST_UNIQUE_NAME.2.parquet
10+
$CLICKHOUSE_LOCAL -m -q "
11+
select sleepEachRow(2);
12+
desc file('$CLICKHOUSE_TEST_UNIQUE_NAME.*.parquet') format Null;
13+
select * from file('$CLICKHOUSE_TEST_UNIQUE_NAME.1.parquet');
14+
select * from file('$CLICKHOUSE_TEST_UNIQUE_NAME.2.parquet');
15+
"
16+
17+
rm $CLICKHOUSE_TEST_UNIQUE_NAME.*

0 commit comments

Comments
 (0)