Skip to content

Conversation

@hantangwangd
Copy link
Member

Propose design to expand the current procedure architecture in presto, support defining, registering and calling procedures which need to be executed in a distributed way.

Besides, in order to demonstrate as an example and to figure out the functional boundaries among the different architecture levels, also describe the design for Iceberg to support distributed procedure as well as the design of a specific distributed procedure rewrite_data_files.

@hantangwangd hantangwangd marked this pull request as draft June 11, 2024 03:44
@hantangwangd hantangwangd force-pushed the main branch 2 times, most recently from 949d74e to a01783a Compare June 13, 2024 03:51
@hantangwangd hantangwangd marked this pull request as ready for review June 13, 2024 03:54
@hantangwangd hantangwangd changed the title [WIP]Add RFC for supporting distributed procedure Add RFC for supporting distributed procedure Jun 13, 2024
Copy link
Contributor

@rschlussel rschlussel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an interesting feature. do you have thoughts on integrating this for cpp workers as well?

@hantangwangd
Copy link
Member Author

@rschlussel Thanks for the comment. Yes, of course we should integrate this for cpp workers if it's proved to be useful and needed.

I think we can take a two-step approach for this. Firstly, we support it on java workers, confirm the feasible of the entire architecture and figure out the best functional boundary division. Then, after that, we can support it on cpp workers following the design and implementation path on java workers. What's your opinion?

Copy link
Contributor

@aditi-pandit aditi-pandit left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @hantangwangd for this RFC. Had couple of comments mainly related to Native execution related impact.

5. Similar to statements such as `create table as`/`insert`/`delete`/`refresh material view` that involve distributed processing, two related SPI methods are defined for `call distributed procedure` statement in the metadata and connector metadata interfaces. `beginCallDistributedProcedure` is used for preparation work before the start of distributed scheduling. And `finishCallDistributedProcedure` is used for transaction commitment after the completion of distributed writing.


6. As for a specific connector (such as Iceberg), in the implementation logic of method `beginCallDistributedProcedure` and `finishCallDistributedProcedure` in `ConnectorMetadata`, in addition to accomplish the common logic for this connector (such as starting a transaction, building a transaction context, committing a transaction, etc.), it should also resolve the specified distributed procedure and call its relevant method to execute the procedure-customized logic.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Native vs Java runtimes begin to diverge significantly at this point. For native runtimes its best if the distributed procedure call method is in C++. But that would mean older java procedures would need a rewrite. Do you have any particular ideas on how you will handle this for Native engine ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comment. Actually, beginCallDistributedProcedure and finishCallDistributedProcedure in ConnectorMetadata are all invoked in coordinator, so native worker do not need to handle them, that is, there is no need in native worker end to resolve the distributed procedures and invoke them.

The non-coordinator worker's responsibility in call distributed procedure is similar to that of insert into or create table as. That is, local planning CallDistributedProcedureNode to a TableWriterOperator with ExecutionWriterTarget, getting corresponding ConnectorPageSink based on this writer target, executing the data writing and finally returning the fragments page to table finish stage.

So I think there is no need to consider this issue, what do you think? Any misunderstand please let me know.

extends WriterTarget
{
private final QualifiedObjectName procedureName;
private final Object[] procedureArguments;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presume the procedureArguments have types. There could be a difference in the types supported by native vs java execution. Would be great to clarify about that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand, CallDistributedProcedureTarget is a subclass of WriterTarget which is used only in coordinator. That is, it's always used in java execution environment.

ExecuteProcedureHandle which is a subclass of ExecutionWriterTarget will be sent to worker. The handling of it is similar to InsertHandle or CreateHandle.


Among them, `TableScanNode -> FilterNode` defines the data to be processed. It's based on the target table determined by `schema` and `table_name` in the parameters, as well as the possibly existing filter conditions.

The `CallDistributedProcedureNode -> TableFinishNode` structure is similar to the `TableWriterNode -> TableFinishNode` structure, used to perform distributed data manipulation and final unified submission behavior.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be great to give more details on what row/control messages are exchanged between the CallDistricutedProcedureNode and TableFinishNode. e.g. TableWriteNode provides protocol of PartitionUpdates and CommitContext that is shared between the two.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, it's a good suggestion. I will supplement this part of the content.


#### 6. Iceberg connector's support for distributed procedure

In Iceberg, we often need to record the original data files that have been scanned and rewritten during the execution of table data operations (including deleted files that have been fully applied), and in the final submission, combine the newly generated data files due to rewriting to make some changes and transaction submissions at the metadata level.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iceberg in Native engine uses HiveConnector itself. The HiveConnector in Native engine handles both Hive and Iceberg splits. Does HiveConnector support distributed procedure ? How will it be reused or enhanced for Iceberg ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As beginCallDistributedProcedure and finishCallDistributedProcedure in ConnectorMetadata are all invoked in coordinator, I think the main change for hive to support distributed procedures is in java implementation.

Hive should implement beginCallDistributedProcedure and finishCallDistributedProcedure in ConnectorMetadata, customize it's own preparation and submission logic, and then implement and register its own distributed procedures. All these jobs could be done in java only.

So I think the main change in worker end logic which need to support native c++ is that it should generate and provide ConnectorPageSink based on the newly added ConnectorDistributedProcedureHandle. Please let me know if there are any omissions.

3. Add a new plan node type: `CallDistributedProcedureNode`. During the analyzing and logical planning phase, construct a logical plan with the following shape for `call distributed procedure` statement:

```text
TableScanNode -> FilterNode -> CallDistributedProcedureNode -> TableFinishNode -> OutputNode
Copy link
Member

@jaystarshot jaystarshot Jun 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand correctly, but if we are going to support a general distributed procedure then why does it have to be just above TableScanNode or table finish?
If its not for a general case but specific to table layouts, then why not just a custom operator implementation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comment. As described in this RFC, compared to existing coordinator-only procedures, these newly expanded kind of procedures which need to be executed distributively involve operations on table's data other than metadata. For example, these procedures can rewrite table data, merge small data files, sort table data, repartition table data etc. So the common action process of them includes reading data from the target table, doing some filtering and translation, writing the translated data to files and submitting the changes in metadata level. So I think the general distributed procedure could be planned to a plan tree like this. (Or adding some other plan nodes like JoinNode/AggregationNode etc. for functional expansion in future)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just a custom operator implementation?

After optimization and plan fragmentation, we should be able to shuffle data between stages to utilize the capabilities of the entire cluster.


## Summary

Propose design to expand the current procedure architecture in presto, support defining, registering and calling procedures which need to be executed in a distributed way.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also define what you mean by procedure?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. Strictly speaking, maybe they should be called connector specific system procedures. I'll add some explanations here.

@hantangwangd
Copy link
Member Author

This RFC is ready for review, the comments are all handled, and the relevant implementation can be viewed here: prestodb/presto#22659. Please take a look when convenient, thanks! cc: @rschlussel @aditi-pandit @jaystarshot @tdcmeehan @ZacBlanco @yingsu00

* Acquire and use procedure registry in `presto-analyzer` and `connectors` module


2. Define a new query type `CALL_DISTRIBUTED_PROCEDURE`, and associate the call distributed procedure statement with this type during preparer phase. So that this kind of statements would fall into the path of `SqlQueryExecutionFactory.createQueryExecution(...)`. The generated `SqlQueryExecution` object can utilize the existing distributed querying, writing, and final committing mechanism to achieve distributed execution for procedures.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its not clear what is the query syntax proposed. Are you proposing a CALL statement or ALTER TABLE ? Can you give some sample examples with their statements here. It would be great to specify the statement syntax specifying what parameters will be and their behavior/limitations.

In the future sections a table and filter are used. How are they specified in the statement used by the end user ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your suggestion. I will supplement this part of the content. The following is a brief explanation.

We propose the syntax of Call statement for the reason described in line 24 to line 28 (https://github.com/prestodb/rfcs/pull/12/files#diff-7655a74167c417d09fba4cc48ec39232f65d7056a449e1cb09d51664f4fe1b7eR24-R28), for example:

CALL iceberg.system.rewrite_data_files('db', 'sample');
CALL iceberg.system.rewrite_data_files(schema => 'db', table_name => 'sample');
CALL iceberg.system.rewrite_data_files('db', 'sample', 'partition_key = 1');
CALL iceberg.system.rewrite_data_files(schema => 'db', table_name => 'sample', filter => 'partition_key = 1');

For all distributed procedures, the implementation of their base class defaults to including parameters schema, table_name and filter. Among them, schema and table_name are required parameters, which specify the target table to be processed by this distributed procedure. While filter is an optional parameter, which indicates the filtering conditions for the data to be processed.

The specific procedure implementations can expand the parameter list, but they must include the required parameters schema and table_name.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3. Add a new plan node type: `CallDistributedProcedureNode`. During the analyzing and logical planning phase, construct a logical plan with the following shape for `call distributed procedure` statement:

```text
TableScanNode -> FilterNode -> CallDistributedProcedureNode -> TableFinishNode -> OutputNode
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its interesting that this is rooted in a TableScanNode. Is this of a regular table or something else ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a regular table representing the target table. As mentioned above, the target table is specified through the parameters schema and table_name.

![Distributed_procedure_architecture](RFC-0006/distributed_procedure.png)


4. The optimizing, segmenting, group execution tagging, and local planning of `CallDistributedProcedureNode` are similar to `TableWriterNode`. And it would be ultimately local planned to a `TableWriterOperator` (which holds a specific type of `ExecutionWriterTarget` subclass related to `call distributed procedure` statement). When creating a `PageSink` to execute data writing, a corresponding `ConnectorPageSink` will be generated based on the specific subclass and property values of `ExecutionWriterTarget` that are actually held by `TableWriterOperator`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConnectorPageSink will need a Prestissimo implementation as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not so familiar with Prestissimo implementation. Are we currently reusing the implementation of ConnectorPageSink from Hive in native worker? If so, yes maybe it's better to implement an Iceberg own ConnectorPageSink.

![Distributed_procedure_architecture](RFC-0006/distributed_procedure.png)


4. The optimizing, segmenting, group execution tagging, and local planning of `CallDistributedProcedureNode` are similar to `TableWriterNode`. And it would be ultimately local planned to a `TableWriterOperator` (which holds a specific type of `ExecutionWriterTarget` subclass related to `call distributed procedure` statement). When creating a `PageSink` to execute data writing, a corresponding `ConnectorPageSink` will be generated based on the specific subclass and property values of `ExecutionWriterTarget` that are actually held by `TableWriterOperator`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will be the fields and members of ExecutionWriterTarget ?

TableWriter partitions and bucketizes input rows to files. This is a very specfic action whereas ExecutionWriterTarget is very generic. Are there specific actions like say a rewrite action or optimize action that we will implement in code that could be specified by ExecutionWriterTarget ? If yes, then we will need to implement the same logic in Native engine as well.

Copy link
Member Author

@hantangwangd hantangwangd Aug 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The detailed description for ExecuteProcedureHandle (which is an implementation class of interface ExecutionWriterTarget) and it's members is in line 252 to line 273 (https://github.com/prestodb/rfcs/pull/12/files#diff-7655a74167c417d09fba4cc48ec39232f65d7056a449e1cb09d51664f4fe1b7eR252-R273) and line 357 to line 372 (https://github.com/prestodb/rfcs/pull/12/files#diff-7655a74167c417d09fba4cc48ec39232f65d7056a449e1cb09d51664f4fe1b7eR357-R372). The definitions of its fields and members are similar to InsertHandle.

Currently there is no specific logic that needs to be implemented through code in ExecuteProcedureHandle, so in order to support ExecuteProcedureHandle in native, we can basically refer to the implementation of InsertHandle. The only difference is that an additional field will appear in ExecuteProcedureHandle: QualifiedObjectName procedureName.

4. The optimizing, segmenting, group execution tagging, and local planning of `CallDistributedProcedureNode` are similar to `TableWriterNode`. And it would be ultimately local planned to a `TableWriterOperator` (which holds a specific type of `ExecutionWriterTarget` subclass related to `call distributed procedure` statement). When creating a `PageSink` to execute data writing, a corresponding `ConnectorPageSink` will be generated based on the specific subclass and property values of `ExecutionWriterTarget` that are actually held by `TableWriterOperator`.


5. Similar to statements such as `create table as`/`insert`/`delete`/`refresh material view` that involve distributed processing, two related SPI methods are defined for `call distributed procedure` statement in the metadata and connector metadata interfaces. `beginCallDistributedProcedure` is used for preparation work before the start of distributed scheduling. And `finishCallDistributedProcedure` is used for transaction commitment after the completion of distributed writing.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its good to clarify if beginCallDistributedProcedure and finishCallDistributedProcedure will happen at the co-ordinator.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, will add this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

public static class CallDistributedProcedureTarget
extends WriterTarget
{
private final QualifiedObjectName procedureName;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this procedure need to be dynamically loaded ? It might be worth thinking how you will do it in C++. Are you planning to use dlopen call ? That can come with lots of security implications.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Firstly, in the current design, distributed procedures serve as Connector specific built-in system procedures, therefore they do not need to be dynamically loaded, but only need to be statically pre-defined and loaded. Dynamic loading is more like what need to be done when supporting UDF or UDP, which is beyond the scope of this proposal. And we can consider how to support it if necessary in the future.

Secondly, as discussed above, the methods of a DistributedProcedure will always be called in Java environment. Even when executing the final submission, the method FinishCallDistributedProcedure.finish(...) will be executed in operator TableFinishOperator in the Java worker inside coordinator as well. So it seems that there is currently no need to consider dynamic loading on C++ environment. Please let me know if there are any omissions or misunderstandings.

Copy link

@yingsu00 yingsu00 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hantangwangd Is there a working prototype or a PR now?

3. Add a new plan node type: `CallDistributedProcedureNode`. During the analyzing and logical planning phase, construct a logical plan with the following shape for `call distributed procedure` statement:

```text
TableScanNode -> FilterNode -> CallDistributedProcedureNode -> TableFinishNode -> OutputNode

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The operation may not necessarily update the real data files. For example, We may want to run CALL system.clean_data_cache('ALL_WORKERS'); to just call AsyncDataCache::shutdown() on all Presto C++ workers. What would the plan look like?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The operation may not necessarily update the real data files. For example, We may want to run CALL system.clean_data_cache('ALL_WORKERS'); to just call AsyncDataCache::shutdown() on all Presto C++ workers. What would the plan look like?

Thanks for the question, @yingsu00. When we need to realize the functionality you mentioned, I think we can extend the subtypes of DistributedProcedure to, for example, DistributedDataProcedure and DistributedMetadataProcedure. For the type of DistributedMetadataProcedure, we can analyze and plan it to a logical plan with the following shape:

OutputNode <- ProcedureFinishNode <- CallDistributedProcedureNode

Wherein, ProcedureFinishNode is a newly added plan node. And after being segmented, the plan only needs to include two stages. Do you think it's reasonable?

@hantangwangd
Copy link
Member Author

Is there a working prototype or a PR now?

Yes, there is already a PR, #22659. Will rebase it to the newest commit list as soon as possible.

Copy link
Contributor

@tdcmeehan tdcmeehan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hantangwangd thanks for your perseverance on this RFC.

It would be nice to sketch out the work required to execute distributed procedures in C++ clusters. It would seem like it's the following:

  1. Add CallDistributedProcedureNode to the protocol
  2. Add a new method PrestoToVeloxConnector.toVeloxInsertTableHandle that interprets the handle and creates a Velox insert handle

@hantangwangd
Copy link
Member Author

@tdcmeehan Thanks for the review and suggestion. Sure, I'll add this part soon!

@hantangwangd
Copy link
Member Author

Hi @tdcmeehan, I have updated the RFC document to include the work required for native workers to support distributed procedures. Furthermore, I have extended the current distributed procedure types to position the currently designed table-data-rewrite feature as a subtype. This could ensure the extensibility required for other types of distributed procedures in the future. Please take a look when you got a chance. Any feedback would be greatly appreciated!

@hantangwangd
Copy link
Member Author

hantangwangd commented Oct 14, 2025

@rschlussel @aditi-pandit @yingsu00 @jaystarshot thanks for your reviews and comments.

Currently, we have expanded the type system for distributed procedures. The table-data-rewrite functionality we mainly focus on in this RFC is one specific subtype of DistributedProcedure now. Under this architecture, we can extend another subtype (e.g., DistributedMaintenanceProcedure) to support the system.clean_data_cache('ALL_WORKERS') mentioned above. During the analysis and logical planning phases, different subtypes of DistributedProcedure will follow different branching logic based on their subtypes, ultimately being planned to plan trees with different shape (or even include newly defined plan nodes).

For example, the logical plan ultimately compiled for the DistributedMaintenanceProcedure type may look like the following:

DistributedMaintenanceProcedureNode -> ProcedureFinishNode -> OutputNode

After being segmented, the plan only needs to include two stages, and the leaf one's partitioning handle should be set to ALL_WORKERS_DISTRIBUTION (a newly defined system partitioning handle).

In this way, we retain the flexibility to extend distributed procedures in the future in a structured way.

Besides, description has been added for the work required to support distributed procedures (with table-data-rewrite type) on native workers. Please take another look when you have a chance. Any feedback would be greatly appreciated!

Copy link
Contributor

@tdcmeehan tdcmeehan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Can you update the RFC number in the file name and also add it to the title, similar to other RFCs?

@hantangwangd
Copy link
Member Author

@tdcmeehan Sure, I have updated the RFC number of the file name to RFC-0021 and added it to the title. Please take a look when you have a chance. Thanks a lot!

### Detailed description

#### 1. Re-factor Procedure/ProcedureRegistry

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think its clear about how we discover these procedures on server startup... Are these like plugins ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aditi-pandit thanks for your feedback. In summary, it extends and reuses the existing procedure registration and discovery mechanism, so it follows the same pattern as existing Procedures. See the registration of the newly added RewriteDataFilesProcedure here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants