Skip to content

Defer library swap until after extension creation in test setup#8576

Closed
eaydingol wants to merge 54 commits into
mainfrom
cherry-pick-defer-libswap
Closed

Defer library swap until after extension creation in test setup#8576
eaydingol wants to merge 54 commits into
mainfrom
cherry-pick-defer-libswap

Conversation

@eaydingol
Copy link
Copy Markdown
Contributor

Cherry-pick of cfd8241 from release-14.0 to main.

Original PR: #8572

neildsh added 30 commits May 10, 2026 23:24
Phase 1 of the sorted-merge feature. This commit adds the data  structures and GUC needed by later phases, with zero behavioral changes:
- SortedMergeKey typedef in multi_physical_planner.h describing one
  sort key for the coordinator k-way merge
- useSortedMerge, sortedMergeKeys[], sortedMergeKeyCount fields on
  DistributedPlan (plan-time decision, never checked at runtime via GUC)
- sortedMergeEligible field on MultiExtendedOp (logical optimizer tag
  read by the physical planner)
- Hidden GUC citus.enable_sorted_merge (PGC_SUSET, default off,
  GUC_NO_SHOW_ALL) consulted only during planning
- Serialization in citus_outfuncs.c and deep-copy in citus_copyfuncs.c
  for all new fields

All new fields default to false/0/NULL. Existing regression tests are
unaffected.

Co-authored-by: Copilot
Phase 2 of the sorted-merge feature. Workers now sort their results
when citus.enable_sorted_merge is enabled at planning time, even for
queries without LIMIT. The plan metadata is populated so later phases
can execute the merge and set pathkeys.

Logical optimizer changes (multi_logical_optimizer.c):
- WorkerSortClauseList() gains an early-return path that pushes the
  sort clause to workers when the GUC is on and the sort is safe
  (no aggregates in ORDER BY, no non-pushable window functions,
  and either no GROUP BY or GROUP BY on partition column).
- WorkerExtendedOpNode() sets sortedMergeEligible = true when the
  worker sort clause semantically matches the original sort clause,
  using the new SortClauseListsMatch() helper.
- SortClauseListsMatch() compares tleSortGroupRef, sortop,
  nulls_first, and eqop for each pair.

Physical planner changes (multi_physical_planner.c):
- CreatePhysicalDistributedPlan() finds the worker MultiExtendedOp
  with sortedMergeEligible = true, builds SortedMergeKey metadata
  from the worker job query, and sets useSortedMerge on the plan.
- BuildSortedMergeKeys() constructs the key array from the worker
  query's SortGroupClause list and target list.

The coordinator Sort node is still present above the CustomScan
(pathkeys not set yet — that is Phase 4). Results are correct
because the redundant Sort re-sorts already-sorted data.

Co-authored-by: Copilot
Phase 3 of the sorted-merge feature. When distributedPlan->useSortedMerge
is true (set at planning time by Phase 2), the adaptive executor now:
1. Routes worker results into per-task tuple stores via a new
   PerTaskDispatchTupleDest that dispatches by task->taskId hash lookup.
   No Task fields are mutated — all state lives on DistributedExecution.
2. After all tasks complete, performs a k-way merge of the per-task stores
   into the final scanState->tuplestorestate using PostgreSQL's public
   binaryheap and SortSupport APIs.
3. Frees per-task stores after the merge.

The existing CitusExecScan/ReturnTupleFromTuplestore/CitusEndScan/
CitusReScan code paths are completely unchanged — they read from
the final tuplestore exactly as before.

New files:
- sorted_merge.h: CreatePerTaskDispatchDest, MergePerTaskStoresIntoFinalStore
- sorted_merge.c: PerTaskDispatchTupleDest with taskId->index hash routing,
  MergePerTaskStoresIntoFinalStore with binaryheap merge, MergeHeapComparator
  modeled after PG's heap_compare_slots in nodeMergeAppend.c
Modified:
- adaptive_executor.c: DistributedExecution gains useSortedMerge/perTaskStores/
  perTaskStoreCount fields. AdaptiveExecutor() branches on useSortedMerge to
  create per-task stores, then merges post-execution. EXPLAIN ANALYZE falls
  back to existing single-tuplestore path.
Safety:
- Shared TupleDestinationStats preserves citus.max_intermediate_result_size
- Per-task stores allocated in AdaptiveExecutor local memory context
  (auto-cleanup on error via PG memory context teardown)
- task->totalReceivedTupleData tracking preserved

The coordinator Sort node is still present above the CustomScan (pathkeys
not set until Phase 4). Results are correct because the redundant Sort
re-sorts already-sorted data.
  Co-authored-by: Copilot
Phase 1 of the sorted-merge feature. This commit adds the data  structures and GUC needed by later phases, with zero behavioral changes:
- SortedMergeKey typedef in multi_physical_planner.h describing one
  sort key for the coordinator k-way merge
- useSortedMerge, sortedMergeKeys[], sortedMergeKeyCount fields on
  DistributedPlan (plan-time decision, never checked at runtime via GUC)
- sortedMergeEligible field on MultiExtendedOp (logical optimizer tag
  read by the physical planner)
- Hidden GUC citus.enable_sorted_merge (PGC_SUSET, default off,
  GUC_NO_SHOW_ALL) consulted only during planning
- Serialization in citus_outfuncs.c and deep-copy in citus_copyfuncs.c
  for all new fields

All new fields default to false/0/NULL. Existing regression tests are
unaffected.

Co-authored-by: Copilot
…n indication of sorted merge in the EXPLAIN output
neildsh and others added 23 commits May 10, 2026 23:24
…shdown.sql.include into a single new multi_orderby_pushdown.sql (924 lines), deleted the three orphaned test files, applied the G3 backward-scan fix (now expects ERROR: cursor can only scan forward and rolls back), updated multi_schedule to reference the new test name, deleted four old expected output files (eager .out, streaming .out, and both .unmodified snapshots), and regenerated a single 3,170-line expected/multi_orderby_pushdown.out. The pg_regress_multi.pl already lacked the enable_streaming_sorted_merge=on line. Verification: make -C src/test/regress check-base EXTRA_TESTS='multi_orderby_pushdown' passes all 15 test
…ro task code path by returning an exhausted adapter.
…Y queries (#8529)

# Streaming sorted merge for distributed `ORDER BY`

## Summary

This PR adds a streaming sorted-merge execution path for multi-shard
`SELECT ... ORDER BY` queries. When `citus.enable_sorted_merge` is
enabled at planning time, Citus pushes safe `ORDER BY` clauses to
workers, advertises sorted output from the coordinator `CustomScan`, and
streams globally sorted tuples through a binary-heap merge over per-task
tuplestores.

The main effect is that PostgreSQL no longer needs to add a
coordinator-side `Sort` node above the Citus scan. Worker shards do
shard-local sorting in parallel, and the coordinator performs an `O(N
log K)` k-way merge, where `K` is the task count, instead of an `O(N log
N)` full-result sort.

## What changed

### Planner

- Adds hidden experimental GUC `citus.enable_sorted_merge` (currently
default `on`, `PGC_SUSET`).
- Extends worker sort pushdown beyond the existing LIMIT/DISTINCT cases
when the query is safe for sorted merge:
  - query has an `ORDER BY`,
  - window functions, if any, are pushable,
  - `ORDER BY` does not contain aggregate expressions,
  - `GROUP BY` is empty or grouped by the partitioning side.
- Records `DistributedPlan.useSortedMerge` during physical planning.
- Sets Citus CustomScan `pathkeys` to the combine query's required sort
order so PostgreSQL elides the parent Sort.
- Uses a dedicated `CustomScanMethods` entry, shown in EXPLAIN as
`Custom Scan (Citus Sorted Merge Adaptive)`.

### Executor

- Adds `sorted_merge.c` / `sorted_merge.h` with a `SortedMergeAdapter`
modeled after PostgreSQL `MergeAppend`.
- Creates one tuplestore per task and routes worker tuples directly into
the matching task store through `task->tupleDest`.
- Shares one `TupleDestinationStats` object across all per-task
destinations so `citus.max_intermediate_result_size` is enforced across
the whole result, not per task.
- Builds `SortSupportData` from the worker query sort clauses and uses a
binary heap to return the next globally sorted tuple.
- Adds a sorted-merge-specific `ExecCustomScan` callback that reads
directly from the adapter, avoiding per-row branching between adapter
and normal tuplestore paths.
- Clears cached task `tupleDest` pointers after execution so
prepared-plan reuse cannot see stale execution-local state.

### Cursor and backward-scan behavior

The streaming adapter is forward-only.

- Sorted-merge plans do not advertise
`CUSTOMPATH_SUPPORT_BACKWARD_SCAN`.
- For `SCROLL` cursors, Citus reinserts the same Material wrapper
PostgreSQL would normally add for non-backward-scannable plans. This is
necessary because Citus replaces the plan tree after
`standard_planner()`.
- Non-scroll backward fetches remain unsupported, matching cursor
semantics.
- `SortedMergeAdapterRescan()` exists as a defensive/rescan path and
rebuilds the heap from the per-task stores.

### Observability and stats

- EXPLAIN plan shape changes from `Sort -> Custom Scan (Citus Adaptive)`
to `Custom Scan (Citus Sorted Merge Adaptive)` for eligible queries.
- The executor type is tracked as `MULTI_EXECUTOR_SORTED_MERGE` so query
stats can distinguish sorted-merge executions from regular adaptive
executions.

## Query behavior

Eligible examples:

```sql
SELECT id, val
FROM distributed_table
ORDER BY id;

SELECT id, val, created_at
FROM distributed_table
WHERE created_at >= now() - interval '1 day'
ORDER BY created_at, id;

SELECT tenant_id, count(*)
FROM distributed_table
GROUP BY tenant_id
ORDER BY tenant_id;
```

Intentionally not eligible:

```sql
-- ORDER BY aggregate expression
SELECT tenant_id, sum(amount) AS total
FROM distributed_table
GROUP BY tenant_id
ORDER BY total;

-- GROUP BY non-distribution column
SELECT user_id, count(*)
FROM distributed_table
GROUP BY user_id
ORDER BY user_id;
```

## Performance summary

Numbers below are from `ssm_presentation_0505.md`, using the latest
documented run:

- PostgreSQL 18.3 release build under `$HOME/pg18-release`
- Citus built from this branch
- 1 coordinator on port 9700, 4 workers on ports 9701-9704
- 8 shards, `work_mem=64MB`, `shared_buffers=4GB`, `jit=off`
- `pgbench -n -T 60 -c $c -j $c -P 30 -p 9700 -d citus`
- 2 tables x 2 configs x 8 query shapes x 3 client counts, 3 runs per
cell
- `BASE`: `citus.enable_sorted_merge=off`
- `STREAM`: `citus.enable_sorted_merge=on`

### Query-shape rollup

| Query | Shape | STREAM faster cells | Median speedup |
|---|---|---:|---:|
| q1 | small index only scan | 1/6 | 0.99x |
| q2 | wider projection sorted by `created_at` | 6/6 | 1.03x |
| q3 | multi-key sort | 6/6 | 1.08x |
| q4 | unindexed sort key | 6/6 | 1.12x |
| q5 | full row, no LIMIT | 6/6 | 1.57x |
| q6 | full row with LIMIT | 6/6 | 1.65x |
| q7/q8 | safety counter-tests | flat | 1.00x |

### Most important performance takeaways

- The strongest wins are full-row sorted reads:
- q5 (`SELECT * ... ORDER BY event_id`) improved about **1.50x-1.63x**.
- q6 (`SELECT * ... ORDER BY event_id LIMIT 20000`) improved about
**1.64x-1.94x**.
- q4, which orders by an unindexed key, improved in every cell by about
**1.10x-1.14x**, showing the benefit of parallel worker sorts plus
coordinator merge.
- q2/q3 show smaller but consistent wins when the coordinator Sort is
not the dominant cost.
- q1 is neutral to slightly negative on the indexed table; the result is
small enough that sorted-merge overhead can outweigh the coordinator
Sort it removes.
- q7/q8 counter-tests stayed flat, validating that the planner
exclusions for aggregate `ORDER BY` and non-partition-column grouping
avoid changing those query shapes.

## Tests

- Adds `src/test/regress/sql/multi_orderby_pushdown.sql` and expected
output.
- Adds coverage for:
  - ASC/DESC, NULLS FIRST/LAST, mixed direction, multi-column ORDER BY,
  - ORDER BY non-distribution columns,
  - GROUP BY distribution column,
  - ORDER BY expressions,
- LIMIT/OFFSET, DISTINCT, DISTINCT ON, UNION ALL, CTEs, subqueries,
joins,
  - prepared statements where the GUC is toggled after plan caching,
  - non-scroll cursor backward-fetch behavior,
  - SCROLL cursor behavior via Material insertion,
  - small `work_mem`,
  - `citus.max_intermediate_result_size`,
  - MX mode.
- Updates EXPLAIN expected output where eligible queries now show
`Custom Scan (Citus Sorted Merge Adaptive)` instead of a coordinator
Sort.

## Notes / limitations

- The adapter streams from per-task tuplestores after
`RunDistributedExecution()` has drained worker output. It does not yet
interleave network reads with merge output.
- The feature is intentionally limited to safe `ORDER BY` shapes.
Aggregate `ORDER BY` and non-partition-column grouping remain on
existing planning paths.
Fix test-citus-lib-n-1: defer library swap until after extension
creation
@codecov
Copy link
Copy Markdown

codecov Bot commented May 14, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 88.75%. Comparing base (9036b48) to head (d29418a).

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #8576      +/-   ##
==========================================
- Coverage   88.75%   88.75%   -0.01%     
==========================================
  Files         288      288              
  Lines       64254    64254              
  Branches     8085     8084       -1     
==========================================
- Hits        57027    57026       -1     
+ Misses       4892     4891       -1     
- Partials     2335     2337       +2     
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@ihalatci
Copy link
Copy Markdown
Contributor

please redo the cherry pick on main

@ihalatci ihalatci closed this May 15, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants