Skip to content

Conversation

@hantangwangd
Copy link
Member

@hantangwangd hantangwangd commented May 3, 2024

Description

This PR expand the current procedure architecture in presto, support defining, registering and calling procedures which need to be executed in a distributed way. Then support distributed procedure in Iceberg connector and implement a specific procedure rewrite_data_files for it.

Referring to: prestodb/rfcs#12

The whole PR is separated into 6 parts:

  1. Re-factor ProcedureRegistry/Procedure data structure to support the creation and register of DistributedProcedure. And make sure ProcedureRegistry be available in presto-analyzer module, so that we can recognize distributed procedures in call statement during prepare and analyze stages.

  2. Handle call statement on distributed procedures in preparer stage. In this stage, we figure out the procedure's type in call statement, and define a new query type CALL_DISTRIBUTED_PROCEDURE for call distributed procedure in BuiltInPreparedQuery. In this way, call distributed procedure statement would be handled by SqlQueryExecutionFactory, then be created and handled as a SqlQueryExecution.

  3. Analyze and plan the call distributed procedure statement based on the subtype of the distributed procedure. For subtype TableDataRewriteDistributedProcedure, ultimately generate a logical plan for it as follows:

OutputNode <- TableFinishNode <- CallDistributedProcedureNode <- FilterNode <- TableScanNode
  1. Optimize, segmentation, grouped tag and local plan for the logical plan generated above. The handle logical for CallDistributedProcedureNode is similar as TableWriterNode. Besides, a new optimizer RewriteWriterTarget is added, which is placed after all optimization rules. It is used to update the TableHandle held in TableFinishNode and CallDistributedProcedureNode based on the underlying TableScanNode after the entire optimization is completed, considering the possible filter pushing down.

  2. Re-factor Iceberg connector to support call distributed procedure. Introduce Iceberg's procedure context and expand IcebergSplitManager to support split source planned by IcebergAbstractMetadata.beginCallDistributedProcedure(...). This split source will be set to procedure context, and use procedure context to hold all the files to be rewritten as well.

  3. Support Iceberg rewrite_data_files procedure. It build a customized split source, set the split source to procedure context in order to be used in IcebergSplitManager. And register a file scan task consumer to collector and hold all the scanned files into procedure context. Then finally in the commit stage, get all the data files and delete files that has been rewritten, and all the files that has been newly generated, change and commit their metadata through Iceberg table's RewriteFiles transaction.

Motivation and Context

N/A

Impact

N/A

Test Plan

  • Add test cases in each phase involving the procedure architecture expansion, including creating and registering for distributed procedures, preparing for call distributed procedure, analyzing for call distributed procedure, logical planning and optimizing for call distributed procedure, and finally add tests for the specific iceberg distributed procedure: rewrite_data_files

Contributor checklist

  • Please make sure your submission complies with our development, formatting, commit message, and attribution guidelines.
  • PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced.
  • Documented new properties (with its default value), SQL syntax, functions, or other functionality.
  • If release notes are required, they follow the release notes guidelines.
  • Adequate tests were added if applicable.
  • CI passed.

Release Notes

== NO RELEASE NOTE ==

@hantangwangd hantangwangd requested a review from presto-oss May 3, 2024 11:23
@hantangwangd hantangwangd marked this pull request as draft May 3, 2024 11:24
@hantangwangd hantangwangd force-pushed the support_call_distributed_procedure branch 4 times, most recently from 7ec819c to 9440737 Compare May 8, 2024 08:36
@hantangwangd hantangwangd force-pushed the support_call_distributed_procedure branch 4 times, most recently from f89dc40 to e796fa2 Compare May 13, 2024 12:56
@hantangwangd hantangwangd force-pushed the support_call_distributed_procedure branch 5 times, most recently from acb0351 to c3eaa96 Compare May 24, 2024 19:59
@hantangwangd hantangwangd force-pushed the support_call_distributed_procedure branch 3 times, most recently from 05de3c8 to 0dc3dbb Compare June 11, 2024 11:08
@hantangwangd hantangwangd changed the title [ForTest]Expand procedure architecture for distributed execution, and support iceberg procedure rewrite_data_files [WIP]Expand procedure architecture for distributed execution, and support iceberg procedure rewrite_data_files Jun 11, 2024
Copy link
Contributor

@steveburnett steveburnett left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the draft doc! Some nits about punctuation, formatting, and some suggested rephrasing for readability and conciseness, but the content looks good.

@hantangwangd hantangwangd force-pushed the support_call_distributed_procedure branch from 0dc3dbb to a78c41c Compare June 13, 2024 18:47
@hantangwangd
Copy link
Member Author

@steveburnett Thanks a lot for your suggestion, all be fixed. Please take a look when convenient!

steveburnett
steveburnett previously approved these changes Jun 13, 2024
Copy link
Contributor

@steveburnett steveburnett left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! (docs)

Pull updated branch, new local doc build, looks good. Thanks!

@hantangwangd hantangwangd force-pushed the support_call_distributed_procedure branch from a78c41c to 2fdbab7 Compare July 13, 2024 03:17
@hantangwangd hantangwangd force-pushed the support_call_distributed_procedure branch from 2fdbab7 to befe9a7 Compare July 31, 2024 18:17
@hantangwangd hantangwangd marked this pull request as ready for review July 31, 2024 19:28
@hantangwangd hantangwangd force-pushed the support_call_distributed_procedure branch 3 times, most recently from 3f53a40 to 0d5a811 Compare October 14, 2025 07:09
@tdcmeehan
Copy link
Contributor

tdcmeehan commented Oct 17, 2025

@hantangwangd I'm reviewing this. One quick thing, could you please add documentation in our developer guide (in develop) that explain how these distributed procedures are built and registered?

@hantangwangd
Copy link
Member Author

@tdcmeehan Thanks for the review. Sure, I'll add the relevant documentation as soon as possible.

Copy link
Contributor

@tdcmeehan tdcmeehan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good work. Well done! I've left some feedback, but it's mostly minor.

I would split this PR into at least 3 parts:

  1. All of the code in core Presto to support distributed prcoedures
  2. The C++ counterpart for this code
  3. The Iceberg integration

private final BeginCallDistributedProcedure beginCallDistributedProcedure;
private final FinishCallDistributedProcedure finishCallDistributedProcedure;

protected DistributedProcedure(DistributedProcedureType type, String schema, String name, List<Argument> arguments, BeginCallDistributedProcedure beginCallDistributedProcedure, FinishCallDistributedProcedure finishCallDistributedProcedure)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be easier to read and more idiomatic to make DistributedProcedure abstract, and make beginCallDistributedProcedure and finishCallDistributedProcedure abstract methods.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great idea, I completely agree! Initially, I didn't declare DistributedProcedure as abstract because Procedure itself wasn't declared as abstract. Now, I've made both Procedure and DistributedProcedure abstract, and introduced a LocalProcedure to represent the original coordinator-only procedures. This makes the overall procedure architecture much clearer and easier to understand.

new DynamicFiltersChecker(),
new WarnOnScanWithoutPartitionPredicate(featuresConfig));
new WarnOnScanWithoutPartitionPredicate(featuresConfig),
new CallDistributedProcedureValidator());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hantangwangd it would be nice to have plan tests, like TestHashGenerationOptimizer, that show the type of plan that gets generated by a distributed procedure.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. Since the CALL DISTRIBUTED PROCEDURE statement requires a valid distributed procedure to be invoked, and currently only one has been implemented in Iceberg connector, I've added the test case to TestIcebergLogicalPlanner. Please take a look when you have time, thanks a lot.

return source;
}

@JsonIgnore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this intentionally ignored?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is intentionally ignored. Subclasses of WriteTarget are only used during planning -- they will not be serialized.

@hantangwangd hantangwangd force-pushed the support_call_distributed_procedure branch from 8d53e4d to 106c2aa Compare October 19, 2025 15:31
@hantangwangd
Copy link
Member Author

@tdcmeehan thanks for your review and feedback. I've addressed all your comments except the one about adding documentation. Please take a look when you have time.

Very good work. Well done! I've left some feedback, but it's mostly minor.

I would split this PR into at least 3 parts:

1. All of the code in core Presto to support distributed prcoedures

2. The C++ counterpart for this code

3. The Iceberg integration

Are you suggesting that I split this into three separate PRs, or should I squash it into three commits within a single PR?

@tdcmeehan
Copy link
Contributor

@hantangwangd since we now squash commits on merge, let's make 3 separate PRs.

@hantangwangd
Copy link
Member Author

@tdcmeehan Sure, I'll do it.

Use a subclass `TableDataRewriteDistributedProcedure` for table rewrite
tasks, for example, merge small data files, sort table data,
repartition table data etc.
Accordingly rename previous ProcedureRegistry to BuiltInProcedureRegistry
… abstract classes

And introduce a new class `LocalProcedure` to represent the former
coordinator-only procedures
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants