Skip to content

Commit 132a0af

Browse files
authored
Merge pull request ClickHouse#89147 from ClickHouse/backport/25.8/89040
Backport ClickHouse#89040 to 25.8: Replace SchedulePool with a separate thread
2 parents 5e13ccc + a5d5db8 commit 132a0af

File tree

2 files changed

+38
-37
lines changed

2 files changed

+38
-37
lines changed

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp

Lines changed: 37 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -278,39 +278,7 @@ IcebergIterator::IcebergIterator(
278278
data_snapshot_,
279279
persistent_components_)
280280
, blocking_queue(100)
281-
, producer_task(local_context_->getSchedulePool().createTask(
282-
"IcebergMetaReaderThread",
283-
[this]
284-
{
285-
while (!blocking_queue.isFinished())
286-
{
287-
std::optional<ManifestFileEntry> entry;
288-
try
289-
{
290-
entry = data_files_iterator.next();
291-
}
292-
catch (...)
293-
{
294-
std::lock_guard lock(exception_mutex);
295-
if (!exception)
296-
{
297-
exception = std::current_exception();
298-
}
299-
blocking_queue.finish();
300-
break;
301-
}
302-
if (!entry.has_value())
303-
break;
304-
while (!blocking_queue.push(std::move(entry.value())))
305-
{
306-
if (blocking_queue.isFinished())
307-
{
308-
break;
309-
}
310-
}
311-
}
312-
blocking_queue.finish();
313-
}))
281+
, producer_task(std::nullopt)
314282
, callback(std::move(callback_))
315283
, format(configuration_.lock()->format)
316284
, compression_method(configuration_.lock()->compression_method)
@@ -330,8 +298,38 @@ IcebergIterator::IcebergIterator(
330298
}
331299
std::sort(equality_deletes_files.begin(), equality_deletes_files.end());
332300
std::sort(position_deletes_files.begin(), position_deletes_files.end());
333-
334-
producer_task->activateAndSchedule();
301+
producer_task.emplace(
302+
[this]()
303+
{
304+
while (!blocking_queue.isFinished())
305+
{
306+
std::optional<ManifestFileEntry> entry;
307+
try
308+
{
309+
entry = data_files_iterator.next();
310+
}
311+
catch (...)
312+
{
313+
std::lock_guard lock(exception_mutex);
314+
if (!exception)
315+
{
316+
exception = std::current_exception();
317+
}
318+
blocking_queue.finish();
319+
break;
320+
}
321+
if (!entry.has_value())
322+
break;
323+
while (!blocking_queue.push(std::move(entry.value())))
324+
{
325+
if (blocking_queue.isFinished())
326+
{
327+
break;
328+
}
329+
}
330+
}
331+
blocking_queue.finish();
332+
});
335333
}
336334

337335
ObjectInfoPtr IcebergIterator::next(size_t)
@@ -372,7 +370,10 @@ size_t IcebergIterator::estimatedKeysCount()
372370
IcebergIterator::~IcebergIterator()
373371
{
374372
blocking_queue.finish();
375-
producer_task->deactivate();
373+
if (producer_task)
374+
{
375+
producer_task->join();
376+
}
376377
}
377378
}
378379

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ class IcebergIterator : public IObjectIterator
103103
Iceberg::SingleThreadIcebergKeysIterator data_files_iterator;
104104
Iceberg::SingleThreadIcebergKeysIterator deletes_iterator;
105105
ConcurrentBoundedQueue<Iceberg::ManifestFileEntry> blocking_queue;
106-
BackgroundSchedulePool::TaskHolder producer_task;
106+
std::optional<ThreadFromGlobalPool> producer_task;
107107
IDataLakeMetadata::FileProgressCallback callback;
108108
const String format;
109109
const String compression_method;

0 commit comments

Comments
 (0)