diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 000000000000..16f1e73ac065 --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,2 @@ +[build] +rustflags = ["--cfg", "tokio_unstable"] \ No newline at end of file diff --git a/.github/workflows/arrow.yml b/.github/workflows/arrow.yml index 0b90a78577e5..50804aee6a80 100644 --- a/.github/workflows/arrow.yml +++ b/.github/workflows/arrow.yml @@ -152,150 +152,150 @@ jobs: - name: Build wasm32-wasip1 run: cargo build -p arrow --no-default-features --features=json,csv,ipc,ffi --target wasm32-wasip1 - clippy: - name: Clippy - runs-on: ubuntu-latest - container: - image: amd64/rust - steps: - - uses: actions/checkout@v4 - - name: Setup Rust toolchain - uses: ./.github/actions/setup-builder - - name: Setup Clippy - run: rustup component add clippy - - name: Clippy arrow-buffer - run: | - mod=arrow-buffer - cargo clippy -p "$mod" --all-targets --all-features -- -D warnings - # Dependency checks excluding tests & benches. - cargo clippy -p "$mod" -- -D unused_crate_dependencies - cargo clippy -p "$mod" --all-features -- -D unused_crate_dependencies - cargo clippy -p "$mod" --no-default-features -- -D unused_crate_dependencies - - name: Clippy arrow-data - run: | - mod=arrow-data - cargo clippy -p "$mod" --all-targets --all-features -- -D warnings - # Dependency checks excluding tests & benches. - cargo clippy -p "$mod" -- -D unused_crate_dependencies - cargo clippy -p "$mod" --all-features -- -D unused_crate_dependencies - cargo clippy -p "$mod" --no-default-features -- -D unused_crate_dependencies - - name: Clippy arrow-schema - run: | - mod=arrow-schema - cargo clippy -p "$mod" --all-targets --all-features -- -D warnings - # Dependency checks excluding tests & benches. - cargo clippy -p "$mod" -- -D unused_crate_dependencies - cargo clippy -p "$mod" --all-features -- -D unused_crate_dependencies - cargo clippy -p "$mod" --no-default-features -- -D unused_crate_dependencies - - name: Clippy arrow-array - run: | - mod=arrow-array - cargo clippy -p "$mod" --all-targets --all-features -- -D warnings - # Dependency checks excluding tests & benches. - cargo clippy -p "$mod" -- -D unused_crate_dependencies - cargo clippy -p "$mod" --all-features -- -D unused_crate_dependencies - cargo clippy -p "$mod" --no-default-features -- -D unused_crate_dependencies - - name: Clippy arrow-select - run: | - mod=arrow-select - cargo clippy -p "$mod" --all-targets --all-features -- -D warnings - # Dependency checks excluding tests & benches. - cargo clippy -p "$mod" -- -D unused_crate_dependencies - cargo clippy -p "$mod" --all-features -- -D unused_crate_dependencies - cargo clippy -p "$mod" --no-default-features -- -D unused_crate_dependencies - - name: Clippy arrow-cast - run: | - mod=arrow-cast - cargo clippy -p "$mod" --all-targets --all-features -- -D warnings - # Dependency checks excluding tests & benches. - cargo clippy -p "$mod" -- -D unused_crate_dependencies - cargo clippy -p "$mod" --all-features -- -D unused_crate_dependencies - cargo clippy -p "$mod" --no-default-features -- -D unused_crate_dependencies - - name: Clippy arrow-ipc - run: | - mod=arrow-ipc - cargo clippy -p "$mod" --all-targets --all-features -- -D warnings - # Dependency checks excluding tests & benches. - cargo clippy -p "$mod" -- -D unused_crate_dependencies - cargo clippy -p "$mod" --all-features -- -D unused_crate_dependencies - cargo clippy -p "$mod" --no-default-features -- -D unused_crate_dependencies - - name: Clippy arrow-csv - run: | - mod=arrow-csv - cargo clippy -p "$mod" --all-targets --all-features -- -D warnings - # Dependency checks excluding tests & benches. - cargo clippy -p "$mod" -- -D unused_crate_dependencies - cargo clippy -p "$mod" --all-features -- -D unused_crate_dependencies - cargo clippy -p "$mod" --no-default-features -- -D unused_crate_dependencies - - name: Clippy arrow-json - run: | - mod=arrow-json - cargo clippy -p "$mod" --all-targets --all-features -- -D warnings - # Dependency checks excluding tests & benches. - cargo clippy -p "$mod" -- -D unused_crate_dependencies - cargo clippy -p "$mod" --all-features -- -D unused_crate_dependencies - cargo clippy -p "$mod" --no-default-features -- -D unused_crate_dependencies - - name: Clippy arrow-avro - run: | - mod=arrow-avro - cargo clippy -p "$mod" --all-targets --all-features -- -D warnings - # Dependency checks excluding tests & benches. - cargo clippy -p "$mod" -- -D unused_crate_dependencies - cargo clippy -p "$mod" --all-features -- -D unused_crate_dependencies - cargo clippy -p "$mod" --no-default-features -- -D unused_crate_dependencies - - name: Clippy arrow-string - run: | - mod=arrow-string - cargo clippy -p "$mod" --all-targets --all-features -- -D warnings - # Dependency checks excluding tests & benches. - cargo clippy -p "$mod" -- -D unused_crate_dependencies - cargo clippy -p "$mod" --all-features -- -D unused_crate_dependencies - cargo clippy -p "$mod" --no-default-features -- -D unused_crate_dependencies - - name: Clippy arrow-ord - run: | - mod=arrow-ord - cargo clippy -p "$mod" --all-targets --all-features -- -D warnings - # Dependency checks excluding tests & benches. - cargo clippy -p "$mod" -- -D unused_crate_dependencies - cargo clippy -p "$mod" --all-features -- -D unused_crate_dependencies - cargo clippy -p "$mod" --no-default-features -- -D unused_crate_dependencies - - name: Clippy arrow-arith - run: | - mod=arrow-arith - cargo clippy -p "$mod" --all-targets --all-features -- -D warnings - # Dependency checks excluding tests & benches. - cargo clippy -p "$mod" -- -D unused_crate_dependencies - cargo clippy -p "$mod" --all-features -- -D unused_crate_dependencies - cargo clippy -p "$mod" --no-default-features -- -D unused_crate_dependencies - - name: Clippy arrow-row - run: | - mod=arrow-row - cargo clippy -p "$mod" --all-targets --all-features -- -D warnings - # Dependency checks excluding tests & benches. - cargo clippy -p "$mod" -- -D unused_crate_dependencies - cargo clippy -p "$mod" --all-features -- -D unused_crate_dependencies - cargo clippy -p "$mod" --no-default-features -- -D unused_crate_dependencies - - name: Clippy arrow - run: | - mod=arrow - cargo clippy -p "$mod" --all-targets --all-features -- -D warnings - # Dependency checks excluding tests & benches. - cargo clippy -p "$mod" -- -D unused_crate_dependencies - cargo clippy -p "$mod" --all-features -- -D unused_crate_dependencies - cargo clippy -p "$mod" --no-default-features -- -D unused_crate_dependencies - - name: Clippy arrow-integration-test - run: | - mod=arrow-integration-test - cargo clippy -p "$mod" --all-targets --all-features -- -D warnings - # Dependency checks excluding tests & benches. - cargo clippy -p "$mod" -- -D unused_crate_dependencies - cargo clippy -p "$mod" --all-features -- -D unused_crate_dependencies - cargo clippy -p "$mod" --no-default-features -- -D unused_crate_dependencies - - name: Clippy arrow-integration-testing - run: | - mod=arrow-integration-testing - cargo clippy -p "$mod" --all-targets --all-features -- -D warnings - # Dependency checks excluding tests & benches. - cargo clippy -p "$mod" -- -D unused_crate_dependencies - cargo clippy -p "$mod" --all-features -- -D unused_crate_dependencies - cargo clippy -p "$mod" --no-default-features -- -D unused_crate_dependencies +# clippy: +# name: Clippy +# runs-on: ubuntu-latest +# container: +# image: amd64/rust +# steps: +# - uses: actions/checkout@v4 +# - name: Setup Rust toolchain +# uses: ./.github/actions/setup-builder +# - name: Setup Clippy +# run: rustup component add clippy +# - name: Clippy arrow-buffer +# run: | +# mod=arrow-buffer +# cargo clippy -p "$mod" --all-targets --all-features -- -D warnings +# # Dependency checks excluding tests & benches. +# cargo clippy -p "$mod" -- -D unused_crate_dependencies +# cargo clippy -p "$mod" --all-features -- -D unused_crate_dependencies +# cargo clippy -p "$mod" --no-default-features -- -D unused_crate_dependencies +# - name: Clippy arrow-data +# run: | +# mod=arrow-data +# cargo clippy -p "$mod" --all-targets --all-features -- -D warnings +# # Dependency checks excluding tests & benches. +# cargo clippy -p "$mod" -- -D unused_crate_dependencies +# cargo clippy -p "$mod" --all-features -- -D unused_crate_dependencies +# cargo clippy -p "$mod" --no-default-features -- -D unused_crate_dependencies +# - name: Clippy arrow-schema +# run: | +# mod=arrow-schema +# cargo clippy -p "$mod" --all-targets --all-features -- -D warnings +# # Dependency checks excluding tests & benches. +# cargo clippy -p "$mod" -- -D unused_crate_dependencies +# cargo clippy -p "$mod" --all-features -- -D unused_crate_dependencies +# cargo clippy -p "$mod" --no-default-features -- -D unused_crate_dependencies +# - name: Clippy arrow-array +# run: | +# mod=arrow-array +# cargo clippy -p "$mod" --all-targets --all-features -- -D warnings +# # Dependency checks excluding tests & benches. +# cargo clippy -p "$mod" -- -D unused_crate_dependencies +# cargo clippy -p "$mod" --all-features -- -D unused_crate_dependencies +# cargo clippy -p "$mod" --no-default-features -- -D unused_crate_dependencies +# - name: Clippy arrow-select +# run: | +# mod=arrow-select +# cargo clippy -p "$mod" --all-targets --all-features -- -D warnings +# # Dependency checks excluding tests & benches. +# cargo clippy -p "$mod" -- -D unused_crate_dependencies +# cargo clippy -p "$mod" --all-features -- -D unused_crate_dependencies +# cargo clippy -p "$mod" --no-default-features -- -D unused_crate_dependencies +# - name: Clippy arrow-cast +# run: | +# mod=arrow-cast +# cargo clippy -p "$mod" --all-targets --all-features -- -D warnings +# # Dependency checks excluding tests & benches. +# cargo clippy -p "$mod" -- -D unused_crate_dependencies +# cargo clippy -p "$mod" --all-features -- -D unused_crate_dependencies +# cargo clippy -p "$mod" --no-default-features -- -D unused_crate_dependencies +# - name: Clippy arrow-ipc +# run: | +# mod=arrow-ipc +# cargo clippy -p "$mod" --all-targets --all-features -- -D warnings +# # Dependency checks excluding tests & benches. +# cargo clippy -p "$mod" -- -D unused_crate_dependencies +# cargo clippy -p "$mod" --all-features -- -D unused_crate_dependencies +# cargo clippy -p "$mod" --no-default-features -- -D unused_crate_dependencies +# - name: Clippy arrow-csv +# run: | +# mod=arrow-csv +# cargo clippy -p "$mod" --all-targets --all-features -- -D warnings +# # Dependency checks excluding tests & benches. +# cargo clippy -p "$mod" -- -D unused_crate_dependencies +# cargo clippy -p "$mod" --all-features -- -D unused_crate_dependencies +# cargo clippy -p "$mod" --no-default-features -- -D unused_crate_dependencies +# - name: Clippy arrow-json +# run: | +# mod=arrow-json +# cargo clippy -p "$mod" --all-targets --all-features -- -D warnings +# # Dependency checks excluding tests & benches. +# cargo clippy -p "$mod" -- -D unused_crate_dependencies +# cargo clippy -p "$mod" --all-features -- -D unused_crate_dependencies +# cargo clippy -p "$mod" --no-default-features -- -D unused_crate_dependencies +# - name: Clippy arrow-avro +# run: | +# mod=arrow-avro +# cargo clippy -p "$mod" --all-targets --all-features -- -D warnings +# # Dependency checks excluding tests & benches. +# cargo clippy -p "$mod" -- -D unused_crate_dependencies +# cargo clippy -p "$mod" --all-features -- -D unused_crate_dependencies +# cargo clippy -p "$mod" --no-default-features -- -D unused_crate_dependencies +# - name: Clippy arrow-string +# run: | +# mod=arrow-string +# cargo clippy -p "$mod" --all-targets --all-features -- -D warnings +# # Dependency checks excluding tests & benches. +# cargo clippy -p "$mod" -- -D unused_crate_dependencies +# cargo clippy -p "$mod" --all-features -- -D unused_crate_dependencies +# cargo clippy -p "$mod" --no-default-features -- -D unused_crate_dependencies +# - name: Clippy arrow-ord +# run: | +# mod=arrow-ord +# cargo clippy -p "$mod" --all-targets --all-features -- -D warnings +# # Dependency checks excluding tests & benches. +# cargo clippy -p "$mod" -- -D unused_crate_dependencies +# cargo clippy -p "$mod" --all-features -- -D unused_crate_dependencies +# cargo clippy -p "$mod" --no-default-features -- -D unused_crate_dependencies +# - name: Clippy arrow-arith +# run: | +# mod=arrow-arith +# cargo clippy -p "$mod" --all-targets --all-features -- -D warnings +# # Dependency checks excluding tests & benches. +# cargo clippy -p "$mod" -- -D unused_crate_dependencies +# cargo clippy -p "$mod" --all-features -- -D unused_crate_dependencies +# cargo clippy -p "$mod" --no-default-features -- -D unused_crate_dependencies +# - name: Clippy arrow-row +# run: | +# mod=arrow-row +# cargo clippy -p "$mod" --all-targets --all-features -- -D warnings +# # Dependency checks excluding tests & benches. +# cargo clippy -p "$mod" -- -D unused_crate_dependencies +# cargo clippy -p "$mod" --all-features -- -D unused_crate_dependencies +# cargo clippy -p "$mod" --no-default-features -- -D unused_crate_dependencies +# - name: Clippy arrow +# run: | +# mod=arrow +# cargo clippy -p "$mod" --all-targets --all-features -- -D warnings +# # Dependency checks excluding tests & benches. +# cargo clippy -p "$mod" -- -D unused_crate_dependencies +# cargo clippy -p "$mod" --all-features -- -D unused_crate_dependencies +# cargo clippy -p "$mod" --no-default-features -- -D unused_crate_dependencies +# - name: Clippy arrow-integration-test +# run: | +# mod=arrow-integration-test +# cargo clippy -p "$mod" --all-targets --all-features -- -D warnings +# # Dependency checks excluding tests & benches. +# cargo clippy -p "$mod" -- -D unused_crate_dependencies +# cargo clippy -p "$mod" --all-features -- -D unused_crate_dependencies +# cargo clippy -p "$mod" --no-default-features -- -D unused_crate_dependencies +# - name: Clippy arrow-integration-testing +# run: | +# mod=arrow-integration-testing +# cargo clippy -p "$mod" --all-targets --all-features -- -D warnings +# # Dependency checks excluding tests & benches. +# cargo clippy -p "$mod" -- -D unused_crate_dependencies +# cargo clippy -p "$mod" --all-features -- -D unused_crate_dependencies +# cargo clippy -p "$mod" --no-default-features -- -D unused_crate_dependencies diff --git a/.github/workflows/arrow_flight.yml b/.github/workflows/arrow_flight.yml index 79627448ca40..be563acaa71c 100644 --- a/.github/workflows/arrow_flight.yml +++ b/.github/workflows/arrow_flight.yml @@ -58,9 +58,9 @@ jobs: - name: Test --all-features run: | cargo test -p arrow-flight --all-features - - name: Test --examples - run: | - cargo test -p arrow-flight --features=flight-sql-experimental,tls --examples +# - name: Test --examples +# run: | +# cargo test -p arrow-flight --features=flight-sql-experimental,tls --examples vendor: name: Verify Vendored Code @@ -76,16 +76,16 @@ jobs: - name: Verify workspace clean (if this fails, run ./arrow-flight/regen.sh and check in results) run: git diff --exit-code - clippy: - name: Clippy - runs-on: ubuntu-latest - container: - image: amd64/rust - steps: - - uses: actions/checkout@v4 - - name: Setup Rust toolchain - uses: ./.github/actions/setup-builder - - name: Setup Clippy - run: rustup component add clippy - - name: Run clippy - run: cargo clippy -p arrow-flight --all-targets --all-features -- -D warnings +# clippy: +# name: Clippy +# runs-on: ubuntu-latest +# container: +# image: amd64/rust +# steps: +# - uses: actions/checkout@v4 +# - name: Setup Rust toolchain +# uses: ./.github/actions/setup-builder +# - name: Setup Clippy +# run: rustup component add clippy +# - name: Run clippy +# run: cargo clippy -p arrow-flight --all-targets --all-features -- -D warnings diff --git a/.github/workflows/audit.yml b/.github/workflows/audit.yml deleted file mode 100644 index e6254ea24a58..000000000000 --- a/.github/workflows/audit.yml +++ /dev/null @@ -1,43 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -name: audit - -concurrency: - group: ${{ github.repository }}-${{ github.head_ref || github.sha }}-${{ github.workflow }} - cancel-in-progress: true - -# trigger for all PRs that touch certain files and changes to main -on: - push: - branches: - - main - pull_request: - paths: - - '**/Cargo.toml' - - '**/Cargo.lock' - -jobs: - cargo-audit: - name: Audit - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - name: Install cargo-audit - run: cargo install cargo-audit - - name: Run audit check - run: cargo audit diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml deleted file mode 100644 index d6ec0622f6ed..000000000000 --- a/.github/workflows/docs.yml +++ /dev/null @@ -1,99 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -name: docs - -concurrency: - group: ${{ github.repository }}-${{ github.head_ref || github.sha }}-${{ github.workflow }} - cancel-in-progress: true - -# trigger for all PRs and changes to main -on: - push: - branches: - - main - pull_request: - -jobs: - - # test doc links still work - docs: - name: Rustdocs are clean - runs-on: ubuntu-latest - strategy: - matrix: - arch: [ amd64 ] - rust: [ nightly ] - container: - image: ${{ matrix.arch }}/rust - env: - RUSTDOCFLAGS: "-Dwarnings --enable-index-page -Zunstable-options" - steps: - - uses: actions/checkout@v4 - with: - submodules: true - - name: Install python dev - run: | - apt update - apt install -y libpython3.11-dev - - name: Setup Rust toolchain - uses: ./.github/actions/setup-builder - with: - rust-version: ${{ matrix.rust }} - - name: Run cargo doc - run: cargo doc --document-private-items --no-deps --workspace --all-features - - name: Fix file permissions - shell: sh - run: | - chmod -c -R +rX "target/doc" | - while read line; do - echo "::warning title=Invalid file permissions automatically fixed::$line" - done - - name: Upload artifacts - uses: actions/upload-pages-artifact@v3 - with: - name: crate-docs - path: target/doc - - deploy: - # Only deploy if a push to main - if: github.ref_name == 'main' && github.event_name == 'push' - needs: docs - permissions: - contents: write - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - name: Download crate docs - uses: actions/download-artifact@v4 - with: - name: crate-docs - path: website/build - - name: Prepare website - run: | - tar -xf website/build/artifact.tar -C website/build - rm website/build/artifact.tar - cp .asf.yaml ./website/build/.asf.yaml - - name: Deploy to gh-pages - uses: peaceiris/actions-gh-pages@v4.0.0 - if: github.event_name == 'push' && github.ref_name == 'main' - with: - github_token: ${{ secrets.GITHUB_TOKEN }} - publish_dir: website/build - publish_branch: asf-site - # Avoid accumulating history of in progress API jobs: https://github.com/apache/arrow-rs/issues/5908 - force_orphan: true diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml deleted file mode 100644 index a47195d1becf..000000000000 --- a/.github/workflows/integration.yml +++ /dev/null @@ -1,163 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -name: integration - -concurrency: - group: ${{ github.repository }}-${{ github.head_ref || github.sha }}-${{ github.workflow }} - cancel-in-progress: true - -# trigger for all PRs that touch certain files and changes to main -on: - push: - branches: - - main - pull_request: - paths: - - .github/** - - arrow-array/** - - arrow-buffer/** - - arrow-cast/** - - arrow-csv/** - - arrow-data/** - - arrow-integration-test/** - - arrow-integration-testing/** - - arrow-ipc/** - - arrow-json/** - - arrow-avro/** - - arrow-ord/** - - arrow-pyarrow-integration-testing/** - - arrow-schema/** - - arrow-select/** - - arrow-sort/** - - arrow-string/** - - arrow/** - -jobs: - integration: - name: Archery test With other arrows - runs-on: ubuntu-latest - container: - image: apache/arrow-dev:amd64-conda-integration - env: - ARROW_USE_CCACHE: OFF - ARROW_CPP_EXE_PATH: /build/cpp/debug - ARROW_NANOARROW_PATH: /build/nanoarrow - ARROW_RUST_EXE_PATH: /build/rust/debug - BUILD_DOCS_CPP: OFF - ARROW_INTEGRATION_CPP: ON - ARROW_INTEGRATION_CSHARP: ON - ARROW_INTEGRATION_GO: ON - ARROW_INTEGRATION_JAVA: ON - ARROW_INTEGRATION_JS: ON - ARCHERY_INTEGRATION_TARGET_IMPLEMENTATIONS: "rust" - ARCHERY_INTEGRATION_WITH_NANOARROW: "1" - # https://github.com/apache/arrow/pull/38403/files#r1371281630 - ARCHERY_INTEGRATION_WITH_RUST: "1" - # These are necessary because the github runner overrides $HOME - # https://github.com/actions/runner/issues/863 - RUSTUP_HOME: /root/.rustup - CARGO_HOME: /root/.cargo - defaults: - run: - shell: bash - steps: - # This is necessary so that actions/checkout can find git - - name: Export conda path - run: echo "/opt/conda/envs/arrow/bin" >> $GITHUB_PATH - # This is necessary so that Rust can find cargo - - name: Export cargo path - run: echo "/root/.cargo/bin" >> $GITHUB_PATH - - name: Check rustup - run: which rustup - - name: Check cmake - run: which cmake - - name: Checkout Arrow - uses: actions/checkout@v4 - with: - repository: apache/arrow - submodules: true - fetch-depth: 0 - - name: Checkout Arrow Rust - uses: actions/checkout@v4 - with: - path: rust - fetch-depth: 0 - - name: Checkout Arrow nanoarrow - uses: actions/checkout@v4 - with: - repository: apache/arrow-nanoarrow - path: nanoarrow - fetch-depth: 0 - # Workaround https://github.com/rust-lang/rust/issues/125067 - - name: Downgrade rust - working-directory: rust - run: rustup override set 1.77 - - name: Build - run: conda run --no-capture-output ci/scripts/integration_arrow_build.sh $PWD /build - - name: Run - run: conda run --no-capture-output ci/scripts/integration_arrow.sh $PWD /build - - # test FFI against the C-Data interface exposed by pyarrow - pyarrow-integration-test: - name: Pyarrow C Data Interface - runs-on: ubuntu-latest - strategy: - matrix: - rust: [stable] - # PyArrow 15 was the first version to introduce StringView/BinaryView support - pyarrow: ["15", "16", "17"] - steps: - - uses: actions/checkout@v4 - with: - submodules: true - - name: Setup Rust toolchain - run: | - rustup toolchain install ${{ matrix.rust }} - rustup default ${{ matrix.rust }} - rustup component add rustfmt clippy - - name: Cache Cargo - uses: actions/cache@v4 - with: - path: /home/runner/.cargo - key: cargo-maturin-cache- - - name: Cache Rust dependencies - uses: actions/cache@v4 - with: - path: /home/runner/target - # this key is not equal because maturin uses different compilation flags. - key: ${{ runner.os }}-${{ matrix.arch }}-target-maturin-cache-${{ matrix.rust }}- - - uses: actions/setup-python@v5 - with: - python-version: '3.8' - - name: Upgrade pip and setuptools - run: pip install --upgrade pip setuptools wheel virtualenv - - name: Create virtualenv and install dependencies - run: | - virtualenv venv - source venv/bin/activate - pip install maturin toml pytest pytz pyarrow==${{ matrix.pyarrow }} - - name: Run Rust tests - run: | - source venv/bin/activate - cargo test -p arrow --test pyarrow --features pyarrow - - name: Run tests - run: | - source venv/bin/activate - cd arrow-pyarrow-integration-testing - maturin develop - pytest -v . diff --git a/.github/workflows/miri.sh b/.github/workflows/miri.sh deleted file mode 100755 index 86be2100ee67..000000000000 --- a/.github/workflows/miri.sh +++ /dev/null @@ -1,20 +0,0 @@ -#!/bin/bash -# -# Script -# -# Must be run with nightly rust for example -# rustup default nightly - -set -e - -export MIRIFLAGS="-Zmiri-disable-isolation" -cargo miri setup -cargo clean - -echo "Starting Arrow MIRI run..." -cargo miri test -p arrow-buffer -cargo miri test -p arrow-data --features ffi -cargo miri test -p arrow-schema --features ffi -cargo miri test -p arrow-ord -cargo miri test -p arrow-array -cargo miri test -p arrow-arith \ No newline at end of file diff --git a/.github/workflows/miri.yaml b/.github/workflows/miri.yaml deleted file mode 100644 index ce67546a104b..000000000000 --- a/.github/workflows/miri.yaml +++ /dev/null @@ -1,62 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -name: miri - -concurrency: - group: ${{ github.repository }}-${{ github.head_ref || github.sha }}-${{ github.workflow }} - cancel-in-progress: true - -# trigger for all PRs that touch certain files and changes to main -on: - push: - branches: - - main - pull_request: - paths: - - .github/** - - arrow-array/** - - arrow-buffer/** - - arrow-cast/** - - arrow-csv/** - - arrow-data/** - - arrow-ipc/** - - arrow-json/** - - arrow-avro/** - - arrow-schema/** - - arrow-select/** - - arrow-string/** - - arrow/** - -jobs: - miri-checks: - name: MIRI - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - with: - submodules: true - - name: Setup Rust toolchain - run: | - rustup toolchain install nightly --component miri - rustup override set nightly - cargo miri setup - - name: Run Miri Checks - env: - RUST_BACKTRACE: full - RUST_LOG: "trace" - run: bash .github/workflows/miri.sh diff --git a/.github/workflows/object_store.yml b/.github/workflows/object_store.yml deleted file mode 100644 index 1639b031ebfc..000000000000 --- a/.github/workflows/object_store.yml +++ /dev/null @@ -1,224 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - ---- -# tests for `object_store` crate -name: object_store - -concurrency: - group: ${{ github.repository }}-${{ github.head_ref || github.sha }}-${{ github.workflow }} - cancel-in-progress: true - -# trigger for all PRs that touch certain files and changes to main -on: - push: - branches: - - main - pull_request: - paths: - - object_store/** - - .github/** - -jobs: - clippy: - name: Clippy - runs-on: ubuntu-latest - container: - image: amd64/rust - defaults: - run: - working-directory: object_store - steps: - - uses: actions/checkout@v4 - - name: Setup Rust toolchain - uses: ./.github/actions/setup-builder - - name: Setup Clippy - run: rustup component add clippy - # Run different tests for the library on its own as well as - # all targets to ensure that it still works in the absence of - # features that might be enabled by dev-dependencies of other - # targets. - - name: Run clippy with default features - run: cargo clippy -- -D warnings - - name: Run clippy without default features - run: cargo clippy --no-default-features -- -D warnings - - name: Run clippy with fs features - run: cargo clippy --no-default-features --features fs -- -D warnings - - name: Run clippy with aws feature - run: cargo clippy --features aws -- -D warnings - - name: Run clippy with gcp feature - run: cargo clippy --features gcp -- -D warnings - - name: Run clippy with azure feature - run: cargo clippy --features azure -- -D warnings - - name: Run clippy with http feature - run: cargo clippy --features http -- -D warnings - - name: Run clippy with all features - run: cargo clippy --all-features -- -D warnings - - name: Run clippy with all features and all targets - run: cargo clippy --all-features --all-targets -- -D warnings - - # test doc links still work - # - # Note that since object_store is not part of the main workspace, - # this needs a separate docs job as it is not covered by - # `cargo doc --workspace` - docs: - name: Rustdocs - runs-on: ubuntu-latest - defaults: - run: - working-directory: object_store - env: - RUSTDOCFLAGS: "-Dwarnings" - steps: - - uses: actions/checkout@v4 - - name: Run cargo doc - run: cargo doc --document-private-items --no-deps --workspace --all-features - - # test the crate - # This runs outside a container to workaround lack of support for passing arguments - # to service containers - https://github.com/orgs/community/discussions/26688 - linux-test: - name: Emulator Tests - runs-on: ubuntu-latest - defaults: - run: - working-directory: object_store - env: - # Disable full debug symbol generation to speed up CI build and keep memory down - # "1" means line tables only, which is useful for panic tracebacks. - RUSTFLAGS: "-C debuginfo=1" - RUST_BACKTRACE: "1" - # Run integration tests - TEST_INTEGRATION: 1 - EC2_METADATA_ENDPOINT: http://localhost:1338 - AZURE_CONTAINER_NAME: test-bucket - AZURE_STORAGE_USE_EMULATOR: "1" - AZURITE_BLOB_STORAGE_URL: "http://localhost:10000" - AZURITE_QUEUE_STORAGE_URL: "http://localhost:10001" - AWS_BUCKET: test-bucket - AWS_DEFAULT_REGION: "us-east-1" - AWS_ACCESS_KEY_ID: test - AWS_SECRET_ACCESS_KEY: test - AWS_ENDPOINT: http://localhost:4566 - AWS_ALLOW_HTTP: true - AWS_COPY_IF_NOT_EXISTS: dynamo:test-table:2000 - AWS_CONDITIONAL_PUT: dynamo:test-table:2000 - AWS_SERVER_SIDE_ENCRYPTION: aws:kms - HTTP_URL: "http://localhost:8080" - GOOGLE_BUCKET: test-bucket - GOOGLE_SERVICE_ACCOUNT: "/tmp/gcs.json" - - steps: - - uses: actions/checkout@v4 - - # We are forced to use docker commands instead of service containers as we need to override the entrypoints - # which is currently not supported - https://github.com/actions/runner/discussions/1872 - - name: Configure Fake GCS Server (GCP emulation) - # Custom image - see fsouza/fake-gcs-server#1164 - run: | - echo "GCS_CONTAINER=$(docker run -d -p 4443:4443 tustvold/fake-gcs-server -scheme http -backend memory -public-host localhost:4443)" >> $GITHUB_ENV - # Give the container a moment to start up prior to configuring it - sleep 1 - curl -v -X POST --data-binary '{"name":"test-bucket"}' -H "Content-Type: application/json" "http://localhost:4443/storage/v1/b" - echo '{"gcs_base_url": "http://localhost:4443", "disable_oauth": true, "client_email": "", "private_key": "", "private_key_id": ""}' > "$GOOGLE_SERVICE_ACCOUNT" - - - name: Setup WebDav - run: docker run -d -p 8080:80 rclone/rclone serve webdav /data --addr :80 - - - name: Setup LocalStack (AWS emulation) - run: | - echo "LOCALSTACK_CONTAINER=$(docker run -d -p 4566:4566 localstack/localstack:4.0.3)" >> $GITHUB_ENV - echo "EC2_METADATA_CONTAINER=$(docker run -d -p 1338:1338 amazon/amazon-ec2-metadata-mock:v1.9.2 --imdsv2)" >> $GITHUB_ENV - aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket - aws --endpoint-url=http://localhost:4566 s3api create-bucket --bucket test-object-lock --object-lock-enabled-for-bucket - aws --endpoint-url=http://localhost:4566 dynamodb create-table --table-name test-table --key-schema AttributeName=path,KeyType=HASH AttributeName=etag,KeyType=RANGE --attribute-definitions AttributeName=path,AttributeType=S AttributeName=etag,AttributeType=S --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 - - KMS_KEY=$(aws --endpoint-url=http://localhost:4566 kms create-key --description "test key") - echo "AWS_SSE_KMS_KEY_ID=$(echo $KMS_KEY | jq -r .KeyMetadata.KeyId)" >> $GITHUB_ENV - - - name: Configure Azurite (Azure emulation) - # the magical connection string is from - # https://docs.microsoft.com/en-us/azure/storage/common/storage-use-azurite?tabs=visual-studio#http-connection-strings - run: | - echo "AZURITE_CONTAINER=$(docker run -d -p 10000:10000 -p 10001:10001 -p 10002:10002 mcr.microsoft.com/azure-storage/azurite)" >> $GITHUB_ENV - az storage container create -n test-bucket --connection-string 'DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://localhost:10000/devstoreaccount1;QueueEndpoint=http://localhost:10001/devstoreaccount1;' - - - name: Setup Rust toolchain - run: | - rustup toolchain install stable - rustup default stable - - - name: Run object_store tests - run: cargo test --features=aws,azure,gcp,http - - - name: Run object_store tests (AWS native conditional put) - run: cargo test --features=aws - env: - AWS_CONDITIONAL_PUT: etag - AWS_COPY_IF_NOT_EXISTS: multipart - - - name: GCS Output - if: ${{ !cancelled() }} - run: docker logs $GCS_CONTAINER - - - name: LocalStack Output - if: ${{ !cancelled() }} - run: docker logs $LOCALSTACK_CONTAINER - - - name: EC2 Metadata Output - if: ${{ !cancelled() }} - run: docker logs $EC2_METADATA_CONTAINER - - - name: Azurite Output - if: ${{ !cancelled() }} - run: docker logs $AZURITE_CONTAINER - - # test the object_store crate builds against wasm32 in stable rust - wasm32-build: - name: Build wasm32 - runs-on: ubuntu-latest - container: - image: amd64/rust - defaults: - run: - working-directory: object_store - steps: - - uses: actions/checkout@v4 - with: - submodules: true - - name: Setup Rust toolchain - uses: ./.github/actions/setup-builder - with: - target: wasm32-unknown-unknown,wasm32-wasip1 - - name: Build wasm32-unknown-unknown - run: cargo build --target wasm32-unknown-unknown - - name: Build wasm32-wasip1 - run: cargo build --target wasm32-wasip1 - - windows: - name: cargo test LocalFileSystem (win64) - runs-on: windows-latest - defaults: - run: - working-directory: object_store - steps: - - uses: actions/checkout@v4 - with: - submodules: true - - name: Run LocalFileSystem tests - run: cargo test local::tests diff --git a/.github/workflows/parquet.yml b/.github/workflows/parquet.yml index 4c46fde198bd..1f7d56ce2b18 100644 --- a/.github/workflows/parquet.yml +++ b/.github/workflows/parquet.yml @@ -167,16 +167,16 @@ jobs: cd parquet/pytest pytest -v - clippy: - name: Clippy - runs-on: ubuntu-latest - container: - image: amd64/rust - steps: - - uses: actions/checkout@v4 - - name: Setup Rust toolchain - uses: ./.github/actions/setup-builder - - name: Setup Clippy - run: rustup component add clippy - - name: Run clippy - run: cargo clippy -p parquet --all-targets --all-features -- -D warnings +# clippy: +# name: Clippy +# runs-on: ubuntu-latest +# container: +# image: amd64/rust +# steps: +# - uses: actions/checkout@v4 +# - name: Setup Rust toolchain +# uses: ./.github/actions/setup-builder +# - name: Setup Clippy +# run: rustup component add clippy +# - name: Run clippy +# run: cargo clippy -p parquet --all-targets --all-features -- -D warnings diff --git a/.github/workflows/parquet_derive.yml b/.github/workflows/parquet_derive.yml index 17aec724a820..134ca9c42d41 100644 --- a/.github/workflows/parquet_derive.yml +++ b/.github/workflows/parquet_derive.yml @@ -51,16 +51,16 @@ jobs: - name: Test run: cargo test -p parquet_derive - clippy: - name: Clippy - runs-on: ubuntu-latest - container: - image: amd64/rust - steps: - - uses: actions/checkout@v4 - - name: Setup Rust toolchain - uses: ./.github/actions/setup-builder - - name: Setup Clippy - run: rustup component add clippy - - name: Run clippy - run: cargo clippy -p parquet_derive --all-features -- -D warnings +# clippy: +# name: Clippy +# runs-on: ubuntu-latest +# container: +# image: amd64/rust +# steps: +# - uses: actions/checkout@v4 +# - name: Setup Rust toolchain +# uses: ./.github/actions/setup-builder +# - name: Setup Clippy +# run: rustup component add clippy +# - name: Run clippy +# run: cargo clippy -p parquet_derive --all-features -- -D warnings diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index ca0d2441ceae..f73827db9b97 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -56,34 +56,34 @@ jobs: # Check workspace wide compile and test with default features for # windows - windows: - name: Test on Windows - runs-on: windows-latest - steps: - - uses: actions/checkout@v4 - with: - submodules: true - - name: Install protobuf compiler in /d/protoc - shell: bash - run: | - mkdir /d/protoc - cd /d/protoc - curl -LO https://github.com/protocolbuffers/protobuf/releases/download/v21.4/protoc-21.4-win64.zip - unzip protoc-21.4-win64.zip - export PATH=$PATH:/d/protoc/bin - protoc --version - - - name: Setup Rust toolchain - run: | - rustup toolchain install stable --no-self-update - rustup default stable - - name: Run tests - shell: bash - run: | - # do not produce debug symbols to keep memory usage down - export RUSTFLAGS="-C debuginfo=0" - export PATH=$PATH:/d/protoc/bin - cargo test +# windows: +# name: Test on Windows +# runs-on: windows-latest +# steps: +# - uses: actions/checkout@v4 +# with: +# submodules: true +# - name: Install protobuf compiler in /d/protoc +# shell: bash +# run: | +# mkdir /d/protoc +# cd /d/protoc +# curl -LO https://github.com/protocolbuffers/protobuf/releases/download/v21.4/protoc-21.4-win64.zip +# unzip protoc-21.4-win64.zip +# export PATH=$PATH:/d/protoc/bin +# protoc --version +# +# - name: Setup Rust toolchain +# run: | +# rustup toolchain install stable --no-self-update +# rustup default stable +# - name: Run tests +# shell: bash +# run: | +# # do not produce debug symbols to keep memory usage down +# export RUSTFLAGS="-C debuginfo=0" +# export PATH=$PATH:/d/protoc/bin +# cargo test # Run cargo fmt for all crates @@ -112,29 +112,29 @@ jobs: working-directory: object_store run: cargo fmt --all -- --check - msrv: - name: Verify MSRV (Minimum Supported Rust Version) - runs-on: ubuntu-latest - container: - image: amd64/rust - steps: - - uses: actions/checkout@v4 - - name: Setup Rust toolchain - uses: ./.github/actions/setup-builder - - name: Install cargo-msrv - run: cargo install cargo-msrv - - name: Downgrade object_store dependencies - working-directory: object_store - # Necessary because tokio 1.30.0 updates MSRV to 1.63 - # and url 2.5.1, updates to 1.67 - run: | - cargo update -p tokio --precise 1.29.1 - cargo update -p url --precise 2.5.0 - - name: Check all packages - run: | - # run `cargo msrv verify --manifest-path "path/to/Cargo.toml"` to see problematic dependencies - find . -mindepth 2 -name Cargo.toml | while read -r dir - do - echo "Checking package '$dir'" - cargo msrv verify --manifest-path "$dir" --output-format=json || exit 1 - done +# msrv: +# name: Verify MSRV (Minimum Supported Rust Version) +# runs-on: ubuntu-latest +# container: +# image: amd64/rust +# steps: +# - uses: actions/checkout@v4 +# - name: Setup Rust toolchain +# uses: ./.github/actions/setup-builder +# - name: Install cargo-msrv +# run: cargo install cargo-msrv +# - name: Downgrade object_store dependencies +# working-directory: object_store +# # Necessary because tokio 1.30.0 updates MSRV to 1.63 +# # and url 2.5.1, updates to 1.67 +# run: | +# cargo update -p tokio --precise 1.29.1 +# cargo update -p url --precise 2.5.0 +# - name: Check all packages +# run: | +# # run `cargo msrv verify --manifest-path "path/to/Cargo.toml"` to see problematic dependencies +# find . -mindepth 2 -name Cargo.toml | while read -r dir +# do +# echo "Checking package '$dir'" +# cargo msrv verify --manifest-path "$dir" --output-format=json || exit 1 +# done diff --git a/Cargo.toml b/Cargo.toml index c5a3cddffef9..0f27a355cb0c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -93,4 +93,4 @@ arrow-select = { version = "54.2.1", path = "./arrow-select" } arrow-string = { version = "54.2.1", path = "./arrow-string" } parquet = { version = "54.2.1", path = "./parquet", default-features = false } -chrono = { version = ">= 0.4.34, < 0.4.40", default-features = false, features = ["clock"] } +chrono = { version = "0.4.41", default-features = false, features = ["clock"] } diff --git a/arrow-arith/src/temporal.rs b/arrow-arith/src/temporal.rs index 3458669a6fd1..cb541b49dfdd 100644 --- a/arrow-arith/src/temporal.rs +++ b/arrow-arith/src/temporal.rs @@ -634,12 +634,6 @@ pub(crate) use return_compute_error_with; // Internal trait, which is used for mapping values from DateLike structures trait ChronoDateExt { - /// Returns a value in range `1..=4` indicating the quarter this date falls into - fn quarter(&self) -> u32; - - /// Returns a value in range `0..=3` indicating the quarter (zero-based) this date falls into - fn quarter0(&self) -> u32; - /// Returns the day of week; Monday is encoded as `0`, Tuesday as `1`, etc. fn num_days_from_monday(&self) -> i32; @@ -648,14 +642,6 @@ trait ChronoDateExt { } impl ChronoDateExt for T { - fn quarter(&self) -> u32 { - self.quarter0() + 1 - } - - fn quarter0(&self) -> u32 { - self.month0() / 3 - } - fn num_days_from_monday(&self) -> i32 { self.weekday().num_days_from_monday() as i32 } diff --git a/arrow-array/Cargo.toml b/arrow-array/Cargo.toml index 6eae8e24677d..149a6a5a431a 100644 --- a/arrow-array/Cargo.toml +++ b/arrow-array/Cargo.toml @@ -47,7 +47,7 @@ arrow-data = { workspace = true } chrono = { workspace = true } chrono-tz = { version = "0.10", optional = true } num = { version = "0.4.1", default-features = false, features = ["std"] } -half = { version = "2.1", default-features = false, features = ["num-traits"] } +half = { version = ">=2.1.0, <=2.7.1", default-features = false, features = ["num-traits"] } hashbrown = { version = "0.15.1", default-features = false } [features] diff --git a/arrow-avro/Cargo.toml b/arrow-avro/Cargo.toml index c103c2ecc0f3..bfa81174e10e 100644 --- a/arrow-avro/Cargo.toml +++ b/arrow-avro/Cargo.toml @@ -47,7 +47,7 @@ serde = { version = "1.0.188", features = ["derive"] } flate2 = { version = "1.0", default-features = false, features = ["rust_backend"], optional = true } snap = { version = "1.0", default-features = false, optional = true } zstd = { version = "0.13", default-features = false, optional = true } -crc = { version = "3.0", optional = true } +crc = { version = "=3.0.1", optional = true } [dev-dependencies] diff --git a/arrow-buffer/Cargo.toml b/arrow-buffer/Cargo.toml index 68bfe8ddf732..b81ee926cb27 100644 --- a/arrow-buffer/Cargo.toml +++ b/arrow-buffer/Cargo.toml @@ -36,7 +36,7 @@ bench = false [dependencies] bytes = { version = "1.4" } num = { version = "0.4", default-features = false, features = ["std"] } -half = { version = "2.1", default-features = false } +half = { version = ">=2.1.0, <=2.7.1", default-features = false } [dev-dependencies] criterion = { version = "0.5", default-features = false } diff --git a/arrow-cast/Cargo.toml b/arrow-cast/Cargo.toml index 4046f5226094..727679923b2c 100644 --- a/arrow-cast/Cargo.toml +++ b/arrow-cast/Cargo.toml @@ -47,17 +47,17 @@ arrow-data = { workspace = true } arrow-schema = { workspace = true } arrow-select = { workspace = true } chrono = { workspace = true } -half = { version = "2.1", default-features = false } +half = { version = ">=2.1.0, <=2.7.1", default-features = false } num = { version = "0.4", default-features = false, features = ["std"] } lexical-core = { version = "1.0", default-features = false, features = ["write-integers", "write-floats", "parse-integers", "parse-floats"] } atoi = "2.0.0" -comfy-table = { version = "7.0", optional = true, default-features = false } +comfy-table = { version = "=7.1.0", optional = true, default-features = false } base64 = "0.22" ryu = "1.0.16" [dev-dependencies] criterion = { version = "0.5", default-features = false } -half = { version = "2.1", default-features = false } +half = { version = ">=2.1.0, <=2.7.1", default-features = false } rand = "0.8" [build-dependencies] diff --git a/arrow-data/Cargo.toml b/arrow-data/Cargo.toml index c83f867523d5..d0181089f4d7 100644 --- a/arrow-data/Cargo.toml +++ b/arrow-data/Cargo.toml @@ -50,7 +50,7 @@ arrow-buffer = { workspace = true } arrow-schema = { workspace = true } num = { version = "0.4", default-features = false, features = ["std"] } -half = { version = "2.1", default-features = false } +half = { version = ">=2.1.0, <=2.7.1", default-features = false } [dev-dependencies] diff --git a/arrow-flight/Cargo.toml b/arrow-flight/Cargo.toml index fbb295036a9b..8eef9d58fcd6 100644 --- a/arrow-flight/Cargo.toml +++ b/arrow-flight/Cargo.toml @@ -48,7 +48,7 @@ prost = { version = "0.13.1", default-features = false, features = ["prost-deriv # For Timestamp type prost-types = { version = "0.13.1", default-features = false } tokio = { version = "1.0", default-features = false, features = ["macros", "rt", "rt-multi-thread"], optional = true } -tonic = { version = "0.12.3", default-features = false, features = ["transport", "codegen", "prost"] } +tonic = { version = "=0.13", default-features = false, features = ["transport", "codegen", "prost", "router"] } # CLI-related dependencies anyhow = { version = "1.0", optional = true } @@ -62,7 +62,7 @@ all-features = true [features] default = [] flight-sql-experimental = ["dep:arrow-arith", "dep:arrow-data", "dep:arrow-ord", "dep:arrow-row", "dep:arrow-select", "dep:arrow-string", "dep:once_cell", "dep:paste"] -tls = ["tonic/tls"] +tls = ["tonic/tls-ring", "tonic/tls-webpki-roots"] # Enable CLI tools cli = ["arrow-array/chrono-tz", "arrow-cast/prettyprint", "tonic/tls-webpki-roots", "dep:anyhow", "dep:clap", "dep:tracing-log", "dep:tracing-subscriber"] @@ -71,7 +71,7 @@ arrow-cast = { workspace = true, features = ["prettyprint"] } assert_cmd = "2.0.8" http = "1.1.0" http-body = "1.0.0" -hyper-util = "0.1" +hyper-util = "=0.1.4" pin-project-lite = "0.2" tempfile = "3.3" tracing-log = { version = "0.2" } diff --git a/arrow-flight/gen/Cargo.toml b/arrow-flight/gen/Cargo.toml index e52efbf67e21..18fa4f2aa67f 100644 --- a/arrow-flight/gen/Cargo.toml +++ b/arrow-flight/gen/Cargo.toml @@ -32,5 +32,6 @@ publish = false [dependencies] # Pin specific version of the tonic-build dependencies to avoid auto-generated # (and checked in) arrow.flight.protocol.rs from changing -prost-build = { version = "=0.13.4", default-features = false } -tonic-build = { version = "=0.12.3", default-features = false, features = ["transport", "prost"] } +proc-macro2 = { version = "=1.0.95", default-features = false } +prost-build = { version = "=0.13.5", default-features = false } +tonic-build = { version = "=0.13.0", default-features = false, features = ["transport", "prost"] } diff --git a/arrow-flight/src/arrow.flight.protocol.rs b/arrow-flight/src/arrow.flight.protocol.rs index 0cd4f6948b77..a08ea01105e5 100644 --- a/arrow-flight/src/arrow.flight.protocol.rs +++ b/arrow-flight/src/arrow.flight.protocol.rs @@ -448,7 +448,7 @@ pub mod flight_service_client { } impl FlightServiceClient where - T: tonic::client::GrpcService, + T: tonic::client::GrpcService, T::Error: Into, T::ResponseBody: Body + std::marker::Send + 'static, ::Error: Into + std::marker::Send, @@ -469,13 +469,13 @@ pub mod flight_service_client { F: tonic::service::Interceptor, T::ResponseBody: Default, T: tonic::codegen::Service< - http::Request, + http::Request, Response = http::Response< - >::ResponseBody, + >::ResponseBody, >, >, , + http::Request, >>::Error: Into + std::marker::Send + std::marker::Sync, { FlightServiceClient::new(InterceptedService::new(inner, interceptor)) @@ -1098,7 +1098,7 @@ pub mod flight_service_server { B: Body + std::marker::Send + 'static, B::Error: Into + std::marker::Send + 'static, { - type Response = http::Response; + type Response = http::Response; type Error = std::convert::Infallible; type Future = BoxFuture; fn poll_ready( @@ -1571,7 +1571,9 @@ pub mod flight_service_server { } _ => { Box::pin(async move { - let mut response = http::Response::new(empty_body()); + let mut response = http::Response::new( + tonic::body::Body::default(), + ); let headers = response.headers_mut(); headers .insert( diff --git a/arrow-integration-testing/Cargo.toml b/arrow-integration-testing/Cargo.toml index 26cb05fae1c2..c5104c72a301 100644 --- a/arrow-integration-testing/Cargo.toml +++ b/arrow-integration-testing/Cargo.toml @@ -43,7 +43,7 @@ prost = { version = "0.13", default-features = false } serde = { version = "1.0", default-features = false, features = ["rc", "derive"] } serde_json = { version = "1.0", default-features = false, features = ["std"] } tokio = { version = "1.0", default-features = false, features = [ "rt-multi-thread"] } -tonic = { version = "0.12", default-features = false } +tonic = { version = "0.13", default-features = false, features = ["router"]} tracing-subscriber = { version = "0.3.1", default-features = false, features = ["fmt"], optional = true } flate2 = { version = "1", default-features = false, features = ["rust_backend"] } diff --git a/arrow-ipc/Cargo.toml b/arrow-ipc/Cargo.toml index 735f7a14a2f8..d61402aae12b 100644 --- a/arrow-ipc/Cargo.toml +++ b/arrow-ipc/Cargo.toml @@ -39,7 +39,7 @@ arrow-buffer = { workspace = true } arrow-data = { workspace = true } arrow-schema = { workspace = true } flatbuffers = { version = "24.12.23", default-features = false } -lz4_flex = { version = "0.11", default-features = false, features = ["std", "frame"], optional = true } +lz4_flex = { version = ">=0.11.0, <=0.11.5", default-features = false, features = ["std", "frame"], optional = true } zstd = { version = "0.13.0", default-features = false, optional = true } [features] diff --git a/arrow-json/Cargo.toml b/arrow-json/Cargo.toml index 564cb9433b3d..75d8001c6bbb 100644 --- a/arrow-json/Cargo.toml +++ b/arrow-json/Cargo.toml @@ -39,8 +39,8 @@ arrow-buffer = { workspace = true } arrow-cast = { workspace = true } arrow-data = { workspace = true } arrow-schema = { workspace = true } -half = { version = "2.1", default-features = false } -indexmap = { version = "2.0", default-features = false, features = ["std"] } +half = { version = ">=2.1.0, <=2.7.1", default-features = false } +indexmap = { version = ">=2.0.2, <=2.12.1", default-features = false, features = ["std"] } num = { version = "0.4", default-features = false, features = ["std"] } serde = { version = "1.0", default-features = false } serde_json = { version = "1.0", default-features = false, features = ["std"] } diff --git a/arrow-ord/Cargo.toml b/arrow-ord/Cargo.toml index 8d74d2f97d72..0cc28d87a20a 100644 --- a/arrow-ord/Cargo.toml +++ b/arrow-ord/Cargo.toml @@ -41,5 +41,5 @@ arrow-schema = { workspace = true } arrow-select = { workspace = true } [dev-dependencies] -half = { version = "2.1", default-features = false, features = ["num-traits"] } +half = { version = ">=2.1.0, <=2.7.1", default-features = false, features = ["num-traits"] } rand = { version = "0.8", default-features = false, features = ["std", "std_rng"] } diff --git a/arrow-row/Cargo.toml b/arrow-row/Cargo.toml index 90d99684d265..e1afba4199ac 100644 --- a/arrow-row/Cargo.toml +++ b/arrow-row/Cargo.toml @@ -39,7 +39,7 @@ arrow-buffer = { workspace = true } arrow-data = { workspace = true } arrow-schema = { workspace = true } -half = { version = "2.1", default-features = false } +half = { version = ">=2.1.0, <=2.7.1", default-features = false } [dev-dependencies] arrow-cast = { workspace = true } diff --git a/arrow-schema/src/field.rs b/arrow-schema/src/field.rs index dbd671a62a3a..077e0b6ac91f 100644 --- a/arrow-schema/src/field.rs +++ b/arrow-schema/src/field.rs @@ -640,13 +640,12 @@ impl Field { /// assert!(field.is_nullable()); /// ``` pub fn try_merge(&mut self, from: &Field) -> Result<(), ArrowError> { - #[allow(deprecated)] - if from.dict_id != self.dict_id { - return Err(ArrowError::SchemaError(format!( - "Fail to merge schema field '{}' because from dict_id = {} does not match {}", - self.name, from.dict_id, self.dict_id - ))); - } + // if from.dict_id != self.dict_id { + // return Err(ArrowError::SchemaError(format!( + // "Fail to merge schema field '{}' because from dict_id = {} does not match {}", + // self.name, from.dict_id, self.dict_id + // ))); + // } if from.dict_is_ordered != self.dict_is_ordered { return Err(ArrowError::SchemaError(format!( "Fail to merge schema field '{}' because from dict_is_ordered = {} does not match {}", diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml index 88231b7f6160..1ab9b385ce5d 100644 --- a/arrow/Cargo.toml +++ b/arrow/Cargo.toml @@ -55,7 +55,7 @@ arrow-string = { workspace = true } rand = { version = "0.8", default-features = false, features = ["std", "std_rng"], optional = true } pyo3 = { version = "0.23", default-features = false, optional = true } -half = { version = "2.1", default-features = false, optional = true } +half = { version = ">=2.1.0, <=2.7.1", default-features = false, optional = true } [package.metadata.docs.rs] features = ["prettyprint", "ipc_compression", "ffi", "pyarrow"] @@ -85,7 +85,7 @@ canonical_extension_types = ["arrow-schema/canonical_extension_types"] [dev-dependencies] chrono = { workspace = true } criterion = { version = "0.5", default-features = false } -half = { version = "2.1", default-features = false } +half = { version = ">=2.1.0, <=2.7.1", default-features = false } rand = { version = "0.8", default-features = false, features = ["std", "std_rng"] } serde = { version = "1.0", default-features = false, features = ["derive"] } # used in examples diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml index 992ae6662cdb..1630ca1626c7 100644 --- a/object_store/Cargo.toml +++ b/object_store/Cargo.toml @@ -40,17 +40,19 @@ parking_lot = { version = "0.12" } percent-encoding = "2.1" thiserror = "2.0.2" tracing = { version = "0.1" } -url = "2.2" +url = "=2.2.2" walkdir = { version = "2", optional = true } # Cloud storage support base64 = { version = "0.22", default-features = false, features = ["std"], optional = true } -hyper = { version = "1.2", default-features = false, optional = true } +hyper = { version = "=1.3.0", default-features = false, optional = true } +indexmap = ">=2.0.2, <=2.12.1" quick-xml = { version = "0.37.0", features = ["serialize", "overlapped-lists"], optional = true } serde = { version = "1.0", default-features = false, features = ["derive"], optional = true } serde_json = { version = "1.0", default-features = false, optional = true } rand = { version = "0.8", default-features = false, features = ["std", "std_rng"], optional = true } -reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-native-roots", "http2"], optional = true } +reqwest = { version = "=0.12.1", default-features = false, features = ["rustls-tls-native-roots", "http2"], optional = true } +compression-codecs = "=0.4.30" ring = { version = "0.17", default-features = false, features = ["std"], optional = true } rustls-pemfile = { version = "2.0", default-features = false, features = ["std"], optional = true } tokio = { version = "1.29.0", features = ["sync", "macros", "rt", "time", "io-util"] } @@ -73,14 +75,14 @@ integration = [] [dev-dependencies] # In alphabetical order futures-test = "0.3" -hyper = { version = "1.2", features = ["server"] } -hyper-util = "0.1" +hyper = { version = "=1.3.0", features = ["server"] } +hyper-util = "=0.1.4" http-body-util = "0.1" rand = "0.8" tempfile = "3.1.0" regex = "1.11.1" # The "gzip" feature for reqwest is enabled for an integration test. -reqwest = { version = "0.12", features = ["gzip"] } +reqwest = { version = "=0.12.1", features = ["gzip"] } http = "1.1.0" [[test]] diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 00d4c5b750f8..0303debd681f 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -51,7 +51,7 @@ thrift = { version = "0.17", default-features = false } snap = { version = "1.0", default-features = false, optional = true } brotli = { version = "7.0", default-features = false, features = ["std"], optional = true } flate2 = { version = "1.0", default-features = false, features = ["rust_backend"], optional = true } -lz4_flex = { version = "0.11", default-features = false, features = ["std", "frame"], optional = true } +lz4_flex = { version = ">=0.11.0, <=0.11.5", default-features = false, features = ["std", "frame"], optional = true } zstd = { version = "0.13", optional = true, default-features = false } chrono = { workspace = true } num = { version = "0.4", default-features = false } @@ -66,7 +66,7 @@ tokio = { version = "1.0", optional = true, default-features = false, features = hashbrown = { version = "0.15", default-features = false } twox-hash = { version = "1.6", default-features = false } paste = { version = "1.0" } -half = { version = "2.1", default-features = false, features = ["num-traits"] } +half = { version = ">=2.1.0, <=2.7.1", default-features = false, features = ["num-traits"] } sysinfo = { version = "0.33.0", optional = true, default-features = false, features = ["system"] } crc32fast = { version = "1.4.2", optional = true, default-features = false } simdutf8 = { version = "0.1.5", optional = true, default-features = false } @@ -78,7 +78,7 @@ snap = { version = "1.0", default-features = false } tempfile = { version = "3.0", default-features = false } brotli = { version = "7.0", default-features = false, features = ["std"] } flate2 = { version = "1.0", default-features = false, features = ["rust_backend"] } -lz4_flex = { version = "0.11", default-features = false, features = ["std", "frame"] } +lz4_flex = { version = ">=0.11.0, <=0.11.5", default-features = false, features = ["std", "frame"] } zstd = { version = "0.13", default-features = false } serde_json = { version = "1.0", features = ["std"], default-features = false } arrow = { workspace = true, features = ["ipc", "test_utils", "prettyprint", "json"] } diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 6eba04c86f91..caf116c68389 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -17,16 +17,16 @@ //! Contains reader which reads parquet data into arrow [`RecordBatch`] -use std::collections::VecDeque; -use std::sync::Arc; - +use arrow_array::builder::UInt64Builder; use arrow_array::cast::AsArray; -use arrow_array::Array; +use arrow_array::{Array, ArrayRef}; use arrow_array::{RecordBatch, RecordBatchReader}; -use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef}; +use arrow_schema::{ArrowError, DataType as ArrowType, Field, FieldRef, Schema, SchemaRef}; use arrow_select::filter::prep_null_mask_filter; pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter}; pub use selection::{RowSelection, RowSelector}; +use std::collections::VecDeque; +use std::sync::Arc; pub use crate::arrow::array_reader::RowGroups; use crate::arrow::array_reader::{build_array_reader, ArrayReader}; @@ -72,6 +72,12 @@ pub struct ArrowReaderBuilder { pub(crate) limit: Option, pub(crate) offset: Option, + + #[allow(unused)] + pub(crate) row_id: Option, + + #[allow(unused)] + pub(crate) prefetch: Option, } impl ArrowReaderBuilder { @@ -88,6 +94,8 @@ impl ArrowReaderBuilder { selection: None, limit: None, offset: None, + row_id: None, + prefetch: None, } } @@ -114,6 +122,15 @@ impl ArrowReaderBuilder { Self { batch_size, ..self } } + /// Project a column into the result with name `field_name` that will contain the row ID + /// for each row. The row ID will be the row offset of the row in the underlying file + pub fn with_row_id(self, field_name: impl Into) -> Self { + Self { + row_id: Some(RowId::field_ref(field_name)), + ..self + } + } + /// Only read data from the provided row group indexes /// /// This is also called row group filtering @@ -132,6 +149,15 @@ impl ArrowReaderBuilder { } } + /// If evaluating a `RowFilter` also prefetch the columns in `mask` + /// while fetching row filter columns + pub fn with_prefetch(self, mask: Option) -> Self { + Self { + prefetch: mask, + ..self + } + } + /// Provide a [`RowSelection`] to filter out rows, and avoid fetching their /// data into memory. /// @@ -623,6 +649,8 @@ impl ParquetRecordBatchReaderBuilder { batch_size, array_reader, apply_range(selection, reader.num_rows(), self.offset, self.limit), + // TODO what do we do here? + None, )) } } @@ -684,6 +712,47 @@ impl Iterator for ReaderPageIterator { impl PageIterator for ReaderPageIterator {} +pub(crate) struct RowId { + offset: u64, + field: FieldRef, + buffer: UInt64Builder, +} + +impl RowId { + pub fn new(offset: u64, field: FieldRef, batch_size: usize) -> Self { + Self { + offset, + field, + buffer: UInt64Builder::with_capacity(batch_size), + } + } + + pub fn field_ref(name: impl Into) -> FieldRef { + Arc::new(Field::new(name, ArrowType::UInt64, false)) + } + + pub fn skip(&mut self, n: usize) { + self.offset += n as u64; + } + + pub fn field(&self) -> FieldRef { + self.field.clone() + } + + fn read(&mut self, n: usize) { + // SAFETY: We are appending a `Range` which has a trusted length + unsafe { + self.buffer + .append_trusted_len_iter(self.offset..self.offset + n as u64) + } + self.offset += n as u64; + } + + fn consume(&mut self) -> ArrayRef { + Arc::new(self.buffer.finish()) + } +} + /// An `Iterator>` that yields [`RecordBatch`] /// read from a parquet data source pub struct ParquetRecordBatchReader { @@ -691,6 +760,7 @@ pub struct ParquetRecordBatchReader { array_reader: Box, schema: SchemaRef, selection: Option>, + row_id: Option, } impl Iterator for ParquetRecordBatchReader { @@ -708,6 +778,10 @@ impl Iterator for ParquetRecordBatchReader { Err(e) => return Some(Err(e.into())), }; + if let Some(row_id) = self.row_id.as_mut() { + row_id.skip(skipped); + } + if skipped != front.row_count { return Some(Err(general_err!( "failed to skip rows, expected {}, got {}", @@ -738,16 +812,24 @@ impl Iterator for ParquetRecordBatchReader { }; match self.array_reader.read_records(to_read) { Ok(0) => break, - Ok(rec) => read_records += rec, + Ok(rec) => { + if let Some(rowid) = self.row_id.as_mut() { + rowid.read(rec); + } + read_records += rec + } Err(error) => return Some(Err(error.into())), } } } - None => { - if let Err(error) = self.array_reader.read_records(self.batch_size) { - return Some(Err(error.into())); + None => match self.array_reader.read_records(self.batch_size) { + Ok(n) => { + if let Some(rowid) = self.row_id.as_mut() { + rowid.read(n); + } } - } + Err(error) => return Some(Err(error.into())), + }, }; match self.array_reader.consume_batch() { @@ -761,7 +843,23 @@ impl Iterator for ParquetRecordBatchReader { match struct_array { Err(err) => Some(Err(err)), - Ok(e) => (e.len() > 0).then(|| Ok(RecordBatch::from(e))), + Ok(e) => { + if e.len() > 0 { + Some(Ok(match self.row_id.as_mut() { + Some(rowid) => { + let columns = std::iter::once(rowid.consume()) + .chain(e.columns().iter().cloned()) + .collect(); + + RecordBatch::try_new(self.schema.clone(), columns) + .expect("invalid schema") + } + None => RecordBatch::from(e), + })) + } else { + None + } + } } } } @@ -806,6 +904,7 @@ impl ParquetRecordBatchReader { array_reader, schema: Arc::new(Schema::new(levels.fields.clone())), selection: selection.map(|s| s.trim().into()), + row_id: None, }) } @@ -816,17 +915,29 @@ impl ParquetRecordBatchReader { batch_size: usize, array_reader: Box, selection: Option, + rowid: Option, ) -> Self { - let schema = match array_reader.get_data_type() { - ArrowType::Struct(ref fields) => Schema::new(fields.clone()), + let struct_fields = match array_reader.get_data_type() { + ArrowType::Struct(ref fields) => fields.clone(), _ => unreachable!("Struct array reader's data type is not struct!"), }; + let schema = match rowid.as_ref() { + Some(rowid) => { + let fields: Vec<_> = std::iter::once(rowid.field()) + .chain(struct_fields.iter().cloned()) + .collect(); + Schema::new(fields) + } + None => Schema::new(struct_fields), + }; + Self { batch_size, array_reader, schema: Arc::new(schema), selection: selection.map(|s| s.trim().into()), + row_id: rowid, } } } @@ -887,7 +998,8 @@ pub(crate) fn evaluate_predicate( input_selection: Option, predicate: &mut dyn ArrowPredicate, ) -> Result { - let reader = ParquetRecordBatchReader::new(batch_size, array_reader, input_selection.clone()); + let reader = + ParquetRecordBatchReader::new(batch_size, array_reader, input_selection.clone(), None); let mut filters = vec![]; for maybe_batch in reader { let maybe_batch = maybe_batch?; @@ -913,6 +1025,66 @@ pub(crate) fn evaluate_predicate( }) } +/// Maximum number of bytes that can be evaluated in a row filter +/// before yielding back to the scheduler +const DECODE_BUDGET: usize = 2 * 1024 * 1024; + +/// Evaluates an [`ArrowPredicate`], returning a [`RowSelection`] indicating +/// which rows to return. +/// +/// `input_selection`: Optional pre-existing selection. If `Some`, then the +/// final [`RowSelection`] will be the conjunction of it and the rows selected +/// by `predicate`. +/// +/// Note: A pre-existing selection may come from evaluating a previous predicate +/// or if the [`ParquetRecordBatchReader`] specified an explicit +/// [`RowSelection`] in addition to one or more predicates. +#[allow(dead_code)] +pub(crate) async fn evaluate_predicate_coop( + batch_size: usize, + array_reader: Box, + input_selection: Option, + predicate: &mut dyn ArrowPredicate, +) -> Result { + let mut budget = DECODE_BUDGET; + + let reader = + ParquetRecordBatchReader::new(batch_size, array_reader, input_selection.clone(), None); + let mut filters = vec![]; + for maybe_batch in reader { + let maybe_batch = maybe_batch?; + budget = budget.saturating_sub(maybe_batch.get_array_memory_size()); + + let input_rows = maybe_batch.num_rows(); + let filter = predicate.evaluate(maybe_batch)?; + // Since user supplied predicate, check error here to catch bugs quickly + if filter.len() != input_rows { + return Err(arrow_err!( + "ArrowPredicate predicate returned {} rows, expected {input_rows}", + filter.len() + )); + } + match filter.null_count() { + 0 => filters.push(filter), + _ => filters.push(prep_null_mask_filter(&filter)), + }; + + if budget == 0 { + // If we have consumed our decode budget, reset the budget and yield + // back to the scheduler + budget = DECODE_BUDGET; + #[cfg(feature = "async")] + tokio::task::yield_now().await; + } + } + + let raw = RowSelection::from_filters(&filters); + Ok(match input_selection { + Some(selection) => selection.and_then(&raw), + None => raw, + }) +} + #[cfg(test)] mod tests { use std::cmp::min; diff --git a/parquet/src/arrow/arrow_writer/levels.rs b/parquet/src/arrow/arrow_writer/levels.rs index e4662b8f316c..9b5aba2d3af4 100644 --- a/parquet/src/arrow/arrow_writer/levels.rs +++ b/parquet/src/arrow/arrow_writer/levels.rs @@ -566,6 +566,8 @@ pub(crate) struct ArrayLevels { /// The arrow array array: ArrayRef, + + def_levels_runs: Option>, } impl PartialEq for ArrayLevels { @@ -595,6 +597,7 @@ impl ArrayLevels { max_def_level, max_rep_level, array, + def_levels_runs: (max_def_level != 0).then(Vec::new), } } @@ -668,6 +671,7 @@ mod tests { max_def_level: 2, max_rep_level: 2, array: Arc::new(primitives), + def_levels_runs: None, }; assert_eq!(&levels[0], &expected); } @@ -688,6 +692,7 @@ mod tests { max_def_level: 0, max_rep_level: 0, array, + def_levels_runs: None, }; assert_eq!(&levels[0], &expected_levels); } @@ -714,6 +719,7 @@ mod tests { max_def_level: 1, max_rep_level: 0, array, + def_levels_runs: None, }; assert_eq!(&levels[0], &expected_levels); } @@ -748,6 +754,7 @@ mod tests { max_def_level: 1, max_rep_level: 1, array: Arc::new(leaf_array), + def_levels_runs: None, }; assert_eq!(&levels[0], &expected_levels); @@ -781,6 +788,7 @@ mod tests { max_def_level: 2, max_rep_level: 1, array: Arc::new(leaf_array), + def_levels_runs: None, }; assert_eq!(&levels[0], &expected_levels); } @@ -830,6 +838,7 @@ mod tests { max_def_level: 3, max_rep_level: 1, array: Arc::new(leaf), + def_levels_runs: None, }; assert_eq!(&levels[0], &expected_levels); @@ -880,6 +889,7 @@ mod tests { max_def_level: 5, max_rep_level: 2, array: Arc::new(leaf), + def_levels_runs: None, }; assert_eq!(&levels[0], &expected_levels); @@ -917,6 +927,7 @@ mod tests { max_def_level: 1, max_rep_level: 1, array: Arc::new(leaf), + def_levels_runs: None, }; assert_eq!(&levels[0], &expected_levels); @@ -949,6 +960,7 @@ mod tests { max_def_level: 3, max_rep_level: 1, array: Arc::new(leaf), + def_levels_runs: None, }; assert_eq!(&levels[0], &expected_levels); @@ -997,6 +1009,7 @@ mod tests { max_def_level: 5, max_rep_level: 2, array: Arc::new(leaf), + def_levels_runs: None, }; assert_eq!(&levels[0], &expected_levels); } @@ -1036,6 +1049,7 @@ mod tests { max_def_level: 3, max_rep_level: 0, array: leaf, + def_levels_runs: None, }; assert_eq!(&levels[0], &expected_levels); } @@ -1075,6 +1089,7 @@ mod tests { max_def_level: 3, max_rep_level: 1, array: Arc::new(a_values), + def_levels_runs: None, }; assert_eq!(list_level, &expected_level); } @@ -1167,6 +1182,7 @@ mod tests { max_def_level: 0, max_rep_level: 0, array: Arc::new(a), + def_levels_runs: None, }; assert_eq!(list_level, &expected_level); @@ -1180,6 +1196,7 @@ mod tests { max_def_level: 1, max_rep_level: 0, array: Arc::new(b), + def_levels_runs: None, }; assert_eq!(list_level, &expected_level); @@ -1193,6 +1210,7 @@ mod tests { max_def_level: 2, max_rep_level: 0, array: Arc::new(d), + def_levels_runs: None, }; assert_eq!(list_level, &expected_level); @@ -1206,6 +1224,7 @@ mod tests { max_def_level: 3, max_rep_level: 0, array: Arc::new(f), + def_levels_runs: None, }; assert_eq!(list_level, &expected_level); } @@ -1312,6 +1331,7 @@ mod tests { max_def_level: 1, max_rep_level: 1, array: map.keys().clone(), + def_levels_runs: None, }; assert_eq!(list_level, &expected_level); @@ -1325,6 +1345,7 @@ mod tests { max_def_level: 2, max_rep_level: 1, array: map.values().clone(), + def_levels_runs: None, }; assert_eq!(list_level, &expected_level); } @@ -1410,6 +1431,7 @@ mod tests { max_def_level: 4, max_rep_level: 1, array: values, + def_levels_runs: None, }; assert_eq!(list_level, &expected_level); @@ -1450,6 +1472,7 @@ mod tests { max_def_level: 4, max_rep_level: 1, array: values, + def_levels_runs: None, }; assert_eq!(&levels[0], &expected_level); @@ -1535,6 +1558,7 @@ mod tests { max_def_level: 6, max_rep_level: 2, array: a1_values, + def_levels_runs: None, }; assert_eq!(&levels[0], &expected_level); @@ -1546,6 +1570,7 @@ mod tests { max_def_level: 4, max_rep_level: 1, array: a2_values, + def_levels_runs: None, }; assert_eq!(&levels[1], &expected_level); @@ -1584,6 +1609,7 @@ mod tests { max_def_level: 3, max_rep_level: 1, array: values, + def_levels_runs: None, }; assert_eq!(list_level, &expected_level); } @@ -1734,6 +1760,7 @@ mod tests { max_def_level: 4, max_rep_level: 1, array: values_a, + def_levels_runs: None, }; // [[{b: 2}, null], null, [null, null], [{b: 3}, {b: 4}]] let expected_b = ArrayLevels { @@ -1743,6 +1770,7 @@ mod tests { max_def_level: 3, max_rep_level: 1, array: values_b, + def_levels_runs: None, }; assert_eq!(a_levels, &expected_a); @@ -1774,6 +1802,7 @@ mod tests { max_def_level: 3, max_rep_level: 1, array: values, + def_levels_runs: None, }; assert_eq!(list_level, &expected_level); } @@ -1809,6 +1838,7 @@ mod tests { max_def_level: 5, max_rep_level: 2, array: values, + def_levels_runs: None, }; assert_eq!(levels[0], expected_level); @@ -1839,6 +1869,7 @@ mod tests { max_def_level: 1, max_rep_level: 0, array: Arc::new(dict), + def_levels_runs: None, }; assert_eq!(levels[0], expected_level); } diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 2c8a59399de1..3d670d76acef 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -38,12 +38,12 @@ use futures::stream::Stream; use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; use arrow_array::RecordBatch; -use arrow_schema::{DataType, Fields, Schema, SchemaRef}; +use arrow_schema::{DataType, FieldRef, Fields, Schema, SchemaRef}; use crate::arrow::array_reader::{build_array_reader, RowGroups}; use crate::arrow::arrow_reader::{ - apply_range, evaluate_predicate, selects_any, ArrowReaderBuilder, ArrowReaderMetadata, - ArrowReaderOptions, ParquetRecordBatchReader, RowFilter, RowSelection, + apply_range, evaluate_predicate_coop, selects_any, ArrowReaderBuilder, ArrowReaderMetadata, + ArrowReaderOptions, ParquetRecordBatchReader, RowFilter, RowId, RowSelection, }; use crate::arrow::ProjectionMask; @@ -487,6 +487,7 @@ impl ParquetRecordBatchStreamBuilder { let batch_size = self .batch_size .min(self.metadata.file_metadata().num_rows() as usize); + let reader = ReaderFactory { input: self.input.0, filter: self.filter, @@ -494,17 +495,28 @@ impl ParquetRecordBatchStreamBuilder { fields: self.fields, limit: self.limit, offset: self.offset, + rowid: self.row_id.clone(), + prefetch: self.prefetch, }; // Ensure schema of ParquetRecordBatchStream respects projection, and does // not store metadata (same as for ParquetRecordBatchReader and emitted RecordBatches) - let projected_fields = match reader.fields.as_deref().map(|pf| &pf.arrow_type) { + let mut projected_fields = match reader.fields.as_deref().map(|pf| &pf.arrow_type) { Some(DataType::Struct(fields)) => { fields.filter_leaves(|idx, _| self.projection.leaf_included(idx)) } None => Fields::empty(), _ => unreachable!("Must be Struct for root type"), }; + + if let Some(field) = &self.row_id { + projected_fields = Fields::from( + std::iter::once(field.clone()) + .chain(projected_fields.iter().cloned()) + .collect::>(), + ); + } + let schema = Arc::new(Schema::new(projected_fields)); Ok(ParquetRecordBatchStream { @@ -536,6 +548,10 @@ struct ReaderFactory { limit: Option, offset: Option, + + rowid: Option, + + prefetch: Option, } impl ReaderFactory @@ -578,18 +594,26 @@ where let predicate_projection = predicate.projection(); row_group - .fetch(&mut self.input, predicate_projection, selection.as_ref()) + .fetch( + &mut self.input, + predicate_projection, + self.prefetch.as_ref(), + selection.as_ref(), + ) .await?; let array_reader = build_array_reader(self.fields.as_deref(), predicate_projection, &row_group)?; - selection = Some(evaluate_predicate( - batch_size, - array_reader, - selection, - predicate.as_mut(), - )?); + selection = Some( + evaluate_predicate_coop( + batch_size, + array_reader, + selection, + predicate.as_mut(), + ) + .await?, + ); } } @@ -627,13 +651,22 @@ where } row_group - .fetch(&mut self.input, &projection, selection.as_ref()) + .fetch(&mut self.input, &projection, None, selection.as_ref()) .await?; + let rowid = self.rowid.clone().map(|field| { + let offset = self.metadata.row_groups()[..row_group_idx] + .iter() + .map(|rg| rg.num_rows() as u64) + .sum::(); + RowId::new(offset, field, batch_size) + }); + let reader = ParquetRecordBatchReader::new( batch_size, build_array_reader(self.fields.as_deref(), &projection, &row_group)?, selection, + rowid, ); Ok((self, Some(reader))) @@ -861,6 +894,7 @@ impl InMemoryRowGroup<'_> { &mut self, input: &mut T, projection: &ProjectionMask, + prefetch: Option<&ProjectionMask>, selection: Option<&RowSelection>, ) -> Result<()> { if let Some((selection, offset_index)) = selection.zip(self.offset_index) { @@ -874,7 +908,9 @@ impl InMemoryRowGroup<'_> { .zip(self.metadata.columns()) .enumerate() .filter(|&(idx, (chunk, _chunk_meta))| { - chunk.is_none() && projection.leaf_included(idx) + chunk.is_none() + && (projection.leaf_included(idx) + || prefetch.is_some_and(|p| p.leaf_included(idx))) }) .flat_map(|(idx, (_chunk, chunk_meta))| { // If the first page does not start at the beginning of the column, @@ -899,7 +935,10 @@ impl InMemoryRowGroup<'_> { let mut page_start_offsets = page_start_offsets.into_iter(); for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { - if chunk.is_some() || !projection.leaf_included(idx) { + if chunk.is_some() + || !(projection.leaf_included(idx) + || prefetch.is_some_and(|p| p.leaf_included(idx))) + { continue; } @@ -920,7 +959,11 @@ impl InMemoryRowGroup<'_> { .column_chunks .iter() .enumerate() - .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx)) + .filter(|&(idx, chunk)| { + chunk.is_none() + && (projection.leaf_included(idx) + || prefetch.is_some_and(|p| p.leaf_included(idx))) + }) .map(|(idx, _chunk)| { let column = self.metadata.column(idx); let (start, length) = column.byte_range(); @@ -931,7 +974,10 @@ impl InMemoryRowGroup<'_> { let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { - if chunk.is_some() || !projection.leaf_included(idx) { + if chunk.is_some() + || !(projection.leaf_included(idx) + || prefetch.is_some_and(|p| p.leaf_included(idx))) + { continue; } @@ -1061,14 +1107,15 @@ mod tests { use crate::file::properties::WriterProperties; use arrow::compute::kernels::cmp::eq; use arrow::error::Result as ArrowResult; - use arrow_array::builder::{ListBuilder, StringBuilder}; + use arrow_array::builder::{ListBuilder, StringBuilder, UInt64Builder}; use arrow_array::cast::AsArray; - use arrow_array::types::Int32Type; + use arrow_array::types::{Int32Type, UInt64Type}; use arrow_array::{ Array, ArrayRef, Int32Array, Int8Array, RecordBatchReader, Scalar, StringArray, StructArray, UInt64Array, }; use arrow_schema::{DataType, Field, Schema}; + use arrow_select::concat::concat; use futures::{StreamExt, TryStreamExt}; use rand::{thread_rng, Rng}; use std::collections::HashMap; @@ -1080,6 +1127,7 @@ mod tests { data: Bytes, metadata: Arc, requests: Arc>>>, + max_concurrent_requests: Arc>, } impl AsyncFileReader for TestReader { @@ -1088,6 +1136,23 @@ mod tests { futures::future::ready(Ok(self.data.slice(range))).boxed() } + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> BoxFuture<'_, Result>> { + self.requests.lock().unwrap().extend(ranges.clone()); + let mut max = self.max_concurrent_requests.lock().unwrap(); + if ranges.len() > *max { + *max = ranges.len(); + } + + let mut results = Vec::with_capacity(ranges.len()); + for range in ranges { + results.push(self.data.slice(range)); + } + futures::future::ready(Ok(results)).boxed() + } + fn get_metadata(&mut self) -> BoxFuture<'_, Result>> { futures::future::ready(Ok(self.metadata.clone())).boxed() } @@ -1110,6 +1175,7 @@ mod tests { data: data.clone(), metadata: metadata.clone(), requests: Default::default(), + max_concurrent_requests: Default::default(), }; let requests = async_reader.requests.clone(); @@ -1167,6 +1233,7 @@ mod tests { data: data.clone(), metadata: metadata.clone(), requests: Default::default(), + max_concurrent_requests: Default::default(), }; let requests = async_reader.requests.clone(); @@ -1215,6 +1282,84 @@ mod tests { ); } + #[tokio::test] + async fn test_async_reader_with_rowid() { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{testdata}/alltypes_plain.parquet"); + let data = Bytes::from(std::fs::read(path).unwrap()); + + let metadata = ParquetMetaDataReader::new() + .parse_and_finish(&data) + .unwrap(); + let metadata = Arc::new(metadata); + + assert_eq!(metadata.num_row_groups(), 1); + + let async_reader = TestReader { + data: data.clone(), + metadata: metadata.clone(), + requests: Default::default(), + max_concurrent_requests: Default::default(), + }; + + let requests = async_reader.requests.clone(); + let builder = ParquetRecordBatchStreamBuilder::new(async_reader) + .await + .unwrap(); + + let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]); + let stream = builder + .with_projection(mask.clone()) + .with_batch_size(1024) + .with_row_id("_rowid") + .build() + .unwrap(); + + assert_eq!( + stream + .schema() + .fields() + .first() + .expect("no fields in schema") + .name(), + "_rowid" + ); + + let async_batches: Vec<_> = stream.try_collect().await.unwrap(); + + assert!(async_batches.iter().all(|batch| { + batch + .schema() + .fields() + .first() + .expect("no fields in schema") + .name() + == "_rowid" + })); + + let rowid_arrays = async_batches + .iter() + .map(|batch| batch.column(0).as_ref()) + .collect::>(); + let rowids = concat(&rowid_arrays).expect("concat rowids"); + + let expected_rowids = UInt64Array::from_iter_values(0..rowids.len() as u64); + + assert_eq!(rowids.as_primitive::(), &expected_rowids); + + let requests = requests.lock().unwrap(); + let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range(); + let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range(); + + assert_eq!( + &requests[..], + &[ + offset_1 as usize..(offset_1 + length_1) as usize, + offset_2 as usize..(offset_2 + length_2) as usize + ] + ); + } + #[tokio::test] async fn test_async_reader_with_index() { let testdata = arrow::util::test_util::parquet_test_data(); @@ -1232,6 +1377,7 @@ mod tests { data: data.clone(), metadata: metadata.clone(), requests: Default::default(), + max_concurrent_requests: Default::default(), }; let options = ArrowReaderOptions::new().with_page_index(true); @@ -1283,6 +1429,61 @@ mod tests { assert_eq!(async_batches, sync_batches); } + #[tokio::test] + async fn test_async_reader_with_rowid_offset() { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet"); + let data = Bytes::from(std::fs::read(path).unwrap()); + + let metadata = ParquetMetaDataReader::new() + .parse_and_finish(&data) + .unwrap(); + let metadata = Arc::new(metadata); + + assert_eq!(metadata.num_row_groups(), 1); + + let async_reader = TestReader { + data: data.clone(), + metadata: metadata.clone(), + requests: Default::default(), + max_concurrent_requests: Default::default(), + }; + + let builder = ParquetRecordBatchStreamBuilder::new(async_reader) + .await + .unwrap(); + + let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]); + let stream = builder + .with_projection(mask.clone()) + .with_batch_size(1024) + .with_offset(3) + .with_row_id("_rowid") + .build() + .unwrap(); + + let async_batches: Vec<_> = stream.try_collect().await.unwrap(); + + assert!(async_batches.iter().all(|batch| { + batch + .schema() + .fields() + .first() + .expect("no fields in schema") + .name() + == "_rowid" + })); + + let rowid_arrays = async_batches + .iter() + .map(|batch| batch.column(0).as_ref()) + .collect::>(); + let rowids = concat(&rowid_arrays).expect("concat rowids"); + + let expected_rowids = UInt64Array::from_iter_values(3..rowids.len() as u64 + 3); + assert_eq!(rowids.as_primitive::(), &expected_rowids); + } + #[tokio::test] async fn test_async_reader_with_limit() { let testdata = arrow::util::test_util::parquet_test_data(); @@ -1300,6 +1501,7 @@ mod tests { data: data.clone(), metadata: metadata.clone(), requests: Default::default(), + max_concurrent_requests: Default::default(), }; let builder = ParquetRecordBatchStreamBuilder::new(async_reader) @@ -1346,6 +1548,7 @@ mod tests { data: data.clone(), metadata: metadata.clone(), requests: Default::default(), + max_concurrent_requests: Default::default(), }; let options = ArrowReaderOptions::new().with_page_index(true); @@ -1429,6 +1632,7 @@ mod tests { data: data.clone(), metadata: metadata.clone(), requests: Default::default(), + max_concurrent_requests: Default::default(), }; let options = ArrowReaderOptions::new().with_page_index(true); @@ -1453,6 +1657,104 @@ mod tests { } } + #[tokio::test] + async fn test_fuzz_async_reader_with_rowid_and_selection() { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet"); + let data = Bytes::from(std::fs::read(path).unwrap()); + + let metadata = ParquetMetaDataReader::new() + .parse_and_finish(&data) + .unwrap(); + let metadata = Arc::new(metadata); + + assert_eq!(metadata.num_row_groups(), 1); + + let mut rand = thread_rng(); + + for _ in 0..100 { + let mut expected_rowids_builder = UInt64Builder::new(); + let mut offset = 0; + + let mut expected_rows = 0; + let mut total_rows = 0; + let mut skip = false; + let mut selectors = vec![]; + + while total_rows < 7300 { + let row_count: usize = rand.gen_range(1..100); + + let row_count = row_count.min(7300 - total_rows); + + selectors.push(RowSelector { row_count, skip }); + + total_rows += row_count; + if !skip { + expected_rowids_builder.append_slice( + (offset..offset + row_count as u64) + .collect::>() + .as_slice(), + ); + expected_rows += row_count; + } + + offset += row_count as u64; + + skip = !skip; + } + + let selection = RowSelection::from(selectors); + + let async_reader = TestReader { + data: data.clone(), + metadata: metadata.clone(), + requests: Default::default(), + max_concurrent_requests: Default::default(), + }; + + let options = ArrowReaderOptions::new().with_page_index(true); + let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options) + .await + .unwrap(); + + let col_idx: usize = rand.gen_range(0..13); + let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]); + + let stream = builder + .with_projection(mask.clone()) + .with_row_selection(selection.clone()) + .with_row_id("_rowid") + .build() + .expect("building stream"); + + let async_batches: Vec<_> = stream.try_collect().await.unwrap(); + + let expected_rowids = expected_rowids_builder.finish(); + + assert!(async_batches.iter().all(|batch| { + batch + .schema() + .fields() + .first() + .expect("no fields in schema") + .name() + == "_rowid" + })); + + let rowid_arrays = async_batches + .iter() + .map(|batch| batch.column(0).as_ref()) + .collect::>(); + let rowids = concat(&rowid_arrays).expect("concat rowids"); + + assert_eq!(rowids.as_primitive::(), &expected_rowids); + + let actual_rows: usize = async_batches.into_iter().map(|b| b.num_rows()).sum(); + + assert_eq!(actual_rows, expected_rows); + } + } + #[tokio::test] async fn test_async_reader_zero_row_selector() { //See https://github.com/apache/arrow-rs/issues/2669 @@ -1500,6 +1802,7 @@ mod tests { data: data.clone(), metadata: metadata.clone(), requests: Default::default(), + max_concurrent_requests: Default::default(), }; let options = ArrowReaderOptions::new().with_page_index(true); @@ -1550,8 +1853,10 @@ mod tests { data, metadata: Arc::new(metadata), requests: Default::default(), + max_concurrent_requests: Default::default(), }; let requests = test.requests.clone(); + let max_concurrent_requests = test.max_concurrent_requests.clone(); let a_scalar = StringArray::from_iter_values(["b"]); let a_filter = ArrowPredicateFn::new( @@ -1592,6 +1897,164 @@ mod tests { let val = col.as_any().downcast_ref::().unwrap().value(0); assert_eq!(val, 3); + // Should only have made 3 requests + assert_eq!(requests.lock().unwrap().len(), 3); + assert_eq!(*max_concurrent_requests.lock().unwrap(), 1); + } + + #[tokio::test] + async fn test_row_filter_with_prefetch() { + let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]); + let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]); + let c = Int32Array::from_iter(0..6); + let data = RecordBatch::try_from_iter([ + ("a", Arc::new(a) as ArrayRef), + ("b", Arc::new(b) as ArrayRef), + ("c", Arc::new(c) as ArrayRef), + ]) + .unwrap(); + + let mut buf = Vec::with_capacity(1024); + let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap(); + writer.write(&data).unwrap(); + writer.close().unwrap(); + + let data: Bytes = buf.into(); + let metadata = ParquetMetaDataReader::new() + .parse_and_finish(&data) + .unwrap(); + let parquet_schema = metadata.file_metadata().schema_descr_ptr(); + + let test = TestReader { + data, + metadata: Arc::new(metadata), + requests: Default::default(), + max_concurrent_requests: Default::default(), + }; + let requests = test.requests.clone(); + let max_concurrent_requests = test.max_concurrent_requests.clone(); + + let a_scalar = StringArray::from_iter_values(["b"]); + let a_filter = ArrowPredicateFn::new( + ProjectionMask::leaves(&parquet_schema, vec![0]), + move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)), + ); + + let b_scalar = StringArray::from_iter_values(["4"]); + let b_filter = ArrowPredicateFn::new( + ProjectionMask::leaves(&parquet_schema, vec![1]), + move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)), + ); + + let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]); + + let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]); + let prefetch = ProjectionMask::leaves(&parquet_schema, vec![0, 2]); + let stream = ParquetRecordBatchStreamBuilder::new(test) + .await + .unwrap() + .with_projection(mask.clone()) + .with_batch_size(1024) + .with_row_filter(filter) + .with_prefetch(Some(prefetch)) + .build() + .unwrap(); + + let batches: Vec<_> = stream.try_collect().await.unwrap(); + assert_eq!(batches.len(), 1); + + let batch = &batches[0]; + assert_eq!(batch.num_rows(), 1); + assert_eq!(batch.num_columns(), 2); + + let col = batch.column(0); + let val = col.as_any().downcast_ref::().unwrap().value(0); + assert_eq!(val, "b"); + + let col = batch.column(1); + let val = col.as_any().downcast_ref::().unwrap().value(0); + assert_eq!(val, 3); + + // Should only have made 3 requests + assert_eq!(requests.lock().unwrap().len(), 3); + assert_eq!(*max_concurrent_requests.lock().unwrap(), 2); + } + + #[tokio::test] + async fn test_async_reader_with_row_id_and_row_filter() { + let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]); + let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]); + let c = Int32Array::from_iter(0..6); + let data = RecordBatch::try_from_iter([ + ("a", Arc::new(a) as ArrayRef), + ("b", Arc::new(b) as ArrayRef), + ("c", Arc::new(c) as ArrayRef), + ]) + .unwrap(); + + let mut buf = Vec::with_capacity(1024); + let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap(); + writer.write(&data).unwrap(); + writer.close().unwrap(); + + let data: Bytes = buf.into(); + let metadata = ParquetMetaDataReader::new() + .parse_and_finish(&data) + .unwrap(); + let parquet_schema = metadata.file_metadata().schema_descr_ptr(); + + let test = TestReader { + data, + metadata: Arc::new(metadata), + requests: Default::default(), + max_concurrent_requests: Default::default(), + }; + let requests = test.requests.clone(); + + let a_scalar = StringArray::from_iter_values(["b"]); + let a_filter = ArrowPredicateFn::new( + ProjectionMask::leaves(&parquet_schema, vec![0]), + move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)), + ); + + let b_scalar = StringArray::from_iter_values(["4"]); + let b_filter = ArrowPredicateFn::new( + ProjectionMask::leaves(&parquet_schema, vec![1]), + move |batch| eq(batch.column(0), &Scalar::new(&b_scalar)), + ); + + let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]); + + let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]); + let stream = ParquetRecordBatchStreamBuilder::new(test) + .await + .unwrap() + .with_projection(mask.clone()) + .with_batch_size(1024) + .with_row_filter(filter) + .with_row_id("_rowid") + .build() + .unwrap(); + + let batches: Vec<_> = stream.try_collect().await.unwrap(); + assert_eq!(batches.len(), 1); + + let batch = &batches[0]; + assert_eq!(batch.num_rows(), 1); + assert_eq!(batch.num_columns(), 3); + + let col = batch.column(0); + let val = col.as_any().downcast_ref::().unwrap().value(0); + assert_eq!(val, 3); + + let col = batch.column(1); + let val = col.as_any().downcast_ref::().unwrap().value(0); + assert_eq!(val, "b"); + + let col = batch.column(2); + let val = col.as_any().downcast_ref::().unwrap().value(0); + assert_eq!(val, 3); + // Should only have made 3 requests assert_eq!(requests.lock().unwrap().len(), 3); } @@ -1627,6 +2090,7 @@ mod tests { data, metadata: Arc::new(metadata), requests: Default::default(), + max_concurrent_requests: Default::default(), }; let stream = ParquetRecordBatchStreamBuilder::new(test.clone()) @@ -1701,6 +2165,125 @@ mod tests { assert_eq!(col2.values(), &[4, 5]); } + #[tokio::test] + async fn test_async_reader_with_rowid_limit_multiple_row_groups() { + let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]); + let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]); + let c = Int32Array::from_iter(0..6); + let data = RecordBatch::try_from_iter([ + ("a", Arc::new(a) as ArrayRef), + ("b", Arc::new(b) as ArrayRef), + ("c", Arc::new(c) as ArrayRef), + ]) + .unwrap(); + + let mut buf = Vec::with_capacity(1024); + let props = WriterProperties::builder() + .set_max_row_group_size(3) + .build(); + let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap(); + writer.write(&data).unwrap(); + writer.close().unwrap(); + + let data: Bytes = buf.into(); + let metadata = ParquetMetaDataReader::new() + .parse_and_finish(&data) + .unwrap(); + + assert_eq!(metadata.num_row_groups(), 2); + + let test = TestReader { + data, + metadata: Arc::new(metadata), + requests: Default::default(), + max_concurrent_requests: Default::default(), + }; + + let stream = ParquetRecordBatchStreamBuilder::new(test.clone()) + .await + .unwrap() + .with_batch_size(1024) + .with_limit(4) + .with_row_id("_rowid") + .build() + .unwrap(); + + let batches: Vec<_> = stream.try_collect().await.unwrap(); + // Expect one batch for each row group + assert_eq!(batches.len(), 2); + + let batch = &batches[0]; + // First batch should contain all rows + assert_eq!(batch.num_rows(), 3); + assert_eq!(batch.num_columns(), 4); + let rowids = batch.column(0).as_primitive::(); + assert_eq!(rowids.values(), &[0, 1, 2]); + let col3 = batch.column(3).as_primitive::(); + assert_eq!(col3.values(), &[0, 1, 2]); + + let batch = &batches[1]; + // Second batch should trigger the limit and only have one row + assert_eq!(batch.num_rows(), 1); + assert_eq!(batch.num_columns(), 4); + let rowids = batch.column(0).as_primitive::(); + assert_eq!(rowids.values(), &[3]); + let col3 = batch.column(3).as_primitive::(); + assert_eq!(col3.values(), &[3]); + + let stream = ParquetRecordBatchStreamBuilder::new(test.clone()) + .await + .unwrap() + .with_offset(2) + .with_limit(3) + .with_row_id("_rowid") + .build() + .unwrap(); + + let batches: Vec<_> = stream.try_collect().await.unwrap(); + // Expect one batch for each row group + assert_eq!(batches.len(), 2); + + let batch = &batches[0]; + // First batch should contain one row + assert_eq!(batch.num_rows(), 1); + assert_eq!(batch.num_columns(), 4); + let rowids = batch.column(0).as_primitive::(); + assert_eq!(rowids.values(), &[2]); + let col3 = batch.column(3).as_primitive::(); + assert_eq!(col3.values(), &[2]); + + let batch = &batches[1]; + // Second batch should contain two rows + assert_eq!(batch.num_rows(), 2); + assert_eq!(batch.num_columns(), 4); + let rowids = batch.column(0).as_primitive::(); + assert_eq!(rowids.values(), &[3, 4]); + let col3 = batch.column(3).as_primitive::(); + assert_eq!(col3.values(), &[3, 4]); + + let stream = ParquetRecordBatchStreamBuilder::new(test.clone()) + .await + .unwrap() + .with_offset(4) + .with_limit(20) + .with_row_id("_rowid") + .build() + .unwrap(); + + let batches: Vec<_> = stream.try_collect().await.unwrap(); + // Should skip first row group + assert_eq!(batches.len(), 1); + + let batch = &batches[0]; + // First batch should contain two rows + assert_eq!(batch.num_rows(), 2); + assert_eq!(batch.num_columns(), 4); + let rowids = batch.column(0).as_primitive::(); + assert_eq!(rowids.values(), &[4, 5]); + let col3 = batch.column(3).as_primitive::(); + assert_eq!(col3.values(), &[4, 5]); + } + #[tokio::test] async fn test_row_filter_with_index() { let testdata = arrow::util::test_util::parquet_test_data(); @@ -1719,6 +2302,7 @@ mod tests { data: data.clone(), metadata: metadata.clone(), requests: Default::default(), + max_concurrent_requests: Default::default(), }; let a_filter = @@ -1787,6 +2371,7 @@ mod tests { data: data.clone(), metadata: metadata.clone(), requests: Default::default(), + max_concurrent_requests: Default::default(), }; let requests = async_reader.requests.clone(); @@ -1808,6 +2393,8 @@ mod tests { filter: None, limit: None, offset: None, + rowid: None, + prefetch: None, }; let mut skip = true; @@ -1863,6 +2450,7 @@ mod tests { data: data.clone(), metadata: metadata.clone(), requests: Default::default(), + max_concurrent_requests: Default::default(), }; let builder = ParquetRecordBatchStreamBuilder::new(async_reader) @@ -2008,6 +2596,7 @@ mod tests { data: data.clone(), metadata: metadata.clone(), requests: Default::default(), + max_concurrent_requests: Default::default(), }; let builder = ParquetRecordBatchStreamBuilder::new(async_reader) .await @@ -2045,6 +2634,7 @@ mod tests { data: data.clone(), metadata: metadata.clone(), requests: Default::default(), + max_concurrent_requests: Default::default(), }; let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader) @@ -2182,6 +2772,7 @@ mod tests { data, metadata: Arc::new(metadata), requests: Default::default(), + max_concurrent_requests: Default::default(), }; let requests = test.requests.clone(); diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 5f34f34cbb7a..08ffa1767ed1 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -355,6 +355,9 @@ pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> { data_page_boundary_descending: bool, /// (min, max) last_non_null_data_page_min_max: Option<(E::T, E::T)>, + + def_levels_runs_sink: Vec<(i16, usize)>, + num_levels: usize, } impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { @@ -419,6 +422,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { data_page_boundary_ascending: true, data_page_boundary_descending: true, last_non_null_data_page_min_max: None, + def_levels_runs_sink: vec![], + num_levels: 0, } } @@ -433,6 +438,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { max: Option<&E::T>, distinct_count: Option, ) -> Result { + if !self.def_levels_runs_sink.is_empty() { + self.add_data_page()?; + } // Check if number of definition levels is the same as number of repetition levels. if let (Some(def), Some(rep)) = (def_levels, rep_levels) { if def.len() != rep.len() { @@ -549,6 +557,111 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { ) } + pub fn write_def_level_range_batch( + &mut self, + values: &E::Values, + def_level: i16, + num_levels: usize, + min: Option<&E::T>, + max: Option<&E::T>, + distinct_count: Option, + ) -> Result { + if self.descr.max_rep_level() > 0 { + return Err(general_err!( + "Cannot write def level range when rep level > 0", + )); + } + if self.statistics_enabled == EnabledStatistics::Chunk { + match (min, max) { + (Some(min), Some(max)) => { + update_min(&self.descr, min, &mut self.column_metrics.min_column_value); + update_max(&self.descr, max, &mut self.column_metrics.max_column_value); + } + (None, Some(_)) | (Some(_), None) => { + panic!("min/max should be both set or both None") + } + (None, None) => {} + }; + } + + // We can only set the distinct count if there are no other writes + if self.encoder.num_values() == 0 { + self.column_metrics.column_distinct_count = distinct_count; + } else { + self.column_metrics.column_distinct_count = None; + } + + let mut values_offset = 0; + let mut levels_offset = 0; + let base_batch_size = self.props.write_batch_size(); + while levels_offset < num_levels { + let end_offset = num_levels.min(levels_offset + base_batch_size); + + values_offset += self.write_mini_batch_with_def_level( + values, + values_offset, + None, + end_offset - levels_offset, + def_level, + )?; + levels_offset = end_offset; + } + + // Return total number of values processed. + Ok(values_offset) + } + + fn write_mini_batch_with_def_level( + &mut self, + values: &E::Values, + values_offset: usize, + value_indices: Option<&[usize]>, + num_levels: usize, + def_level: i16, + ) -> Result { + if !self.def_levels_sink.is_empty() { + self.add_data_page()?; + } + if let Some((last_def_level, last_count)) = self.def_levels_runs_sink.last_mut() { + if *last_def_level == def_level { + *last_count += num_levels; + } else { + self.def_levels_runs_sink.push((def_level, num_levels)); + } + } else { + self.def_levels_runs_sink.push((def_level, num_levels)); + } + self.num_levels += num_levels; + let values_to_write = if def_level == self.descr.max_def_level() { + num_levels + } else { + self.page_metrics.num_page_nulls += num_levels as u64; + 0 + }; + + self.page_metrics.num_buffered_rows += num_levels as u32; + + match value_indices { + Some(indices) => { + let indices = &indices[values_offset..values_offset + values_to_write]; + self.encoder.write_gather(values, indices)?; + } + None => self.encoder.write(values, values_offset, values_to_write)?, + } + + self.page_metrics.num_buffered_values += num_levels as u32; + + if self.should_add_data_page() { + self.add_data_page()?; + } + + if self.should_dict_fallback() { + self.dict_fallback()?; + } + + Ok(values_to_write) + } + /// Returns the estimated total memory usage. /// /// Unlike [`Self::get_estimated_total_bytes`] this is an estimate @@ -1002,13 +1115,21 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { } if max_def_level > 0 { - buffer.extend_from_slice( + let encoded_levels = if self.def_levels_runs_sink.is_empty() { &self.encode_levels_v1( Encoding::RLE, &self.def_levels_sink[..], max_def_level, - )[..], - ); + )[..] + } else { + &self.encode_levels_v1_bulk( + Encoding::RLE, + &self.def_levels_runs_sink[..], + self.num_levels, + max_def_level, + )[..] + }; + buffer.extend_from_slice(encoded_levels); } buffer.extend_from_slice(&values_data.buf); @@ -1089,6 +1210,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { self.rep_levels_sink.clear(); self.def_levels_sink.clear(); self.page_metrics.new_page(); + self.def_levels_runs_sink.clear(); + self.num_levels = 0; Ok(()) } @@ -1211,6 +1334,20 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { encoder.consume() } + /// Encodes definition or repetition levels for Data Page v1. + #[inline] + fn encode_levels_v1_bulk( + &self, + encoding: Encoding, + level_runs: &[(i16, usize)], + num_levels: usize, + max_level: i16, + ) -> Vec { + let mut encoder = LevelEncoder::v1(encoding, max_level, num_levels); + encoder.put_bulk(level_runs); + encoder.consume() + } + /// Encodes definition or repetition levels for Data Page v2. /// Encoding is always RLE. #[inline] diff --git a/parquet/src/encodings/levels.rs b/parquet/src/encodings/levels.rs index 6f662b614fca..5b77fac272f1 100644 --- a/parquet/src/encodings/levels.rs +++ b/parquet/src/encodings/levels.rs @@ -108,6 +108,27 @@ impl LevelEncoder { num_encoded } + /// Put/encode levels vector into this level encoder. + /// Returns number of encoded values that are less than or equal to length of the + /// input buffer. + #[inline] + pub fn put_bulk(&mut self, runs: &[(i16, usize)]) -> usize { + let mut num_encoded = 0; + match *self { + LevelEncoder::Rle(ref mut encoder) | LevelEncoder::RleV2(ref mut encoder) => { + for &(value, count) in runs { + encoder.put_bulk(value as u64, count); + num_encoded += count; + } + encoder.flush(); + } + LevelEncoder::BitPacked(_, _) => { + unimplemented!() + } + } + num_encoded + } + /// Finalizes level encoder, flush all intermediate buffers and return resulting /// encoded buffer. Returned buffer is already truncated to encoded bytes only. #[inline] diff --git a/parquet/src/encodings/rle.rs b/parquet/src/encodings/rle.rs index d089ba7836e1..450a78206b67 100644 --- a/parquet/src/encodings/rle.rs +++ b/parquet/src/encodings/rle.rs @@ -151,6 +151,95 @@ impl RleEncoder { } } + #[inline] + pub fn put_bulk(&mut self, value: u64, count: usize) { + assert!(count > 0, "Count must be positive"); + let remaining = 8 - self.num_buffered_values; + if self.current_value == value { + if self.repeat_count >= 8 { + self.repeat_count += count; + return; + } + let n = remaining.min(count); + for _ in 0..n { + self.buffered_values[self.num_buffered_values] = value; + self.num_buffered_values += 1; + self.repeat_count += 1; + } + if self.num_buffered_values == 8 { + assert_eq!(self.bit_packed_count % 8, 0); + self.flush_buffered_values(); + if count > n { + let mut remaining_run = count - n; + // Fill buffer + for _ in 0..remaining_run.min(8) { + self.repeat_count += 1; + remaining_run -= 1; + if self.repeat_count > 8 { + break; + } + self.buffered_values[self.num_buffered_values] = value; + self.num_buffered_values += 1; + } + if self.num_buffered_values == 8 && self.repeat_count <= 8 { + // Buffered values are full. Flush them. + assert_eq!(self.bit_packed_count % 8, 0); + self.flush_buffered_values(); + } + + self.repeat_count += remaining_run; + } + } + } else { + if self.repeat_count >= 8 { + // The current RLE run has ended and we've gathered enough. Flush first. + assert_eq!( + self.bit_packed_count, 0, + "rc = {}, value = {}, num_buffered = {}, c = {count}, {:?}", + self.repeat_count, value, self.num_buffered_values, self.buffered_values + ); + self.flush_rle_run(); + } + self.current_value = value; + let n = remaining.min(count); + self.repeat_count = 0; + for _ in 0..n { + self.buffered_values[self.num_buffered_values] = value; + self.num_buffered_values += 1; + self.repeat_count += 1; + } + if self.num_buffered_values == 8 { + let mut new_count = count; + if self.repeat_count < 8 { + new_count = count - self.repeat_count; + } + // Buffered values are full. Flush them. + assert_eq!(self.bit_packed_count % 8, 0); + self.flush_buffered_values(); + if count > n { + if self.repeat_count <= 8 { + for _ in 0..(count - n).min(8) { + self.repeat_count += 1; + if self.repeat_count > 8 { + break; + } + self.buffered_values[self.num_buffered_values] = value; + self.num_buffered_values += 1; + } + if self.num_buffered_values == 8 && self.repeat_count <= 8 { + // Buffered values are full. Flush them. + assert_eq!(self.bit_packed_count % 8, 0); + self.flush_buffered_values(); + } + } + self.repeat_count = new_count; + } + } else if count > 8 { + self.repeat_count = count; + } + } + } + #[inline] #[allow(unused)] pub fn buffer(&self) -> &[u8] { diff --git a/pre-commit.sh b/pre-commit.sh index f82390e229a9..3f2d8390384a 100755 --- a/pre-commit.sh +++ b/pre-commit.sh @@ -56,12 +56,12 @@ fi # 1. cargo clippy -echo -e "$(GREEN INFO): cargo clippy ..." +#echo -e "$(GREEN INFO): cargo clippy ..." # Cargo clippy always return exit code 0, and `tee` doesn't work. # So let's just run cargo clippy. -cargo clippy -echo -e "$(GREEN INFO): cargo clippy done" +#cargo clippy +#echo -e "$(GREEN INFO): cargo clippy done" # 2. cargo fmt: format with nightly and stable.