From 95b4f8bf765493959c290519dfdbdcda0b385c87 Mon Sep 17 00:00:00 2001 From: wudidapaopao Date: Thu, 20 Nov 2025 17:07:51 +0800 Subject: [PATCH 1/4] fix: fix concurrent query python dataframe hang --- programs/local/TableFunctionPython.cpp | 2 +- programs/local/chdb.cpp | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/programs/local/TableFunctionPython.cpp b/programs/local/TableFunctionPython.cpp index afa068b0ded..3dbec303e25 100644 --- a/programs/local/TableFunctionPython.cpp +++ b/programs/local/TableFunctionPython.cpp @@ -66,13 +66,13 @@ void TableFunctionPython::parseArguments(const ASTPtr & ast_function, ContextPtr std::remove_if(py_reader_arg_str.begin(), py_reader_arg_str.end(), [](char c) { return c == '\'' || c == '\"' || c == '`'; }), py_reader_arg_str.end()); + py::gil_scoped_acquire acquire; auto instance = PythonTableCache::getQueryableObj(py_reader_arg_str); if (instance == nullptr || instance.is_none()) throw Exception(ErrorCodes::PY_OBJECT_NOT_FOUND, "Python object not found in the Python environment\n" "Ensure that the object is type of PyReader, pandas DataFrame, or PyArrow Table and is in the global or local scope"); - py::gil_scoped_acquire acquire; LOG_DEBUG( logger, "Python object found in Python environment with name: {} type: {}", diff --git a/programs/local/chdb.cpp b/programs/local/chdb.cpp index c964fe6b2f9..34c54b40307 100644 --- a/programs/local/chdb.cpp +++ b/programs/local/chdb.cpp @@ -468,6 +468,8 @@ const std::string & chdb_streaming_result_error_string(chdb_streaming_result * r chdb_connection * connect_chdb_with_exception(int argc, char ** argv) { + /// Release GIL during connection mutex acquisition to prevent deadlock + py::gil_scoped_release release; std::lock_guard global_lock(global_connection_mutex); std::string path = ":memory:"; // Default path From 92f8bf6dc48b744bef0b17bfb0d4e2f7545f0a6b Mon Sep 17 00:00:00 2001 From: wudidapaopao Date: Thu, 20 Nov 2025 17:54:17 +0800 Subject: [PATCH 2/4] fix: fix build issues --- programs/local/chdb.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/programs/local/chdb.cpp b/programs/local/chdb.cpp index 34c54b40307..e4b42c74f29 100644 --- a/programs/local/chdb.cpp +++ b/programs/local/chdb.cpp @@ -469,7 +469,9 @@ const std::string & chdb_streaming_result_error_string(chdb_streaming_result * r chdb_connection * connect_chdb_with_exception(int argc, char ** argv) { /// Release GIL during connection mutex acquisition to prevent deadlock +#if USE_PYTHON py::gil_scoped_release release; +#endif std::lock_guard global_lock(global_connection_mutex); std::string path = ":memory:"; // Default path From 98b707415979cbfc95643284f836c141217d15cd Mon Sep 17 00:00:00 2001 From: wudidapaopao Date: Thu, 20 Nov 2025 19:08:48 +0800 Subject: [PATCH 3/4] chore: update workflow --- .github/workflows/build_macos_x86_wheels.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/build_macos_x86_wheels.yml b/.github/workflows/build_macos_x86_wheels.yml index 5f9cf520d80..f58e2230c86 100644 --- a/.github/workflows/build_macos_x86_wheels.yml +++ b/.github/workflows/build_macos_x86_wheels.yml @@ -133,7 +133,6 @@ jobs: - uses: actions/checkout@v3 with: fetch-depth: 0 - token: ${{ secrets.GH_TOKEN }} - name: Update submodules run: | git submodule update --init --recursive --jobs 4 From 037c5149a356819cd12988f7ee00936ee8081d41 Mon Sep 17 00:00:00 2001 From: wudidapaopao Date: Fri, 21 Nov 2025 10:22:38 +0800 Subject: [PATCH 4/4] test: add parallel query tests --- tests/test_parallel_query.py | 80 ++++++++++++++++++++++++++++++++++++ 1 file changed, 80 insertions(+) create mode 100755 tests/test_parallel_query.py diff --git a/tests/test_parallel_query.py b/tests/test_parallel_query.py new file mode 100755 index 00000000000..565725f6916 --- /dev/null +++ b/tests/test_parallel_query.py @@ -0,0 +1,80 @@ +#!python3 +import unittest +import traceback +import concurrent.futures +import sys +import uuid +import chdb +import pandas as pd + + +def worker(worker_id): + try: + # Create a DataFrame + df = pd.DataFrame({ + 'id': list(range(10)), + 'value': [i * worker_id for i in range(10)], + 'category': [f'cat_{i % 3}' for i in range(10)] + }) + + # Generate unique variable name + var_name = f"__test_df_{uuid.uuid4().hex}__" + + # Register DataFrame in global namespace + globals()[var_name] = df + + try: + # Execute SQL query using Python() table function + sql = f"SELECT * FROM Python({var_name}) WHERE value > 3" + conn = chdb.connect() + result = conn.query(sql, 'DataFrame') + conn.close() + return result + finally: + # Clean up global namespace + if var_name in globals(): + del globals()[var_name] + + except Exception as e: + # print(f"[Worker {worker_id}] EXCEPTION: {e}") + # traceback.print_exc() + return None + +def test_connection_per_thread(): + """Test using separate connection per thread. + Note: This test may report errors about not finding Python objects, + which is expected. The main purpose is to test that queries don't + deadlock in multi-threaded scenarios. + """ + num_workers = 5 + try: + with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor: + futures = [executor.submit(worker, i) for i in range(num_workers)] + + completed = [] + for future in concurrent.futures.as_completed(futures, timeout=15): + try: + result = future.result() + completed.append(result) + except Exception as e: + print(f"Worker raised exception: {e}") + return False + if len(completed) != num_workers: + print(f"ERROR: Expected {num_workers} results, got {len(completed)}") + return False + return True + + except concurrent.futures.TimeoutError: + print("TIMEOUT!") + return False + except Exception as e: + print(f"ERROR: {e}") + return False + +class TestParallelQuery(unittest.TestCase): + def test_parallel_query(self): + self.assertTrue(test_connection_per_thread()) + + +if __name__ == '__main__': + unittest.main(verbosity=2)