diff --git a/.github/workflows/build_macos_x86_wheels.yml b/.github/workflows/build_macos_x86_wheels.yml index 5f9cf520d80..f58e2230c86 100644 --- a/.github/workflows/build_macos_x86_wheels.yml +++ b/.github/workflows/build_macos_x86_wheels.yml @@ -133,7 +133,6 @@ jobs: - uses: actions/checkout@v3 with: fetch-depth: 0 - token: ${{ secrets.GH_TOKEN }} - name: Update submodules run: | git submodule update --init --recursive --jobs 4 diff --git a/programs/local/TableFunctionPython.cpp b/programs/local/TableFunctionPython.cpp index afa068b0ded..3dbec303e25 100644 --- a/programs/local/TableFunctionPython.cpp +++ b/programs/local/TableFunctionPython.cpp @@ -66,13 +66,13 @@ void TableFunctionPython::parseArguments(const ASTPtr & ast_function, ContextPtr std::remove_if(py_reader_arg_str.begin(), py_reader_arg_str.end(), [](char c) { return c == '\'' || c == '\"' || c == '`'; }), py_reader_arg_str.end()); + py::gil_scoped_acquire acquire; auto instance = PythonTableCache::getQueryableObj(py_reader_arg_str); if (instance == nullptr || instance.is_none()) throw Exception(ErrorCodes::PY_OBJECT_NOT_FOUND, "Python object not found in the Python environment\n" "Ensure that the object is type of PyReader, pandas DataFrame, or PyArrow Table and is in the global or local scope"); - py::gil_scoped_acquire acquire; LOG_DEBUG( logger, "Python object found in Python environment with name: {} type: {}", diff --git a/programs/local/chdb.cpp b/programs/local/chdb.cpp index c964fe6b2f9..e4b42c74f29 100644 --- a/programs/local/chdb.cpp +++ b/programs/local/chdb.cpp @@ -468,6 +468,10 @@ const std::string & chdb_streaming_result_error_string(chdb_streaming_result * r chdb_connection * connect_chdb_with_exception(int argc, char ** argv) { + /// Release GIL during connection mutex acquisition to prevent deadlock +#if USE_PYTHON + py::gil_scoped_release release; +#endif std::lock_guard global_lock(global_connection_mutex); std::string path = ":memory:"; // Default path diff --git a/tests/test_parallel_query.py b/tests/test_parallel_query.py new file mode 100755 index 00000000000..565725f6916 --- /dev/null +++ b/tests/test_parallel_query.py @@ -0,0 +1,80 @@ +#!python3 +import unittest +import traceback +import concurrent.futures +import sys +import uuid +import chdb +import pandas as pd + + +def worker(worker_id): + try: + # Create a DataFrame + df = pd.DataFrame({ + 'id': list(range(10)), + 'value': [i * worker_id for i in range(10)], + 'category': [f'cat_{i % 3}' for i in range(10)] + }) + + # Generate unique variable name + var_name = f"__test_df_{uuid.uuid4().hex}__" + + # Register DataFrame in global namespace + globals()[var_name] = df + + try: + # Execute SQL query using Python() table function + sql = f"SELECT * FROM Python({var_name}) WHERE value > 3" + conn = chdb.connect() + result = conn.query(sql, 'DataFrame') + conn.close() + return result + finally: + # Clean up global namespace + if var_name in globals(): + del globals()[var_name] + + except Exception as e: + # print(f"[Worker {worker_id}] EXCEPTION: {e}") + # traceback.print_exc() + return None + +def test_connection_per_thread(): + """Test using separate connection per thread. + Note: This test may report errors about not finding Python objects, + which is expected. The main purpose is to test that queries don't + deadlock in multi-threaded scenarios. + """ + num_workers = 5 + try: + with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor: + futures = [executor.submit(worker, i) for i in range(num_workers)] + + completed = [] + for future in concurrent.futures.as_completed(futures, timeout=15): + try: + result = future.result() + completed.append(result) + except Exception as e: + print(f"Worker raised exception: {e}") + return False + if len(completed) != num_workers: + print(f"ERROR: Expected {num_workers} results, got {len(completed)}") + return False + return True + + except concurrent.futures.TimeoutError: + print("TIMEOUT!") + return False + except Exception as e: + print(f"ERROR: {e}") + return False + +class TestParallelQuery(unittest.TestCase): + def test_parallel_query(self): + self.assertTrue(test_connection_per_thread()) + + +if __name__ == '__main__': + unittest.main(verbosity=2)