Skip to content

Commit 4e68ebe

Browse files
committed
fix
1 parent 09a4266 commit 4e68ebe

File tree

1 file changed

+14
-11
lines changed

1 file changed

+14
-11
lines changed

src/Storages/ObjectStorage/IObjectIterator.cpp

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -101,21 +101,24 @@ ObjectInfoPtr ObjectIteratorSplitByBuckets::next(size_t id)
101101
if (!last_object_info)
102102
return {};
103103

104-
auto buffer = createReadBuffer(*last_object_info, object_storage, getContext(), log);
105-
106104
auto splitter = FormatFactory::instance().getSplitter(format);
107-
if (splitter)
105+
/// If there's no splitter for this format (e.g., CSV in archives), return the object as-is
106+
if (!splitter)
107+
return last_object_info;
108+
109+
auto buffer = createReadBuffer(*last_object_info, object_storage, getContext(), log);
110+
size_t bucket_size = getContext()->getSettingsRef()[Setting::cluster_table_function_buckets_batch_size];
111+
auto file_bucket_info = splitter->splitToBuckets(bucket_size, *buffer, format_settings);
112+
for (const auto & file_bucket : file_bucket_info)
108113
{
109-
size_t bucket_size = getContext()->getSettingsRef()[Setting::cluster_table_function_buckets_batch_size];
110-
auto file_bucket_info = splitter->splitToBuckets(bucket_size, *buffer, format_settings);
111-
for (const auto & file_bucket : file_bucket_info)
112-
{
113-
auto copy_object_info = *last_object_info;
114-
copy_object_info.file_bucket_info = file_bucket;
115-
pending_objects_info.push(std::make_shared<ObjectInfo>(copy_object_info));
116-
}
114+
auto copy_object_info = *last_object_info;
115+
copy_object_info.file_bucket_info = file_bucket;
116+
pending_objects_info.push(std::make_shared<ObjectInfo>(copy_object_info));
117117
}
118118

119+
if (pending_objects_info.empty())
120+
return last_object_info;
121+
119122
auto result = pending_objects_info.front();
120123
pending_objects_info.pop();
121124
return result;

0 commit comments

Comments
 (0)