Skip to content

Commit d2f938f

Browse files
committed
Fix starving caused by GIL
1 parent 59075e8 commit d2f938f

File tree

3 files changed

+8
-3
lines changed

3 files changed

+8
-3
lines changed

src/Processors/Sources/PythonSource.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ extern const int PY_EXCEPTION_OCCURED;
4848

4949
PythonSource::PythonSource(
5050
py::object & data_source_,
51+
bool isInheritsFromPyReader_,
5152
const Block & sample_block_,
5253
PyColumnVecPtr column_cache,
5354
size_t data_source_row_count,
@@ -56,6 +57,7 @@ PythonSource::PythonSource(
5657
size_t num_streams)
5758
: ISource(sample_block_.cloneEmpty())
5859
, data_source(data_source_)
60+
, isInheritsFromPyReader(isInheritsFromPyReader_)
5961
, sample_block(sample_block_)
6062
, column_cache(column_cache)
6163
, data_source_row_count(data_source_row_count)
@@ -544,7 +546,7 @@ Chunk PythonSource::generate()
544546

545547
try
546548
{
547-
if (isInheritsFromPyReader(data_source))
549+
if (isInheritsFromPyReader)
548550
{
549551
PyObjectVecPtr data;
550552
py::gil_scoped_acquire acquire;

src/Processors/Sources/PythonSource.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ class PythonSource : public ISource
2626
public:
2727
PythonSource(
2828
py::object & data_source_,
29+
bool isInheritsFromPyReader_,
2930
const Block & sample_block_,
3031
PyColumnVecPtr column_cache,
3132
size_t data_source_row_count,
@@ -42,6 +43,7 @@ class PythonSource : public ISource
4243

4344
private:
4445
py::object & data_source; // Do not own the reference
46+
bool isInheritsFromPyReader; // If the data_source is a PyReader object
4547

4648
Block sample_block;
4749
PyColumnVecPtr column_cache;

src/Storages/StoragePython.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ Pipe StoragePython::read(
7070

7171
if (isInheritsFromPyReader(data_source))
7272
{
73-
return Pipe(std::make_shared<PythonSource>(data_source, sample_block, column_cache, data_source_row_count, max_block_size, 0, 1));
73+
return Pipe(
74+
std::make_shared<PythonSource>(data_source, true, sample_block, column_cache, data_source_row_count, max_block_size, 0, 1));
7475
}
7576

7677
prepareColumnCache(column_names, sample_block.getColumns(), sample_block);
@@ -79,7 +80,7 @@ Pipe StoragePython::read(
7980
// num_streams = 32; // for chdb testing
8081
for (size_t stream = 0; stream < num_streams; ++stream)
8182
pipes.emplace_back(std::make_shared<PythonSource>(
82-
data_source, sample_block, column_cache, data_source_row_count, max_block_size, stream, num_streams));
83+
data_source, false, sample_block, column_cache, data_source_row_count, max_block_size, stream, num_streams));
8384
return Pipe::unitePipes(std::move(pipes));
8485
}
8586

0 commit comments

Comments
 (0)