Skip to content

Antalya 26.1 Backport of #99548 - Parallelize object storage output#1580

Open
mkmkme wants to merge 2 commits intoantalya-26.1from
backports/antalya-26.1/99548
Open

Antalya 26.1 Backport of #99548 - Parallelize object storage output#1580
mkmkme wants to merge 2 commits intoantalya-26.1from
backports/antalya-26.1/99548

Conversation

@mkmkme
Copy link
Copy Markdown
Collaborator

@mkmkme mkmkme commented Mar 26, 2026

Changelog category (leave one):

  • Performance Improvement

Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):

Improve the performance of data lakes. In previous versions, reading from object storage didn't resize the pipeline to the number of processing threads. (ClickHouse#99548 by @alexey-milovidov)

Documentation entry for user-facing changes

...

CI/CD Options

Exclude tests:

  • Fast test
  • Integration Tests
  • Stateless tests
  • Stateful tests
  • Performance tests
  • All with ASAN
  • All with TSAN
  • All with MSAN
  • All with UBSAN
  • All with Coverage
  • All with Aarch64
  • All Regression
  • Disable CI Cache

Regression jobs to run:

  • Fast suites (mostly <1h)
  • Aggregate Functions (2h)
  • Alter (1.5h)
  • Benchmark (30m)
  • ClickHouse Keeper (1h)
  • Iceberg (2h)
  • LDAP (1h)
  • Parquet (1.5h)
  • RBAC (1.5h)
  • SSL Server (1h)
  • S3 (2h)
  • S3 Export (2h)
  • Swarms (30m)
  • Tiered Storage (2h)

alexey-milovidov and others added 2 commits March 26, 2026 08:24
@github-actions
Copy link
Copy Markdown

Workflow [PR], commit [dab5942]

@mkmkme
Copy link
Copy Markdown
Collaborator Author

mkmkme commented Mar 26, 2026

AI audit note: This review comment was generated by AI (claude-4.6-sonnet-medium-thinking).

Audit update for PR #1580 (Parallelize object storage output — backport of ClickHouse#99548):

Confirmed defects:

Low: NullSource guard order inverted vs. canonical pattern
    Note: identical to upstream ClickHouse PR #99548 — not introduced by this backport.
    Impact: If pipe.empty() is ever true, a NullSource (1 port) is added before
            the resize check, allowing an unnecessary resize of the NullSource pipe
            to max_num_streams processors; produces idle/redundant processors.
    Anchor: src/Processors/QueryPlan/ReadFromObjectStorageStep.cpp / initializePipeline
    Trigger: pipe.empty() == true (empty object storage table with no matching files)
    Why defect: StorageFile.cpp and StorageURL.cpp (the canonical reference) evaluate
            output_ports *before* the NullSource fallback, so output_ports == 0 when
            the pipe is empty and the resize condition (output_ports > 0) is false —
            NullSource is never resized. The PR reverses this order: NullSource is
            inserted first, then output_ports == 1, causing a spurious resize.
            Code path difference:
              // StorageFile.cpp (correct):
              output_ports = pipe.numOutputPorts(); // 0 for empty pipe
              if (… && output_ports > 0 …) pipe.resize(max_num_streams); // skipped
              if (pipe.empty()) pipe = NullSource;
              // PR (incorrect order):
              if (pipe.empty()) pipe = NullSource;  // output_ports becomes 1
              output_ports = pipe.numOutputPorts(); // 1
              if (… && output_ports > 0 …) pipe.resize(max_num_streams); // fires
    Practical severity note: In practice, num_streams >= 1 is maintained (via the
            else { num_streams = 1; } branch), ensuring at least one source is always
            in pipes, so pipe.empty() is currently dead code — impact is theoretical
            for this codebase state. Nonetheless the ordering is demonstrably wrong
            by comparison with the reference implementations.
    Fix direction (short): Move the output_ports computation and resize block to
            *before* the pipe.empty() / NullSource fallback, mirroring StorageFile.cpp.
    Regression test direction (short): Add a stateless test with an empty S3 glob
            (no matching files) and verify EXPLAIN PIPELINE does not show a Resize node.

Low: Test uses non-standard S3 mock port 19999
    Note: identical to upstream ClickHouse PR #99548 — not introduced by this backport.
    Impact: Test 04040_object_storage_parallelize_output.sh may silently fail or
            be skipped in CI environments where mock S3 is not available on port 19999;
            all existing stateless S3/Iceberg tests use the standard mock port 11111.
    Anchor: tests/queries/0_stateless/04040_object_storage_parallelize_output.sh
    Trigger: Any CI runner that provides mock S3 only on port 11111 (the repo-wide
            standard per tests/config/config.d/storage_conf.xml and all other *.sh tests).
    Why defect: EXPLAIN PIPELINE calls initializePipeline → createIterator →
            StorageObjectStorageSource::createFileIterator, which may attempt a
            HeadObject/ListObjects against localhost:19999; a connection-refused error
            at that point causes EXPLAIN to emit an error instead of the pipeline plan,
            grep finds no match, output is empty, and the test fails vs. reference.
    Fix direction (short): Change the test URL to http://localhost:11111/… (the
            repo-standard mock endpoint) or add a conditional check for S3 mock availability.
    Regression test direction (short): Run the test in the same CI environment as
            other S3 stateless tests and verify it outputs "Resize 1 → 4".

Coverage summary:

Scope reviewed: ReadFromObjectStorageStep.cpp / .h constructor + initializePipeline
                changes; FormatFactory resize guard; new stateless test; upstream
                reference implementations (StorageFile.cpp, StorageURL.cpp, IStorage.cpp).
Categories failed: NullSource guard ordering; test environment port assumption.
Categories passed: max_num_streams initialization correctness; Setting removal
            (max_threads was unused); parallelize_output_from_storages setting existence
            and default; format guard via FormatFactory; C++ memory safety (no new
            heap ownership, no raw pointers, no use-after-move); thread safety
            (resize is local to initializePipeline, no shared-state mutation);
            distributed_aggregation_memory_efficient guard (not applicable for
            local object storage reads); integer overflow/underflow (output_ports,
            max_num_streams are size_t, comparisons are safe); rollback/partial-update
            (pipe is local, moved into pipeline.init last).
Assumptions/limits: Static analysis only; EXPLAIN PIPELINE S3 iterator behavior at
            runtime not verified; pipe.empty() dead-code status relies on num_streams>=1
            invariant holding in all callers.

Copy link
Copy Markdown

@ianton-ru ianton-ru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@Selfeer
Copy link
Copy Markdown
Collaborator

Selfeer commented Mar 27, 2026

PR #1580 — CI verification

New vs base (upstream "Checks New Fails") — NOT PR-related:

  • test_storage_delta/test_cdf.py::test_cdf[] (Integration amd_binary 2/5)
    Error: Code: 742. DELTA_KERNEL_ERROR — "Expected the first commit to have version 2, got None".
    Same Delta Lake CDF flake seen on PR Antalya 26.1 Backport of #96620 - Iceberg partitioing fix #1579 (same base branch, same shard). The PR only touches object storage read pipeline resizing — zero overlap with Delta Lake kernel code. Not caused by this PR.

  • 01079_parallel_alter_modify_zookeeper_long (Stateless amd_debug / s3 sequential)
    Error: Timeout at 600s (both original run and retry). Test does parallel ALTER MODIFY on ReplicatedMergeTree with ZooKeeper under stress.
    Sibling test 01079_parallel_alter_detach_table_zookeeper has known flakiness upstream (#97358) on the same s3 storage + sequential CI config. The PR adds read pipeline parallelization which could theoretically add contention on an already-slow debug+s3+sequential path, but the test is dominated by replication/DDL sync, not object storage scans. Only fails on one out of 17 stateless shards. Treat as pre-existing flake / CI load sensitivity, not a PR regression.

Regression New Fails — NOT PR-related:

Known Fails (45 BROKEN — all pre-existing):

Same catalogue as PR #1579: NETLINK_ERROR, KNOWN #1369, unstable upstream tests, S3 unreachable, etc.

Infrastructure:

  • Grype (Alpine image): 1 high/critical CVE on Alpine tag; non-Alpine Grype passed.

Passing suites of note:

  • All Iceberg regression (iceberg_1 + iceberg_2, x86_64 + aarch64): green — relevant given data lake performance context
  • All stateless test suites except one shard: green (16/17)
  • All integration suites except amd_binary 2/5: green (14/15)
  • S3 export partition, parquet minio, parquet AWS S3: green
  • All builds, fuzzers, stress tests, unit tests: green
  • arm_asan targeted rerun of the new test 04040: passed

Conclusion

No failures are caused by this PR. The two "new fails" are a recurring Delta Lake CDF flake (unrelated code path) and a timeout in a parallel-ALTER stress test on a slow CI config (pre-existing fragility in the 01079 test family). Regression failures (swarms, parquet unsupportednull) are the same recurring flakes across the antalya-26.1 branch.

@Selfeer Selfeer added the verified Verified by QA label Mar 27, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants