From 9cef21bb7ac681dcd7d33a67db08ccea9c58fd52 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Mon, 6 Oct 2025 16:03:55 -0700 Subject: [PATCH] Refactor DefineDataset and DefineFlow protos to group properties together --- .../spark_connect_graph_element_registry.py | 48 ++-- .../sql/connect/proto/pipelines_pb2.py | 107 ++++---- .../sql/connect/proto/pipelines_pb2.pyi | 245 ++++++++++++------ .../protobuf/spark/connect/pipelines.proto | 53 ++-- .../connect/pipelines/PipelinesHandler.scala | 17 +- ...SparkDeclarativePipelinesServerSuite.scala | 64 +++-- .../pipelines/TestPipelineDefinition.scala | 30 ++- 7 files changed, 357 insertions(+), 207 deletions(-) diff --git a/python/pyspark/pipelines/spark_connect_graph_element_registry.py b/python/pyspark/pipelines/spark_connect_graph_element_registry.py index 8faf7eb9ef589..b90a6b863c87b 100644 --- a/python/pyspark/pipelines/spark_connect_graph_element_registry.py +++ b/python/pyspark/pipelines/spark_connect_graph_element_registry.py @@ -45,10 +45,12 @@ def __init__(self, spark: SparkSession, dataflow_graph_id: str) -> None: def register_dataset(self, dataset: Dataset) -> None: if isinstance(dataset, Table): - table_properties = dataset.table_properties - partition_cols = dataset.partition_cols - schema = None # TODO - format = dataset.format + table_details = pb2.PipelineCommand.DefineDataset.TableDetails( + table_properties=dataset.table_properties, + partition_cols=dataset.partition_cols, + schema=None, # TODO + format=dataset.format, + ) if isinstance(dataset, MaterializedView): dataset_type = pb2.DatasetType.MATERIALIZED_VIEW @@ -59,29 +61,29 @@ def register_dataset(self, dataset: Dataset) -> None: errorClass="UNSUPPORTED_PIPELINES_DATASET_TYPE", messageParameters={"dataset_type": type(dataset).__name__}, ) + + inner_command = pb2.PipelineCommand.DefineDataset( + dataflow_graph_id=self._dataflow_graph_id, + dataset_name=dataset.name, + dataset_type=dataset_type, + comment=dataset.comment, + table_details=table_details, + source_code_location=source_code_location_to_proto(dataset.source_code_location), + ) elif isinstance(dataset, TemporaryView): - table_properties = None - partition_cols = None - schema = None - format = None dataset_type = pb2.DatasetType.TEMPORARY_VIEW + inner_command = pb2.PipelineCommand.DefineDataset( + dataflow_graph_id=self._dataflow_graph_id, + dataset_name=dataset.name, + dataset_type=dataset_type, + comment=dataset.comment, + source_code_location=source_code_location_to_proto(dataset.source_code_location), + ) else: raise PySparkTypeError( errorClass="UNSUPPORTED_PIPELINES_DATASET_TYPE", messageParameters={"dataset_type": type(dataset).__name__}, ) - - inner_command = pb2.PipelineCommand.DefineDataset( - dataflow_graph_id=self._dataflow_graph_id, - dataset_name=dataset.name, - dataset_type=dataset_type, - comment=dataset.comment, - table_properties=table_properties, - partition_cols=partition_cols, - schema=schema, - format=format, - source_code_location=source_code_location_to_proto(dataset.source_code_location), - ) command = pb2.Command() command.pipeline_command.define_dataset.CopyFrom(inner_command) self._client.execute_command(command) @@ -91,11 +93,15 @@ def register_flow(self, flow: Flow) -> None: df = flow.func() relation = cast(ConnectDataFrame, df)._plan.plan(self._client) + relation_flow_details = pb2.PipelineCommand.DefineFlow.WriteRelationFlowDetails( + relation=relation, + ) + inner_command = pb2.PipelineCommand.DefineFlow( dataflow_graph_id=self._dataflow_graph_id, flow_name=flow.name, target_dataset_name=flow.target, - relation=relation, + relation_flow_details=relation_flow_details, sql_conf=flow.spark_conf, source_code_location=source_code_location_to_proto(flow.source_code_location), ) diff --git a/python/pyspark/sql/connect/proto/pipelines_pb2.py b/python/pyspark/sql/connect/proto/pipelines_pb2.py index 849d141f9c498..b59b72da35c7c 100644 --- a/python/pyspark/sql/connect/proto/pipelines_pb2.py +++ b/python/pyspark/sql/connect/proto/pipelines_pb2.py @@ -34,6 +34,7 @@ _sym_db = _symbol_database.Default() +from google.protobuf import any_pb2 as google_dot_protobuf_dot_any__pb2 from google.protobuf import timestamp_pb2 as google_dot_protobuf_dot_timestamp__pb2 from pyspark.sql.connect.proto import common_pb2 as spark_dot_connect_dot_common__pb2 from pyspark.sql.connect.proto import relations_pb2 as spark_dot_connect_dot_relations__pb2 @@ -41,7 +42,7 @@ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n\x1dspark/connect/pipelines.proto\x12\rspark.connect\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1aspark/connect/common.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\xc6\x1b\n\x0fPipelineCommand\x12h\n\x15\x63reate_dataflow_graph\x18\x01 \x01(\x0b\x32\x32.spark.connect.PipelineCommand.CreateDataflowGraphH\x00R\x13\x63reateDataflowGraph\x12U\n\x0e\x64\x65\x66ine_dataset\x18\x02 \x01(\x0b\x32,.spark.connect.PipelineCommand.DefineDatasetH\x00R\rdefineDataset\x12L\n\x0b\x64\x65\x66ine_flow\x18\x03 \x01(\x0b\x32).spark.connect.PipelineCommand.DefineFlowH\x00R\ndefineFlow\x12\x62\n\x13\x64rop_dataflow_graph\x18\x04 \x01(\x0b\x32\x30.spark.connect.PipelineCommand.DropDataflowGraphH\x00R\x11\x64ropDataflowGraph\x12\x46\n\tstart_run\x18\x05 \x01(\x0b\x32\'.spark.connect.PipelineCommand.StartRunH\x00R\x08startRun\x12r\n\x19\x64\x65\x66ine_sql_graph_elements\x18\x06 \x01(\x0b\x32\x35.spark.connect.PipelineCommand.DefineSqlGraphElementsH\x00R\x16\x64\x65\x66ineSqlGraphElements\x12\xa1\x01\n*get_query_function_execution_signal_stream\x18\x07 \x01(\x0b\x32\x44.spark.connect.PipelineCommand.GetQueryFunctionExecutionSignalStreamH\x00R%getQueryFunctionExecutionSignalStream\x12\x88\x01\n!define_flow_query_function_result\x18\x08 \x01(\x0b\x32<.spark.connect.PipelineCommand.DefineFlowQueryFunctionResultH\x00R\x1d\x64\x65\x66ineFlowQueryFunctionResult\x1a\xb4\x02\n\x13\x43reateDataflowGraph\x12,\n\x0f\x64\x65\x66\x61ult_catalog\x18\x01 \x01(\tH\x00R\x0e\x64\x65\x66\x61ultCatalog\x88\x01\x01\x12.\n\x10\x64\x65\x66\x61ult_database\x18\x02 \x01(\tH\x01R\x0f\x64\x65\x66\x61ultDatabase\x88\x01\x01\x12Z\n\x08sql_conf\x18\x05 \x03(\x0b\x32?.spark.connect.PipelineCommand.CreateDataflowGraph.SqlConfEntryR\x07sqlConf\x1a:\n\x0cSqlConfEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\x12\n\x10_default_catalogB\x13\n\x11_default_database\x1aZ\n\x11\x44ropDataflowGraph\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_id\x1a\xc4\x05\n\rDefineDataset\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12&\n\x0c\x64\x61taset_name\x18\x02 \x01(\tH\x01R\x0b\x64\x61tasetName\x88\x01\x01\x12\x42\n\x0c\x64\x61taset_type\x18\x03 \x01(\x0e\x32\x1a.spark.connect.DatasetTypeH\x02R\x0b\x64\x61tasetType\x88\x01\x01\x12\x1d\n\x07\x63omment\x18\x04 \x01(\tH\x03R\x07\x63omment\x88\x01\x01\x12l\n\x10table_properties\x18\x05 \x03(\x0b\x32\x41.spark.connect.PipelineCommand.DefineDataset.TablePropertiesEntryR\x0ftableProperties\x12%\n\x0epartition_cols\x18\x06 \x03(\tR\rpartitionCols\x12\x34\n\x06schema\x18\x07 \x01(\x0b\x32\x17.spark.connect.DataTypeH\x04R\x06schema\x88\x01\x01\x12\x1b\n\x06\x66ormat\x18\x08 \x01(\tH\x05R\x06\x66ormat\x88\x01\x01\x12X\n\x14source_code_location\x18\t \x01(\x0b\x32!.spark.connect.SourceCodeLocationH\x06R\x12sourceCodeLocation\x88\x01\x01\x1a\x42\n\x14TablePropertiesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\x14\n\x12_dataflow_graph_idB\x0f\n\r_dataset_nameB\x0f\n\r_dataset_typeB\n\n\x08_commentB\t\n\x07_schemaB\t\n\x07_formatB\x17\n\x15_source_code_location\x1a\x85\x05\n\nDefineFlow\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12 \n\tflow_name\x18\x02 \x01(\tH\x01R\x08\x66lowName\x88\x01\x01\x12\x33\n\x13target_dataset_name\x18\x03 \x01(\tH\x02R\x11targetDatasetName\x88\x01\x01\x12\x38\n\x08relation\x18\x04 \x01(\x0b\x32\x17.spark.connect.RelationH\x03R\x08relation\x88\x01\x01\x12Q\n\x08sql_conf\x18\x05 \x03(\x0b\x32\x36.spark.connect.PipelineCommand.DefineFlow.SqlConfEntryR\x07sqlConf\x12 \n\tclient_id\x18\x06 \x01(\tH\x04R\x08\x63lientId\x88\x01\x01\x12X\n\x14source_code_location\x18\x07 \x01(\x0b\x32!.spark.connect.SourceCodeLocationH\x05R\x12sourceCodeLocation\x88\x01\x01\x1a:\n\x0cSqlConfEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x1a:\n\x08Response\x12 \n\tflow_name\x18\x01 \x01(\tH\x00R\x08\x66lowName\x88\x01\x01\x42\x0c\n\n_flow_nameB\x14\n\x12_dataflow_graph_idB\x0c\n\n_flow_nameB\x16\n\x14_target_dataset_nameB\x0b\n\t_relationB\x0c\n\n_client_idB\x17\n\x15_source_code_location\x1a\x97\x02\n\x08StartRun\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12\x34\n\x16\x66ull_refresh_selection\x18\x02 \x03(\tR\x14\x66ullRefreshSelection\x12-\n\x10\x66ull_refresh_all\x18\x03 \x01(\x08H\x01R\x0e\x66ullRefreshAll\x88\x01\x01\x12+\n\x11refresh_selection\x18\x04 \x03(\tR\x10refreshSelection\x12\x15\n\x03\x64ry\x18\x05 \x01(\x08H\x02R\x03\x64ry\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x13\n\x11_full_refresh_allB\x06\n\x04_dry\x1a\xc7\x01\n\x16\x44\x65\x66ineSqlGraphElements\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12\'\n\rsql_file_path\x18\x02 \x01(\tH\x01R\x0bsqlFilePath\x88\x01\x01\x12\x1e\n\x08sql_text\x18\x03 \x01(\tH\x02R\x07sqlText\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x10\n\x0e_sql_file_pathB\x0b\n\t_sql_text\x1a\x9e\x01\n%GetQueryFunctionExecutionSignalStream\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12 \n\tclient_id\x18\x02 \x01(\tH\x01R\x08\x63lientId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x0c\n\n_client_id\x1a\xdd\x01\n\x1d\x44\x65\x66ineFlowQueryFunctionResult\x12 \n\tflow_name\x18\x01 \x01(\tH\x00R\x08\x66lowName\x88\x01\x01\x12/\n\x11\x64\x61taflow_graph_id\x18\x02 \x01(\tH\x01R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12\x38\n\x08relation\x18\x03 \x01(\x0b\x32\x17.spark.connect.RelationH\x02R\x08relation\x88\x01\x01\x42\x0c\n\n_flow_nameB\x14\n\x12_dataflow_graph_idB\x0b\n\t_relationB\x0e\n\x0c\x63ommand_type"\xf4\x05\n\x15PipelineCommandResult\x12\x81\x01\n\x1c\x63reate_dataflow_graph_result\x18\x01 \x01(\x0b\x32>.spark.connect.PipelineCommandResult.CreateDataflowGraphResultH\x00R\x19\x63reateDataflowGraphResult\x12n\n\x15\x64\x65\x66ine_dataset_result\x18\x02 \x01(\x0b\x32\x38.spark.connect.PipelineCommandResult.DefineDatasetResultH\x00R\x13\x64\x65\x66ineDatasetResult\x12\x65\n\x12\x64\x65\x66ine_flow_result\x18\x03 \x01(\x0b\x32\x35.spark.connect.PipelineCommandResult.DefineFlowResultH\x00R\x10\x64\x65\x66ineFlowResult\x1a\x62\n\x19\x43reateDataflowGraphResult\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_id\x1a\x86\x01\n\x13\x44\x65\x66ineDatasetResult\x12W\n\x13resolved_identifier\x18\x01 \x01(\x0b\x32!.spark.connect.ResolvedIdentifierH\x00R\x12resolvedIdentifier\x88\x01\x01\x42\x16\n\x14_resolved_identifier\x1a\x83\x01\n\x10\x44\x65\x66ineFlowResult\x12W\n\x13resolved_identifier\x18\x01 \x01(\x0b\x32!.spark.connect.ResolvedIdentifierH\x00R\x12resolvedIdentifier\x88\x01\x01\x42\x16\n\x14_resolved_identifierB\r\n\x0bresult_type"I\n\x13PipelineEventResult\x12\x32\n\x05\x65vent\x18\x01 \x01(\x0b\x32\x1c.spark.connect.PipelineEventR\x05\x65vent"t\n\rPipelineEvent\x12\x38\n\ttimestamp\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.TimestampR\ttimestamp\x12\x1d\n\x07message\x18\x02 \x01(\tH\x00R\x07message\x88\x01\x01\x42\n\n\x08_message"z\n\x12SourceCodeLocation\x12 \n\tfile_name\x18\x01 \x01(\tH\x00R\x08\x66ileName\x88\x01\x01\x12$\n\x0bline_number\x18\x02 \x01(\x05H\x01R\nlineNumber\x88\x01\x01\x42\x0c\n\n_file_nameB\x0e\n\x0c_line_number"E\n$PipelineQueryFunctionExecutionSignal\x12\x1d\n\nflow_names\x18\x01 \x03(\tR\tflowNames*a\n\x0b\x44\x61tasetType\x12\x1c\n\x18\x44\x41TASET_TYPE_UNSPECIFIED\x10\x00\x12\x15\n\x11MATERIALIZED_VIEW\x10\x01\x12\t\n\x05TABLE\x10\x02\x12\x12\n\x0eTEMPORARY_VIEW\x10\x03\x42\x36\n\x1eorg.apache.spark.connect.protoP\x01Z\x12internal/generatedb\x06proto3' + b'\n\x1dspark/connect/pipelines.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1aspark/connect/common.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\xe0\x1e\n\x0fPipelineCommand\x12h\n\x15\x63reate_dataflow_graph\x18\x01 \x01(\x0b\x32\x32.spark.connect.PipelineCommand.CreateDataflowGraphH\x00R\x13\x63reateDataflowGraph\x12U\n\x0e\x64\x65\x66ine_dataset\x18\x02 \x01(\x0b\x32,.spark.connect.PipelineCommand.DefineDatasetH\x00R\rdefineDataset\x12L\n\x0b\x64\x65\x66ine_flow\x18\x03 \x01(\x0b\x32).spark.connect.PipelineCommand.DefineFlowH\x00R\ndefineFlow\x12\x62\n\x13\x64rop_dataflow_graph\x18\x04 \x01(\x0b\x32\x30.spark.connect.PipelineCommand.DropDataflowGraphH\x00R\x11\x64ropDataflowGraph\x12\x46\n\tstart_run\x18\x05 \x01(\x0b\x32\'.spark.connect.PipelineCommand.StartRunH\x00R\x08startRun\x12r\n\x19\x64\x65\x66ine_sql_graph_elements\x18\x06 \x01(\x0b\x32\x35.spark.connect.PipelineCommand.DefineSqlGraphElementsH\x00R\x16\x64\x65\x66ineSqlGraphElements\x12\xa1\x01\n*get_query_function_execution_signal_stream\x18\x07 \x01(\x0b\x32\x44.spark.connect.PipelineCommand.GetQueryFunctionExecutionSignalStreamH\x00R%getQueryFunctionExecutionSignalStream\x12\x88\x01\n!define_flow_query_function_result\x18\x08 \x01(\x0b\x32<.spark.connect.PipelineCommand.DefineFlowQueryFunctionResultH\x00R\x1d\x64\x65\x66ineFlowQueryFunctionResult\x1a\xb4\x02\n\x13\x43reateDataflowGraph\x12,\n\x0f\x64\x65\x66\x61ult_catalog\x18\x01 \x01(\tH\x00R\x0e\x64\x65\x66\x61ultCatalog\x88\x01\x01\x12.\n\x10\x64\x65\x66\x61ult_database\x18\x02 \x01(\tH\x01R\x0f\x64\x65\x66\x61ultDatabase\x88\x01\x01\x12Z\n\x08sql_conf\x18\x05 \x03(\x0b\x32?.spark.connect.PipelineCommand.CreateDataflowGraph.SqlConfEntryR\x07sqlConf\x1a:\n\x0cSqlConfEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\x12\n\x10_default_catalogB\x13\n\x11_default_database\x1aZ\n\x11\x44ropDataflowGraph\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_id\x1a\x86\x07\n\rDefineDataset\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x01R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12&\n\x0c\x64\x61taset_name\x18\x02 \x01(\tH\x02R\x0b\x64\x61tasetName\x88\x01\x01\x12\x42\n\x0c\x64\x61taset_type\x18\x03 \x01(\x0e\x32\x1a.spark.connect.DatasetTypeH\x03R\x0b\x64\x61tasetType\x88\x01\x01\x12\x1d\n\x07\x63omment\x18\x04 \x01(\tH\x04R\x07\x63omment\x88\x01\x01\x12X\n\x14source_code_location\x18\x05 \x01(\x0b\x32!.spark.connect.SourceCodeLocationH\x05R\x12sourceCodeLocation\x88\x01\x01\x12`\n\rtable_details\x18\x06 \x01(\x0b\x32\x39.spark.connect.PipelineCommand.DefineDataset.TableDetailsH\x00R\x0ctableDetails\x12\x35\n\textension\x18\xe7\x07 \x01(\x0b\x32\x14.google.protobuf.AnyH\x00R\textension\x1a\xdd\x02\n\x0cTableDetails\x12y\n\x10table_properties\x18\x01 \x03(\x0b\x32N.spark.connect.PipelineCommand.DefineDataset.TableDetails.TablePropertiesEntryR\x0ftableProperties\x12%\n\x0epartition_cols\x18\x02 \x03(\tR\rpartitionCols\x12\x34\n\x06schema\x18\x03 \x01(\x0b\x32\x17.spark.connect.DataTypeH\x00R\x06schema\x88\x01\x01\x12\x1b\n\x06\x66ormat\x18\x04 \x01(\tH\x01R\x06\x66ormat\x88\x01\x01\x1a\x42\n\x14TablePropertiesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\t\n\x07_schemaB\t\n\x07_formatB\t\n\x07\x64\x65tailsB\x14\n\x12_dataflow_graph_idB\x0f\n\r_dataset_nameB\x0f\n\r_dataset_typeB\n\n\x08_commentB\x17\n\x15_source_code_location\x1a\xdd\x06\n\nDefineFlow\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x01R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12 \n\tflow_name\x18\x02 \x01(\tH\x02R\x08\x66lowName\x88\x01\x01\x12\x33\n\x13target_dataset_name\x18\x03 \x01(\tH\x03R\x11targetDatasetName\x88\x01\x01\x12Q\n\x08sql_conf\x18\x04 \x03(\x0b\x32\x36.spark.connect.PipelineCommand.DefineFlow.SqlConfEntryR\x07sqlConf\x12 \n\tclient_id\x18\x05 \x01(\tH\x04R\x08\x63lientId\x88\x01\x01\x12X\n\x14source_code_location\x18\x06 \x01(\x0b\x32!.spark.connect.SourceCodeLocationH\x05R\x12sourceCodeLocation\x88\x01\x01\x12x\n\x15relation_flow_details\x18\x07 \x01(\x0b\x32\x42.spark.connect.PipelineCommand.DefineFlow.WriteRelationFlowDetailsH\x00R\x13relationFlowDetails\x12\x35\n\textension\x18\xe7\x07 \x01(\x0b\x32\x14.google.protobuf.AnyH\x00R\textension\x1a:\n\x0cSqlConfEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x1a\x61\n\x18WriteRelationFlowDetails\x12\x38\n\x08relation\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x08relation\x88\x01\x01\x42\x0b\n\t_relation\x1a:\n\x08Response\x12 \n\tflow_name\x18\x01 \x01(\tH\x00R\x08\x66lowName\x88\x01\x01\x42\x0c\n\n_flow_nameB\t\n\x07\x64\x65tailsB\x14\n\x12_dataflow_graph_idB\x0c\n\n_flow_nameB\x16\n\x14_target_dataset_nameB\x0c\n\n_client_idB\x17\n\x15_source_code_location\x1a\x97\x02\n\x08StartRun\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12\x34\n\x16\x66ull_refresh_selection\x18\x02 \x03(\tR\x14\x66ullRefreshSelection\x12-\n\x10\x66ull_refresh_all\x18\x03 \x01(\x08H\x01R\x0e\x66ullRefreshAll\x88\x01\x01\x12+\n\x11refresh_selection\x18\x04 \x03(\tR\x10refreshSelection\x12\x15\n\x03\x64ry\x18\x05 \x01(\x08H\x02R\x03\x64ry\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x13\n\x11_full_refresh_allB\x06\n\x04_dry\x1a\xc7\x01\n\x16\x44\x65\x66ineSqlGraphElements\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12\'\n\rsql_file_path\x18\x02 \x01(\tH\x01R\x0bsqlFilePath\x88\x01\x01\x12\x1e\n\x08sql_text\x18\x03 \x01(\tH\x02R\x07sqlText\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x10\n\x0e_sql_file_pathB\x0b\n\t_sql_text\x1a\x9e\x01\n%GetQueryFunctionExecutionSignalStream\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12 \n\tclient_id\x18\x02 \x01(\tH\x01R\x08\x63lientId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x0c\n\n_client_id\x1a\xdd\x01\n\x1d\x44\x65\x66ineFlowQueryFunctionResult\x12 \n\tflow_name\x18\x01 \x01(\tH\x00R\x08\x66lowName\x88\x01\x01\x12/\n\x11\x64\x61taflow_graph_id\x18\x02 \x01(\tH\x01R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12\x38\n\x08relation\x18\x03 \x01(\x0b\x32\x17.spark.connect.RelationH\x02R\x08relation\x88\x01\x01\x42\x0c\n\n_flow_nameB\x14\n\x12_dataflow_graph_idB\x0b\n\t_relationB\x0e\n\x0c\x63ommand_type"\xf4\x05\n\x15PipelineCommandResult\x12\x81\x01\n\x1c\x63reate_dataflow_graph_result\x18\x01 \x01(\x0b\x32>.spark.connect.PipelineCommandResult.CreateDataflowGraphResultH\x00R\x19\x63reateDataflowGraphResult\x12n\n\x15\x64\x65\x66ine_dataset_result\x18\x02 \x01(\x0b\x32\x38.spark.connect.PipelineCommandResult.DefineDatasetResultH\x00R\x13\x64\x65\x66ineDatasetResult\x12\x65\n\x12\x64\x65\x66ine_flow_result\x18\x03 \x01(\x0b\x32\x35.spark.connect.PipelineCommandResult.DefineFlowResultH\x00R\x10\x64\x65\x66ineFlowResult\x1a\x62\n\x19\x43reateDataflowGraphResult\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_id\x1a\x86\x01\n\x13\x44\x65\x66ineDatasetResult\x12W\n\x13resolved_identifier\x18\x01 \x01(\x0b\x32!.spark.connect.ResolvedIdentifierH\x00R\x12resolvedIdentifier\x88\x01\x01\x42\x16\n\x14_resolved_identifier\x1a\x83\x01\n\x10\x44\x65\x66ineFlowResult\x12W\n\x13resolved_identifier\x18\x01 \x01(\x0b\x32!.spark.connect.ResolvedIdentifierH\x00R\x12resolvedIdentifier\x88\x01\x01\x42\x16\n\x14_resolved_identifierB\r\n\x0bresult_type"I\n\x13PipelineEventResult\x12\x32\n\x05\x65vent\x18\x01 \x01(\x0b\x32\x1c.spark.connect.PipelineEventR\x05\x65vent"t\n\rPipelineEvent\x12\x38\n\ttimestamp\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.TimestampR\ttimestamp\x12\x1d\n\x07message\x18\x02 \x01(\tH\x00R\x07message\x88\x01\x01\x42\n\n\x08_message"z\n\x12SourceCodeLocation\x12 \n\tfile_name\x18\x01 \x01(\tH\x00R\x08\x66ileName\x88\x01\x01\x12$\n\x0bline_number\x18\x02 \x01(\x05H\x01R\nlineNumber\x88\x01\x01\x42\x0c\n\n_file_nameB\x0e\n\x0c_line_number"E\n$PipelineQueryFunctionExecutionSignal\x12\x1d\n\nflow_names\x18\x01 \x03(\tR\tflowNames*a\n\x0b\x44\x61tasetType\x12\x1c\n\x18\x44\x41TASET_TYPE_UNSPECIFIED\x10\x00\x12\x15\n\x11MATERIALIZED_VIEW\x10\x01\x12\t\n\x05TABLE\x10\x02\x12\x12\n\x0eTEMPORARY_VIEW\x10\x03\x42\x36\n\x1eorg.apache.spark.connect.protoP\x01Z\x12internal/generatedb\x06proto3' ) _globals = globals() @@ -56,52 +57,64 @@ ]._serialized_options = b"\n\036org.apache.spark.connect.protoP\001Z\022internal/generated" _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_SQLCONFENTRY"]._loaded_options = None _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_SQLCONFENTRY"]._serialized_options = b"8\001" - _globals["_PIPELINECOMMAND_DEFINEDATASET_TABLEPROPERTIESENTRY"]._loaded_options = None - _globals["_PIPELINECOMMAND_DEFINEDATASET_TABLEPROPERTIESENTRY"]._serialized_options = b"8\001" + _globals[ + "_PIPELINECOMMAND_DEFINEDATASET_TABLEDETAILS_TABLEPROPERTIESENTRY" + ]._loaded_options = None + _globals[ + "_PIPELINECOMMAND_DEFINEDATASET_TABLEDETAILS_TABLEPROPERTIESENTRY" + ]._serialized_options = b"8\001" _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._loaded_options = None _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_options = b"8\001" - _globals["_DATASETTYPE"]._serialized_start = 4843 - _globals["_DATASETTYPE"]._serialized_end = 4940 - _globals["_PIPELINECOMMAND"]._serialized_start = 168 - _globals["_PIPELINECOMMAND"]._serialized_end = 3694 - _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_start = 1050 - _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_end = 1358 - _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_SQLCONFENTRY"]._serialized_start = 1259 - _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_SQLCONFENTRY"]._serialized_end = 1317 - _globals["_PIPELINECOMMAND_DROPDATAFLOWGRAPH"]._serialized_start = 1360 - _globals["_PIPELINECOMMAND_DROPDATAFLOWGRAPH"]._serialized_end = 1450 - _globals["_PIPELINECOMMAND_DEFINEDATASET"]._serialized_start = 1453 - _globals["_PIPELINECOMMAND_DEFINEDATASET"]._serialized_end = 2161 - _globals["_PIPELINECOMMAND_DEFINEDATASET_TABLEPROPERTIESENTRY"]._serialized_start = 1980 - _globals["_PIPELINECOMMAND_DEFINEDATASET_TABLEPROPERTIESENTRY"]._serialized_end = 2046 - _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_start = 2164 - _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_end = 2809 - _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_start = 1259 - _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_end = 1317 - _globals["_PIPELINECOMMAND_DEFINEFLOW_RESPONSE"]._serialized_start = 2639 - _globals["_PIPELINECOMMAND_DEFINEFLOW_RESPONSE"]._serialized_end = 2697 - _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_start = 2812 - _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_end = 3091 - _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_start = 3094 - _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_end = 3293 - _globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_start = 3296 - _globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_end = 3454 - _globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_start = 3457 - _globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_end = 3678 - _globals["_PIPELINECOMMANDRESULT"]._serialized_start = 3697 - _globals["_PIPELINECOMMANDRESULT"]._serialized_end = 4453 - _globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_start = 4069 - _globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_end = 4167 - _globals["_PIPELINECOMMANDRESULT_DEFINEDATASETRESULT"]._serialized_start = 4170 - _globals["_PIPELINECOMMANDRESULT_DEFINEDATASETRESULT"]._serialized_end = 4304 - _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_start = 4307 - _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_end = 4438 - _globals["_PIPELINEEVENTRESULT"]._serialized_start = 4455 - _globals["_PIPELINEEVENTRESULT"]._serialized_end = 4528 - _globals["_PIPELINEEVENT"]._serialized_start = 4530 - _globals["_PIPELINEEVENT"]._serialized_end = 4646 - _globals["_SOURCECODELOCATION"]._serialized_start = 4648 - _globals["_SOURCECODELOCATION"]._serialized_end = 4770 - _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_start = 4772 - _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_end = 4841 + _globals["_DATASETTYPE"]._serialized_start = 5280 + _globals["_DATASETTYPE"]._serialized_end = 5377 + _globals["_PIPELINECOMMAND"]._serialized_start = 195 + _globals["_PIPELINECOMMAND"]._serialized_end = 4131 + _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_start = 1077 + _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_end = 1385 + _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_SQLCONFENTRY"]._serialized_start = 1286 + _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_SQLCONFENTRY"]._serialized_end = 1344 + _globals["_PIPELINECOMMAND_DROPDATAFLOWGRAPH"]._serialized_start = 1387 + _globals["_PIPELINECOMMAND_DROPDATAFLOWGRAPH"]._serialized_end = 1477 + _globals["_PIPELINECOMMAND_DEFINEDATASET"]._serialized_start = 1480 + _globals["_PIPELINECOMMAND_DEFINEDATASET"]._serialized_end = 2382 + _globals["_PIPELINECOMMAND_DEFINEDATASET_TABLEDETAILS"]._serialized_start = 1929 + _globals["_PIPELINECOMMAND_DEFINEDATASET_TABLEDETAILS"]._serialized_end = 2278 + _globals[ + "_PIPELINECOMMAND_DEFINEDATASET_TABLEDETAILS_TABLEPROPERTIESENTRY" + ]._serialized_start = 2190 + _globals[ + "_PIPELINECOMMAND_DEFINEDATASET_TABLEDETAILS_TABLEPROPERTIESENTRY" + ]._serialized_end = 2256 + _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_start = 2385 + _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_end = 3246 + _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_start = 1286 + _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_end = 1344 + _globals["_PIPELINECOMMAND_DEFINEFLOW_WRITERELATIONFLOWDETAILS"]._serialized_start = 2979 + _globals["_PIPELINECOMMAND_DEFINEFLOW_WRITERELATIONFLOWDETAILS"]._serialized_end = 3076 + _globals["_PIPELINECOMMAND_DEFINEFLOW_RESPONSE"]._serialized_start = 3078 + _globals["_PIPELINECOMMAND_DEFINEFLOW_RESPONSE"]._serialized_end = 3136 + _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_start = 3249 + _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_end = 3528 + _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_start = 3531 + _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_end = 3730 + _globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_start = 3733 + _globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_end = 3891 + _globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_start = 3894 + _globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_end = 4115 + _globals["_PIPELINECOMMANDRESULT"]._serialized_start = 4134 + _globals["_PIPELINECOMMANDRESULT"]._serialized_end = 4890 + _globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_start = 4506 + _globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_end = 4604 + _globals["_PIPELINECOMMANDRESULT_DEFINEDATASETRESULT"]._serialized_start = 4607 + _globals["_PIPELINECOMMANDRESULT_DEFINEDATASETRESULT"]._serialized_end = 4741 + _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_start = 4744 + _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_end = 4875 + _globals["_PIPELINEEVENTRESULT"]._serialized_start = 4892 + _globals["_PIPELINEEVENTRESULT"]._serialized_end = 4965 + _globals["_PIPELINEEVENT"]._serialized_start = 4967 + _globals["_PIPELINEEVENT"]._serialized_end = 5083 + _globals["_SOURCECODELOCATION"]._serialized_start = 5085 + _globals["_SOURCECODELOCATION"]._serialized_end = 5207 + _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_start = 5209 + _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_end = 5278 # @@protoc_insertion_point(module_scope) diff --git a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi index b5ed1c216a837..860e6176391ea 100644 --- a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi +++ b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi @@ -35,6 +35,7 @@ limitations under the License. """ import builtins import collections.abc +import google.protobuf.any_pb2 import google.protobuf.descriptor import google.protobuf.internal.containers import google.protobuf.internal.enum_type_wrapper @@ -208,32 +209,101 @@ class PipelineCommand(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor - class TablePropertiesEntry(google.protobuf.message.Message): + class TableDetails(google.protobuf.message.Message): + """Dataset metadata that's only applicable to tables and materialized views.""" + DESCRIPTOR: google.protobuf.descriptor.Descriptor - KEY_FIELD_NUMBER: builtins.int - VALUE_FIELD_NUMBER: builtins.int - key: builtins.str - value: builtins.str + class TablePropertiesEntry(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + KEY_FIELD_NUMBER: builtins.int + VALUE_FIELD_NUMBER: builtins.int + key: builtins.str + value: builtins.str + def __init__( + self, + *, + key: builtins.str = ..., + value: builtins.str = ..., + ) -> None: ... + def ClearField( + self, field_name: typing_extensions.Literal["key", b"key", "value", b"value"] + ) -> None: ... + + TABLE_PROPERTIES_FIELD_NUMBER: builtins.int + PARTITION_COLS_FIELD_NUMBER: builtins.int + SCHEMA_FIELD_NUMBER: builtins.int + FORMAT_FIELD_NUMBER: builtins.int + @property + def table_properties( + self, + ) -> google.protobuf.internal.containers.ScalarMap[builtins.str, builtins.str]: + """Optional table properties.""" + @property + def partition_cols( + self, + ) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: + """Optional partition columns for the table.""" + @property + def schema(self) -> pyspark.sql.connect.proto.types_pb2.DataType: + """Schema for the dataset. If unset, this will be inferred from incoming flows.""" + format: builtins.str + """The output table format of the dataset.""" def __init__( self, *, - key: builtins.str = ..., - value: builtins.str = ..., + table_properties: collections.abc.Mapping[builtins.str, builtins.str] | None = ..., + partition_cols: collections.abc.Iterable[builtins.str] | None = ..., + schema: pyspark.sql.connect.proto.types_pb2.DataType | None = ..., + format: builtins.str | None = ..., ) -> None: ... + def HasField( + self, + field_name: typing_extensions.Literal[ + "_format", + b"_format", + "_schema", + b"_schema", + "format", + b"format", + "schema", + b"schema", + ], + ) -> builtins.bool: ... def ClearField( - self, field_name: typing_extensions.Literal["key", b"key", "value", b"value"] + self, + field_name: typing_extensions.Literal[ + "_format", + b"_format", + "_schema", + b"_schema", + "format", + b"format", + "partition_cols", + b"partition_cols", + "schema", + b"schema", + "table_properties", + b"table_properties", + ], ) -> None: ... + @typing.overload + def WhichOneof( + self, oneof_group: typing_extensions.Literal["_format", b"_format"] + ) -> typing_extensions.Literal["format"] | None: ... + @typing.overload + def WhichOneof( + self, oneof_group: typing_extensions.Literal["_schema", b"_schema"] + ) -> typing_extensions.Literal["schema"] | None: ... DATAFLOW_GRAPH_ID_FIELD_NUMBER: builtins.int DATASET_NAME_FIELD_NUMBER: builtins.int DATASET_TYPE_FIELD_NUMBER: builtins.int COMMENT_FIELD_NUMBER: builtins.int - TABLE_PROPERTIES_FIELD_NUMBER: builtins.int - PARTITION_COLS_FIELD_NUMBER: builtins.int - SCHEMA_FIELD_NUMBER: builtins.int - FORMAT_FIELD_NUMBER: builtins.int SOURCE_CODE_LOCATION_FIELD_NUMBER: builtins.int + TABLE_DETAILS_FIELD_NUMBER: builtins.int + EXTENSION_FIELD_NUMBER: builtins.int dataflow_graph_id: builtins.str """The graph to attach this dataset to.""" dataset_name: builtins.str @@ -243,27 +313,12 @@ class PipelineCommand(google.protobuf.message.Message): comment: builtins.str """Optional comment for the dataset.""" @property - def table_properties( - self, - ) -> google.protobuf.internal.containers.ScalarMap[builtins.str, builtins.str]: - """Optional table properties. Only applies to dataset_type == TABLE and dataset_type == MATERIALIZED_VIEW.""" - @property - def partition_cols( - self, - ) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: - """Optional partition columns for the dataset. Only applies to dataset_type == TABLE and - dataset_type == MATERIALIZED_VIEW. - """ - @property - def schema(self) -> pyspark.sql.connect.proto.types_pb2.DataType: - """Schema for the dataset. If unset, this will be inferred from incoming flows.""" - format: builtins.str - """The output table format of the dataset. Only applies to dataset_type == TABLE and - dataset_type == MATERIALIZED_VIEW. - """ - @property def source_code_location(self) -> global___SourceCodeLocation: """The location in source code that this dataset was defined.""" + @property + def table_details(self) -> global___PipelineCommand.DefineDataset.TableDetails: ... + @property + def extension(self) -> google.protobuf.any_pb2.Any: ... def __init__( self, *, @@ -271,11 +326,9 @@ class PipelineCommand(google.protobuf.message.Message): dataset_name: builtins.str | None = ..., dataset_type: global___DatasetType.ValueType | None = ..., comment: builtins.str | None = ..., - table_properties: collections.abc.Mapping[builtins.str, builtins.str] | None = ..., - partition_cols: collections.abc.Iterable[builtins.str] | None = ..., - schema: pyspark.sql.connect.proto.types_pb2.DataType | None = ..., - format: builtins.str | None = ..., source_code_location: global___SourceCodeLocation | None = ..., + table_details: global___PipelineCommand.DefineDataset.TableDetails | None = ..., + extension: google.protobuf.any_pb2.Any | None = ..., ) -> None: ... def HasField( self, @@ -288,10 +341,6 @@ class PipelineCommand(google.protobuf.message.Message): b"_dataset_name", "_dataset_type", b"_dataset_type", - "_format", - b"_format", - "_schema", - b"_schema", "_source_code_location", b"_source_code_location", "comment", @@ -302,12 +351,14 @@ class PipelineCommand(google.protobuf.message.Message): b"dataset_name", "dataset_type", b"dataset_type", - "format", - b"format", - "schema", - b"schema", + "details", + b"details", + "extension", + b"extension", "source_code_location", b"source_code_location", + "table_details", + b"table_details", ], ) -> builtins.bool: ... def ClearField( @@ -321,10 +372,6 @@ class PipelineCommand(google.protobuf.message.Message): b"_dataset_name", "_dataset_type", b"_dataset_type", - "_format", - b"_format", - "_schema", - b"_schema", "_source_code_location", b"_source_code_location", "comment", @@ -335,16 +382,14 @@ class PipelineCommand(google.protobuf.message.Message): b"dataset_name", "dataset_type", b"dataset_type", - "format", - b"format", - "partition_cols", - b"partition_cols", - "schema", - b"schema", + "details", + b"details", + "extension", + b"extension", "source_code_location", b"source_code_location", - "table_properties", - b"table_properties", + "table_details", + b"table_details", ], ) -> None: ... @typing.overload @@ -365,20 +410,16 @@ class PipelineCommand(google.protobuf.message.Message): self, oneof_group: typing_extensions.Literal["_dataset_type", b"_dataset_type"] ) -> typing_extensions.Literal["dataset_type"] | None: ... @typing.overload - def WhichOneof( - self, oneof_group: typing_extensions.Literal["_format", b"_format"] - ) -> typing_extensions.Literal["format"] | None: ... - @typing.overload - def WhichOneof( - self, oneof_group: typing_extensions.Literal["_schema", b"_schema"] - ) -> typing_extensions.Literal["schema"] | None: ... - @typing.overload def WhichOneof( self, oneof_group: typing_extensions.Literal[ "_source_code_location", b"_source_code_location" ], ) -> typing_extensions.Literal["source_code_location"] | None: ... + @typing.overload + def WhichOneof( + self, oneof_group: typing_extensions.Literal["details", b"details"] + ) -> typing_extensions.Literal["table_details", "extension"] | None: ... class DefineFlow(google.protobuf.message.Message): """Request to define a flow targeting a dataset.""" @@ -402,6 +443,38 @@ class PipelineCommand(google.protobuf.message.Message): self, field_name: typing_extensions.Literal["key", b"key", "value", b"value"] ) -> None: ... + class WriteRelationFlowDetails(google.protobuf.message.Message): + """A flow that is that takes the contents of a relation and writes it to the target dataset.""" + + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + RELATION_FIELD_NUMBER: builtins.int + @property + def relation(self) -> pyspark.sql.connect.proto.relations_pb2.Relation: + """An unresolved relation that defines the dataset's flow. Empty if the query function + that defines the flow cannot be analyzed at the time of flow definition. + """ + def __init__( + self, + *, + relation: pyspark.sql.connect.proto.relations_pb2.Relation | None = ..., + ) -> None: ... + def HasField( + self, + field_name: typing_extensions.Literal[ + "_relation", b"_relation", "relation", b"relation" + ], + ) -> builtins.bool: ... + def ClearField( + self, + field_name: typing_extensions.Literal[ + "_relation", b"_relation", "relation", b"relation" + ], + ) -> None: ... + def WhichOneof( + self, oneof_group: typing_extensions.Literal["_relation", b"_relation"] + ) -> typing_extensions.Literal["relation"] | None: ... + class Response(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -432,10 +505,11 @@ class PipelineCommand(google.protobuf.message.Message): DATAFLOW_GRAPH_ID_FIELD_NUMBER: builtins.int FLOW_NAME_FIELD_NUMBER: builtins.int TARGET_DATASET_NAME_FIELD_NUMBER: builtins.int - RELATION_FIELD_NUMBER: builtins.int SQL_CONF_FIELD_NUMBER: builtins.int CLIENT_ID_FIELD_NUMBER: builtins.int SOURCE_CODE_LOCATION_FIELD_NUMBER: builtins.int + RELATION_FLOW_DETAILS_FIELD_NUMBER: builtins.int + EXTENSION_FIELD_NUMBER: builtins.int dataflow_graph_id: builtins.str """The graph to attach this flow to.""" flow_name: builtins.str @@ -443,11 +517,6 @@ class PipelineCommand(google.protobuf.message.Message): target_dataset_name: builtins.str """Name of the dataset this flow writes to. Can be partially or fully qualified.""" @property - def relation(self) -> pyspark.sql.connect.proto.relations_pb2.Relation: - """An unresolved relation that defines the dataset's flow. Empty if the query function - that defines the flow cannot be analyzed at the time of flow definition. - """ - @property def sql_conf( self, ) -> google.protobuf.internal.containers.ScalarMap[builtins.str, builtins.str]: @@ -459,16 +528,24 @@ class PipelineCommand(google.protobuf.message.Message): @property def source_code_location(self) -> global___SourceCodeLocation: """The location in source code that this flow was defined.""" + @property + def relation_flow_details( + self, + ) -> global___PipelineCommand.DefineFlow.WriteRelationFlowDetails: ... + @property + def extension(self) -> google.protobuf.any_pb2.Any: ... def __init__( self, *, dataflow_graph_id: builtins.str | None = ..., flow_name: builtins.str | None = ..., target_dataset_name: builtins.str | None = ..., - relation: pyspark.sql.connect.proto.relations_pb2.Relation | None = ..., sql_conf: collections.abc.Mapping[builtins.str, builtins.str] | None = ..., client_id: builtins.str | None = ..., source_code_location: global___SourceCodeLocation | None = ..., + relation_flow_details: global___PipelineCommand.DefineFlow.WriteRelationFlowDetails + | None = ..., + extension: google.protobuf.any_pb2.Any | None = ..., ) -> None: ... def HasField( self, @@ -479,8 +556,6 @@ class PipelineCommand(google.protobuf.message.Message): b"_dataflow_graph_id", "_flow_name", b"_flow_name", - "_relation", - b"_relation", "_source_code_location", b"_source_code_location", "_target_dataset_name", @@ -489,10 +564,14 @@ class PipelineCommand(google.protobuf.message.Message): b"client_id", "dataflow_graph_id", b"dataflow_graph_id", + "details", + b"details", + "extension", + b"extension", "flow_name", b"flow_name", - "relation", - b"relation", + "relation_flow_details", + b"relation_flow_details", "source_code_location", b"source_code_location", "target_dataset_name", @@ -508,8 +587,6 @@ class PipelineCommand(google.protobuf.message.Message): b"_dataflow_graph_id", "_flow_name", b"_flow_name", - "_relation", - b"_relation", "_source_code_location", b"_source_code_location", "_target_dataset_name", @@ -518,10 +595,14 @@ class PipelineCommand(google.protobuf.message.Message): b"client_id", "dataflow_graph_id", b"dataflow_graph_id", + "details", + b"details", + "extension", + b"extension", "flow_name", b"flow_name", - "relation", - b"relation", + "relation_flow_details", + b"relation_flow_details", "source_code_location", b"source_code_location", "sql_conf", @@ -544,10 +625,6 @@ class PipelineCommand(google.protobuf.message.Message): self, oneof_group: typing_extensions.Literal["_flow_name", b"_flow_name"] ) -> typing_extensions.Literal["flow_name"] | None: ... @typing.overload - def WhichOneof( - self, oneof_group: typing_extensions.Literal["_relation", b"_relation"] - ) -> typing_extensions.Literal["relation"] | None: ... - @typing.overload def WhichOneof( self, oneof_group: typing_extensions.Literal[ @@ -559,6 +636,10 @@ class PipelineCommand(google.protobuf.message.Message): self, oneof_group: typing_extensions.Literal["_target_dataset_name", b"_target_dataset_name"], ) -> typing_extensions.Literal["target_dataset_name"] | None: ... + @typing.overload + def WhichOneof( + self, oneof_group: typing_extensions.Literal["details", b"details"] + ) -> typing_extensions.Literal["relation_flow_details", "extension"] | None: ... class StartRun(google.protobuf.message.Message): """Resolves all datasets and flows and start a pipeline update. Should be called after all diff --git a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto index 16d211f9f72d3..adf9af313d34b 100644 --- a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto +++ b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto @@ -19,6 +19,7 @@ syntax = "proto3"; package spark.connect; +import "google/protobuf/any.proto"; import "google/protobuf/timestamp.proto"; import "spark/connect/common.proto"; import "spark/connect/relations.proto"; @@ -73,22 +74,28 @@ message PipelineCommand { // Optional comment for the dataset. optional string comment = 4; - // Optional table properties. Only applies to dataset_type == TABLE and dataset_type == MATERIALIZED_VIEW. - map table_properties = 5; + // The location in source code that this dataset was defined. + optional SourceCodeLocation source_code_location = 5; - // Optional partition columns for the dataset. Only applies to dataset_type == TABLE and - // dataset_type == MATERIALIZED_VIEW. - repeated string partition_cols = 6; + oneof details { + TableDetails table_details = 6; + google.protobuf.Any extension = 999; + } - // Schema for the dataset. If unset, this will be inferred from incoming flows. - optional spark.connect.DataType schema = 7; + // Dataset metadata that's only applicable to tables and materialized views. + message TableDetails { + // Optional table properties. + map table_properties = 1; - // The output table format of the dataset. Only applies to dataset_type == TABLE and - // dataset_type == MATERIALIZED_VIEW. - optional string format = 8; + // Optional partition columns for the table. + repeated string partition_cols = 2; - // The location in source code that this dataset was defined. - optional SourceCodeLocation source_code_location = 9; + // Schema for the dataset. If unset, this will be inferred from incoming flows. + optional spark.connect.DataType schema = 3; + + // The output table format of the dataset. + optional string format = 4; + } } // Request to define a flow targeting a dataset. @@ -102,19 +109,27 @@ message PipelineCommand { // Name of the dataset this flow writes to. Can be partially or fully qualified. optional string target_dataset_name = 3; - // An unresolved relation that defines the dataset's flow. Empty if the query function - // that defines the flow cannot be analyzed at the time of flow definition. - optional spark.connect.Relation relation = 4; - // SQL configurations set when running this flow. - map sql_conf = 5; + map sql_conf = 4; // Identifier for the client making the request. The server uses this to determine what flow // evaluation request stream to dispatch evaluation requests to for this flow. - optional string client_id = 6; + optional string client_id = 5; // The location in source code that this flow was defined. - optional SourceCodeLocation source_code_location = 7; + optional SourceCodeLocation source_code_location = 6; + + oneof details { + WriteRelationFlowDetails relation_flow_details = 7; + google.protobuf.Any extension = 999; + } + + // A flow that is that takes the contents of a relation and writes it to the target dataset. + message WriteRelationFlowDetails { + // An unresolved relation that defines the dataset's flow. Empty if the query function + // that defines the flow cannot be analyzed at the time of flow definition. + optional spark.connect.Relation relation = 1; + } message Response { // Fully qualified flow name that uniquely identify a flow in the Dataflow graph. diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala index 01402c64e8a2a..87ea98fd2f12c 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala @@ -184,17 +184,19 @@ private[connect] object PipelinesHandler extends Logging { currentCatalog = Some(graphElementRegistry.defaultCatalog), currentDatabase = Some(graphElementRegistry.defaultDatabase)) .identifier + + val tableDetails = dataset.getTableDetails graphElementRegistry.registerTable( Table( identifier = qualifiedIdentifier, comment = Option(dataset.getComment), - specifiedSchema = Option.when(dataset.hasSchema)( + specifiedSchema = Option.when(tableDetails.hasSchema)( DataTypeProtoConverter - .toCatalystType(dataset.getSchema) + .toCatalystType(tableDetails.getSchema) .asInstanceOf[StructType]), - partitionCols = Option(dataset.getPartitionColsList.asScala.toSeq) + partitionCols = Option(tableDetails.getPartitionColsList.asScala.toSeq) .filter(_.nonEmpty), - properties = dataset.getTablePropertiesMap.asScala.toMap, + properties = tableDetails.getTablePropertiesMap.asScala.toMap, origin = QueryOrigin( filePath = Option.when(dataset.getSourceCodeLocation.hasFileName)( dataset.getSourceCodeLocation.getFileName), @@ -203,7 +205,7 @@ private[connect] object PipelinesHandler extends Logging { objectType = Option(QueryOriginType.Table.toString), objectName = Option(qualifiedIdentifier.unquotedString), language = Option(Python())), - format = Option.when(dataset.hasFormat)(dataset.getFormat), + format = Option.when(tableDetails.hasFormat)(tableDetails.getFormat), normalizedPath = None, isStreamingTable = dataset.getDatasetType == proto.DatasetType.TABLE)) qualifiedIdentifier @@ -279,12 +281,13 @@ private[connect] object PipelinesHandler extends Logging { } } + val relationFlowDetails = flow.getRelationFlowDetails graphElementRegistry.registerFlow( new UnresolvedFlow( identifier = flowIdentifier, destinationIdentifier = destinationIdentifier, - func = - FlowAnalysis.createFlowFunctionFromLogicalPlan(transformRelationFunc(flow.getRelation)), + func = FlowAnalysis.createFlowFunctionFromLogicalPlan( + transformRelationFunc(relationFlowDetails.getRelation)), sqlConf = flow.getSqlConfMap.asScala.toMap, once = false, queryContext = QueryContext(Option(defaultCatalog), Option(defaultDatabase)), diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala index 772b43656b141..0a1a74fabb495 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala @@ -97,19 +97,23 @@ class SparkDeclarativePipelinesServerSuite .setDataflowGraphId(graphId) .setFlowName("mv") .setTargetDatasetName("mv") - .setRelation( - Relation + .setRelationFlowDetails( + DefineFlow.WriteRelationFlowDetails .newBuilder() - .setUnresolvedTableValuedFunction( - UnresolvedTableValuedFunction + .setRelation( + Relation .newBuilder() - .setFunctionName("range") - .addArguments(Expression + .setUnresolvedTableValuedFunction(UnresolvedTableValuedFunction .newBuilder() - .setLiteral(Expression.Literal.newBuilder().setInteger(5).build()) + .setFunctionName("range") + .addArguments(Expression + .newBuilder() + .setLiteral(Expression.Literal.newBuilder().setInteger(5).build()) + .build()) .build()) .build()) - .build())) + .build()) + .build()) .build())) registerGraphElementsFromSql( graphId = graphId, @@ -698,17 +702,21 @@ class SparkDeclarativePipelinesServerSuite .setDataflowGraphId(graphId) .setFlowName(testCase.flowName) .setTargetDatasetName(testCase.flowName) - .setRelation( - Relation + .setRelationFlowDetails( + DefineFlow.WriteRelationFlowDetails .newBuilder() - .setUnresolvedTableValuedFunction( - UnresolvedTableValuedFunction + .setRelation( + Relation .newBuilder() - .setFunctionName("range") - .addArguments(Expression - .newBuilder() - .setLiteral(Expression.Literal.newBuilder().setInteger(5).build()) - .build()) + .setUnresolvedTableValuedFunction( + UnresolvedTableValuedFunction + .newBuilder() + .setFunctionName("range") + .addArguments(Expression + .newBuilder() + .setLiteral(Expression.Literal.newBuilder().setInteger(5).build()) + .build()) + .build()) .build()) .build()) .build() @@ -762,17 +770,21 @@ class SparkDeclarativePipelinesServerSuite .setDataflowGraphId(graphId) .setFlowName(testCase.flowName) .setTargetDatasetName(testCase.flowName) - .setRelation( - Relation + .setRelationFlowDetails( + DefineFlow.WriteRelationFlowDetails .newBuilder() - .setUnresolvedTableValuedFunction( - UnresolvedTableValuedFunction + .setRelation( + Relation .newBuilder() - .setFunctionName("range") - .addArguments(Expression - .newBuilder() - .setLiteral(Expression.Literal.newBuilder().setInteger(5).build()) - .build()) + .setUnresolvedTableValuedFunction( + UnresolvedTableValuedFunction + .newBuilder() + .setFunctionName("range") + .addArguments(Expression + .newBuilder() + .setLiteral(Expression.Literal.newBuilder().setInteger(5).build()) + .build()) + .build()) .build()) .build()) .build() diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/TestPipelineDefinition.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/TestPipelineDefinition.scala index c31aec0b7a5e6..36b90016a52af 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/TestPipelineDefinition.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/TestPipelineDefinition.scala @@ -42,23 +42,33 @@ class TestPipelineDefinition(graphId: String) { // specifiedSchema: Option[StructType] = None, partitionCols: Option[Seq[String]] = None, properties: Map[String, String] = Map.empty): Unit = { + val tableDetails = sc.PipelineCommand.DefineDataset.TableDetails + .newBuilder() + .addAllPartitionCols(partitionCols.getOrElse(Seq()).asJava) + .putAllTableProperties(properties.asJava) + .build() + tableDefs += sc.PipelineCommand.DefineDataset .newBuilder() .setDataflowGraphId(graphId) .setDatasetName(name) .setDatasetType(datasetType) .setComment(comment.getOrElse("")) - .addAllPartitionCols(partitionCols.getOrElse(Seq()).asJava) - .putAllTableProperties(properties.asJava) + .setTableDetails(tableDetails) .build() query.foreach { q => + val relationFlowDetails = sc.PipelineCommand.DefineFlow.WriteRelationFlowDetails + .newBuilder() + .setRelation(q) + .build() + flowDefs += sc.PipelineCommand.DefineFlow .newBuilder() .setDataflowGraphId(graphId) .setFlowName(name) .setTargetDatasetName(name) - .setRelation(q) + .setRelationFlowDetails(relationFlowDetails) .putAllSqlConf(sparkConf.asJava) .build() } @@ -92,12 +102,17 @@ class TestPipelineDefinition(graphId: String) { .setComment(comment.getOrElse("")) .build() + val relationFlowDetails = sc.PipelineCommand.DefineFlow.WriteRelationFlowDetails + .newBuilder() + .setRelation(query) + .build() + flowDefs += sc.PipelineCommand.DefineFlow .newBuilder() .setDataflowGraphId(graphId) .setFlowName(name) .setTargetDatasetName(name) - .setRelation(query) + .setRelationFlowDetails(relationFlowDetails) .putAllSqlConf(sparkConf.asJava) .build() @@ -118,12 +133,17 @@ class TestPipelineDefinition(graphId: String) { query: sc.Relation, sparkConf: Map[String, String] = Map.empty, once: Boolean = false): Unit = { + val relationFlowDetails = sc.PipelineCommand.DefineFlow.WriteRelationFlowDetails + .newBuilder() + .setRelation(query) + .build() + flowDefs += sc.PipelineCommand.DefineFlow .newBuilder() .setDataflowGraphId(graphId) .setFlowName(name) .setTargetDatasetName(destinationName) - .setRelation(query) + .setRelationFlowDetails(relationFlowDetails) .putAllSqlConf(sparkConf.asJava) .build() }