Skip to content

Commit 3303aa4

Browse files
committed
fix buld squashed
1 parent 948b085 commit 3303aa4

File tree

8 files changed

+159
-17
lines changed

8 files changed

+159
-17
lines changed

src/Core/Settings.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,7 @@ class WriteBuffer;
110110
M(CLASS_NAME, URI) \
111111
M(CLASS_NAME, VectorSearchFilterStrategy) \
112112
M(CLASS_NAME, GeoToH3ArgumentOrder) \
113-
M(CLASS_NAME, ObjectStorageGranularityLevel) \
114-
M(CLASS_NAME, DecorrelationJoinKind)
113+
M(CLASS_NAME, ObjectStorageGranularityLevel)
115114

116115

117116
COMMON_SETTINGS_SUPPORTED_TYPES(Settings, DECLARE_SETTING_TRAIT)

src/Disks/ObjectStorages/IObjectStorage.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,8 @@ struct PathWithMetadata
150150
std::optional<String> absolute_path;
151151
ObjectStoragePtr object_storage_to_use = nullptr;
152152

153+
FileBucketInfoPtr file_bucket_info;
154+
153155
PathWithMetadata() = default;
154156

155157
explicit PathWithMetadata(
@@ -189,6 +191,14 @@ struct PathWithMetadata
189191
void loadMetadata(ObjectStoragePtr object_storage, bool ignore_non_existent_file = true);
190192

191193
ObjectStoragePtr getObjectStorage() const { return object_storage_to_use; }
194+
195+
String getIdentifier() const
196+
{
197+
String result = absolute_path.value_or(relative_path);
198+
if (file_bucket_info)
199+
result += file_bucket_info->getIdentifier();
200+
return result;
201+
}
192202
};
193203

194204
struct ObjectKeyWithMetadata

src/Interpreters/ClusterFunctionReadTask.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
11
#pragma once
22
#include <Core/Types.h>
3-
<<<<<<< HEAD
4-
=======
53
#include <Storages/ObjectStorage/DataLakes/DataLakeObjectMetadata.h>
64
#include <Processors/Formats/IInputFormat.h>
75
#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.h>
8-
>>>>>>> 4bed2ad0c69 (Merge pull request #87508 from scanhex12/distributed_execution_better_spread)
96
#include <Storages/ObjectStorage/IObjectIterator.h>
107
#include <Storages/ObjectStorage/DataLakes/DataLakeObjectMetadata.h>
118

src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
#include <Core/Range.h>
77
#include <Interpreters/ActionsDAG.h>
88
#include <Processors/ISimpleTransform.h>
9-
#include <Storages/ObjectStorage/IObjectIterator.h>
9+
#include <Disks/ObjectStorages/IObjectStorage.h>
1010
#include <QueryPipeline/QueryPipelineBuilder.h>
1111
#include <Storages/AlterCommands.h>
1212
#include <Storages/MutationCommands.h>
@@ -94,10 +94,9 @@ struct StorageID;
9494
struct IObjectIterator;
9595
struct RelativePathWithMetadata;
9696
class IObjectStorage;
97-
struct ObjectInfo;
98-
using ObjectInfoPtr = std::shared_ptr<ObjectInfo>;
9997
using ObjectIterator = std::shared_ptr<IObjectIterator>;
10098
using ObjectStoragePtr = std::shared_ptr<IObjectStorage>;
99+
using ObjectInfoPtr = std::shared_ptr<PathWithMetadata>;
101100

102101
class IDataLakeMetadata : boost::noncopyable
103102
{

src/Storages/ObjectStorage/IObjectIterator.cpp

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,4 +76,49 @@ ObjectInfoPtr ObjectIteratorWithPathAndFileFilter::next(size_t id)
7676
return {};
7777
}
7878

79+
ObjectIteratorSplitByBuckets::ObjectIteratorSplitByBuckets(
80+
ObjectIterator iterator_,
81+
const String & format_,
82+
ObjectStoragePtr object_storage_,
83+
const ContextPtr & context_)
84+
: WithContext(context_)
85+
, iterator(iterator_)
86+
, format(format_)
87+
, object_storage(object_storage_)
88+
, format_settings(getFormatSettings(context_))
89+
{
90+
}
91+
92+
ObjectInfoPtr ObjectIteratorSplitByBuckets::next(size_t id)
93+
{
94+
if (!pending_objects_info.empty())
95+
{
96+
auto result = pending_objects_info.front();
97+
pending_objects_info.pop();
98+
return result;
99+
}
100+
auto last_object_info = iterator->next(id);
101+
if (!last_object_info)
102+
return {};
103+
104+
auto buffer = createReadBuffer(*last_object_info, object_storage, getContext(), log);
105+
106+
auto splitter = FormatFactory::instance().getSplitter(format);
107+
if (splitter)
108+
{
109+
size_t bucket_size = getContext()->getSettingsRef()[Setting::cluster_table_function_buckets_batch_size];
110+
auto file_bucket_info = splitter->splitToBuckets(bucket_size, *buffer, format_settings);
111+
for (const auto & file_bucket : file_bucket_info)
112+
{
113+
auto copy_object_info = *last_object_info;
114+
copy_object_info.file_bucket_info = file_bucket;
115+
pending_objects_info.push(std::make_shared<ObjectInfo>(copy_object_info));
116+
}
117+
}
118+
119+
auto result = pending_objects_info.front();
120+
pending_objects_info.pop();
121+
return result;
122+
}
123+
79124
}

src/Storages/ObjectStorage/StorageObjectStorageConfiguration.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ class IDataLakeMetadata;
2727
struct IObjectIterator;
2828
using SinkToStoragePtr = std::shared_ptr<SinkToStorage>;
2929
using ObjectIterator = std::shared_ptr<IObjectIterator>;
30+
using ObjectInfoPtr = std::shared_ptr<PathWithMetadata>;
3031

3132
namespace ErrorCodes
3233
{
@@ -281,17 +282,16 @@ class StorageObjectStorageConfiguration
281282
return false;
282283
}
283284

285+
String format = "auto";
286+
String compression_method = "auto";
287+
String structure = "auto";
288+
284289
PartitionStrategyFactory::StrategyType partition_strategy_type = PartitionStrategyFactory::StrategyType::NONE;
285290
std::shared_ptr<IPartitionStrategy> partition_strategy;
286291
/// Whether partition column values are contained in the actual data.
287292
/// And alternative is with hive partitioning, when they are contained in file path.
288293
bool partition_columns_in_data_file = true;
289294

290-
private:
291-
String format = "auto";
292-
String compression_method = "auto";
293-
String structure = "auto";
294-
295295
protected:
296296
bool initialized = false;
297297

src/Storages/StorageURL.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -807,10 +807,10 @@ std::function<void(std::ostream &)> IStorageURLBase::getReadPOSTDataCallback(
807807

808808
namespace
809809
{
810-
class ReadBufferIterator : public IReadBufferIterator, WithContext
810+
class URLReadBufferIterator : public IReadBufferIterator, WithContext
811811
{
812812
public:
813-
ReadBufferIterator(
813+
URLReadBufferIterator(
814814
const std::vector<String> & urls_to_check_,
815815
std::optional<String> format_,
816816
const CompressionMethod & compression_method_,
@@ -1054,7 +1054,7 @@ std::pair<ColumnsDescription, String> IStorageURLBase::getTableStructureAndForma
10541054
else
10551055
urls_to_check = {uri};
10561056

1057-
ReadBufferIterator read_buffer_iterator(urls_to_check, format, compression_method, headers, format_settings, context);
1057+
URLReadBufferIterator read_buffer_iterator(urls_to_check, format, compression_method, headers, format_settings, context);
10581058
if (format)
10591059
return {readSchemaFromFormat(*format, format_settings, read_buffer_iterator, context), *format};
10601060
return detectFormatAndReadSchema(format_settings, read_buffer_iterator, context);

tests/integration/test_storage_iceberg/test.py

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3925,7 +3925,7 @@ def check_validity_and_get_prunned_files(select_expression):
39253925
)
39263926

39273927

3928-
3928+
39293929
def test_iceberg_write_minmax(started_cluster):
39303930
instance = started_cluster.instances["node1"]
39313931
TABLE_NAME = "test_iceberg_write_minmax_" + get_uuid_str()
@@ -3939,3 +3939,95 @@ def test_iceberg_write_minmax(started_cluster):
39393939

39403940
res = instance.query(f"SELECT x,y FROM {TABLE_NAME} WHERE y=2 ORDER BY ALL").strip()
39413941
assert res == "1\t2"
3942+
3943+
3944+
@pytest.mark.parametrize("format_version", ["1", "2"])
3945+
@pytest.mark.parametrize("storage_type", ["s3", "azure"])
3946+
@pytest.mark.parametrize("cluster_table_function_buckets_batch_size", [0, 100, 1000])
3947+
@pytest.mark.parametrize("input_format_parquet_use_native_reader_v3", [0, 1])
3948+
def test_cluster_table_function_split_by_row_groups(started_cluster_iceberg_with_spark, format_version, storage_type, cluster_table_function_buckets_batch_size,input_format_parquet_use_native_reader_v3):
3949+
instance = started_cluster_iceberg_with_spark.instances["node1"]
3950+
spark = started_cluster_iceberg_with_spark.spark_session
3951+
3952+
TABLE_NAME = (
3953+
"test_iceberg_cluster_"
3954+
+ format_version
3955+
+ "_"
3956+
+ storage_type
3957+
+ "_"
3958+
+ get_uuid_str()
3959+
)
3960+
3961+
def add_df(mode):
3962+
write_iceberg_from_df(
3963+
spark,
3964+
generate_data(spark, 0, 100000),
3965+
TABLE_NAME,
3966+
mode=mode,
3967+
format_version=format_version,
3968+
)
3969+
3970+
files = default_upload_directory(
3971+
started_cluster_iceberg_with_spark,
3972+
storage_type,
3973+
f"/iceberg_data/default/{TABLE_NAME}/",
3974+
f"/iceberg_data/default/{TABLE_NAME}/",
3975+
)
3976+
3977+
logging.info(f"Adding another dataframe. result files: {files}")
3978+
3979+
return files
3980+
3981+
files = add_df(mode="overwrite")
3982+
for i in range(1, 5 * len(started_cluster_iceberg_with_spark.instances)):
3983+
files = add_df(mode="append")
3984+
3985+
clusters = instance.query(f"SELECT * FROM system.clusters")
3986+
logging.info(f"Clusters setup: {clusters}")
3987+
3988+
# Regular Query only node1
3989+
table_function_expr = get_creation_expression(
3990+
storage_type, TABLE_NAME, started_cluster_iceberg_with_spark, table_function=True
3991+
)
3992+
select_regular = (
3993+
instance.query(f"SELECT * FROM {table_function_expr} ORDER BY ALL").strip().split()
3994+
)
3995+
3996+
# Cluster Query with node1 as coordinator
3997+
table_function_expr_cluster = get_creation_expression(
3998+
storage_type,
3999+
TABLE_NAME,
4000+
started_cluster_iceberg_with_spark,
4001+
table_function=True,
4002+
run_on_cluster=True,
4003+
)
4004+
instance.query("SYSTEM FLUSH LOGS")
4005+
4006+
def get_buffers_count(func):
4007+
buffers_count_before = int(
4008+
instance.query(
4009+
f"SELECT sum(ProfileEvents['EngineFileLikeReadFiles']) FROM system.query_log WHERE type = 'QueryFinish'"
4010+
)
4011+
)
4012+
4013+
func()
4014+
instance.query("SYSTEM FLUSH LOGS")
4015+
buffers_count = int(
4016+
instance.query(
4017+
f"SELECT sum(ProfileEvents['EngineFileLikeReadFiles']) FROM system.query_log WHERE type = 'QueryFinish'"
4018+
)
4019+
)
4020+
return buffers_count - buffers_count_before
4021+
4022+
select_cluster = (
4023+
instance.query(f"SELECT * FROM {table_function_expr_cluster} ORDER BY ALL SETTINGS input_format_parquet_use_native_reader_v3={input_format_parquet_use_native_reader_v3},cluster_table_function_split_granularity='bucket', cluster_table_function_buckets_batch_size={cluster_table_function_buckets_batch_size}").strip().split()
4024+
)
4025+
4026+
# Simple size check
4027+
assert len(select_cluster) == len(select_regular)
4028+
# Actual check
4029+
assert select_cluster == select_regular
4030+
4031+
buffers_count_with_splitted_tasks = get_buffers_count(lambda: instance.query(f"SELECT * FROM {table_function_expr_cluster} ORDER BY ALL SETTINGS input_format_parquet_use_native_reader_v3={input_format_parquet_use_native_reader_v3},cluster_table_function_split_granularity='bucket', cluster_table_function_buckets_batch_size={cluster_table_function_buckets_batch_size}").strip().split())
4032+
buffers_count_default = get_buffers_count(lambda: instance.query(f"SELECT * FROM {table_function_expr_cluster} ORDER BY ALL SETTINGS input_format_parquet_use_native_reader_v3={input_format_parquet_use_native_reader_v3}, cluster_table_function_buckets_batch_size={cluster_table_function_buckets_batch_size}").strip().split())
4033+
assert buffers_count_with_splitted_tasks > buffers_count_default

0 commit comments

Comments
 (0)