Skip to content

Commit 2aa061e

Browse files
committed
feat: implement and test querying JSON columns in pyarrow Table, DataFrame, dict, and PyReader
- Implemented support for querying JSON columns across various data sources including pyarrow Table, DataFrame, dictionary, and PyReader. - Added corresponding test cases to validate the querying functionality for each data type. - Enhanced the display format of run_all.py.
1 parent 9dd6b67 commit 2aa061e

32 files changed

+1141
-172
lines changed

programs/local/CMakeLists.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ set (CLICKHOUSE_LOCAL_SOURCES LocalServer.cpp)
22

33
if (USE_PYTHON)
44
set (CHDB_SOURCES
5+
FormatHelper.cpp
6+
ListScan.cpp
57
LocalChdb.cpp
68
LocalServer.cpp
79
NumpyType.cpp
@@ -10,6 +12,9 @@ if (USE_PYTHON)
1012
PandasScan.cpp
1113
PybindWrapper.cpp
1214
PythonConversion.cpp
15+
PythonDict.cpp
16+
PythonReader.cpp
17+
PythonTableCache.cpp
1318
PythonImportCache.cpp
1419
PythonImporter.cpp
1520
PythonObjects.cpp

programs/local/DatetimeCacheItem.h

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44

55
namespace CHDB {
66

7-
struct DatetimeDatetimeCacheItem : public PythonImportCacheItem {
7+
struct DatetimeDatetimeCacheItem : public PythonImportCacheItem
8+
{
89

910
public:
1011
DatetimeDatetimeCacheItem(PythonImportCacheItem * parent)
@@ -19,7 +20,8 @@ struct DatetimeDatetimeCacheItem : public PythonImportCacheItem {
1920
PythonImportCacheItem combine;
2021
};
2122

22-
struct DatetimeDateCacheItem : public PythonImportCacheItem {
23+
struct DatetimeDateCacheItem : public PythonImportCacheItem
24+
{
2325

2426
public:
2527
DatetimeDateCacheItem(PythonImportCacheItem * parent)
@@ -33,7 +35,8 @@ struct DatetimeDateCacheItem : public PythonImportCacheItem {
3335
PythonImportCacheItem min;
3436
};
3537

36-
struct DatetimeCacheItem : public PythonImportCacheItem {
38+
struct DatetimeCacheItem : public PythonImportCacheItem
39+
{
3740

3841
public:
3942
static constexpr const char *Name = "datetime";

programs/local/DecimalCacheItem.h

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
#pragma once
2+
3+
#include "PythonImportCacheItem.h"
4+
5+
namespace CHDB {
6+
7+
struct DecimalCacheItem : public PythonImportCacheItem
8+
{
9+
public:
10+
static constexpr const char * Name = "decimal";
11+
12+
DecimalCacheItem() : PythonImportCacheItem("decimal"), Decimal("Decimal", this)
13+
{
14+
}
15+
16+
~DecimalCacheItem() override = default;
17+
18+
PythonImportCacheItem Decimal;
19+
};
20+
21+
} // namespace CHDB

programs/local/FormatHelper.cpp

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
#include "FormatHelper.h"
2+
3+
namespace CHDB {
4+
5+
static bool is_json_supported = true;
6+
7+
void SetCurrentFormat(const char * format)
8+
{
9+
if (format)
10+
{
11+
String lowerFormat = format;
12+
std::transform(lowerFormat.begin(), lowerFormat.end(), lowerFormat.begin(), ::tolower);
13+
14+
is_json_supported = !(lowerFormat == "arrow" || lowerFormat == "parquet" || lowerFormat == "arrowstream"
15+
|| lowerFormat == "protobuf" || lowerFormat == "protobuflist" || lowerFormat == "protobufsingle");
16+
17+
return;
18+
}
19+
20+
is_json_supported = true;
21+
}
22+
23+
bool isJSONSupported()
24+
{
25+
return is_json_supported;
26+
}
27+
28+
} // namespace CHDB

programs/local/FormatHelper.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
#pragma once
2+
3+
#include <base/types.h>
4+
5+
namespace CHDB {
6+
7+
void SetCurrentFormat(const char * format);
8+
9+
bool isJSONSupported();
10+
11+
} // namespace CHDB

programs/local/ListScan.cpp

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
#include "ListScan.h"
2+
#include "DataTypes/IDataType.h"
3+
#include "IO/WriteHelpers.h"
4+
#include "PythonConversion.h"
5+
6+
#include <Columns/ColumnObject.h>
7+
#include <DataTypes/Serializations/SerializationJSON.h>
8+
9+
namespace DB
10+
{
11+
12+
namespace ErrorCodes
13+
{
14+
extern const int BAD_ARGUMENTS;
15+
extern const int PY_EXCEPTION_OCCURED;
16+
}
17+
18+
}
19+
20+
using namespace DB;
21+
22+
namespace CHDB {
23+
24+
ColumnPtr ListScan::scanObject(
25+
const ColumnWrapper & col_wrap,
26+
const size_t cursor,
27+
const size_t count,
28+
const FormatSettings & format_settings)
29+
{
30+
innerCheck(col_wrap);
31+
32+
auto & data_type = col_wrap.dest_type;
33+
auto column = data_type->createColumn();
34+
auto serialization = data_type->getDefaultSerialization();
35+
36+
innerScanObject(cursor, count, format_settings, serialization, col_wrap.data, column);
37+
38+
return column;
39+
}
40+
41+
void ListScan::scanObject(
42+
const size_t cursor,
43+
const size_t count,
44+
const FormatSettings & format_settings,
45+
const py::handle & obj,
46+
MutableColumnPtr & column)
47+
{
48+
auto data_type = std::make_shared<DataTypeObject>(DataTypeObject::SchemaFormat::JSON);
49+
SerializationPtr serialization = data_type->getDefaultSerialization();
50+
51+
innerScanObject(cursor, count, format_settings, serialization, obj, column);
52+
}
53+
54+
void ListScan::innerScanObject(
55+
const size_t cursor,
56+
const size_t count,
57+
const FormatSettings & format_settings,
58+
SerializationPtr & serialization,
59+
const py::handle & obj,
60+
MutableColumnPtr & column)
61+
{
62+
py::gil_scoped_acquire acquire;
63+
64+
auto list = obj.cast<py::list>();
65+
66+
for (size_t i = cursor; i < cursor + count; ++i)
67+
{
68+
auto item = list.attr("__getitem__")(i);
69+
if (!tryInsertJsonResult(item, format_settings, column, serialization))
70+
column->insertDefault();
71+
}
72+
}
73+
74+
void ListScan::innerCheck(const ColumnWrapper & col_wrap)
75+
{
76+
if (col_wrap.data.is_none())
77+
throw Exception(ErrorCodes::PY_EXCEPTION_OCCURED, "Column data is None");
78+
79+
if (!col_wrap.buf)
80+
throw Exception(ErrorCodes::PY_EXCEPTION_OCCURED, "Column buffer is null");
81+
}
82+
83+
} // namespace CHDB

programs/local/ListScan.h

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
#pragma once
2+
3+
#include "PybindWrapper.h"
4+
#include "PythonUtils.h"
5+
6+
namespace CHDB {
7+
8+
class ListScan {
9+
public:
10+
static DB::ColumnPtr scanObject(
11+
const DB::ColumnWrapper & col_wrap,
12+
const size_t cursor,
13+
const size_t count,
14+
const DB::FormatSettings & format_settings);
15+
16+
static void scanObject(
17+
const size_t cursor,
18+
const size_t count,
19+
const DB::FormatSettings & format_settings,
20+
const py::handle & obj,
21+
DB::MutableColumnPtr & column);
22+
23+
private:
24+
static void innerCheck(const DB::ColumnWrapper & col_wrap);
25+
26+
static void innerScanObject(
27+
const size_t cursor,
28+
const size_t count,
29+
const DB::FormatSettings & format_settings,
30+
DB::SerializationPtr & serialization,
31+
const py::handle & obj,
32+
DB::MutableColumnPtr & column);
33+
};
34+
35+
} // namespace CHDB

programs/local/LocalChdb.cpp

Lines changed: 6 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -2,68 +2,16 @@
22
#include "LocalServer.h"
33
#include "chdb.h"
44
#include "PythonImporter.h"
5+
#include "PythonTableCache.h"
56
#include "TableFunctionPython.h"
67

78
#include <mutex>
8-
#include "Common/logger_useful.h"
9-
#include <Common/re2.h>
10-
#include "pybind11/gil.h"
11-
#include "pybind11/pytypes.h"
9+
#include <Common/logger_useful.h>
1210

1311
namespace py = pybind11;
1412

1513
extern bool inside_main = true;
1614

17-
// Global storage for Python Table Engine queriable object
18-
extern py::handle global_query_obj;
19-
20-
// Find the queriable object in the Python environment
21-
// return nullptr if no Python obj is referenced in query string
22-
// return py::none if the obj referenced not found
23-
// return the Python object if found
24-
// The object name is extracted from the query string, must referenced by
25-
// Python(var_name) or Python('var_name') or python("var_name") or python('var_name')
26-
// such as:
27-
// - `SELECT * FROM Python('PyReader')`
28-
// - `SELECT * FROM Python(PyReader_instance)`
29-
// - `SELECT * FROM Python(some_var_with_type_pandas_DataFrame_or_pyarrow_Table)`
30-
// The object can be any thing that Python Table supported, like PyReader, pandas DataFrame, or PyArrow Table
31-
// The object should be in the global or local scope
32-
py::handle findQueryableObjFromQuery(const std::string & query_str)
33-
{
34-
// Extract the object name from the query string
35-
std::string var_name;
36-
37-
// RE2 pattern to match Python()/python() patterns with single/double quotes or no quotes
38-
static const RE2 pattern(R"([Pp]ython\s*\(\s*(?:['"]([^'"]+)['"]|([a-zA-Z_][a-zA-Z0-9_]*))\s*\))");
39-
40-
re2::StringPiece input(query_str);
41-
std::string quoted_match, unquoted_match;
42-
43-
// Try to match and extract the groups
44-
if (RE2::PartialMatch(query_str, pattern, &quoted_match, &unquoted_match))
45-
{
46-
// If quoted string was matched
47-
if (!quoted_match.empty())
48-
{
49-
var_name = quoted_match;
50-
}
51-
// If unquoted identifier was matched
52-
else if (!unquoted_match.empty())
53-
{
54-
var_name = unquoted_match;
55-
}
56-
}
57-
58-
if (var_name.empty())
59-
{
60-
return nullptr;
61-
}
62-
63-
// Find the object in the Python environment
64-
return DB::findQueryableObj(var_name);
65-
}
66-
6715
local_result_v2 * queryToBuffer(
6816
const std::string & queryStr,
6917
const std::string & output_format = "CSV",
@@ -303,7 +251,7 @@ void connection_wrapper::commit()
303251

304252
query_result * connection_wrapper::query(const std::string & query_str, const std::string & format)
305253
{
306-
global_query_obj = findQueryableObjFromQuery(query_str);
254+
CHDB::PythonTableCache::findQueryableObjFromQuery(query_str);
307255

308256
py::gil_scoped_release release;
309257
auto * result = query_conn(*conn, query_str.c_str(), format.c_str());
@@ -322,7 +270,7 @@ query_result * connection_wrapper::query(const std::string & query_str, const st
322270

323271
streaming_query_result * connection_wrapper::send_query(const std::string & query_str, const std::string & format)
324272
{
325-
global_query_obj = findQueryableObjFromQuery(query_str);
273+
CHDB::PythonTableCache::findQueryableObjFromQuery(query_str);
326274

327275
py::gil_scoped_release release;
328276
auto * result = query_conn_streaming(*conn, query_str.c_str(), format.c_str());
@@ -372,7 +320,7 @@ void connection_wrapper::streaming_cancel_query(streaming_query_result * streami
372320
void cursor_wrapper::execute(const std::string & query_str)
373321
{
374322
release_result();
375-
global_query_obj = findQueryableObjFromQuery(query_str);
323+
CHDB::PythonTableCache::findQueryableObjFromQuery(query_str);
376324

377325
// Use JSONCompactEachRowWithNamesAndTypes format for better type support
378326
py::gil_scoped_release release;
@@ -545,6 +493,7 @@ PYBIND11_MODULE(_chdb, m)
545493
auto destroy_import_cache = []()
546494
{
547495
DB::LocalServer::cleanupConnection();
496+
CHDB::PythonTableCache::clear();
548497
CHDB::PythonImporter::destroy();
549498
};
550499
m.add_object("_destroy_import_cache", py::capsule(destroy_import_cache));

programs/local/LocalServer.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
#include "chdb-internal.h"
44

55
#if USE_PYTHON
6+
#include "FormatHelper.h"
67
#include "TableFunctionPython.h"
8+
#include "PythonTableCache.h"
79
#include <TableFunctions/TableFunctionFactory.h>
810
#include <Storages/StorageFactory.h>
911
#endif
@@ -1425,6 +1427,8 @@ static CHDB::ResultData createQueryResult(DB::LocalServer * server, const CHDB::
14251427
/// Acquiring GIL during process termination leads to immediate thread termination.
14261428
local_connection->resetQueryContext();
14271429
}
1430+
1431+
CHDB::PythonTableCache::clear();
14281432
#endif
14291433
}
14301434

@@ -1704,13 +1708,19 @@ static CHDB::ResultData executeQueryRequest(
17041708
streaming_req->query = query;
17051709
streaming_req->format = format;
17061710
queue->current_query = std::move(streaming_req);
1711+
#if USE_PYTHON
1712+
CHDB::SetCurrentFormat(format);
1713+
#endif
17071714
}
17081715
else if (query_type == CHDB::QueryType::TYPE_MATERIALIZED)
17091716
{
17101717
auto materialized_req = std::make_unique<CHDB::MaterializedQueryRequest>();
17111718
materialized_req->query = query;
17121719
materialized_req->format = format;
17131720
queue->current_query = std::move(materialized_req);
1721+
#if USE_PYTHON
1722+
CHDB::SetCurrentFormat(format);
1723+
#endif
17141724
}
17151725
else
17161726
{

programs/local/PandasAnalyzer.cpp

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,9 @@ DataTypePtr PandasAnalyzer::getItemType(py::object obj, bool & can_convert)
6060

6161
switch (object_type) {
6262
case PythonObjectType::Dict:
63+
return std::make_shared<DataTypeObject>(DataTypeObject::SchemaFormat::JSON);
6364
case PythonObjectType::Tuple:
6465
case PythonObjectType::List:
65-
{
66-
// PyDictionary dict = PyDictionary(py::reinterpret_borrow<py::object>(obj));
67-
68-
return std::make_shared<DataTypeObject>(DataTypeObject::SchemaFormat::JSON);
69-
}
7066
case PythonObjectType::None:
7167
case PythonObjectType::Bool:
7268
case PythonObjectType::Integer:

0 commit comments

Comments
 (0)