Skip to content

Commit 4fff84d

Browse files
Merge pull request ClickHouse#86880 from ClickHouse/backport/25.8/86357
Backport ClickHouse#86357 to 25.8: Add rows/bytes limit for inserted data files in delta lake
2 parents 2a17103 + 2e19ad3 commit 4fff84d

File tree

9 files changed

+247
-71
lines changed

9 files changed

+247
-71
lines changed

src/Core/Settings.cpp

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6471,12 +6471,12 @@ Query Iceberg table using the snapshot that was current at a specific timestamp.
64716471
)", 0) \
64726472
DECLARE(Int64, iceberg_snapshot_id, 0, R"(
64736473
Query Iceberg table using the specific snapshot id.
6474-
)", 0) \
6475-
DECLARE(Bool, delta_lake_enable_expression_visitor_logging, false, R"(
6476-
Enables Test level logs of DeltaLake expression visitor. These logs can be too verbose even for test logging.
64776474
)", 0) \
64786475
DECLARE(Bool, show_data_lake_catalogs_in_system_tables, true, R"(
64796476
Enables showing data lake catalogs in system tables.
6477+
)", 0) \
6478+
DECLARE(Bool, delta_lake_enable_expression_visitor_logging, false, R"(
6479+
Enables Test level logs of DeltaLake expression visitor. These logs can be too verbose even for test logging.
64806480
)", 0) \
64816481
DECLARE(Int64, delta_lake_snapshot_version, -1, R"(
64826482
Version of delta lake snapshot to read. Value -1 means to read latest version (value 0 is a valid snapshot version).
@@ -6486,6 +6486,12 @@ Enables throwing an exception if there was an error when analyzing scan predicat
64866486
)", 0) \
64876487
DECLARE(Bool, delta_lake_enable_engine_predicate, true, R"(
64886488
Enables delta-kernel internal data pruning.
6489+
)", 0) \
6490+
DECLARE(NonZeroUInt64, delta_lake_insert_max_rows_in_data_file, 100000, R"(
6491+
Defines a rows limit for a single inserted data file in delta lake.
6492+
)", 0) \
6493+
DECLARE(NonZeroUInt64, delta_lake_insert_max_bytes_in_data_file, 1_GiB, R"(
6494+
Defines a bytes limit for a single inserted data file in delta lake.
64896495
)", 0) \
64906496
DECLARE(Bool, allow_experimental_delta_lake_writes, false, R"(
64916497
Enables delta-kernel writes feature.

src/Core/SettingsChangesHistory.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,8 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
9696
{"per_part_index_stats", false, false, "New setting."},
9797
{"allow_experimental_iceberg_compaction", 0, 0, "New setting "},
9898
{"delta_lake_snapshot_version", -1, -1, "New setting"},
99+
{"delta_lake_insert_max_bytes_in_data_file", 1_GiB, 1_GiB, "New setting."},
100+
{"delta_lake_insert_max_rows_in_data_file", 100000, 100000, "New setting."},
99101
{"use_roaring_bitmap_iceberg_positional_deletes", false, false, "New setting"},
100102
{"iceberg_metadata_compression_method", "", "", "New setting"},
101103
{"allow_experimental_correlated_subqueries", false, true, "Mark correlated subqueries support as Beta."},

src/Storages/ObjectStorage/DataLakes/DeltaLake/DeltaLakePartitionedSink.cpp

Lines changed: 68 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <Common/Arena.h>
77
#include <Common/PODArray.h>
88
#include <Core/UUID.h>
9+
#include <Core/Settings.h>
910

1011
#include <Formats/FormatFactory.h>
1112
#include <Processors/Formats/IOutputFormat.h>
@@ -30,6 +31,12 @@ namespace ErrorCodes
3031
extern const int INCORRECT_DATA;
3132
}
3233

34+
namespace Setting
35+
{
36+
extern const SettingsNonZeroUInt64 delta_lake_insert_max_rows_in_data_file;
37+
extern const SettingsNonZeroUInt64 delta_lake_insert_max_bytes_in_data_file;
38+
}
39+
3340
namespace
3441
{
3542
/// Given partition columns list,
@@ -74,6 +81,8 @@ DeltaLakePartitionedSink::DeltaLakePartitionedSink(
7481
, object_storage(object_storage_)
7582
, format_settings(format_settings_)
7683
, configuration(configuration_)
84+
, data_file_max_rows(context_->getSettingsRef()[Setting::delta_lake_insert_max_rows_in_data_file])
85+
, data_file_max_bytes(context_->getSettingsRef()[Setting::delta_lake_insert_max_bytes_in_data_file])
7786
, partition_strategy(createPartitionStrategy(partition_columns, getHeader(), context_))
7887
, delta_transaction(delta_transaction_)
7988
{
@@ -131,7 +140,7 @@ void DeltaLakePartitionedSink::consume(Chunk & chunk)
131140
partition_index_to_chunk.emplace_back(Columns(), partition_column->size());
132141
}
133142

134-
for (size_t partition_index = 0; partition_index < partitions_size; ++partition_index)
143+
for (size_t partition_index = 0; partition_index < partitions_size; ++partition_index)
135144
{
136145
auto & partition_chunk = partition_index_to_chunk[partition_index];
137146
partition_chunk.addColumn(std::move(partition_index_to_column_split[partition_index]));
@@ -140,68 +149,86 @@ void DeltaLakePartitionedSink::consume(Chunk & chunk)
140149

141150
for (const auto & [partition_key, partition_index] : partition_id_to_chunk_index)
142151
{
143-
auto partition_data = getPartitionDataForPartitionKey(partition_key);
152+
auto & data_files = getPartitionDataForPartitionKey(partition_key)->data_files;
144153
auto & partition_chunk = partition_index_to_chunk[partition_index];
145-
partition_data->sink->consume(partition_chunk);
146-
partition_data->size += partition_chunk.bytes();
154+
155+
if (data_files.empty()
156+
|| data_files.back().written_rows >= data_file_max_rows
157+
|| data_files.back().written_bytes >= data_file_max_bytes)
158+
{
159+
data_files.emplace_back(createSinkForPartition(partition_key));
160+
total_data_files_count += 1;
161+
}
162+
auto & data_file = data_files.back();
163+
data_file.written_bytes += partition_chunk.bytes();
164+
data_file.written_rows += partition_chunk.getNumRows();
165+
data_file.sink->consume(partition_chunk);
147166
}
148167
}
149168

150-
DeltaLakePartitionedSink::PartitionDataPtr
169+
DeltaLakePartitionedSink::PartitionInfoPtr
151170
DeltaLakePartitionedSink::getPartitionDataForPartitionKey(StringRef partition_key)
152171
{
153-
auto it = partition_id_to_sink.find(partition_key);
154-
if (it == partition_id_to_sink.end())
155-
{
156-
auto data = std::make_shared<PartitionData>();
157-
auto data_prefix = std::filesystem::path(delta_transaction->getDataPath()) / partition_key.toString();
158-
data->path = DeltaLake::generateWritePath(std::move(data_prefix), configuration->format);
159-
160-
data->sink = std::make_shared<StorageObjectStorageSink>(
161-
data->path,
162-
object_storage,
163-
configuration,
164-
format_settings,
165-
std::make_shared<Block>(partition_strategy->getFormatHeader()),
166-
getContext()
167-
);
168-
std::tie(it, std::ignore) = partition_id_to_sink.emplace(partition_key, std::move(data));
169-
}
172+
auto it = partitions_data.find(partition_key);
173+
if (it == partitions_data.end())
174+
std::tie(it, std::ignore) = partitions_data.emplace(partition_key, std::make_shared<PartitionInfo>(partition_key));
170175
return it->second;
171176
}
172177

178+
DeltaLakePartitionedSink::StorageSinkPtr
179+
DeltaLakePartitionedSink::createSinkForPartition(StringRef partition_key)
180+
{
181+
auto data_prefix = std::filesystem::path(delta_transaction->getDataPath()) / partition_key.toString();
182+
return std::make_unique<StorageObjectStorageSink>(
183+
DeltaLake::generateWritePath(std::move(data_prefix), configuration->format),
184+
object_storage,
185+
configuration,
186+
format_settings,
187+
std::make_shared<Block>(partition_strategy->getFormatHeader()),
188+
getContext());
189+
}
190+
173191
void DeltaLakePartitionedSink::onFinish()
174192
{
175-
if (isCancelled() || partition_id_to_sink.empty())
193+
if (isCancelled() || partitions_data.empty())
176194
return;
177195

178-
for (auto & [_, data] : partition_id_to_sink)
179-
data->sink->onFinish();
180-
181-
LOG_TEST(log, "Written to {} sinks", partition_id_to_sink.size());
196+
std::vector<DeltaLake::WriteTransaction::CommitFile> files;
197+
files.reserve(total_data_files_count);
198+
const auto data_prefix = delta_transaction->getDataPath();
182199

183-
try
200+
for (auto & [_, partition_info] : partitions_data)
184201
{
185-
std::vector<DeltaLake::WriteTransaction::CommitFile> files;
186-
files.reserve(partition_id_to_sink.size());
187-
const auto data_prefix = delta_transaction->getDataPath();
188-
for (auto & [_, data] : partition_id_to_sink)
202+
auto & [partition_key, data_files] = *partition_info;
203+
auto partition_key_str = partition_key.toString();
204+
auto keys_and_values = HivePartitioningUtils::parseHivePartitioningKeysAndValues(partition_key_str);
205+
Map partition_values;
206+
partition_values.reserve(keys_and_values.size());
207+
for (const auto & [key, value] : keys_and_values)
208+
partition_values.emplace_back(DB::Tuple({key, value}));
209+
210+
for (const auto & [sink, written_bytes, written_rows] : data_files)
189211
{
190-
auto keys_and_values = HivePartitioningUtils::parseHivePartitioningKeysAndValues(data->path);
191-
Map partition_values;
192-
partition_values.reserve(keys_and_values.size());
193-
for (const auto & [key, value] : keys_and_values)
194-
partition_values.emplace_back(DB::Tuple({key, value}));
195-
196-
files.emplace_back(data->path.substr(data_prefix.size()), data->size, partition_values);
212+
sink->onFinish();
213+
files.emplace_back(
214+
sink->getPath().substr(data_prefix.size()),
215+
sink->getFileSize(),
216+
partition_values);
197217
}
218+
}
219+
220+
LOG_TEST(log, "Written {} data files", total_data_files_count);
221+
222+
try
223+
{
198224
delta_transaction->commit(files);
199225
}
200226
catch (...)
201227
{
202-
for (auto & [_, data] : partition_id_to_sink)
228+
for (auto & [_, partition_info] : partitions_data)
203229
{
204-
object_storage->removeObjectIfExists(StoredObject(data->path));
230+
for (const auto & [sink, written_bytes, written_rows] : partition_info->data_files)
231+
object_storage->removeObjectIfExists(StoredObject(sink->getPath()));
205232
}
206233
throw;
207234
}

src/Storages/ObjectStorage/DataLakes/DeltaLake/DeltaLakePartitionedSink.h

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include <absl/container/flat_hash_map.h>
1010
#include <Processors/Sinks/SinkToStorage.h>
1111
#include <Storages/ObjectStorage/IObjectIterator.h>
12+
#include <Storages/ObjectStorage/StorageObjectStorageSink.h>
1213
#include <Storages/IPartitionStrategy.h>
1314

1415

@@ -49,26 +50,42 @@ class DeltaLakePartitionedSink : public SinkToStorage, private WithContext
4950
void onFinish() override;
5051

5152
private:
52-
struct PartitionData
53+
using StorageSinkPtr = std::unique_ptr<StorageObjectStorageSink>;
54+
55+
struct DataFileInfo
56+
{
57+
explicit DataFileInfo(StorageSinkPtr sink_) : sink(std::move(sink_)) {}
58+
59+
StorageSinkPtr sink;
60+
size_t written_bytes = 0;
61+
size_t written_rows = 0;
62+
};
63+
struct PartitionInfo
5364
{
54-
SinkPtr sink;
55-
std::string path;
56-
size_t size = 0;
65+
explicit PartitionInfo(StringRef partition_key_) : partition_key(partition_key_) {}
66+
67+
const StringRef partition_key;
68+
std::vector<DataFileInfo> data_files;
5769
};
58-
using PartitionDataPtr = std::shared_ptr<PartitionData>;
59-
PartitionDataPtr getPartitionDataForPartitionKey(StringRef partition_key);
70+
using PartitionInfoPtr = std::shared_ptr<PartitionInfo>;
6071

6172
const LoggerPtr log;
6273
const Names partition_columns;
6374
const ObjectStoragePtr object_storage;
6475
const std::optional<FormatSettings> format_settings;
6576
const StorageObjectStorageConfigurationPtr configuration;
77+
const size_t data_file_max_rows;
78+
const size_t data_file_max_bytes;
6679
const std::unique_ptr<IPartitionStrategy> partition_strategy;
6780
const DeltaLake::WriteTransactionPtr delta_transaction;
6881

69-
absl::flat_hash_map<StringRef, PartitionDataPtr> partition_id_to_sink;
82+
absl::flat_hash_map<StringRef, PartitionInfoPtr> partitions_data;
83+
size_t total_data_files_count = 0;
7084
IColumn::Selector chunk_row_index_to_partition_index;
7185
Arena partition_keys_arena;
86+
87+
StorageSinkPtr createSinkForPartition(StringRef partition_key);
88+
PartitionInfoPtr getPartitionDataForPartitionKey(StringRef partition_key);
7289
};
7390

7491
}

src/Storages/ObjectStorage/DataLakes/DeltaLake/DeltaLakeSink.cpp

Lines changed: 54 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,21 @@
1+
#include <Core/Settings.h>
12
#include <Storages/ObjectStorage/DataLakes/DeltaLake/DeltaLakeSink.h>
23

34
#if USE_DELTA_KERNEL_RS
45
#include <Common/logger_useful.h>
6+
#include <Interpreters/Context.h>
57
#include <Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h>
68
#include <Storages/ObjectStorage/DataLakes/DeltaLake/WriteTransaction.h>
79
#include <Storages/ObjectStorage/DataLakes/DeltaLake/KernelUtils.h>
810

911

1012
namespace DB
1113
{
14+
namespace Setting
15+
{
16+
extern const SettingsNonZeroUInt64 delta_lake_insert_max_rows_in_data_file;
17+
extern const SettingsNonZeroUInt64 delta_lake_insert_max_bytes_in_data_file;
18+
}
1219

1320
DeltaLakeSink::DeltaLakeSink(
1421
DeltaLake::WriteTransactionPtr delta_transaction_,
@@ -17,49 +24,78 @@ DeltaLakeSink::DeltaLakeSink(
1724
ContextPtr context_,
1825
SharedHeader sample_block_,
1926
const std::optional<FormatSettings> & format_settings_)
20-
: StorageObjectStorageSink(
21-
DeltaLake::generateWritePath(
22-
delta_transaction_->getDataPath(),
23-
configuration_->format),
24-
object_storage_,
25-
configuration_,
26-
format_settings_,
27-
sample_block_,
28-
context_)
27+
: SinkToStorage(sample_block_)
28+
, WithContext(context_)
2929
, delta_transaction(delta_transaction_)
3030
, object_storage(object_storage_)
31+
, configuration(configuration_)
32+
, format_settings(format_settings_)
33+
, sample_block(sample_block_)
34+
, data_file_max_rows(context_->getSettingsRef()[Setting::delta_lake_insert_max_rows_in_data_file])
35+
, data_file_max_bytes(context_->getSettingsRef()[Setting::delta_lake_insert_max_bytes_in_data_file])
3136
{
3237
delta_transaction->validateSchema(getHeader());
3338
}
3439

40+
DeltaLakeSink::StorageSinkPtr DeltaLakeSink::createStorageSink() const
41+
{
42+
return std::make_unique<StorageObjectStorageSink>(
43+
DeltaLake::generateWritePath(
44+
delta_transaction->getDataPath(),
45+
configuration->format),
46+
object_storage,
47+
configuration,
48+
format_settings,
49+
sample_block,
50+
getContext());
51+
}
52+
3553
void DeltaLakeSink::consume(Chunk & chunk)
3654
{
3755
if (isCancelled())
3856
return;
3957

40-
written_bytes += chunk.bytes();
41-
StorageObjectStorageSink::consume(chunk);
58+
if (data_files.empty()
59+
|| data_files.back().written_bytes >= data_file_max_bytes
60+
|| data_files.back().written_rows >= data_file_max_rows)
61+
{
62+
data_files.emplace_back(createStorageSink());
63+
}
64+
65+
auto & data_file = data_files.back();
66+
data_file.written_bytes += chunk.bytes();
67+
data_file.written_rows += chunk.getNumRows();
68+
data_file.sink->consume(chunk);
4269
}
4370

4471
void DeltaLakeSink::onFinish()
4572
{
4673
if (isCancelled())
4774
return;
4875

49-
StorageObjectStorageSink::onFinish();
76+
std::vector<DeltaLake::WriteTransaction::CommitFile> files;
77+
files.reserve(data_files.size());
78+
for (const auto & [sink, written_bytes, written_rows] : data_files)
79+
{
80+
sink->onFinish();
81+
auto file_location = sink->getPath().substr(delta_transaction->getDataPath().size());
82+
auto file_size = sink->getFileSize();
83+
files.emplace_back(std::move(file_location), file_size, Map{});
84+
}
5085

5186
try
5287
{
53-
std::vector<DeltaLake::WriteTransaction::CommitFile> files;
54-
auto file_location = getPath().substr(delta_transaction->getDataPath().size());
55-
files.emplace_back(std::move(file_location), written_bytes, Map{});
5688
delta_transaction->commit(files);
5789
}
5890
catch (...)
5991
{
60-
/// FIXME: this should be just removeObject,
61-
/// but IObjectStorage does not have such method.
62-
object_storage->removeObjectIfExists(StoredObject(getPath()));
92+
for (const auto & [sink, written_bytes, written_rows] : data_files)
93+
{
94+
/// FIXME: this should be just removeObject,
95+
/// but IObjectStorage does not have such method.
96+
object_storage->removeObjectIfExists(StoredObject(sink->getPath()));
97+
98+
}
6399
throw;
64100
}
65101
}

0 commit comments

Comments
 (0)