Skip to content

Commit e56ab2f

Browse files
cookiedough77sryza
authored andcommitted
[SPARK-53593][SDP] Add response field for DefineDataset and DefineFlow RPC
### What changes were proposed in this pull request? This PR updates the Spark Connect server to return resolved dataset and flow names in the responses of DefineDataset and DefineFlow RPCs. Changes include: 1. Adding resolved_data_name and resolved_flow_name to the respective proto response messages. 2. Updating the RPC handlers to return resolved identifiers as response. 3. Adding unit tests in SparkDeclarativePipelinesServerSuite to validate the resolved names ### Why are the changes needed? The SC client requires the resolved names for datasets and flows to support graph resolution in the LDP frontend. Returning this info from the server ensures consistent naming and proper registration. ### Does this PR introduce _any_ user-facing change? Yes. The DefineDataset and DefineFlow RPC responses now include fully qualified names like `catalog`.`db`.`mv`. Implicit flows to temp views return unqualified names like `mv`. ### How was this patch tested? Added targeted unit tests in SparkDeclarativePipelinesServerSuite. Verified both default and custom catalog/database cases. Run test using ``` ./build/sbt > project connect > testOnly *SparkDeclarativePipelinesServerSuite ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52328 from cookiedough77/jessie.luo_data/spark-add-response. Lead-authored-by: Jessie Luo <[email protected]> Co-authored-by: cookiedough77 <[email protected]> Signed-off-by: Sandy Ryza <[email protected]>
1 parent 65b9da5 commit e56ab2f

File tree

11 files changed

+635
-196
lines changed

11 files changed

+635
-196
lines changed

python/pyspark/sql/connect/proto/common_pb2.py

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535

3636

3737
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
38-
b'\n\x1aspark/connect/common.proto\x12\rspark.connect"\xb0\x01\n\x0cStorageLevel\x12\x19\n\x08use_disk\x18\x01 \x01(\x08R\x07useDisk\x12\x1d\n\nuse_memory\x18\x02 \x01(\x08R\tuseMemory\x12 \n\x0cuse_off_heap\x18\x03 \x01(\x08R\nuseOffHeap\x12"\n\x0c\x64\x65serialized\x18\x04 \x01(\x08R\x0c\x64\x65serialized\x12 \n\x0breplication\x18\x05 \x01(\x05R\x0breplication"G\n\x13ResourceInformation\x12\x12\n\x04name\x18\x01 \x01(\tR\x04name\x12\x1c\n\taddresses\x18\x02 \x03(\tR\taddresses"\xc3\x01\n\x17\x45xecutorResourceRequest\x12#\n\rresource_name\x18\x01 \x01(\tR\x0cresourceName\x12\x16\n\x06\x61mount\x18\x02 \x01(\x03R\x06\x61mount\x12.\n\x10\x64iscovery_script\x18\x03 \x01(\tH\x00R\x0f\x64iscoveryScript\x88\x01\x01\x12\x1b\n\x06vendor\x18\x04 \x01(\tH\x01R\x06vendor\x88\x01\x01\x42\x13\n\x11_discovery_scriptB\t\n\x07_vendor"R\n\x13TaskResourceRequest\x12#\n\rresource_name\x18\x01 \x01(\tR\x0cresourceName\x12\x16\n\x06\x61mount\x18\x02 \x01(\x01R\x06\x61mount"\xa5\x03\n\x0fResourceProfile\x12\x64\n\x12\x65xecutor_resources\x18\x01 \x03(\x0b\x32\x35.spark.connect.ResourceProfile.ExecutorResourcesEntryR\x11\x65xecutorResources\x12X\n\x0etask_resources\x18\x02 \x03(\x0b\x32\x31.spark.connect.ResourceProfile.TaskResourcesEntryR\rtaskResources\x1al\n\x16\x45xecutorResourcesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12<\n\x05value\x18\x02 \x01(\x0b\x32&.spark.connect.ExecutorResourceRequestR\x05value:\x02\x38\x01\x1a\x64\n\x12TaskResourcesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x38\n\x05value\x18\x02 \x01(\x0b\x32".spark.connect.TaskResourceRequestR\x05value:\x02\x38\x01"\x93\x01\n\x06Origin\x12\x42\n\rpython_origin\x18\x01 \x01(\x0b\x32\x1b.spark.connect.PythonOriginH\x00R\x0cpythonOrigin\x12\x39\n\njvm_origin\x18\x02 \x01(\x0b\x32\x18.spark.connect.JvmOriginH\x00R\tjvmOriginB\n\n\x08\x66unction"G\n\x0cPythonOrigin\x12\x1a\n\x08\x66ragment\x18\x01 \x01(\tR\x08\x66ragment\x12\x1b\n\tcall_site\x18\x02 \x01(\tR\x08\x63\x61llSite"\xb1\x03\n\tJvmOrigin\x12\x17\n\x04line\x18\x01 \x01(\x05H\x00R\x04line\x88\x01\x01\x12*\n\x0estart_position\x18\x02 \x01(\x05H\x01R\rstartPosition\x88\x01\x01\x12$\n\x0bstart_index\x18\x03 \x01(\x05H\x02R\nstartIndex\x88\x01\x01\x12"\n\nstop_index\x18\x04 \x01(\x05H\x03R\tstopIndex\x88\x01\x01\x12\x1e\n\x08sql_text\x18\x05 \x01(\tH\x04R\x07sqlText\x88\x01\x01\x12$\n\x0bobject_type\x18\x06 \x01(\tH\x05R\nobjectType\x88\x01\x01\x12$\n\x0bobject_name\x18\x07 \x01(\tH\x06R\nobjectName\x88\x01\x01\x12\x41\n\x0bstack_trace\x18\x08 \x03(\x0b\x32 .spark.connect.StackTraceElementR\nstackTraceB\x07\n\x05_lineB\x11\n\x0f_start_positionB\x0e\n\x0c_start_indexB\r\n\x0b_stop_indexB\x0b\n\t_sql_textB\x0e\n\x0c_object_typeB\x0e\n\x0c_object_name"\xea\x02\n\x11StackTraceElement\x12/\n\x11\x63lass_loader_name\x18\x01 \x01(\tH\x00R\x0f\x63lassLoaderName\x88\x01\x01\x12$\n\x0bmodule_name\x18\x02 \x01(\tH\x01R\nmoduleName\x88\x01\x01\x12*\n\x0emodule_version\x18\x03 \x01(\tH\x02R\rmoduleVersion\x88\x01\x01\x12\'\n\x0f\x64\x65\x63laring_class\x18\x04 \x01(\tR\x0e\x64\x65\x63laringClass\x12\x1f\n\x0bmethod_name\x18\x05 \x01(\tR\nmethodName\x12 \n\tfile_name\x18\x06 \x01(\tH\x03R\x08\x66ileName\x88\x01\x01\x12\x1f\n\x0bline_number\x18\x07 \x01(\x05R\nlineNumberB\x14\n\x12_class_loader_nameB\x0e\n\x0c_module_nameB\x11\n\x0f_module_versionB\x0c\n\n_file_name"\x1f\n\x05\x42ools\x12\x16\n\x06values\x18\x01 \x03(\x08R\x06values"\x1e\n\x04Ints\x12\x16\n\x06values\x18\x01 \x03(\x05R\x06values"\x1f\n\x05Longs\x12\x16\n\x06values\x18\x01 \x03(\x03R\x06values" \n\x06\x46loats\x12\x16\n\x06values\x18\x01 \x03(\x02R\x06values"!\n\x07\x44oubles\x12\x16\n\x06values\x18\x01 \x03(\x01R\x06values"!\n\x07Strings\x12\x16\n\x06values\x18\x01 \x03(\tR\x06valuesB6\n\x1eorg.apache.spark.connect.protoP\x01Z\x12internal/generatedb\x06proto3'
38+
b'\n\x1aspark/connect/common.proto\x12\rspark.connect"\xb0\x01\n\x0cStorageLevel\x12\x19\n\x08use_disk\x18\x01 \x01(\x08R\x07useDisk\x12\x1d\n\nuse_memory\x18\x02 \x01(\x08R\tuseMemory\x12 \n\x0cuse_off_heap\x18\x03 \x01(\x08R\nuseOffHeap\x12"\n\x0c\x64\x65serialized\x18\x04 \x01(\x08R\x0c\x64\x65serialized\x12 \n\x0breplication\x18\x05 \x01(\x05R\x0breplication"G\n\x13ResourceInformation\x12\x12\n\x04name\x18\x01 \x01(\tR\x04name\x12\x1c\n\taddresses\x18\x02 \x03(\tR\taddresses"\xc3\x01\n\x17\x45xecutorResourceRequest\x12#\n\rresource_name\x18\x01 \x01(\tR\x0cresourceName\x12\x16\n\x06\x61mount\x18\x02 \x01(\x03R\x06\x61mount\x12.\n\x10\x64iscovery_script\x18\x03 \x01(\tH\x00R\x0f\x64iscoveryScript\x88\x01\x01\x12\x1b\n\x06vendor\x18\x04 \x01(\tH\x01R\x06vendor\x88\x01\x01\x42\x13\n\x11_discovery_scriptB\t\n\x07_vendor"R\n\x13TaskResourceRequest\x12#\n\rresource_name\x18\x01 \x01(\tR\x0cresourceName\x12\x16\n\x06\x61mount\x18\x02 \x01(\x01R\x06\x61mount"\xa5\x03\n\x0fResourceProfile\x12\x64\n\x12\x65xecutor_resources\x18\x01 \x03(\x0b\x32\x35.spark.connect.ResourceProfile.ExecutorResourcesEntryR\x11\x65xecutorResources\x12X\n\x0etask_resources\x18\x02 \x03(\x0b\x32\x31.spark.connect.ResourceProfile.TaskResourcesEntryR\rtaskResources\x1al\n\x16\x45xecutorResourcesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12<\n\x05value\x18\x02 \x01(\x0b\x32&.spark.connect.ExecutorResourceRequestR\x05value:\x02\x38\x01\x1a\x64\n\x12TaskResourcesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x38\n\x05value\x18\x02 \x01(\x0b\x32".spark.connect.TaskResourceRequestR\x05value:\x02\x38\x01"\x93\x01\n\x06Origin\x12\x42\n\rpython_origin\x18\x01 \x01(\x0b\x32\x1b.spark.connect.PythonOriginH\x00R\x0cpythonOrigin\x12\x39\n\njvm_origin\x18\x02 \x01(\x0b\x32\x18.spark.connect.JvmOriginH\x00R\tjvmOriginB\n\n\x08\x66unction"G\n\x0cPythonOrigin\x12\x1a\n\x08\x66ragment\x18\x01 \x01(\tR\x08\x66ragment\x12\x1b\n\tcall_site\x18\x02 \x01(\tR\x08\x63\x61llSite"\xb1\x03\n\tJvmOrigin\x12\x17\n\x04line\x18\x01 \x01(\x05H\x00R\x04line\x88\x01\x01\x12*\n\x0estart_position\x18\x02 \x01(\x05H\x01R\rstartPosition\x88\x01\x01\x12$\n\x0bstart_index\x18\x03 \x01(\x05H\x02R\nstartIndex\x88\x01\x01\x12"\n\nstop_index\x18\x04 \x01(\x05H\x03R\tstopIndex\x88\x01\x01\x12\x1e\n\x08sql_text\x18\x05 \x01(\tH\x04R\x07sqlText\x88\x01\x01\x12$\n\x0bobject_type\x18\x06 \x01(\tH\x05R\nobjectType\x88\x01\x01\x12$\n\x0bobject_name\x18\x07 \x01(\tH\x06R\nobjectName\x88\x01\x01\x12\x41\n\x0bstack_trace\x18\x08 \x03(\x0b\x32 .spark.connect.StackTraceElementR\nstackTraceB\x07\n\x05_lineB\x11\n\x0f_start_positionB\x0e\n\x0c_start_indexB\r\n\x0b_stop_indexB\x0b\n\t_sql_textB\x0e\n\x0c_object_typeB\x0e\n\x0c_object_name"\xea\x02\n\x11StackTraceElement\x12/\n\x11\x63lass_loader_name\x18\x01 \x01(\tH\x00R\x0f\x63lassLoaderName\x88\x01\x01\x12$\n\x0bmodule_name\x18\x02 \x01(\tH\x01R\nmoduleName\x88\x01\x01\x12*\n\x0emodule_version\x18\x03 \x01(\tH\x02R\rmoduleVersion\x88\x01\x01\x12\'\n\x0f\x64\x65\x63laring_class\x18\x04 \x01(\tR\x0e\x64\x65\x63laringClass\x12\x1f\n\x0bmethod_name\x18\x05 \x01(\tR\nmethodName\x12 \n\tfile_name\x18\x06 \x01(\tH\x03R\x08\x66ileName\x88\x01\x01\x12\x1f\n\x0bline_number\x18\x07 \x01(\x05R\nlineNumberB\x14\n\x12_class_loader_nameB\x0e\n\x0c_module_nameB\x11\n\x0f_module_versionB\x0c\n\n_file_name"t\n\x12ResolvedIdentifier\x12!\n\x0c\x63\x61talog_name\x18\x01 \x01(\tR\x0b\x63\x61talogName\x12\x1c\n\tnamespace\x18\x02 \x03(\tR\tnamespace\x12\x1d\n\ntable_name\x18\x03 \x01(\tR\ttableName"\x1f\n\x05\x42ools\x12\x16\n\x06values\x18\x01 \x03(\x08R\x06values"\x1e\n\x04Ints\x12\x16\n\x06values\x18\x01 \x03(\x05R\x06values"\x1f\n\x05Longs\x12\x16\n\x06values\x18\x01 \x03(\x03R\x06values" \n\x06\x46loats\x12\x16\n\x06values\x18\x01 \x03(\x02R\x06values"!\n\x07\x44oubles\x12\x16\n\x06values\x18\x01 \x03(\x01R\x06values"!\n\x07Strings\x12\x16\n\x06values\x18\x01 \x03(\tR\x06valuesB6\n\x1eorg.apache.spark.connect.protoP\x01Z\x12internal/generatedb\x06proto3'
3939
)
4040

4141
_globals = globals()
@@ -74,16 +74,18 @@
7474
_globals["_JVMORIGIN"]._serialized_end = 1660
7575
_globals["_STACKTRACEELEMENT"]._serialized_start = 1663
7676
_globals["_STACKTRACEELEMENT"]._serialized_end = 2025
77-
_globals["_BOOLS"]._serialized_start = 2027
78-
_globals["_BOOLS"]._serialized_end = 2058
79-
_globals["_INTS"]._serialized_start = 2060
80-
_globals["_INTS"]._serialized_end = 2090
81-
_globals["_LONGS"]._serialized_start = 2092
82-
_globals["_LONGS"]._serialized_end = 2123
83-
_globals["_FLOATS"]._serialized_start = 2125
84-
_globals["_FLOATS"]._serialized_end = 2157
85-
_globals["_DOUBLES"]._serialized_start = 2159
86-
_globals["_DOUBLES"]._serialized_end = 2192
87-
_globals["_STRINGS"]._serialized_start = 2194
88-
_globals["_STRINGS"]._serialized_end = 2227
77+
_globals["_RESOLVEDIDENTIFIER"]._serialized_start = 2027
78+
_globals["_RESOLVEDIDENTIFIER"]._serialized_end = 2143
79+
_globals["_BOOLS"]._serialized_start = 2145
80+
_globals["_BOOLS"]._serialized_end = 2176
81+
_globals["_INTS"]._serialized_start = 2178
82+
_globals["_INTS"]._serialized_end = 2208
83+
_globals["_LONGS"]._serialized_start = 2210
84+
_globals["_LONGS"]._serialized_end = 2241
85+
_globals["_FLOATS"]._serialized_start = 2243
86+
_globals["_FLOATS"]._serialized_end = 2275
87+
_globals["_DOUBLES"]._serialized_start = 2277
88+
_globals["_DOUBLES"]._serialized_end = 2310
89+
_globals["_STRINGS"]._serialized_start = 2312
90+
_globals["_STRINGS"]._serialized_end = 2345
8991
# @@protoc_insertion_point(module_scope)

python/pyspark/sql/connect/proto/common_pb2.pyi

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -599,6 +599,34 @@ class StackTraceElement(google.protobuf.message.Message):
599599

600600
global___StackTraceElement = StackTraceElement
601601

602+
class ResolvedIdentifier(google.protobuf.message.Message):
603+
DESCRIPTOR: google.protobuf.descriptor.Descriptor
604+
605+
CATALOG_NAME_FIELD_NUMBER: builtins.int
606+
NAMESPACE_FIELD_NUMBER: builtins.int
607+
TABLE_NAME_FIELD_NUMBER: builtins.int
608+
catalog_name: builtins.str
609+
@property
610+
def namespace(
611+
self,
612+
) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: ...
613+
table_name: builtins.str
614+
def __init__(
615+
self,
616+
*,
617+
catalog_name: builtins.str = ...,
618+
namespace: collections.abc.Iterable[builtins.str] | None = ...,
619+
table_name: builtins.str = ...,
620+
) -> None: ...
621+
def ClearField(
622+
self,
623+
field_name: typing_extensions.Literal[
624+
"catalog_name", b"catalog_name", "namespace", b"namespace", "table_name", b"table_name"
625+
],
626+
) -> None: ...
627+
628+
global___ResolvedIdentifier = ResolvedIdentifier
629+
602630
class Bools(google.protobuf.message.Message):
603631
DESCRIPTOR: google.protobuf.descriptor.Descriptor
604632

0 commit comments

Comments
 (0)