Skip to content

Commit 89ad25c

Browse files
committed
fix buld squashed
1 parent 2cb4829 commit 89ad25c

File tree

9 files changed

+160
-18
lines changed

9 files changed

+160
-18
lines changed

src/Core/Settings.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,7 @@ class WriteBuffer;
110110
M(CLASS_NAME, URI) \
111111
M(CLASS_NAME, VectorSearchFilterStrategy) \
112112
M(CLASS_NAME, GeoToH3ArgumentOrder) \
113-
M(CLASS_NAME, ObjectStorageGranularityLevel) \
114-
M(CLASS_NAME, DecorrelationJoinKind)
113+
M(CLASS_NAME, ObjectStorageGranularityLevel)
115114

116115

117116
COMMON_SETTINGS_SUPPORTED_TYPES(Settings, DECLARE_SETTING_TRAIT)

src/Core/SettingsChangesHistory.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
5555
{"export_merge_tree_part_file_already_exists_policy", "skip", "skip", "New setting."},
5656
{"iceberg_timezone_for_timestamptz", "UTC", "UTC", "New setting."},
5757
{"hybrid_table_auto_cast_columns", true, true, "New setting to automatically cast Hybrid table columns when segments disagree on types. Default enabled."},
58-
{"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."}
58+
{"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."},
5959
{"cluster_table_function_split_granularity", "file", "file", "New setting."},
6060
{"cluster_table_function_buckets_batch_size", 0, 0, "New setting."},
6161
{"arrow_flight_request_descriptor_type", "path", "path", "New setting. Type of descriptor to use for Arrow Flight requests: 'path' or 'command'. Dremio requires 'command'."},

src/Disks/ObjectStorages/IObjectStorage.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,8 @@ struct PathWithMetadata
150150
std::optional<String> absolute_path;
151151
ObjectStoragePtr object_storage_to_use = nullptr;
152152

153+
FileBucketInfoPtr file_bucket_info;
154+
153155
PathWithMetadata() = default;
154156

155157
explicit PathWithMetadata(
@@ -189,6 +191,14 @@ struct PathWithMetadata
189191
void loadMetadata(ObjectStoragePtr object_storage, bool ignore_non_existent_file = true);
190192

191193
ObjectStoragePtr getObjectStorage() const { return object_storage_to_use; }
194+
195+
String getIdentifier() const
196+
{
197+
String result = absolute_path.value_or(relative_path);
198+
if (file_bucket_info)
199+
result += file_bucket_info->getIdentifier();
200+
return result;
201+
}
192202
};
193203

194204
struct ObjectKeyWithMetadata

src/Interpreters/ClusterFunctionReadTask.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
11
#pragma once
22
#include <Core/Types.h>
3-
<<<<<<< HEAD
4-
=======
53
#include <Storages/ObjectStorage/DataLakes/DataLakeObjectMetadata.h>
64
#include <Processors/Formats/IInputFormat.h>
75
#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.h>
8-
>>>>>>> 4bed2ad0c69 (Merge pull request #87508 from scanhex12/distributed_execution_better_spread)
96
#include <Storages/ObjectStorage/IObjectIterator.h>
107
#include <Storages/ObjectStorage/DataLakes/DataLakeObjectMetadata.h>
118

src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
#include <Core/Range.h>
77
#include <Interpreters/ActionsDAG.h>
88
#include <Processors/ISimpleTransform.h>
9-
#include <Storages/ObjectStorage/IObjectIterator.h>
9+
#include <Disks/ObjectStorages/IObjectStorage.h>
1010
#include <QueryPipeline/QueryPipelineBuilder.h>
1111
#include <Storages/AlterCommands.h>
1212
#include <Storages/MutationCommands.h>
@@ -94,10 +94,9 @@ struct StorageID;
9494
struct IObjectIterator;
9595
struct RelativePathWithMetadata;
9696
class IObjectStorage;
97-
struct ObjectInfo;
98-
using ObjectInfoPtr = std::shared_ptr<ObjectInfo>;
9997
using ObjectIterator = std::shared_ptr<IObjectIterator>;
10098
using ObjectStoragePtr = std::shared_ptr<IObjectStorage>;
99+
using ObjectInfoPtr = std::shared_ptr<PathWithMetadata>;
101100

102101
class IDataLakeMetadata : boost::noncopyable
103102
{

src/Storages/ObjectStorage/IObjectIterator.cpp

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,4 +76,49 @@ ObjectInfoPtr ObjectIteratorWithPathAndFileFilter::next(size_t id)
7676
return {};
7777
}
7878

79+
ObjectIteratorSplitByBuckets::ObjectIteratorSplitByBuckets(
80+
ObjectIterator iterator_,
81+
const String & format_,
82+
ObjectStoragePtr object_storage_,
83+
const ContextPtr & context_)
84+
: WithContext(context_)
85+
, iterator(iterator_)
86+
, format(format_)
87+
, object_storage(object_storage_)
88+
, format_settings(getFormatSettings(context_))
89+
{
90+
}
91+
92+
ObjectInfoPtr ObjectIteratorSplitByBuckets::next(size_t id)
93+
{
94+
if (!pending_objects_info.empty())
95+
{
96+
auto result = pending_objects_info.front();
97+
pending_objects_info.pop();
98+
return result;
99+
}
100+
auto last_object_info = iterator->next(id);
101+
if (!last_object_info)
102+
return {};
103+
104+
auto buffer = createReadBuffer(*last_object_info, object_storage, getContext(), log);
105+
106+
auto splitter = FormatFactory::instance().getSplitter(format);
107+
if (splitter)
108+
{
109+
size_t bucket_size = getContext()->getSettingsRef()[Setting::cluster_table_function_buckets_batch_size];
110+
auto file_bucket_info = splitter->splitToBuckets(bucket_size, *buffer, format_settings);
111+
for (const auto & file_bucket : file_bucket_info)
112+
{
113+
auto copy_object_info = *last_object_info;
114+
copy_object_info.file_bucket_info = file_bucket;
115+
pending_objects_info.push(std::make_shared<ObjectInfo>(copy_object_info));
116+
}
117+
}
118+
119+
auto result = pending_objects_info.front();
120+
pending_objects_info.pop();
121+
return result;
122+
}
123+
79124
}

src/Storages/ObjectStorage/StorageObjectStorageConfiguration.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ class IDataLakeMetadata;
2727
struct IObjectIterator;
2828
using SinkToStoragePtr = std::shared_ptr<SinkToStorage>;
2929
using ObjectIterator = std::shared_ptr<IObjectIterator>;
30+
using ObjectInfoPtr = std::shared_ptr<PathWithMetadata>;
3031

3132
namespace ErrorCodes
3233
{
@@ -281,17 +282,16 @@ class StorageObjectStorageConfiguration
281282
return false;
282283
}
283284

285+
String format = "auto";
286+
String compression_method = "auto";
287+
String structure = "auto";
288+
284289
PartitionStrategyFactory::StrategyType partition_strategy_type = PartitionStrategyFactory::StrategyType::NONE;
285290
std::shared_ptr<IPartitionStrategy> partition_strategy;
286291
/// Whether partition column values are contained in the actual data.
287292
/// And alternative is with hive partitioning, when they are contained in file path.
288293
bool partition_columns_in_data_file = true;
289294

290-
private:
291-
String format = "auto";
292-
String compression_method = "auto";
293-
String structure = "auto";
294-
295295
protected:
296296
bool initialized = false;
297297

src/Storages/StorageURL.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -807,10 +807,10 @@ std::function<void(std::ostream &)> IStorageURLBase::getReadPOSTDataCallback(
807807

808808
namespace
809809
{
810-
class ReadBufferIterator : public IReadBufferIterator, WithContext
810+
class URLReadBufferIterator : public IReadBufferIterator, WithContext
811811
{
812812
public:
813-
ReadBufferIterator(
813+
URLReadBufferIterator(
814814
const std::vector<String> & urls_to_check_,
815815
std::optional<String> format_,
816816
const CompressionMethod & compression_method_,
@@ -1054,7 +1054,7 @@ std::pair<ColumnsDescription, String> IStorageURLBase::getTableStructureAndForma
10541054
else
10551055
urls_to_check = {uri};
10561056

1057-
ReadBufferIterator read_buffer_iterator(urls_to_check, format, compression_method, headers, format_settings, context);
1057+
URLReadBufferIterator read_buffer_iterator(urls_to_check, format, compression_method, headers, format_settings, context);
10581058
if (format)
10591059
return {readSchemaFromFormat(*format, format_settings, read_buffer_iterator, context), *format};
10601060
return detectFormatAndReadSchema(format_settings, read_buffer_iterator, context);

tests/integration/test_storage_iceberg/test.py

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3923,7 +3923,7 @@ def check_validity_and_get_prunned_files(select_expression):
39233923
)
39243924

39253925

3926-
3926+
39273927
def test_iceberg_write_minmax(started_cluster):
39283928
instance = started_cluster.instances["node1"]
39293929
TABLE_NAME = "test_iceberg_write_minmax_" + get_uuid_str()
@@ -3937,3 +3937,95 @@ def test_iceberg_write_minmax(started_cluster):
39373937

39383938
res = instance.query(f"SELECT x,y FROM {TABLE_NAME} WHERE y=2 ORDER BY ALL").strip()
39393939
assert res == "1\t2"
3940+
3941+
3942+
@pytest.mark.parametrize("format_version", ["1", "2"])
3943+
@pytest.mark.parametrize("storage_type", ["s3", "azure"])
3944+
@pytest.mark.parametrize("cluster_table_function_buckets_batch_size", [0, 100, 1000])
3945+
@pytest.mark.parametrize("input_format_parquet_use_native_reader_v3", [0, 1])
3946+
def test_cluster_table_function_split_by_row_groups(started_cluster_iceberg_with_spark, format_version, storage_type, cluster_table_function_buckets_batch_size,input_format_parquet_use_native_reader_v3):
3947+
instance = started_cluster_iceberg_with_spark.instances["node1"]
3948+
spark = started_cluster_iceberg_with_spark.spark_session
3949+
3950+
TABLE_NAME = (
3951+
"test_iceberg_cluster_"
3952+
+ format_version
3953+
+ "_"
3954+
+ storage_type
3955+
+ "_"
3956+
+ get_uuid_str()
3957+
)
3958+
3959+
def add_df(mode):
3960+
write_iceberg_from_df(
3961+
spark,
3962+
generate_data(spark, 0, 100000),
3963+
TABLE_NAME,
3964+
mode=mode,
3965+
format_version=format_version,
3966+
)
3967+
3968+
files = default_upload_directory(
3969+
started_cluster_iceberg_with_spark,
3970+
storage_type,
3971+
f"/iceberg_data/default/{TABLE_NAME}/",
3972+
f"/iceberg_data/default/{TABLE_NAME}/",
3973+
)
3974+
3975+
logging.info(f"Adding another dataframe. result files: {files}")
3976+
3977+
return files
3978+
3979+
files = add_df(mode="overwrite")
3980+
for i in range(1, 5 * len(started_cluster_iceberg_with_spark.instances)):
3981+
files = add_df(mode="append")
3982+
3983+
clusters = instance.query(f"SELECT * FROM system.clusters")
3984+
logging.info(f"Clusters setup: {clusters}")
3985+
3986+
# Regular Query only node1
3987+
table_function_expr = get_creation_expression(
3988+
storage_type, TABLE_NAME, started_cluster_iceberg_with_spark, table_function=True
3989+
)
3990+
select_regular = (
3991+
instance.query(f"SELECT * FROM {table_function_expr} ORDER BY ALL").strip().split()
3992+
)
3993+
3994+
# Cluster Query with node1 as coordinator
3995+
table_function_expr_cluster = get_creation_expression(
3996+
storage_type,
3997+
TABLE_NAME,
3998+
started_cluster_iceberg_with_spark,
3999+
table_function=True,
4000+
run_on_cluster=True,
4001+
)
4002+
instance.query("SYSTEM FLUSH LOGS")
4003+
4004+
def get_buffers_count(func):
4005+
buffers_count_before = int(
4006+
instance.query(
4007+
f"SELECT sum(ProfileEvents['EngineFileLikeReadFiles']) FROM system.query_log WHERE type = 'QueryFinish'"
4008+
)
4009+
)
4010+
4011+
func()
4012+
instance.query("SYSTEM FLUSH LOGS")
4013+
buffers_count = int(
4014+
instance.query(
4015+
f"SELECT sum(ProfileEvents['EngineFileLikeReadFiles']) FROM system.query_log WHERE type = 'QueryFinish'"
4016+
)
4017+
)
4018+
return buffers_count - buffers_count_before
4019+
4020+
select_cluster = (
4021+
instance.query(f"SELECT * FROM {table_function_expr_cluster} ORDER BY ALL SETTINGS input_format_parquet_use_native_reader_v3={input_format_parquet_use_native_reader_v3},cluster_table_function_split_granularity='bucket', cluster_table_function_buckets_batch_size={cluster_table_function_buckets_batch_size}").strip().split()
4022+
)
4023+
4024+
# Simple size check
4025+
assert len(select_cluster) == len(select_regular)
4026+
# Actual check
4027+
assert select_cluster == select_regular
4028+
4029+
buffers_count_with_splitted_tasks = get_buffers_count(lambda: instance.query(f"SELECT * FROM {table_function_expr_cluster} ORDER BY ALL SETTINGS input_format_parquet_use_native_reader_v3={input_format_parquet_use_native_reader_v3},cluster_table_function_split_granularity='bucket', cluster_table_function_buckets_batch_size={cluster_table_function_buckets_batch_size}").strip().split())
4030+
buffers_count_default = get_buffers_count(lambda: instance.query(f"SELECT * FROM {table_function_expr_cluster} ORDER BY ALL SETTINGS input_format_parquet_use_native_reader_v3={input_format_parquet_use_native_reader_v3}, cluster_table_function_buckets_batch_size={cluster_table_function_buckets_batch_size}").strip().split())
4031+
assert buffers_count_with_splitted_tasks > buffers_count_default

0 commit comments

Comments
 (0)