Skip to content
Draft
12 changes: 12 additions & 0 deletions .github/actions/build_bindings_python/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,18 @@ runs:
echo "JEMALLOC_SYS_WITH_LG_PAGE=14" >> $GITHUB_ENV
echo "JEMALLOC_SYS_WITH_MALLOC_CONF=oversize_threshold:0,dirty_decay_ms:5000,muzzy_decay_ms:5000" >> $GITHUB_ENV

- name: Setup build dependencies for Linux
if: contains(inputs.target, '-linux') && inputs.version == ''
shell: bash
run: |
bash ./scripts/setup/dev_setup.sh -yb
if [[ "${{ inputs.target }}" == aarch64-* ]]; then
echo "JEMALLOC_SYS_WITH_LG_PAGE=16" >> $GITHUB_ENV
else
echo "JEMALLOC_SYS_WITH_LG_PAGE=14" >> $GITHUB_ENV
fi
echo "JEMALLOC_SYS_WITH_MALLOC_CONF=oversize_threshold:0,dirty_decay_ms:5000,muzzy_decay_ms:5000" >> $GITHUB_ENV

- name: Setup uv
uses: astral-sh/setup-uv@v5
with:
Expand Down
33 changes: 32 additions & 1 deletion .github/workflows/dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
runs-on: ubuntu-latest
outputs:
any_src_changed: ${{ steps.src.outputs.any_changed }}
bendpy_changed: ${{ steps.bendpy.outputs.any_changed }}
steps:
- uses: actions/checkout@v4
with:
Expand All @@ -35,6 +36,12 @@
scripts/setup/**
scripts/distribution/**
.devcontainer/**
- name: Check Bendpy File Changes
uses: tj-actions/changed-files@v46
id: bendpy
with:
files: |
src/bendpy/**
- name: Output Source File Changes
run: |
if [[ "${{ steps.src.outputs.any_changed }}" == "true" ]]; then
Expand All @@ -57,24 +64,48 @@
runner_arch: ARM64
license_type: trial

test_bendpy:
needs: changes
if: needs.changes.outputs.bendpy_changed == 'true'
runs-on:
- self-hosted
- ARM64
- Linux
- 4c
- aws
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- uses: ./.github/actions/build_bindings_python
timeout-minutes: 30
with:
target: aarch64-unknown-linux-gnu

ready:

Check warning

Code scanning / CodeQL

Workflow does not contain permissions Medium

Actions job or workflow does not limit the permissions of the GITHUB_TOKEN. Consider setting an explicit permissions block, using the following as a minimal starting point: {contents: read}
if: always()
runs-on: ubuntu-latest
needs:
- changes
- linux
- test_bendpy
steps:
- name: Check Ready to Merge
uses: actions/github-script@v7
env:
SRC_CHANGED: ${{ needs.changes.outputs.any_src_changed }}
LINUX_BUILD_RESULT: ${{ needs.linux.result }}
BENDPY_RESULT: ${{ needs.test_bendpy.result }}
with:
script: |
if (process.env.SRC_CHANGED == 'false') {
if (process.env.SRC_CHANGED == 'false' && process.env.BENDPY_RESULT != 'failure') {
core.info('No source file changes detected, skipping');
return;
}
if (process.env.BENDPY_RESULT == 'failure') {
core.setFailed('Bendpy tests failed, not ready to merge');
return;
}
if (process.env.LINUX_BUILD_RESULT == 'success') {
core.info('Linux build succeeded, ready to merge');
return;
Expand Down
120 changes: 93 additions & 27 deletions src/bendpy/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
use std::sync::Arc;

use databend_common_exception::Result;
use databend_common_expression::BlockEntry;
use databend_common_expression::Column;
use databend_common_meta_app::principal::BUILTIN_ROLE_ACCOUNT_ADMIN;
use databend_common_version::BUILD_INFO;
use databend_query::sessions::BuildInfoRef;
Expand All @@ -28,6 +30,33 @@ use crate::dataframe::default_box_size;
use crate::utils::RUNTIME;
use crate::utils::wait_for_future;

fn resolve_file_path(path: &str) -> String {
if path.contains("://") {
return path.to_owned();
}
if path.starts_with('/') {
return format!("fs://{}", path);
}
format!(
"fs://{}/{}",
std::env::current_dir().unwrap().to_str().unwrap(),
path
)
}

fn extract_string_column(
entry: &BlockEntry,
) -> Option<&databend_common_expression::types::StringColumn> {
match entry {
BlockEntry::Column(Column::String(col)) => Some(col),
BlockEntry::Column(Column::Nullable(n)) => match &n.column {
Column::String(col) => Some(col),
_ => None,
},
_ => None,
}
}

#[pyclass(name = "SessionContext", module = "databend", subclass)]
#[derive(Clone)]
pub(crate) struct PySessionContext {
Expand Down Expand Up @@ -171,41 +200,78 @@ impl PySessionContext {
connection: Option<&str>,
py: Python,
) -> PyResult<()> {
let sql = if let Some(connection_name) = connection {
let pattern_clause = pattern
.map(|p| format!(", pattern => '{}'", p))
.unwrap_or_default();
format!(
"create view {} as select * from '{}' (file_format => '{}'{}, connection => '{}')",
name, path, file_format, pattern_clause, connection_name
)
} else {
let mut path = path.to_owned();
if path.starts_with('/') {
path = format!("fs://{}", path);
}
let file_path = match connection {
Some(_) => path.to_owned(),
None => resolve_file_path(path),
};
let connection_clause = connection
.map(|c| format!(", connection => '{}'", c))
.unwrap_or_default();
let pattern_clause = pattern
.map(|p| format!(", pattern => '{}'", p))
.unwrap_or_default();

if !path.contains("://") {
path = format!(
"fs://{}/{}",
std::env::current_dir().unwrap().to_str().unwrap(),
path.as_str()
);
let select_clause = match file_format {
"csv" => {
self.build_column_select(&file_path, file_format, pattern, connection, py)?
}

let pattern_clause = pattern
.map(|p| format!(", pattern => '{}'", p))
.unwrap_or_default();
format!(
"create view {} as select * from '{}' (file_format => '{}'{})",
name, path, file_format, pattern_clause
)
_ => "*".to_string(),
};

let sql = format!(
"create view {} as select {} from '{}' (file_format => '{}'{}{})",
name, select_clause, file_path, file_format, pattern_clause, connection_clause
);
let _ = self.sql(&sql, py)?.collect(py)?;
Ok(())
}

/// Infer column names via `infer_schema` and build `$1 AS col1, $2 AS col2, ...`.
fn build_column_select(
&mut self,
file_path: &str,
file_format: &str,
pattern: Option<&str>,
connection: Option<&str>,
py: Python,
) -> PyResult<String> {
let conn_clause = connection
.map(|c| format!(", connection_name => '{}'", c))
.unwrap_or_default();
let pattern_clause = pattern
.map(|p| format!(", pattern => '{}'", p))
.unwrap_or_default();
let sql = format!(
"SELECT column_name FROM infer_schema(location => '{}', file_format => '{}'{}{})",
file_path,
file_format.to_uppercase(),
pattern_clause,
conn_clause
);

let blocks = self.sql(&sql, py)?.collect(py)?;
let col_names: Vec<String> = blocks
.blocks
.iter()
.filter(|b| b.num_rows() > 0)
.filter_map(|b| extract_string_column(b.get_by_offset(0)))
.flat_map(|col| col.iter().map(|s| s.to_string()))
.collect();

if col_names.is_empty() {
return Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
"Could not infer schema: no columns found",
));
}

Ok(col_names
.iter()
.enumerate()
.map(|(i, name)| format!("${} AS `{}`", i + 1, name))
.collect::<Vec<_>>()
.join(", "))
}

#[pyo3(signature = (name, access_key_id, secret_access_key, endpoint_url = None, region = None))]
fn create_s3_connection(
&mut self,
Expand Down
18 changes: 18 additions & 0 deletions src/bendpy/tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
from databend import SessionContext
import pandas as pd
import polars
import tempfile
import os


class TestBasic:
Expand Down Expand Up @@ -60,3 +62,19 @@ def test_create_insert_select(self):
"select sum(a) x, max(b) y, max(d) z from aa where c"
).to_polars()
assert df.to_pandas().values.tolist() == [[90.0, "9", 9.0]]

def test_register_csv(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".csv", delete=False) as f:
f.write("name,age,city\n")
f.write("Alice,30,NYC\n")
f.write("Bob,25,LA\n")
f.write("Charlie,35,Chicago\n")
csv_path = f.name

try:
self.ctx.register_csv("people", csv_path)
df = self.ctx.sql("SELECT name, age, city FROM people ORDER BY age").to_pandas()
assert df.values.tolist() == [["Bob", "25", "LA"], ["Alice", "30", "NYC"], ["Charlie", "35", "Chicago"]]
finally:
os.unlink(csv_path)

64 changes: 53 additions & 11 deletions src/bendpy/tests/test_connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,23 @@ def register_parquet(self, name, path, pattern=None, connection=None):
self.sql(sql)

def register_csv(self, name, path, pattern=None, connection=None):
if connection:
pattern_clause = f", pattern => '{pattern}'" if pattern else ""
sql = f"create view {name} as select * from '{path}' (file_format => 'csv'{pattern_clause}, connection => '{connection}')"
else:
pattern_clause = f", pattern => '{pattern}'" if pattern else ""
sql = f"create view {name} as select * from '{path}' (file_format => 'csv'{pattern_clause})"
self.sql(sql)
self._register_delimited(name, path, "csv", pattern, connection)

def _register_delimited(self, name, path, fmt, pattern=None, connection=None):
"""CSV/TSV: infer schema first, then create view with column positions."""
file_path = path if connection else (f"fs://{path}" if path.startswith("/") else path)
conn_infer = f", connection_name => '{connection}'" if connection else ""
pattern_infer = f", pattern => '{pattern}'" if pattern else ""
self.sql(
f"SELECT column_name FROM infer_schema(location => '{file_path}', file_format => '{fmt.upper()}'{pattern_infer}{conn_infer})"
)
# Simulated: infer_schema returns 3 columns
select = "$1 AS `col1`, $2 AS `col2`, $3 AS `col3`"
pattern_clause = f", pattern => '{pattern}'" if pattern else ""
conn_clause = f", connection => '{connection}'" if connection else ""
self.sql(
f"create view {name} as select {select} from '{file_path}' (file_format => '{fmt}'{pattern_clause}{conn_clause})"
)

def create_azblob_connection(self, name, endpoint_url, account_name, account_key):
sql = f"CREATE OR REPLACE CONNECTION {name} STORAGE_TYPE = 'AZBLOB' endpoint_url = '{endpoint_url}' account_name = '{account_name}' account_key = '{account_key}'"
Expand Down Expand Up @@ -253,8 +263,15 @@ def test_register_csv_with_connection(self):

self.ctx.register_csv("users", "s3://bucket/users.csv", connection="my_s3")

expected_sql = "create view users as select * from 's3://bucket/users.csv' (file_format => 'csv', connection => 'my_s3')"
mock_sql.assert_called_once_with(expected_sql)
assert mock_sql.call_count == 2
# First call: infer_schema
mock_sql.assert_any_call(
"SELECT column_name FROM infer_schema(location => 's3://bucket/users.csv', file_format => 'CSV', connection_name => 'my_s3')"
)
# Second call: create view with column positions
mock_sql.assert_any_call(
"create view users as select $1 AS `col1`, $2 AS `col2`, $3 AS `col3` from 's3://bucket/users.csv' (file_format => 'csv', connection => 'my_s3')"
)

def test_register_parquet_legacy_mode(self):
with unittest.mock.patch.object(self.ctx, "sql") as mock_sql:
Expand All @@ -271,8 +288,33 @@ def test_register_csv_with_pattern_no_connection(self):

self.ctx.register_csv("logs", "/data/logs/", pattern="*.csv")

expected_sql = "create view logs as select * from '/data/logs/' (file_format => 'csv', pattern => '*.csv')"
mock_sql.assert_called_once_with(expected_sql)
assert mock_sql.call_count == 2
# First call: infer_schema with pattern passed through
mock_sql.assert_any_call(
"SELECT column_name FROM infer_schema(location => 'fs:///data/logs/', file_format => 'CSV', pattern => '*.csv')"
)
# Second call: create view with column positions
mock_sql.assert_any_call(
"create view logs as select $1 AS `col1`, $2 AS `col2`, $3 AS `col3` from 'fs:///data/logs/' (file_format => 'csv', pattern => '*.csv')"
)

def test_register_csv_with_pattern_and_connection(self):
with unittest.mock.patch.object(self.ctx, "sql") as mock_sql:
mock_sql.return_value.collect.return_value = None

self.ctx.register_csv(
"logs", "s3://bucket/logs/", pattern="*.csv", connection="my_s3"
)

assert mock_sql.call_count == 2
# First call: infer_schema with both pattern and connection
mock_sql.assert_any_call(
"SELECT column_name FROM infer_schema(location => 's3://bucket/logs/', file_format => 'CSV', pattern => '*.csv', connection_name => 'my_s3')"
)
# Second call: create view with column positions
mock_sql.assert_any_call(
"create view logs as select $1 AS `col1`, $2 AS `col2`, $3 AS `col3` from 's3://bucket/logs/' (file_format => 'csv', pattern => '*.csv', connection => 'my_s3')"
)


class TestStages:
Expand Down
Loading