Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/build_macos_x86_wheels.yml
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ jobs:
- uses: actions/checkout@v3
with:
fetch-depth: 0
token: ${{ secrets.GH_TOKEN }}
- name: Update submodules
run: |
git submodule update --init --recursive --jobs 4
Expand Down
2 changes: 1 addition & 1 deletion programs/local/TableFunctionPython.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,13 @@ void TableFunctionPython::parseArguments(const ASTPtr & ast_function, ContextPtr
std::remove_if(py_reader_arg_str.begin(), py_reader_arg_str.end(), [](char c) { return c == '\'' || c == '\"' || c == '`'; }),
py_reader_arg_str.end());

py::gil_scoped_acquire acquire;
auto instance = PythonTableCache::getQueryableObj(py_reader_arg_str);
if (instance == nullptr || instance.is_none())
throw Exception(ErrorCodes::PY_OBJECT_NOT_FOUND,
"Python object not found in the Python environment\n"
"Ensure that the object is type of PyReader, pandas DataFrame, or PyArrow Table and is in the global or local scope");

py::gil_scoped_acquire acquire;
LOG_DEBUG(
logger,
"Python object found in Python environment with name: {} type: {}",
Expand Down
4 changes: 4 additions & 0 deletions programs/local/chdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,10 @@ const std::string & chdb_streaming_result_error_string(chdb_streaming_result * r

chdb_connection * connect_chdb_with_exception(int argc, char ** argv)
{
/// Release GIL during connection mutex acquisition to prevent deadlock
#if USE_PYTHON
py::gil_scoped_release release;
#endif
std::lock_guard<std::shared_mutex> global_lock(global_connection_mutex);

std::string path = ":memory:"; // Default path
Expand Down
80 changes: 80 additions & 0 deletions tests/test_parallel_query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
#!python3
import unittest
import traceback
import concurrent.futures
import sys
import uuid
import chdb
import pandas as pd


def worker(worker_id):
try:
# Create a DataFrame
df = pd.DataFrame({
'id': list(range(10)),
'value': [i * worker_id for i in range(10)],
'category': [f'cat_{i % 3}' for i in range(10)]
})

# Generate unique variable name
var_name = f"__test_df_{uuid.uuid4().hex}__"

# Register DataFrame in global namespace
globals()[var_name] = df

try:
# Execute SQL query using Python() table function
sql = f"SELECT * FROM Python({var_name}) WHERE value > 3"
conn = chdb.connect()
result = conn.query(sql, 'DataFrame')
conn.close()
return result
finally:
# Clean up global namespace
if var_name in globals():
del globals()[var_name]

except Exception as e:
# print(f"[Worker {worker_id}] EXCEPTION: {e}")
# traceback.print_exc()
return None

def test_connection_per_thread():
"""Test using separate connection per thread.
Note: This test may report errors about not finding Python objects,
which is expected. The main purpose is to test that queries don't
deadlock in multi-threaded scenarios.
"""
num_workers = 5
try:
with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor:
futures = [executor.submit(worker, i) for i in range(num_workers)]

completed = []
for future in concurrent.futures.as_completed(futures, timeout=15):
try:
result = future.result()
completed.append(result)
except Exception as e:
print(f"Worker raised exception: {e}")
return False
if len(completed) != num_workers:
print(f"ERROR: Expected {num_workers} results, got {len(completed)}")
return False
return True

except concurrent.futures.TimeoutError:
print("TIMEOUT!")
return False
except Exception as e:
print(f"ERROR: {e}")
return False

class TestParallelQuery(unittest.TestCase):
def test_parallel_query(self):
self.assertTrue(test_connection_per_thread())


if __name__ == '__main__':
unittest.main(verbosity=2)
Loading