diff --git a/.github/workflows/bindings.nodejs.yml b/.github/workflows/bindings.nodejs.yml index 9acc76c14..e01d85653 100644 --- a/.github/workflows/bindings.nodejs.yml +++ b/.github/workflows/bindings.nodejs.yml @@ -86,6 +86,7 @@ jobs: shell: bash env: NAPI_TARGET: ${{ matrix.target }} + MACOSX_DEPLOYMENT_TARGET: 10.12 run: | if [[ "${{ github.event_name }}" == "pull_request" ]]; then pnpm napi build --platform --target=$NAPI_TARGET --js generated.js diff --git a/.github/workflows/bindings.python.yml b/.github/workflows/bindings.python.yml index c09959c1a..716c8e154 100644 --- a/.github/workflows/bindings.python.yml +++ b/.github/workflows/bindings.python.yml @@ -41,7 +41,7 @@ jobs: run: ruff format --check . build: - name: build-${{ matrix.os }}-${{ matrix.arch }} + name: build-${{ matrix.os }}-${{ matrix.arch }}-${{ matrix.wheel }} needs: check runs-on: ${{ matrix.runner }} strategy: @@ -51,10 +51,22 @@ jobs: arch: x86_64 target: x86_64-unknown-linux-gnu runner: ubuntu-latest + wheel: py39-abi + - os: linux + arch: x86_64 + target: x86_64-unknown-linux-gnu + runner: ubuntu-latest + wheel: cp38 + - os: linux + arch: aarch64 + target: aarch64-unknown-linux-gnu + runner: ubuntu-latest + wheel: py39-abi - os: linux arch: aarch64 target: aarch64-unknown-linux-gnu runner: ubuntu-latest + wheel: cp38 # FIXME: Windows build is broken # - os: windows # arch: x86_64 @@ -64,10 +76,22 @@ jobs: arch: x86_64 target: x86_64-apple-darwin runner: macos-latest + wheel: py39-abi + - os: macos + arch: x86_64 + target: x86_64-apple-darwin + runner: macos-latest + wheel: cp38 - os: macos arch: aarch64 target: aarch64-apple-darwin runner: macos-latest + wheel: py39-abi + - os: macos + arch: aarch64 + target: aarch64-apple-darwin + runner: macos-latest + wheel: cp38 steps: - uses: actions/checkout@v4 - name: Get opts @@ -75,10 +99,16 @@ jobs: shell: bash run: | if [[ "${{ github.event_name }}" == "pull_request" ]]; then - echo "BUILD_ARGS=--strip --out dist" >> $GITHUB_OUTPUT + base_args="--strip --out dist" + else + base_args="--release --strip --out dist" + fi + if [[ "${{ matrix.wheel }}" == "py39-abi" ]]; then + wheel_args="--features py39-abi" else - echo "BUILD_ARGS=--release --strip --out dist" >> $GITHUB_OUTPUT + wheel_args="--no-default-features --features cp38 --interpreter python3.8" fi + echo "BUILD_ARGS=${base_args} ${wheel_args}" >> $GITHUB_OUTPUT if [[ "${{ matrix.target }}" == "aarch64-unknown-linux-gnu" ]]; then echo "MANYLINUX=2_28" >> $GITHUB_OUTPUT else @@ -89,6 +119,11 @@ jobs: with: cache-key: bindings-python-${{ matrix.os }}-${{ matrix.arch }} target: ${{ matrix.target }} + - name: Setup Python 3.8 (cp38 wheels) + if: matrix.wheel == 'cp38' && matrix.os != 'linux' + uses: actions/setup-python@v5 + with: + python-version: "3.8" - name: Build wheels uses: PyO3/maturin-action@v1 with: @@ -100,16 +135,17 @@ jobs: - name: Upload artifact uses: actions/upload-artifact@v4 with: - name: bindings-python-${{ matrix.os }}-${{ matrix.arch }} + name: bindings-python-${{ matrix.os }}-${{ matrix.arch }}-${{ matrix.wheel }} path: bindings/python/dist/*.whl integration: - name: integration-${{ matrix.pyver }} + name: integration-${{ matrix.pyver }}-${{matrix.body}} needs: build runs-on: ubuntu-latest strategy: matrix: pyver: ["3.8", "3.9", "3.10", "3.11", "3.12", "3.13"] + body: ["arrow", "json"] steps: - uses: actions/checkout@v4 - name: Setup Python @@ -122,22 +158,33 @@ jobs: - name: Download artifact uses: actions/download-artifact@v4 with: - name: bindings-python-linux-x86_64 + pattern: bindings-python-linux-x86_64-* path: bindings/python/artifacts + merge-multiple: true - name: Install dependencies working-directory: bindings/python run: | pip install behave - pip install artifacts/*.whl + if [[ "${{ matrix.pyver }}" == "3.8" ]]; then + pip install artifacts/*cp38*.whl + else + pip install artifacts/*abi3*.whl + fi - name: Test AsyncIO working-directory: bindings/python run: behave tests/asyncio + env: + BODY_FORMAT: ${{ matrix.body }} - name: Test Blocking working-directory: bindings/python run: behave tests/blocking + env: + BODY_FORMAT: ${{ matrix.body }} - name: Test Cursor working-directory: bindings/python run: behave tests/cursor + env: + BODY_FORMAT: ${{ matrix.body }} compat: name: Compat Test @@ -154,7 +201,7 @@ jobs: - name: Download artifact uses: actions/download-artifact@v4 with: - name: bindings-python-linux-x86_64 + name: bindings-python-linux-x86_64-py39-abi path: bindings/python/dist - name: Run Nox run: nox -f tests/nox/noxfile.py diff --git a/.github/workflows/cron.integration.yml b/.github/workflows/cron.integration.yml index a27001d74..5e77e1dc1 100644 --- a/.github/workflows/cron.integration.yml +++ b/.github/workflows/cron.integration.yml @@ -46,8 +46,11 @@ jobs: run: make -C tests test-bendsql DATABEND_QUERY_VERSION=${{ needs.version.outputs.version }} build-python: - name: build-python + name: build-python-${{ matrix.wheel }} runs-on: ubuntu-latest + strategy: + matrix: + wheel: [py39-abi, cp38] steps: - uses: actions/checkout@v4 - name: Setup Rust toolchain @@ -55,6 +58,17 @@ jobs: with: cache-key: bindings-python-linux-x64 target: x86_64-unknown-linux-gnu + - name: Get opts + id: opts + shell: bash + run: | + base_args="--release --strip --out dist" + if [[ "${{ matrix.wheel }}" == "py39-abi" ]]; then + wheel_args="--features py39-abi" + else + wheel_args="--no-default-features --features cp38 --interpreter python3.8" + fi + echo "BUILD_ARGS=${base_args} ${wheel_args}" >> $GITHUB_OUTPUT - name: Build wheels uses: PyO3/maturin-action@v1 with: @@ -62,11 +76,11 @@ jobs: target: x86_64-unknown-linux-gnu manylinux: auto sccache: "true" - args: --release --strip --out dist + args: ${{ steps.opts.outputs.BUILD_ARGS }} - name: Upload artifact uses: actions/upload-artifact@v4 with: - name: bindings-python + name: bindings-python-${{ matrix.wheel }} path: bindings/python/dist/*.whl build-nodejs: @@ -120,13 +134,18 @@ jobs: - name: Download artifact uses: actions/download-artifact@v4 with: - name: bindings-python + pattern: bindings-python-* path: bindings/python/artifacts + merge-multiple: true - name: Install dependencies working-directory: bindings/python run: | pip install behave - pip install artifacts/*.whl + if [[ "${{ matrix.pyver }}" == "3.8" ]]; then + pip install artifacts/*cp38*.whl + else + pip install artifacts/*abi3*.whl + fi - name: Test AsyncIO working-directory: bindings/python run: behave tests/asyncio diff --git a/Cargo.toml b/Cargo.toml index 2656db06e..636468039 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,11 +30,13 @@ databend-driver-macros = { path = "macros", version = "0.30.3" } jsonb = { version = "0.5.3" } tokio-stream = "0.1" chrono = { version = "0.4.40", default-features = false, features = ["clock"] } +chrono-tz = { version = "0.10.4" } arrow = { version = "55.0" } arrow-array = { version = "55.0" } arrow-buffer = { version = "55.0" } arrow-schema = { version = "55.0" } arrow-flight = { version = "55.0", features = ["flight-sql-experimental"] } +arrow-ipc = { version = "55.0", features = ["lz4", "zstd"]} tonic = { version = "0.12", default-features = false, features = [ "transport", "codegen", diff --git a/bindings/nodejs/Cargo.toml b/bindings/nodejs/Cargo.toml index c4bdb0897..4457824a5 100644 --- a/bindings/nodejs/Cargo.toml +++ b/bindings/nodejs/Cargo.toml @@ -14,6 +14,7 @@ doc = false [dependencies] chrono = { workspace = true } +chrono-tz = { workspace = true } databend-driver = { workspace = true, features = ["rustls", "flight-sql"] } env_logger = "0.11.8" tokio-stream = { workspace = true } diff --git a/bindings/nodejs/src/lib.rs b/bindings/nodejs/src/lib.rs index 6132ed1a0..49bda7195 100644 --- a/bindings/nodejs/src/lib.rs +++ b/bindings/nodejs/src/lib.rs @@ -15,7 +15,8 @@ #[macro_use] extern crate napi_derive; -use chrono::{NaiveDate, NaiveDateTime, NaiveTime}; +use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime}; +use chrono_tz::Tz; use databend_driver::LoadMethod; use napi::{bindgen_prelude::*, Env}; use once_cell::sync::Lazy; @@ -314,10 +315,10 @@ impl ToNapiValue for Value<'_> { databend_driver::Value::Number(n) => { NumberValue::to_napi_value(env, NumberValue(n.clone())) } - databend_driver::Value::Timestamp(_) => { + databend_driver::Value::Timestamp(_, _tz) => { let inner = val.inner.clone(); - let v = NaiveDateTime::try_from(inner).map_err(format_napi_error)?; - NaiveDateTime::to_napi_value(env, v) + let v = DateTime::::try_from(inner).map_err(format_napi_error)?; + DateTime::to_napi_value(env, v) } databend_driver::Value::Date(_) => { let inner = val.inner.clone(); diff --git a/bindings/python/Cargo.toml b/bindings/python/Cargo.toml index 331ef7711..97a316f51 100644 --- a/bindings/python/Cargo.toml +++ b/bindings/python/Cargo.toml @@ -14,6 +14,7 @@ doc = false [dependencies] chrono = { workspace = true } +chrono-tz = { workspace = true } databend-client = { workspace = true } databend-driver = { workspace = true, features = ["rustls", "flight-sql"] } databend-driver-core = { workspace = true } @@ -23,6 +24,11 @@ ctor = "0.2" env_logger = "0.11.8" http = "1.0" once_cell = "1.21" -pyo3 = { version = "0.24.2", features = ["abi3-py37", "chrono"] } +pyo3 = { version = "0.24.2", features = ["extension-module", "chrono", "chrono-tz"] } pyo3-async-runtimes = { version = "0.24", features = ["tokio-runtime"] } tokio = "1.44" + +[features] +default = ["py39-abi"] +py39-abi = ["pyo3/abi3-py39"] +cp38 = [] diff --git a/bindings/python/src/types.rs b/bindings/python/src/types.rs index 303eb1a24..47b2174b0 100644 --- a/bindings/python/src/types.rs +++ b/bindings/python/src/types.rs @@ -14,7 +14,12 @@ use std::sync::Arc; -use chrono::{Duration, NaiveDate, NaiveDateTime}; +#[cfg(feature = "cp38")] +use chrono::offset::Offset; +#[cfg(feature = "cp38")] +use chrono::FixedOffset; +use chrono::{DateTime, Duration, NaiveDate}; +use chrono_tz::Tz; use once_cell::sync::Lazy; use pyo3::exceptions::{PyAttributeError, PyException, PyStopAsyncIteration, PyStopIteration}; use pyo3::sync::GILOnceCell; @@ -73,11 +78,20 @@ impl<'py> IntoPyObject<'py> for Value { let v = NumberValue(n); v.into_bound_py_any(py)? } - databend_driver::Value::Timestamp(_) => { - let t = NaiveDateTime::try_from(self.0).map_err(|e| { + databend_driver::Value::Timestamp(_, _) => { + let t = DateTime::::try_from(self.0).map_err(|e| { PyException::new_err(format!("failed to convert timestamp: {e}")) })?; - t.into_bound_py_any(py)? + #[cfg(feature = "cp38")] + { + // chrono_tz -> PyDateTime isn't implemented for Python < 3.9 (no zoneinfo). + let t: DateTime = t.with_timezone(&t.offset().fix()); + t.into_bound_py_any(py)? + } + #[cfg(not(feature = "cp38"))] + { + t.into_bound_py_any(py)? + } } databend_driver::Value::Date(_) => { let d = NaiveDate::try_from(self.0) diff --git a/bindings/python/tests/asyncio/steps/binding.py b/bindings/python/tests/asyncio/steps/binding.py index e3f9a1ad9..10a4a94ff 100644 --- a/bindings/python/tests/asyncio/steps/binding.py +++ b/bindings/python/tests/asyncio/steps/binding.py @@ -15,7 +15,7 @@ import os import gc import time -from datetime import datetime, date, timedelta +from datetime import datetime, date, timedelta, timezone from decimal import Decimal from behave import given, when, then @@ -37,6 +37,11 @@ else: DRIVER_VERSION = (100, 0, 0) +if DRIVER_VERSION > (0, 30, 3): + default_tzinfo = timezone.utc +else: + default_tzinfo = None + @given("A new Databend Driver Client") async def _(context): @@ -44,6 +49,8 @@ async def _(context): "TEST_DATABEND_DSN", "databend://root:root@localhost:8000/?sslmode=disable", ) + if os.getenv("BODY_FORMAT") == "arrow": + dsn += "&body_format=arrow" client = databend_driver.AsyncDatabendClient(dsn) context.conn = await client.get_conn() context.client = client @@ -130,9 +137,9 @@ async def _(context): row = await context.conn.query_row( "select (10, '20', to_datetime('2024-04-16 12:34:56.789'))" ) - assert row.values() == ((10, "20", datetime(2024, 4, 16, 12, 34, 56, 789000)),), ( - f"Tuple: {row.values()}" - ) + assert row.values() == ( + (10, "20", datetime(2024, 4, 16, 12, 34, 56, 789000, tzinfo=default_tzinfo)), + ), f"Tuple: {row.values()}" @then("Select numbers should iterate all rows") @@ -156,9 +163,33 @@ async def _(context): rows = await context.conn.query_iter("SELECT * FROM test") ret = [row.values() for row in rows] expected = [ - (-1, 1, 1.0, "'", None, date(2011, 3, 6), datetime(2011, 3, 6, 6, 20)), - (-2, 2, 2.0, '"', "", date(2012, 5, 31), datetime(2012, 5, 31, 11, 20)), - (-3, 3, 3.0, "\\", "NULL", date(2016, 4, 4), datetime(2016, 4, 4, 11, 30)), + ( + -1, + 1, + 1.0, + "'", + None, + date(2011, 3, 6), + datetime(2011, 3, 6, 6, 20, tzinfo=default_tzinfo), + ), + ( + -2, + 2, + 2.0, + '"', + "", + date(2012, 5, 31), + datetime(2012, 5, 31, 11, 20, tzinfo=default_tzinfo), + ), + ( + -3, + 3, + 3.0, + "\\", + "NULL", + date(2016, 4, 4), + datetime(2016, 4, 4, 11, 30, tzinfo=default_tzinfo), + ), ] assert ret == expected, f"ret: {ret}" @@ -176,9 +207,33 @@ async def _(context): rows = await context.conn.query_iter("SELECT * FROM test") ret = [row.values() for row in rows] expected = [ - (-1, 1, 1.0, "'", None, date(2011, 3, 6), datetime(2011, 3, 6, 6, 20)), - (-2, 2, 2.0, '"', None, date(2012, 5, 31), datetime(2012, 5, 31, 11, 20)), - (-3, 3, 3.0, "\\", "NULL", date(2016, 4, 4), datetime(2016, 4, 4, 11, 30)), + ( + -1, + 1, + 1.0, + "'", + None, + date(2011, 3, 6), + datetime(2011, 3, 6, 6, 20, tzinfo=default_tzinfo), + ), + ( + -2, + 2, + 2.0, + '"', + None, + date(2012, 5, 31), + datetime(2012, 5, 31, 11, 20, tzinfo=default_tzinfo), + ), + ( + -3, + 3, + 3.0, + "\\", + "NULL", + date(2016, 4, 4), + datetime(2016, 4, 4, 11, 30, tzinfo=default_tzinfo), + ), ] assert ret == expected, f"ret: {ret}" @@ -213,9 +268,33 @@ async def test_load_file(context, load_method): rows = await context.conn.query_iter("SELECT * FROM test1") ret = [row.values() for row in rows] expected = [ - (-1, 1, 1.0, "'", None, date(2011, 3, 6), datetime(2011, 3, 6, 6, 20)), - (-2, 2, 2.0, '"', None, date(2012, 5, 31), datetime(2012, 5, 31, 11, 20)), - (-3, 3, 3.0, "\\", "NULL", date(2016, 4, 4), datetime(2016, 4, 4, 11, 30)), + ( + -1, + 1, + 1.0, + "'", + None, + date(2011, 3, 6), + datetime(2011, 3, 6, 6, 20, tzinfo=default_tzinfo), + ), + ( + -2, + 2, + 2.0, + '"', + None, + date(2012, 5, 31), + datetime(2012, 5, 31, 11, 20, tzinfo=default_tzinfo), + ), + ( + -3, + 3, + 3.0, + "\\", + "NULL", + date(2016, 4, 4), + datetime(2016, 4, 4, 11, 30, tzinfo=default_tzinfo), + ), ] assert ret == expected, f"{load_method} ret: {ret}" diff --git a/bindings/python/tests/blocking/steps/binding.py b/bindings/python/tests/blocking/steps/binding.py index ee57fe6b1..e0f30d8a2 100644 --- a/bindings/python/tests/blocking/steps/binding.py +++ b/bindings/python/tests/blocking/steps/binding.py @@ -14,7 +14,7 @@ import os import gc -from datetime import datetime, date, timedelta +from datetime import datetime, date, timedelta, timezone from decimal import Decimal import time from time import sleep @@ -38,6 +38,11 @@ else: DRIVER_VERSION = (100, 0, 0) +if DRIVER_VERSION > (0, 30, 3): + default_tzinfo = timezone.utc +else: + default_tzinfo = None + @given("A new Databend Driver Client") def _(context): @@ -45,6 +50,8 @@ def _(context): "TEST_DATABEND_DSN", "databend://root:root@localhost:8000/?sslmode=disable", ) + if os.getenv("BODY_FORMAT") == "arrow": + dsn += "&body_format=arrow" client = databend_driver.BlockingDatabendClient(dsn) context.conn = client.get_conn() context.client = client @@ -126,9 +133,27 @@ def _(context): row = context.conn.query_row( "select (10, '20', to_datetime('2024-04-16 12:34:56.789'))" ) - assert row.values() == ((10, "20", datetime(2024, 4, 16, 12, 34, 56, 789000)),), ( - f"Tuple: {row.values()}" - ) + assert row.values() == ( + (10, "20", datetime(2024, 4, 16, 12, 34, 56, 789000, tzinfo=default_tzinfo)), + ), f"Tuple: {row.values()}" + + import sys + + if ( + DRIVER_VERSION > (0, 30, 3) + and DB_VERSION > (1, 2, 836) + and sys.version_info.minor >= 8 + ): + if sys.version_info.minor >= 9: + from zoneinfo import ZoneInfo + + tz_expected = ZoneInfo("Asia/Shanghai") + else: + tz_expected = timezone(timedelta(hours=8)) + context.conn.exec("set timezone='Asia/Shanghai'") + row = context.conn.query_row("select to_datetime('2024-04-16 12:34:56.789')") + exp = datetime(2024, 4, 16, 12, 34, 56, 789000, tzinfo=tz_expected) + assert row.values()[0] == exp, f"Tuple: {row.values()}" @then("Select numbers should iterate all rows") @@ -152,9 +177,33 @@ def _(context): rows = context.conn.query_iter("SELECT * FROM test") ret = [row.values() for row in rows] expected = [ - (-1, 1, 1.0, "'", None, date(2011, 3, 6), datetime(2011, 3, 6, 6, 20)), - (-2, 2, 2.0, '"', "", date(2012, 5, 31), datetime(2012, 5, 31, 11, 20)), - (-3, 3, 3.0, "\\", "NULL", date(2016, 4, 4), datetime(2016, 4, 4, 11, 30)), + ( + -1, + 1, + 1.0, + "'", + None, + date(2011, 3, 6), + datetime(2011, 3, 6, 6, 20, tzinfo=default_tzinfo), + ), + ( + -2, + 2, + 2.0, + '"', + "", + date(2012, 5, 31), + datetime(2012, 5, 31, 11, 20, tzinfo=default_tzinfo), + ), + ( + -3, + 3, + 3.0, + "\\", + "NULL", + date(2016, 4, 4), + datetime(2016, 4, 4, 11, 30, tzinfo=default_tzinfo), + ), ] assert ret == expected, f"ret: {ret}" @@ -172,9 +221,33 @@ def _(context): rows = context.conn.query_iter("SELECT * FROM test") ret = [row.values() for row in rows] expected = [ - (-1, 1, 1.0, "'", None, date(2011, 3, 6), datetime(2011, 3, 6, 6, 20)), - (-2, 2, 2.0, '"', None, date(2012, 5, 31), datetime(2012, 5, 31, 11, 20)), - (-3, 3, 3.0, "\\", "NULL", date(2016, 4, 4), datetime(2016, 4, 4, 11, 30)), + ( + -1, + 1, + 1.0, + "'", + None, + date(2011, 3, 6), + datetime(2011, 3, 6, 6, 20, tzinfo=default_tzinfo), + ), + ( + -2, + 2, + 2.0, + '"', + None, + date(2012, 5, 31), + datetime(2012, 5, 31, 11, 20, tzinfo=default_tzinfo), + ), + ( + -3, + 3, + 3.0, + "\\", + "NULL", + date(2016, 4, 4), + datetime(2016, 4, 4, 11, 30, tzinfo=default_tzinfo), + ), ] assert ret == expected, f"ret: {ret}" @@ -207,9 +280,33 @@ def test_load_file(context, load_method): rows = context.conn.query_iter("SELECT * FROM test1") ret = [row.values() for row in rows] expected = [ - (-1, 1, 1.0, "'", None, date(2011, 3, 6), datetime(2011, 3, 6, 6, 20)), - (-2, 2, 2.0, '"', None, date(2012, 5, 31), datetime(2012, 5, 31, 11, 20)), - (-3, 3, 3.0, "\\", "NULL", date(2016, 4, 4), datetime(2016, 4, 4, 11, 30)), + ( + -1, + 1, + 1.0, + "'", + None, + date(2011, 3, 6), + datetime(2011, 3, 6, 6, 20, tzinfo=default_tzinfo), + ), + ( + -2, + 2, + 2.0, + '"', + None, + date(2012, 5, 31), + datetime(2012, 5, 31, 11, 20, tzinfo=default_tzinfo), + ), + ( + -3, + 3, + 3.0, + "\\", + "NULL", + date(2016, 4, 4), + datetime(2016, 4, 4, 11, 30, tzinfo=default_tzinfo), + ), ] assert ret == expected, f"{load_method} ret: {ret}" diff --git a/bindings/python/tests/cursor/steps/binding.py b/bindings/python/tests/cursor/steps/binding.py index 97d13aed5..6c6c66380 100644 --- a/bindings/python/tests/cursor/steps/binding.py +++ b/bindings/python/tests/cursor/steps/binding.py @@ -14,7 +14,7 @@ import os import gc -from datetime import datetime, date, timedelta +from datetime import datetime, date, timedelta, timezone from decimal import Decimal import time @@ -38,6 +38,11 @@ else: DRIVER_VERSION = (100, 0, 0) +if DRIVER_VERSION > (0, 30, 3): + default_tzinfo = timezone.utc +else: + default_tzinfo = None + @given("A new Databend Driver Client") def _(context): @@ -45,6 +50,8 @@ def _(context): "TEST_DATABEND_DSN", "databend://root:root@localhost:8000/?sslmode=disable", ) + if os.getenv("BODY_FORMAT") == "arrow": + dsn += "&body_format=arrow" client = databend_driver.BlockingDatabendClient(dsn) context.client = client context.cursor = client.cursor() @@ -145,7 +152,7 @@ def _(context): ( 10, "20", - datetime(2024, 4, 16, 12, 34, 56, 789000), + datetime(2024, 4, 16, 12, 34, 56, 789000, tzinfo=default_tzinfo), ), ) assert row.values() == expected, f"Tuple: {row.values()}" @@ -188,7 +195,7 @@ def _(context): "'", None, date(2011, 3, 6), - datetime(2011, 3, 6, 6, 20), + datetime(2011, 3, 6, 6, 20, tzinfo=default_tzinfo), '{"a":1}', ), ( @@ -198,7 +205,7 @@ def _(context): '"', "", date(2012, 5, 31), - datetime(2012, 5, 31, 11, 20), + datetime(2012, 5, 31, 11, 20, tzinfo=default_tzinfo), '{"a":2}', ), ( @@ -208,7 +215,7 @@ def _(context): "\\", "NULL", date(2016, 4, 4), - datetime(2016, 4, 4, 11, 30), + datetime(2016, 4, 4, 11, 30, tzinfo=default_tzinfo), '{"a":3}', ), ] @@ -271,7 +278,7 @@ def _(context): "'", None, date(2011, 3, 6), - datetime(2011, 3, 6, 6, 20), + datetime(2011, 3, 6, 6, 20, tzinfo=default_tzinfo), '{"a":1}', ), ( @@ -281,7 +288,7 @@ def _(context): '"', None, date(2012, 5, 31), - datetime(2012, 5, 31, 11, 20), + datetime(2012, 5, 31, 11, 20, tzinfo=default_tzinfo), '{"a":2}', ), ( @@ -291,7 +298,7 @@ def _(context): "\\", "NULL", date(2016, 4, 4), - datetime(2016, 4, 4, 11, 30), + datetime(2016, 4, 4, 11, 30, tzinfo=default_tzinfo), '{"a":3}', ), ] diff --git a/cli/src/display.rs b/cli/src/display.rs index a5dba0812..0c90c8bc3 100644 --- a/cli/src/display.rs +++ b/cli/src/display.rs @@ -839,7 +839,7 @@ fn value_display_width(value: &Value, quote_string: bool) -> usize { Value::EmptyArray => EMPTY_WIDTH, Value::EmptyMap => EMPTY_WIDTH, Value::Date(_) => DATE_WIDTH, - Value::Timestamp(_) => TIMESTAMP_WIDTH, + Value::Timestamp(_, _) => TIMESTAMP_WIDTH, Value::String(_) => { let value_str = value.to_string(); if quote_string { diff --git a/core/Cargo.toml b/core/Cargo.toml index 207c8f6a8..3dafe6d6a 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -21,7 +21,11 @@ native-tls = ["reqwest/native-tls"] [dependencies] tokio-stream = { workspace = true } +arrow-array = { workspace = true } +arrow-ipc = { workspace = true } +bytes = "1.10.1" base64 = "0.22.1" +chrono-tz = { workspace = true } cookie = "0.18.1" log = { version = "0.4", features = ["kv"] } once_cell = "1.21" diff --git a/core/src/capability.rs b/core/src/capability.rs index 4b6756d78..140da08f8 100644 --- a/core/src/capability.rs +++ b/core/src/capability.rs @@ -17,12 +17,14 @@ use semver::Version; #[derive(Default, Debug)] pub struct Capability { pub streaming_load: bool, + pub arrow_data: bool, } impl Capability { pub fn from_server_version(ver: &Version) -> Capability { Capability { streaming_load: ver > &Version::new(1, 2, 781), + arrow_data: ver > &Version::new(1, 2, 835), } } } diff --git a/core/src/client.rs b/core/src/client.rs index 995ef328a..76f992441 100644 --- a/core/src/client.rs +++ b/core/src/client.rs @@ -32,14 +32,16 @@ use crate::{ QueryStats, }; use crate::{Page, Pages}; +use arrow_array::RecordBatch; +use arrow_ipc::reader::StreamReader; use base64::engine::general_purpose::URL_SAFE; use base64::Engine; -use log::{error, info, warn}; +use log::{debug, error, info, warn}; use once_cell::sync::Lazy; use parking_lot::Mutex; use percent_encoding::percent_decode_str; use reqwest::cookie::CookieStore; -use reqwest::header::{HeaderMap, HeaderValue}; +use reqwest::header::{HeaderMap, HeaderValue, ACCEPT, CONTENT_TYPE}; use reqwest::multipart::{Form, Part}; use reqwest::{Body, Client as HttpClient, Request, RequestBuilder, Response, StatusCode}; use semver::Version; @@ -65,6 +67,8 @@ const TXN_STATE_ACTIVE: &str = "Active"; const HEADER_SQL: &str = "X-DATABEND-SQL"; const HEADER_QUERY_CONTEXT: &str = "X-DATABEND-QUERY-CONTEXT"; const HEADER_SESSION_ID: &str = "X-DATABEND-SESSION-ID"; +const CONTENT_TYPE_ARROW: &str = "application/vnd.apache.arrow.stream"; +const CONTENT_TYPE_ARROW_OR_JSON: &str = "application/vnd.apache.arrow.stream"; static VERSION: Lazy = Lazy::new(|| { let version = option_env!("CARGO_PKG_VERSION").unwrap_or("unknown"); @@ -102,6 +106,7 @@ pub struct APIClient { route_hint: RouteHintGenerator, disable_login: bool, + body_format: String, disable_session_token: bool, session_token_info: Option>>, @@ -256,6 +261,17 @@ impl APIClient { } } } + "body_format" => { + let v = v.to_string().to_lowercase(); + match v.as_str() { + "json" | "arrow" => client.body_format = v.to_string(), + _ => { + return Err(Error::BadArgument(format!( + "Invalid value for body_format: {v}" + ))) + } + } + } _ => { session_settings.insert(k.to_string(), v.to_string()); } @@ -425,9 +441,8 @@ impl APIClient { pub async fn start_query(self: &Arc, sql: &str, need_progress: bool) -> Result { info!("start query: {sql}"); - let resp = self.start_query_inner(sql, None).await?; - let pages = Pages::new(self.clone(), resp, need_progress); - Ok(pages) + let (resp, batches) = self.start_query_inner(sql, None).await?; + Pages::new(self.clone(), resp, batches, need_progress) } pub fn finalize_query(self: &Arc, query_id: &str) { @@ -459,7 +474,7 @@ impl APIClient { &self, sql: &str, stage_attachment_config: Option>, - ) -> Result { + ) -> Result<(QueryResponse, Vec)> { if !self.in_active_transaction() { self.route_hint.next(); } @@ -476,6 +491,11 @@ impl APIClient { // headers let query_id = self.gen_query_id(); let mut headers = self.make_headers(Some(&query_id))?; + if self.capability.arrow_data && self.body_format == "arrow" { + debug!("accept arrow data"); + headers.insert(ACCEPT, HeaderValue::from_static(CONTENT_TYPE_ARROW_OR_JSON)); + } + if need_sticky { if let Some(node_id) = self.last_node_id() { headers.insert(HEADER_STICKY_NODE, node_id.parse()?); @@ -485,22 +505,77 @@ impl APIClient { builder = self.wrap_auth_or_session_token(builder)?; let request = builder.headers(headers.clone()).build()?; let response = self.query_request_helper(request, true, true).await?; - if let Some(route_hint) = response.headers().get(HEADER_ROUTE_HINT) { - self.route_hint.set(route_hint.to_str().unwrap_or_default()); - } - let body = response.bytes().await?; - let result: QueryResponse = json_from_slice(&body)?; - self.handle_session(&result.session).await; - if let Some(err) = result.error { - return Err(Error::QueryFailed(err)); + self.handle_page(response, true).await + } + + fn is_arrow_data(response: &Response) -> bool { + if let Some(typ) = response.headers().get(CONTENT_TYPE) { + if let Ok(t) = typ.to_str() { + return t == CONTENT_TYPE_ARROW; + } } + false + } - self.set_last_query_id(Some(query_id)); - self.handle_warnings(&result); - if let Some(node_id) = &result.node_id { - self.set_last_node_id(node_id.clone()); + async fn handle_page( + &self, + response: Response, + is_first: bool, + ) -> Result<(QueryResponse, Vec)> { + let status = response.status(); + if status != 200 { + return Err(Error::response_error(status, &response.bytes().await?)); } - Ok(result) + let is_arrow_data = Self::is_arrow_data(&response); + if is_first { + if let Some(route_hint) = response.headers().get(HEADER_ROUTE_HINT) { + self.route_hint.set(route_hint.to_str().unwrap_or_default()); + } + } + let mut body = response.bytes().await?; + let mut batches = vec![]; + if is_arrow_data { + if is_first { + debug!("received arrow data"); + } + let cursor = std::io::Cursor::new(body.as_ref()); + let reader = StreamReader::try_new(cursor, None) + .map_err(|e| Error::Decode(format!("failed to decode arrow stream: {e}")))?; + let schema = reader.schema(); + let json_body = if let Some(json_resp) = schema.metadata.get("response_header") { + bytes::Bytes::copy_from_slice(json_resp.as_bytes()) + } else { + return Err(Error::Decode( + "missing response_header metadata in arrow payload".to_string(), + )); + }; + for batch in reader { + let batch = batch + .map_err(|e| Error::Decode(format!("failed to decode arrow batch: {e}")))?; + batches.push(batch); + } + body = json_body + }; + let resp: QueryResponse = json_from_slice(&body).map_err(|e| { + if let Error::Logic(status, ec) = &e { + if *status == 404 { + return Error::QueryNotFound(ec.message.clone()); + } + } + e + })?; + self.handle_session(&resp.session).await; + if let Some(err) = &resp.error { + return Err(Error::QueryFailed(err.clone())); + } + if is_first { + self.handle_warnings(&resp); + self.set_last_query_id(Some(resp.id.clone())); + if let Some(node_id) = &resp.node_id { + self.set_last_node_id(node_id.clone()); + } + } + Ok((resp, batches)) } pub async fn query_page( @@ -508,10 +583,13 @@ impl APIClient { query_id: &str, next_uri: &str, node_id: &Option, - ) -> Result { + ) -> Result<(QueryResponse, Vec)> { info!("query page: {next_uri}"); let endpoint = self.endpoint.join(next_uri)?; - let headers = self.make_headers(Some(query_id))?; + let mut headers = self.make_headers(Some(query_id))?; + if self.capability.arrow_data && self.body_format == "arrow" { + headers.insert(ACCEPT, HeaderValue::from_static(CONTENT_TYPE_ARROW_OR_JSON)); + } let mut builder = self.cli.get(endpoint.clone()); builder = self .wrap_auth_or_session_token(builder)? @@ -523,24 +601,7 @@ impl APIClient { let request = builder.build()?; let response = self.query_request_helper(request, false, true).await?; - let status = response.status(); - if status != 200 { - return Err(Error::response_error(status, &response.bytes().await?)); - } - let body = response.bytes().await?; - let resp: QueryResponse = json_from_slice(&body).map_err(|e| { - if let Error::Logic(status, ec) = &e { - if *status == 404 { - return Error::QueryNotFound(ec.message.clone()); - } - } - e - })?; - self.handle_session(&resp.session).await; - match resp.error { - Some(err) => Err(Error::QueryFailed(err)), - None => Ok(resp), - } + self.handle_page(response, false).await } pub async fn kill_query(&self, query_id: &str) -> Result<()> { @@ -643,8 +704,8 @@ impl APIClient { file_format_options: Some(file_format_options), copy_options: Some(copy_options), }); - let resp = self.start_query_inner(sql, stage_attachment).await?; - let mut pages = Pages::new(self.clone(), resp, false); + let (resp, batches) = self.start_query_inner(sql, stage_attachment).await?; + let mut pages = Pages::new(self.clone(), resp, batches, false)?; let mut all = Page::default(); while let Some(page) = pages.next().await { all.update(page?); @@ -1206,6 +1267,7 @@ impl Default for APIClient { last_node_id: Default::default(), disable_session_token: true, disable_login: false, + body_format: "json".to_string(), session_token_info: None, closed: AtomicBool::new(false), last_query_id: Default::default(), diff --git a/core/src/pages.rs b/core/src/pages.rs index 7932dcaee..df621711f 100644 --- a/core/src/pages.rs +++ b/core/src/pages.rs @@ -15,12 +15,16 @@ use crate::client::QueryState; use crate::error::Result; use crate::response::QueryResponse; -use crate::{APIClient, QueryStats, SchemaField}; +use crate::{APIClient, Error, QueryStats, SchemaField}; +use arrow_array::RecordBatch; +use chrono_tz::Tz; use log::debug; use parking_lot::Mutex; +use std::collections::BTreeMap; use std::future::Future; use std::mem; use std::pin::Pin; +use std::str::FromStr; use std::sync::Arc; use std::task::{Context, Poll}; use std::time::Instant; @@ -28,22 +32,26 @@ use tokio_stream::{Stream, StreamExt}; #[derive(Default)] pub struct Page { - pub schema: Vec, + pub raw_schema: Vec, pub data: Vec>>, + pub batches: Vec, pub stats: QueryStats, + pub settings: Option>, } impl Page { - pub fn from_response(response: QueryResponse) -> Self { + pub fn from_response(response: QueryResponse, batches: Vec) -> Self { Self { - schema: response.schema, + raw_schema: response.schema, data: response.data, stats: response.stats, + batches, + settings: response.settings, } } pub fn update(&mut self, p: Page) { - self.schema = p.schema; + self.raw_schema = p.raw_schema; if self.data.is_empty() { self.data = p.data } else { @@ -53,7 +61,7 @@ impl Page { } } -type PageFut = Pin> + Send>>; +type PageFut = Pin)>> + Send>>; pub struct Pages { query_id: String, @@ -70,7 +78,12 @@ pub struct Pages { } impl Pages { - pub fn new(client: Arc, first_response: QueryResponse, need_progress: bool) -> Self { + pub fn new( + client: Arc, + first_response: QueryResponse, + record_batches: Vec, + need_progress: bool, + ) -> Result { let mut s = Self { query_id: first_response.id.clone(), need_progress, @@ -82,9 +95,9 @@ impl Pages { result_timeout_secs: first_response.result_timeout_secs, last_access_time: Arc::new(Mutex::new(Instant::now())), }; - let first_page = Page::from_response(first_response); + let first_page = Page::from_response(first_response, record_batches); s.first_page = Some(first_page); - s + Ok(s) } pub fn add_back(&mut self, page: Page) { @@ -94,14 +107,22 @@ impl Pages { pub async fn wait_for_schema( mut self, need_progress: bool, - ) -> Result<(Self, Vec)> { + ) -> Result<(Self, Vec, Tz)> { while let Some(page) = self.next().await { let page = page?; - if !page.schema.is_empty() + if !page.raw_schema.is_empty() || !page.data.is_empty() + || !page.batches.is_empty() || (need_progress && page.stats.progresses.has_progress()) { - let schema = page.schema.clone(); + let schema = page.raw_schema.clone(); + let utc = "UTC".to_owned(); + let timezone = page + .settings + .as_ref() + .and_then(|m| m.get("timezone")) + .unwrap_or(&utc); + let timezone = Tz::from_str(timezone).map_err(|e| Error::Decode(e.to_string()))?; self.add_back(page); let last_access_time = self.last_access_time.clone(); if let Some(node_id) = &self.node_id { @@ -113,10 +134,10 @@ impl Pages { self.client .register_query_for_heartbeat(&self.query_id, state) } - return Ok((self, schema)); + return Ok((self, schema, timezone)); } } - Ok((self, vec![])) + Ok((self, vec![], Tz::UTC)) } } @@ -129,7 +150,7 @@ impl Stream for Pages { }; match self.next_page_future { Some(ref mut next_page) => match Pin::new(next_page).poll(cx) { - Poll::Ready(Ok(resp)) => { + Poll::Ready(Ok((resp, batches))) => { self.next_uri = resp.next_uri.clone(); self.next_page_future = None; if resp.data.is_empty() && !self.need_progress { @@ -137,7 +158,7 @@ impl Stream for Pages { } else { let now = Instant::now(); *self.last_access_time.lock() = now; - Poll::Ready(Some(Ok(Page::from_response(resp)))) + Poll::Ready(Some(Ok(Page::from_response(resp, batches)))) } } Poll::Ready(Err(e)) => { diff --git a/core/src/response.rs b/core/src/response.rs index 2f6d3bb9b..ea3dd4ee1 100644 --- a/core/src/response.rs +++ b/core/src/response.rs @@ -15,6 +15,7 @@ use crate::error_code::ErrorCode; use crate::session::SessionState; use serde::{Deserialize, Serialize}; +use std::collections::BTreeMap; #[derive(Deserialize, Debug, Default)] pub struct QueryStats { @@ -77,6 +78,7 @@ pub struct QueryResponse { pub schema: Vec, pub data: Vec>>, pub state: String, + pub settings: Option>, pub error: Option, // make it optional for backward compatibility pub warnings: Option>, diff --git a/driver/Cargo.toml b/driver/Cargo.toml index 6f29c156b..46ded17f2 100644 --- a/driver/Cargo.toml +++ b/driver/Cargo.toml @@ -30,6 +30,7 @@ arrow = { workspace = true } arrow-flight = { workspace = true, optional = true } arrow-schema = { workspace = true, optional = true } chrono = { workspace = true, features = ["serde"] } +chrono-tz = { workspace = true } databend-client = { workspace = true } databend-driver-core = { workspace = true } databend-driver-macros = { workspace = true } @@ -49,3 +50,4 @@ tokio = { version = "1.34", features = ["macros"] } url = { version = "2.5", default-features = false } [dev-dependencies] +env_logger = "0.11.8" diff --git a/driver/src/flight_sql.rs b/driver/src/flight_sql.rs index e17cd3073..e30738613 100644 --- a/driver/src/flight_sql.rs +++ b/driver/src/flight_sql.rs @@ -25,6 +25,7 @@ use arrow_flight::sql::client::FlightSqlServiceClient; use arrow_flight::utils::flight_data_to_arrow_batch; use arrow_schema::SchemaRef as ArrowSchemaRef; use async_trait::async_trait; +use chrono_tz::Tz; use percent_encoding::percent_decode_str; use tokio::sync::Mutex; use tokio_stream::{Stream, StreamExt}; @@ -372,7 +373,7 @@ impl Stream for FlightSQLRows { self.schema.clone(), &dicitionaries_by_id, )?; - let rows = Rows::try_from(batch)?; + let rows = Rows::try_from((batch, Tz::UTC))?; self.rows.extend(rows); self.poll_next(cx) } diff --git a/driver/src/rest_api.rs b/driver/src/rest_api.rs index f78feb7ff..8596a090e 100644 --- a/driver/src/rest_api.rs +++ b/driver/src/rest_api.rs @@ -13,6 +13,7 @@ // limitations under the License. use async_trait::async_trait; +use chrono_tz::Tz; use log::info; use std::collections::{BTreeMap, VecDeque}; use std::marker::PhantomData; @@ -31,7 +32,9 @@ use databend_client::APIClient; use databend_client::Pages; use databend_driver_core::error::{Error, Result}; use databend_driver_core::raw_rows::{RawRow, RawRowIterator, RawRowWithStats}; -use databend_driver_core::rows::{Row, RowIterator, RowStatsIterator, RowWithStats, ServerStats}; +use databend_driver_core::rows::{ + Row, RowIterator, RowStatsIterator, RowWithStats, Rows, ServerStats, +}; use databend_driver_core::schema::{Schema, SchemaRef}; const LOAD_PLACEHOLDER: &str = "@_databend_load"; @@ -293,7 +296,11 @@ pub struct RestAPIRows { pages: Pages, schema: SchemaRef, + timezone: Tz, + data: VecDeque>>, + rows: VecDeque, + stats: Option, _phantom: std::marker::PhantomData, @@ -301,12 +308,14 @@ pub struct RestAPIRows { impl RestAPIRows { async fn from_pages(pages: Pages) -> Result<(Schema, Self)> { - let (pages, schema) = pages.wait_for_schema(true).await?; + let (pages, schema, timezone) = pages.wait_for_schema(true).await?; let schema: Schema = schema.try_into()?; let rows = Self { pages, schema: Arc::new(schema.clone()), + timezone, data: Default::default(), + rows: Default::default(), stats: None, _phantom: PhantomData, }; @@ -325,7 +334,12 @@ impl Stream for RestAPIRows { // Therefore, we could guarantee the `/final` called before the last row. if self.data.len() > 1 { if let Some(row) = self.data.pop_front() { - let row = T::try_from_row(row, self.schema.clone())?; + let row = T::try_from_raw_row(row, self.schema.clone(), self.timezone)?; + return Poll::Ready(Some(Ok(row))); + } + } else if self.rows.len() > 1 { + if let Some(row) = self.rows.pop_front() { + let row = T::from_row(row); return Poll::Ready(Some(Ok(row))); } } @@ -333,20 +347,31 @@ impl Stream for RestAPIRows { match Pin::new(&mut self.pages).poll_next(cx) { Poll::Ready(Some(Ok(page))) => { if self.schema.fields().is_empty() { - self.schema = Arc::new(page.schema.try_into()?); + self.schema = Arc::new(page.raw_schema.try_into()?); + } + if page.batches.is_empty() { + let mut new_data = page.data.into(); + self.data.append(&mut new_data); + } else { + for batch in page.batches.into_iter() { + let rows = Rows::try_from((batch, self.timezone))?; + self.rows.extend(rows); + } } - let mut new_data = page.data.into(); - self.data.append(&mut new_data); Poll::Ready(Some(Ok(T::from_stats(page.stats.into())))) } Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e.into()))), - Poll::Ready(None) => match self.data.pop_front() { - Some(row) => { - let row = T::try_from_row(row, self.schema.clone())?; + Poll::Ready(None) => { + if let Some(row) = self.rows.pop_front() { + let row = T::from_row(row); Poll::Ready(Some(Ok(row))) + } else if let Some(row) = self.data.pop_front() { + let row = T::try_from_raw_row(row, self.schema.clone(), self.timezone)?; + Poll::Ready(Some(Ok(row))) + } else { + Poll::Ready(None) } - None => Poll::Ready(None), - }, + } Poll::Pending => Poll::Pending, } } @@ -354,7 +379,8 @@ impl Stream for RestAPIRows { trait FromRowStats: Send + Sync + Clone { fn from_stats(stats: ServerStats) -> Self; - fn try_from_row(row: Vec>, schema: SchemaRef) -> Result; + fn try_from_raw_row(row: Vec>, schema: SchemaRef, tz: Tz) -> Result; + fn from_row(row: Row) -> Self; } impl FromRowStats for RowWithStats { @@ -362,8 +388,11 @@ impl FromRowStats for RowWithStats { RowWithStats::Stats(stats) } - fn try_from_row(row: Vec>, schema: SchemaRef) -> Result { - Ok(RowWithStats::Row(Row::try_from((schema, row))?)) + fn try_from_raw_row(row: Vec>, schema: SchemaRef, tz: Tz) -> Result { + Ok(RowWithStats::Row(Row::try_from((schema, row, tz))?)) + } + fn from_row(row: Row) -> Self { + RowWithStats::Row(row) } } @@ -372,8 +401,12 @@ impl FromRowStats for RawRowWithStats { RawRowWithStats::Stats(stats) } - fn try_from_row(row: Vec>, schema: SchemaRef) -> Result { - let rows = Row::try_from((schema, row.clone()))?; + fn try_from_raw_row(row: Vec>, schema: SchemaRef, tz: Tz) -> Result { + let rows = Row::try_from((schema, row.clone(), tz))?; Ok(RawRowWithStats::Row(RawRow::new(rows, row))) } + + fn from_row(row: Row) -> Self { + RawRowWithStats::Row(RawRow::from(row)) + } } diff --git a/driver/tests/driver/common/mod.rs b/driver/tests/driver/common/mod.rs index 3e236be46..e51f6ed9c 100644 --- a/driver/tests/driver/common/mod.rs +++ b/driver/tests/driver/common/mod.rs @@ -12,4 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. +use env_logger::Builder; +use log::LevelFilter; +use once_cell::sync::Lazy; + pub static DEFAULT_DSN: &str = "databend://root:@localhost:8000/default?sslmode=disable"; + +pub static INIT_LOG: Lazy<()> = Lazy::new(|| { + Builder::new() + .filter(None, LevelFilter::Debug) + .try_init() + .ok(); +}); diff --git a/driver/tests/driver/select_iter.rs b/driver/tests/driver/select_iter.rs index 71a8edd8a..441691c03 100644 --- a/driver/tests/driver/select_iter.rs +++ b/driver/tests/driver/select_iter.rs @@ -16,7 +16,7 @@ use tokio_stream::StreamExt; use databend_driver::{Client, Connection}; -use crate::common::DEFAULT_DSN; +use crate::common::{DEFAULT_DSN, INIT_LOG}; async fn prepare(name: &str) -> (Connection, String) { let dsn = option_env!("TEST_DATABEND_DSN").unwrap_or(DEFAULT_DSN); @@ -202,6 +202,7 @@ async fn select_numbers() { #[tokio::test] async fn select_multi_page() { + *INIT_LOG; let (conn, _) = prepare("select_multi_page").await; // default page size is 10000 let n = 46000; diff --git a/sql/Cargo.toml b/sql/Cargo.toml index bbf51c85e..6b3936bce 100644 --- a/sql/Cargo.toml +++ b/sql/Cargo.toml @@ -11,16 +11,17 @@ authors = { workspace = true } repository = { workspace = true } [features] -flight-sql = ["dep:arrow", "dep:arrow-array", "dep:arrow-schema", "dep:tonic", "dep:jsonb"] +flight-sql = ["dep:tonic"] [dependencies] -arrow = { workspace = true, optional = true } -arrow-array = { workspace = true, optional = true } +arrow = { workspace = true } +arrow-array = { workspace = true } arrow-buffer = { workspace = true } -arrow-schema = { workspace = true, optional = true } +arrow-schema = { workspace = true } chrono = { workspace = true } +chrono-tz = { workspace = true } databend-client = { workspace = true } -jsonb = { workspace = true, optional = true } +jsonb = { workspace = true } tokio-stream = { workspace = true } tonic = { workspace = true, optional = true } diff --git a/sql/src/error.rs b/sql/src/error.rs index 01ccb281f..d23c9cc12 100644 --- a/sql/src/error.rs +++ b/sql/src/error.rs @@ -45,7 +45,6 @@ pub enum Error { BadArgument(String), InvalidResponse(String), Api(databend_client::Error), - #[cfg(feature = "flight-sql")] Arrow(arrow_schema::ArrowError), Convert(ConvertError), } @@ -60,7 +59,6 @@ impl std::fmt::Display for Error { Error::BadArgument(msg) => write!(f, "BadArgument: {msg}"), Error::InvalidResponse(msg) => write!(f, "ResponseError: {msg}"), - #[cfg(feature = "flight-sql")] Error::Arrow(e) => { let msg = match e { arrow_schema::ArrowError::IoError(msg, _) => { @@ -152,7 +150,6 @@ impl From for Error { } } -#[cfg(feature = "flight-sql")] impl From for Error { fn from(e: arrow_schema::ArrowError) -> Self { Error::Arrow(e) diff --git a/sql/src/raw_rows.rs b/sql/src/raw_rows.rs index 273857020..a7a2c2fdb 100644 --- a/sql/src/raw_rows.rs +++ b/sql/src/raw_rows.rs @@ -12,10 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +use chrono_tz::Tz; use std::pin::Pin; use std::task::Context; use std::task::Poll; - use tokio_stream::{Stream, StreamExt}; use crate::error::Error; @@ -59,13 +59,13 @@ impl RawRow { } } -impl TryFrom<(SchemaRef, Vec>)> for RawRow { +impl TryFrom<(SchemaRef, Vec>, Tz)> for RawRow { type Error = Error; - fn try_from((schema, data): (SchemaRef, Vec>)) -> Result { + fn try_from((schema, data, tz): (SchemaRef, Vec>, Tz)) -> Result { let mut values: Vec = Vec::with_capacity(data.len()); for (field, val) in schema.fields().iter().zip(data.clone().into_iter()) { - values.push(Value::try_from((&field.data_type, val))?); + values.push(Value::try_from((&field.data_type, val, tz))?); } let row = Row::new(schema, values); @@ -73,6 +73,16 @@ impl TryFrom<(SchemaRef, Vec>)> for RawRow { } } +impl From for RawRow { + fn from(row: Row) -> Self { + let mut raw_row: Vec> = Vec::with_capacity(row.values().len()); + for val in row.values() { + raw_row.push(Some(val.to_string())); + } + RawRow::new(row, raw_row) + } +} + impl IntoIterator for RawRow { type Item = Option; type IntoIter = std::vec::IntoIter; diff --git a/sql/src/rows.rs b/sql/src/rows.rs index fd65dd938..f7b56a8fd 100644 --- a/sql/src/rows.rs +++ b/sql/src/rows.rs @@ -19,12 +19,11 @@ use std::task::Poll; use serde::Deserialize; use tokio_stream::{Stream, StreamExt}; -#[cfg(feature = "flight-sql")] -use arrow::record_batch::RecordBatch; - use crate::error::{Error, Result}; use crate::schema::SchemaRef; use crate::value::Value; +use arrow::record_batch::RecordBatch; +use chrono_tz::Tz; #[derive(Clone, Debug)] pub enum RowWithStats { @@ -135,13 +134,13 @@ impl Row { } } -impl TryFrom<(SchemaRef, Vec>)> for Row { +impl TryFrom<(SchemaRef, Vec>, Tz)> for Row { type Error = Error; - fn try_from((schema, data): (SchemaRef, Vec>)) -> Result { + fn try_from((schema, data, tz): (SchemaRef, Vec>, Tz)) -> Result { let mut values: Vec = Vec::with_capacity(data.len()); for (field, val) in schema.fields().iter().zip(data.into_iter()) { - values.push(Value::try_from((&field.data_type, val))?); + values.push(Value::try_from((&field.data_type, val, tz))?); } Ok(Self::new(schema, values)) } @@ -183,10 +182,9 @@ impl Rows { } } -#[cfg(feature = "flight-sql")] -impl TryFrom for Rows { +impl TryFrom<(RecordBatch, Tz)> for Rows { type Error = Error; - fn try_from(batch: RecordBatch) -> Result { + fn try_from((batch, ltz): (RecordBatch, Tz)) -> Result { let batch_schema = batch.schema(); let schema = SchemaRef::new(batch_schema.clone().try_into()?); let mut rows: Vec = Vec::new(); @@ -195,7 +193,7 @@ impl TryFrom for Rows { for j in 0..batch_schema.fields().len() { let v = batch.column(j); let field = batch_schema.field(j); - let value = Value::try_from((field, v, i))?; + let value = Value::try_from((field, v, i, ltz))?; values.push(value); } rows.push(Row::new(schema.clone(), values)); diff --git a/sql/src/schema.rs b/sql/src/schema.rs index fef023570..96c35e63a 100644 --- a/sql/src/schema.rs +++ b/sql/src/schema.rs @@ -18,27 +18,17 @@ use databend_client::SchemaField as APISchemaField; use crate::error::{Error, Result}; -#[cfg(feature = "flight-sql")] use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, SchemaRef as ArrowSchemaRef}; // Extension types defined by Databend -#[cfg(feature = "flight-sql")] pub(crate) const EXTENSION_KEY: &str = "Extension"; -#[cfg(feature = "flight-sql")] pub(crate) const ARROW_EXT_TYPE_EMPTY_ARRAY: &str = "EmptyArray"; -#[cfg(feature = "flight-sql")] pub(crate) const ARROW_EXT_TYPE_EMPTY_MAP: &str = "EmptyMap"; -#[cfg(feature = "flight-sql")] pub(crate) const ARROW_EXT_TYPE_VARIANT: &str = "Variant"; -#[cfg(feature = "flight-sql")] pub(crate) const ARROW_EXT_TYPE_BITMAP: &str = "Bitmap"; -#[cfg(feature = "flight-sql")] pub(crate) const ARROW_EXT_TYPE_GEOMETRY: &str = "Geometry"; -#[cfg(feature = "flight-sql")] pub(crate) const ARROW_EXT_TYPE_GEOGRAPHY: &str = "Geography"; -#[cfg(feature = "flight-sql")] pub(crate) const ARROW_EXT_TYPE_INTERVAL: &str = "Interval"; -#[cfg(feature = "flight-sql")] pub(crate) const ARROW_EXT_TYPE_VECTOR: &str = "Vector"; #[derive(Debug, Clone, PartialEq, Eq)] @@ -318,7 +308,6 @@ impl TryFrom> for Schema { } } -#[cfg(feature = "flight-sql")] impl TryFrom<&Arc> for Field { type Error = Error; @@ -427,7 +416,6 @@ impl TryFrom<&Arc> for Field { } } -#[cfg(feature = "flight-sql")] impl TryFrom for Schema { type Error = Error; diff --git a/sql/src/value.rs b/sql/src/value.rs index 93afa9bf1..2a04202b8 100644 --- a/sql/src/value.rs +++ b/sql/src/value.rs @@ -12,18 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; -use std::fmt::{Display, Formatter, Write}; -use std::hash::Hash; -use std::io::BufRead; -use std::io::Cursor; - use arrow_buffer::i256; -use chrono::{DateTime, Datelike, NaiveDate, NaiveDateTime}; +use chrono::{DateTime, Datelike, LocalResult, NaiveDate, NaiveDateTime, TimeZone}; +use chrono_tz::Tz; use geozero::wkb::FromWkb; use geozero::wkb::WkbDialect; use geozero::wkt::Ewkt; use hex; +use std::collections::HashMap; +use std::fmt::{Display, Formatter, Write}; +use std::hash::Hash; +use std::io::BufRead; +use std::io::Cursor; use crate::cursor_ext::{ collect_binary_number, collect_number, BufferReadStringExt, ReadBytesExt, ReadCheckPointExt, @@ -32,7 +32,6 @@ use crate::cursor_ext::{ use crate::error::{ConvertError, Error, Result}; use crate::schema::{DataType, DecimalDataType, DecimalSize, NumberDataType}; -#[cfg(feature = "flight-sql")] use { crate::schema::{ ARROW_EXT_TYPE_BITMAP, ARROW_EXT_TYPE_EMPTY_ARRAY, ARROW_EXT_TYPE_EMPTY_MAP, @@ -84,7 +83,7 @@ pub enum Value { String(String), Number(NumberValue), /// Microseconds from 1970-01-01 00:00:00 UTC - Timestamp(i64), + Timestamp(i64, Tz), TimestampTz(String), Date(i32), Array(Vec), @@ -121,7 +120,7 @@ impl Value { NumberValue::Decimal128(_, s) => DataType::Decimal(DecimalDataType::Decimal128(*s)), NumberValue::Decimal256(_, s) => DataType::Decimal(DecimalDataType::Decimal256(*s)), }, - Self::Timestamp(_) => DataType::Timestamp, + Self::Timestamp(_, _) => DataType::Timestamp, Self::TimestampTz(_) => DataType::TimestampTz, Self::Date(_) => DataType::Date, @@ -154,12 +153,12 @@ impl Value { } } -impl TryFrom<(&DataType, Option)> for Value { +impl TryFrom<(&DataType, Option, Tz)> for Value { type Error = Error; - fn try_from((t, v): (&DataType, Option)) -> Result { + fn try_from((t, v, tz): (&DataType, Option, Tz)) -> Result { match v { - Some(v) => Self::try_from((t, v)), + Some(v) => Self::try_from((t, v, tz)), None => match t { DataType::Null => Ok(Self::Null), DataType::Nullable(_) => Ok(Self::Null), @@ -171,10 +170,10 @@ impl TryFrom<(&DataType, Option)> for Value { } } -impl TryFrom<(&DataType, String)> for Value { +impl TryFrom<(&DataType, String, Tz)> for Value { type Error = Error; - fn try_from((t, v): (&DataType, String)) -> Result { + fn try_from((t, v, tz): (&DataType, String, Tz)) -> Result { match t { DataType::Null => Ok(Self::Null), DataType::EmptyArray => Ok(Self::EmptyArray), @@ -220,11 +219,20 @@ impl TryFrom<(&DataType, String)> for Value { let d = parse_decimal(v.as_str(), *size)?; Ok(Self::Number(d)) } - DataType::Timestamp => Ok(Self::Timestamp( - NaiveDateTime::parse_from_str(v.as_str(), "%Y-%m-%d %H:%M:%S%.6f")? - .and_utc() - .timestamp_micros(), - )), + DataType::Timestamp => { + let naive_dt = NaiveDateTime::parse_from_str(v.as_str(), "%Y-%m-%d %H:%M:%S%.6f")?; + let dt_with_tz = match tz.from_local_datetime(&naive_dt) { + LocalResult::Single(dt) => dt, + LocalResult::None => { + return Err(Error::Parsing(format!( + "time {v} not exists in timezone {tz}" + ))) + } + LocalResult::Ambiguous(dt1, _dt2) => dt1, + }; + let ts = dt_with_tz.timestamp_micros(); + Ok(Self::Timestamp(ts, tz)) + } DataType::TimestampTz => Ok(Self::TimestampTz(v)), DataType::Date => Ok(Self::Date( NaiveDate::parse_from_str(v.as_str(), "%Y-%m-%d")?.num_days_from_ce() @@ -248,7 +256,7 @@ impl TryFrom<(&DataType, String)> for Value { if v == NULL_VALUE { Ok(Self::Null) } else { - Self::try_from((inner.as_ref(), v)) + Self::try_from((inner.as_ref(), v, tz)) } } }, @@ -256,11 +264,10 @@ impl TryFrom<(&DataType, String)> for Value { } } -#[cfg(feature = "flight-sql")] -impl TryFrom<(&ArrowField, &Arc, usize)> for Value { +impl TryFrom<(&ArrowField, &Arc, usize, Tz)> for Value { type Error = Error; fn try_from( - (field, array, seq): (&ArrowField, &Arc, usize), + (field, array, seq, ltz): (&ArrowField, &Arc, usize, Tz), ) -> std::result::Result { if let Some(extend_type) = field.metadata().get(EXTENSION_KEY) { return match extend_type.as_str() { @@ -497,7 +504,7 @@ impl TryFrom<(&ArrowField, &Arc, usize)> for Value { } let ts = array.value(seq); match tz { - None => Ok(Value::Timestamp(ts)), + None => Ok(Value::Timestamp(ts, ltz)), Some(tz) => Err(ConvertError::new("timestamp", format!("{array:?}")) .with_message(format!("non-UTC timezone not supported: {tz:?}")) .into()), @@ -515,7 +522,7 @@ impl TryFrom<(&ArrowField, &Arc, usize)> for Value { let inner_array = unsafe { array.value_unchecked(seq) }; let mut values = Vec::with_capacity(inner_array.len()); for i in 0..inner_array.len() { - let value = Value::try_from((f.as_ref(), &inner_array, i))?; + let value = Value::try_from((f.as_ref(), &inner_array, i, ltz))?; values.push(value); } Ok(Value::Array(values)) @@ -527,7 +534,7 @@ impl TryFrom<(&ArrowField, &Arc, usize)> for Value { let inner_array = unsafe { array.value_unchecked(seq) }; let mut values = Vec::with_capacity(inner_array.len()); for i in 0..inner_array.len() { - let value = Value::try_from((f.as_ref(), &inner_array, i))?; + let value = Value::try_from((f.as_ref(), &inner_array, i, ltz))?; values.push(value); } Ok(Value::Array(values)) @@ -540,8 +547,10 @@ impl TryFrom<(&ArrowField, &Arc, usize)> for Value { let inner_array = unsafe { array.value_unchecked(seq) }; let mut values = Vec::with_capacity(inner_array.len()); for i in 0..inner_array.len() { - let key = Value::try_from((fs[0].as_ref(), inner_array.column(0), i))?; - let val = Value::try_from((fs[1].as_ref(), inner_array.column(1), i))?; + let key = + Value::try_from((fs[0].as_ref(), inner_array.column(0), i, ltz))?; + let val = + Value::try_from((fs[1].as_ref(), inner_array.column(1), i, ltz))?; values.push((key, val)); } Ok(Value::Map(values)) @@ -558,7 +567,7 @@ impl TryFrom<(&ArrowField, &Arc, usize)> for Value { Some(array) => { let mut values = Vec::with_capacity(array.len()); for (f, inner_array) in fs.iter().zip(array.columns().iter()) { - let value = Value::try_from((f.as_ref(), inner_array, seq))?; + let value = Value::try_from((f.as_ref(), inner_array, seq, ltz))?; values.push(value); } Ok(Value::Tuple(values)) @@ -589,10 +598,11 @@ impl TryFrom for String { })?; Ok(date.format("%Y-%m-%d").to_string()) } - Value::Timestamp(ts) => { + Value::Timestamp(ts, tz) => { let dt = DateTime::from_timestamp_micros(ts).ok_or_else(|| { ConvertError::new("timestamp", format!("invalid timestamp: {}", ts)) })?; + let dt = dt.with_timezone(&tz); Ok(dt.format(TIMESTAMP_FORMAT).to_string()) } _ => Err(ConvertError::new("string", format!("{val:?}")).into()), @@ -630,7 +640,7 @@ macro_rules! impl_try_from_number_value { Value::Number(NumberValue::Float32(i)) => Ok(i as $t), Value::Number(NumberValue::Float64(i)) => Ok(i as $t), Value::Date(i) => Ok(i as $t), - Value::Timestamp(i) => Ok(i as $t), + Value::Timestamp(i, _) => Ok(i as $t), _ => Err(ConvertError::new("number", format!("{:?}", val)).into()), } } @@ -654,12 +664,12 @@ impl TryFrom for NaiveDateTime { type Error = Error; fn try_from(val: Value) -> Result { match val { - Value::Timestamp(i) => { + Value::Timestamp(i, _tz) => { let secs = i / 1_000_000; let nanos = ((i % 1_000_000) * 1000) as u32; match DateTime::from_timestamp(secs, nanos) { Some(t) => Ok(t.naive_utc()), - None => Err(ConvertError::new("NaiveDateTime", "".to_string()).into()), + None => Err(ConvertError::new("NaiveDateTime", format!("{val}")).into()), } } _ => Err(ConvertError::new("NaiveDateTime", format!("{val}")).into()), @@ -667,6 +677,23 @@ impl TryFrom for NaiveDateTime { } } +impl TryFrom for DateTime { + type Error = Error; + fn try_from(val: Value) -> Result { + match val { + Value::Timestamp(i, tz) => { + let secs = i / 1_000_000; + let nanos = ((i % 1_000_000) * 1000) as u32; + match DateTime::from_timestamp(secs, nanos) { + Some(t) => Ok(tz.from_utc_datetime(&t.naive_utc())), + None => Err(ConvertError::new("Datetime", format!("{val}")).into()), + } + } + _ => Err(ConvertError::new("DateTime", format!("{val}")).into()), + } + } +} + impl TryFrom for NaiveDate { type Error = Error; fn try_from(val: Value) -> Result { @@ -896,7 +923,7 @@ fn encode_value(f: &mut std::fmt::Formatter<'_>, val: &Value, raw: bool) -> std: write!(f, "'{s}'") } } - Value::Timestamp(micros) => { + Value::Timestamp(micros, _tz) => { let (mut secs, mut nanos) = (*micros / 1_000_000, (*micros % 1_000_000) * 1_000); if nanos < 0 { secs -= 1; @@ -1813,7 +1840,7 @@ impl ValueDecoder { let ts = NaiveDateTime::parse_from_str(v, "%Y-%m-%d %H:%M:%S%.6f")? .and_utc() .timestamp_micros(); - Ok(Value::Timestamp(ts)) + Ok(Value::Timestamp(ts, Tz::UTC)) } fn read_interval>(&self, reader: &mut Cursor) -> Result { @@ -2169,14 +2196,14 @@ impl From<&NaiveDate> for Value { impl From for Value { fn from(dt: NaiveDateTime) -> Self { let timestamp_micros = dt.and_utc().timestamp_micros(); - Value::Timestamp(timestamp_micros) + Value::Timestamp(timestamp_micros, Tz::UTC) } } impl From<&NaiveDateTime> for Value { fn from(dt: &NaiveDateTime) -> Self { let timestamp_micros = dt.and_utc().timestamp_micros(); - Value::Timestamp(timestamp_micros) + Value::Timestamp(timestamp_micros, Tz::UTC) } } @@ -2200,8 +2227,10 @@ impl Value { } Value::String(s) => format!("'{}'", s), Value::Number(n) => n.to_string(), - Value::Timestamp(ts) => { + Value::Timestamp(ts, tz) => { + // TODO: use ts directly? let dt = DateTime::from_timestamp_micros(*ts).unwrap(); + let dt = dt.with_timezone(tz); format!("'{}'", dt.format(TIMESTAMP_FORMAT)) } Value::TimestampTz(t) => format!("'{t}'"), diff --git a/tests/Makefile b/tests/Makefile index 3766717bd..fdf654efc 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -31,6 +31,11 @@ test-bindings-python: up cd ../bindings/python && behave tests/blocking cd ../bindings/python && behave tests/cursor +test-bindings-python-arrow: up + TEST_ARROW_BODY=1 cd ../bindings/python && behave tests/asyncio + TEST_ARROW_BODY=1 cd ../bindings/python && behave tests/blocking + TEST_ARROW_BODY=1 cd ../bindings/python && behave tests/cursor + test-bindings-nodejs: up cd ../bindings/nodejs && pnpm run test diff --git a/tests/nox/noxfile.py b/tests/nox/noxfile.py index 646a28daa..600461549 100644 --- a/tests/nox/noxfile.py +++ b/tests/nox/noxfile.py @@ -15,10 +15,17 @@ import nox import os +def generate_params1(): + for db_version in ["1.2.803", "1.2.791"]: + for body_format in ["arrow", "json"]: + v = tuple(map(int, db_version.split("."))) + if body_format == "arrow" and v < (1, 2, 836): + continue + yield nox.param(db_version, body_format) @nox.session -@nox.parametrize("db_version", ["1.2.803", "1.2.791"]) -def new_driver_with_old_servers(session, db_version): +@nox.parametrize(["db_version", "body_format"], generate_params1()) +def new_driver_with_old_servers(session, db_version, body_format): query_version = f"v{db_version}-nightly" session.install("behave") # cd bindings/python @@ -33,20 +40,30 @@ def new_driver_with_old_servers(session, db_version): "DATABEND_QUERY_VERSION": query_version, "DATABEND_META_VERSION": query_version, "DB_VERSION": db_version, + "BODY_FORMAT": body_format } session.run("make", "test-bindings-python", env=env) session.run("make", "down") -# to avoid fail the compact test in repo databend +def generate_params2(): + for driver_version in ["0.28.2", "0.28.1"]: + for body_format in ["arrow", "json"]: + v = tuple(map(int, driver_version.split("."))) + if body_format == "arrow" and v <= (0, 30, 3): + continue + yield nox.param(driver_version, body_format) + + @nox.session -@nox.parametrize("driver_version", ["0.28.2", "0.28.1"]) -def new_test_with_old_drivers(session, driver_version): +@nox.parametrize(["driver_version", "body_format"], generate_params2()) +def new_test_with_old_drivers(session, driver_version, body_format): session.install("behave") session.install(f"databend-driver=={driver_version}") with session.chdir(".."): env = { "DRIVER_VERSION": driver_version, + "BODY_FORMAT": body_format } session.run("make", "test-bindings-python", env=env) session.run("make", "down") diff --git a/ttc/Cargo.toml b/ttc/Cargo.toml index a9244be47..ff7a68ccb 100644 --- a/ttc/Cargo.toml +++ b/ttc/Cargo.toml @@ -15,6 +15,7 @@ databend-driver = { workspace = true } bytes = "1" clap = { version = "4.4", features = ["derive", "env"] } +env_logger = "0.11.8" serde = { version = "1.0", features = ["derive"] } serde_json = { version = "1.0", default-features = false, features = ["std"] } tokio = { version = "1.34", features = [ diff --git a/ttc/src/server.rs b/ttc/src/server.rs index 42bbc8974..b7a6ef971 100644 --- a/ttc/src/server.rs +++ b/ttc/src/server.rs @@ -42,6 +42,10 @@ struct Response { #[tokio::main] async fn main() -> Result<(), Box> { + env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("warn")) + .target(env_logger::Target::Stdout) + .init(); + let config = Config::parse(); // check dsn