Skip to content

Commit 65ff85a

Browse files
anishm-dbsryza
authored andcommitted
[SPARK-52640][SDP] Propagate Python Source Code Location
### What changes were proposed in this pull request? Propagate source code location details (line number and file path) E2E for declarative pipelines. That is, collect this information from the python REPL that registers SDP datasets/flows, propagate it through the appropriate spark connect handlers, and associate it to the appropriate datasets/flows in pipeline events/exceptions. ### Why are the changes needed? Better observability and debugging experience for users. Allows users to identify the exact lines that cause a particular exception. ### Does this PR introduce _any_ user-facing change? Yes, we are populating source code information in the origin for pipeline events, which is user-facing. Currently SDP is not released in any spark version however. ### How was this patch tested? Added tests to `org.apache.spark.sql.connect.pipelines.PythonPipelineSuite` ### Was this patch authored or co-authored using generative AI tooling? No Closes #51344 from AnishMahto/sdp-python-query-origins. Authored-by: anishm-db <[email protected]> Signed-off-by: Sandy Ryza <[email protected]>
1 parent 7ed0e37 commit 65ff85a

File tree

13 files changed

+359
-53
lines changed

13 files changed

+359
-53
lines changed

python/pyspark/pipelines/source_code_location.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,34 @@ def get_caller_source_code_location(stacklevel: int) -> SourceCodeLocation:
3030
"""
3131
Returns a SourceCodeLocation object representing the location code that invokes this function.
3232
33+
If this function is called from a decorator (ex. @sdp.table), note that the returned line
34+
number is affected by how the decorator was triggered - i.e. whether @sdp.table or @sdp.table()
35+
was called - AND what python version is being used
36+
37+
Case 1:
38+
|@sdp.table()
39+
|def fn
40+
41+
@sdp.table() is executed immediately, on line 1. This is true for all python versions.
42+
43+
Case 2:
44+
|@sdp.table
45+
|def fn
46+
47+
In python < 3.10, @sdp.table will expand to fn = sdp.table(fn), replacing the line that `fn` is
48+
defined on. This would be line 2. More interestingly, this means:
49+
50+
|@sdp.table
51+
|
52+
|
53+
|def fn
54+
55+
Will expand to fn = sdp.table(fn) on line 4, where `fn` is defined.
56+
57+
However, in python 3.10+, the line number in the stack trace will still be the line that the
58+
decorator was defined on. In other words, case 2 will be treated the same as case 1, and the
59+
line number will be 1.
60+
3361
:param stacklevel: The number of stack frames to go up. 0 means the direct caller of this
3462
function, 1 means the caller of the caller, and so on.
3563
"""

python/pyspark/pipelines/spark_connect_graph_element_registry.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
)
3030
from pyspark.pipelines.flow import Flow
3131
from pyspark.pipelines.graph_element_registry import GraphElementRegistry
32+
from pyspark.pipelines.source_code_location import SourceCodeLocation
3233
from typing import Any, cast
3334
import pyspark.sql.connect.proto as pb2
3435

@@ -79,6 +80,7 @@ def register_dataset(self, dataset: Dataset) -> None:
7980
partition_cols=partition_cols,
8081
schema=schema,
8182
format=format,
83+
source_code_location=source_code_location_to_proto(dataset.source_code_location),
8284
)
8385
command = pb2.Command()
8486
command.pipeline_command.define_dataset.CopyFrom(inner_command)
@@ -95,6 +97,7 @@ def register_flow(self, flow: Flow) -> None:
9597
target_dataset_name=flow.target,
9698
relation=relation,
9799
sql_conf=flow.spark_conf,
100+
source_code_location=source_code_location_to_proto(flow.source_code_location),
98101
)
99102
command = pb2.Command()
100103
command.pipeline_command.define_flow.CopyFrom(inner_command)
@@ -109,3 +112,11 @@ def register_sql(self, sql_text: str, file_path: Path) -> None:
109112
command = pb2.Command()
110113
command.pipeline_command.define_sql_graph_elements.CopyFrom(inner_command)
111114
self._client.execute_command(command)
115+
116+
117+
def source_code_location_to_proto(
118+
source_code_location: SourceCodeLocation,
119+
) -> pb2.SourceCodeLocation:
120+
return pb2.SourceCodeLocation(
121+
file_name=source_code_location.filename, line_number=source_code_location.line_number
122+
)

python/pyspark/sql/connect/proto/pipelines_pb2.py

Lines changed: 35 additions & 33 deletions
Large diffs are not rendered by default.

python/pyspark/sql/connect/proto/pipelines_pb2.pyi

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@ class PipelineCommand(google.protobuf.message.Message):
233233
PARTITION_COLS_FIELD_NUMBER: builtins.int
234234
SCHEMA_FIELD_NUMBER: builtins.int
235235
FORMAT_FIELD_NUMBER: builtins.int
236+
SOURCE_CODE_LOCATION_FIELD_NUMBER: builtins.int
236237
dataflow_graph_id: builtins.str
237238
"""The graph to attach this dataset to."""
238239
dataset_name: builtins.str
@@ -260,6 +261,9 @@ class PipelineCommand(google.protobuf.message.Message):
260261
"""The output table format of the dataset. Only applies to dataset_type == TABLE and
261262
dataset_type == MATERIALIZED_VIEW.
262263
"""
264+
@property
265+
def source_code_location(self) -> global___SourceCodeLocation:
266+
"""The location in source code that this dataset was defined."""
263267
def __init__(
264268
self,
265269
*,
@@ -271,6 +275,7 @@ class PipelineCommand(google.protobuf.message.Message):
271275
partition_cols: collections.abc.Iterable[builtins.str] | None = ...,
272276
schema: pyspark.sql.connect.proto.types_pb2.DataType | None = ...,
273277
format: builtins.str | None = ...,
278+
source_code_location: global___SourceCodeLocation | None = ...,
274279
) -> None: ...
275280
def HasField(
276281
self,
@@ -287,6 +292,8 @@ class PipelineCommand(google.protobuf.message.Message):
287292
b"_format",
288293
"_schema",
289294
b"_schema",
295+
"_source_code_location",
296+
b"_source_code_location",
290297
"comment",
291298
b"comment",
292299
"dataflow_graph_id",
@@ -299,6 +306,8 @@ class PipelineCommand(google.protobuf.message.Message):
299306
b"format",
300307
"schema",
301308
b"schema",
309+
"source_code_location",
310+
b"source_code_location",
302311
],
303312
) -> builtins.bool: ...
304313
def ClearField(
@@ -316,6 +325,8 @@ class PipelineCommand(google.protobuf.message.Message):
316325
b"_format",
317326
"_schema",
318327
b"_schema",
328+
"_source_code_location",
329+
b"_source_code_location",
319330
"comment",
320331
b"comment",
321332
"dataflow_graph_id",
@@ -330,6 +341,8 @@ class PipelineCommand(google.protobuf.message.Message):
330341
b"partition_cols",
331342
"schema",
332343
b"schema",
344+
"source_code_location",
345+
b"source_code_location",
333346
"table_properties",
334347
b"table_properties",
335348
],
@@ -359,6 +372,13 @@ class PipelineCommand(google.protobuf.message.Message):
359372
def WhichOneof(
360373
self, oneof_group: typing_extensions.Literal["_schema", b"_schema"]
361374
) -> typing_extensions.Literal["schema"] | None: ...
375+
@typing.overload
376+
def WhichOneof(
377+
self,
378+
oneof_group: typing_extensions.Literal[
379+
"_source_code_location", b"_source_code_location"
380+
],
381+
) -> typing_extensions.Literal["source_code_location"] | None: ...
362382

363383
class DefineFlow(google.protobuf.message.Message):
364384
"""Request to define a flow targeting a dataset."""
@@ -415,6 +435,7 @@ class PipelineCommand(google.protobuf.message.Message):
415435
RELATION_FIELD_NUMBER: builtins.int
416436
SQL_CONF_FIELD_NUMBER: builtins.int
417437
CLIENT_ID_FIELD_NUMBER: builtins.int
438+
SOURCE_CODE_LOCATION_FIELD_NUMBER: builtins.int
418439
dataflow_graph_id: builtins.str
419440
"""The graph to attach this flow to."""
420441
flow_name: builtins.str
@@ -435,6 +456,9 @@ class PipelineCommand(google.protobuf.message.Message):
435456
"""Identifier for the client making the request. The server uses this to determine what flow
436457
evaluation request stream to dispatch evaluation requests to for this flow.
437458
"""
459+
@property
460+
def source_code_location(self) -> global___SourceCodeLocation:
461+
"""The location in source code that this flow was defined."""
438462
def __init__(
439463
self,
440464
*,
@@ -444,6 +468,7 @@ class PipelineCommand(google.protobuf.message.Message):
444468
relation: pyspark.sql.connect.proto.relations_pb2.Relation | None = ...,
445469
sql_conf: collections.abc.Mapping[builtins.str, builtins.str] | None = ...,
446470
client_id: builtins.str | None = ...,
471+
source_code_location: global___SourceCodeLocation | None = ...,
447472
) -> None: ...
448473
def HasField(
449474
self,
@@ -456,6 +481,8 @@ class PipelineCommand(google.protobuf.message.Message):
456481
b"_flow_name",
457482
"_relation",
458483
b"_relation",
484+
"_source_code_location",
485+
b"_source_code_location",
459486
"_target_dataset_name",
460487
b"_target_dataset_name",
461488
"client_id",
@@ -466,6 +493,8 @@ class PipelineCommand(google.protobuf.message.Message):
466493
b"flow_name",
467494
"relation",
468495
b"relation",
496+
"source_code_location",
497+
b"source_code_location",
469498
"target_dataset_name",
470499
b"target_dataset_name",
471500
],
@@ -481,6 +510,8 @@ class PipelineCommand(google.protobuf.message.Message):
481510
b"_flow_name",
482511
"_relation",
483512
b"_relation",
513+
"_source_code_location",
514+
b"_source_code_location",
484515
"_target_dataset_name",
485516
b"_target_dataset_name",
486517
"client_id",
@@ -491,6 +522,8 @@ class PipelineCommand(google.protobuf.message.Message):
491522
b"flow_name",
492523
"relation",
493524
b"relation",
525+
"source_code_location",
526+
b"source_code_location",
494527
"sql_conf",
495528
b"sql_conf",
496529
"target_dataset_name",
@@ -515,6 +548,13 @@ class PipelineCommand(google.protobuf.message.Message):
515548
self, oneof_group: typing_extensions.Literal["_relation", b"_relation"]
516549
) -> typing_extensions.Literal["relation"] | None: ...
517550
@typing.overload
551+
def WhichOneof(
552+
self,
553+
oneof_group: typing_extensions.Literal[
554+
"_source_code_location", b"_source_code_location"
555+
],
556+
) -> typing_extensions.Literal["source_code_location"] | None: ...
557+
@typing.overload
518558
def WhichOneof(
519559
self,
520560
oneof_group: typing_extensions.Literal["_target_dataset_name", b"_target_dataset_name"],
@@ -1134,6 +1174,60 @@ class PipelineEvent(google.protobuf.message.Message):
11341174

11351175
global___PipelineEvent = PipelineEvent
11361176

1177+
class SourceCodeLocation(google.protobuf.message.Message):
1178+
"""Source code location information associated with a particular dataset or flow."""
1179+
1180+
DESCRIPTOR: google.protobuf.descriptor.Descriptor
1181+
1182+
FILE_NAME_FIELD_NUMBER: builtins.int
1183+
LINE_NUMBER_FIELD_NUMBER: builtins.int
1184+
file_name: builtins.str
1185+
"""The file that this pipeline source code was defined in."""
1186+
line_number: builtins.int
1187+
"""The specific line number that this pipeline source code is located at, if applicable."""
1188+
def __init__(
1189+
self,
1190+
*,
1191+
file_name: builtins.str | None = ...,
1192+
line_number: builtins.int | None = ...,
1193+
) -> None: ...
1194+
def HasField(
1195+
self,
1196+
field_name: typing_extensions.Literal[
1197+
"_file_name",
1198+
b"_file_name",
1199+
"_line_number",
1200+
b"_line_number",
1201+
"file_name",
1202+
b"file_name",
1203+
"line_number",
1204+
b"line_number",
1205+
],
1206+
) -> builtins.bool: ...
1207+
def ClearField(
1208+
self,
1209+
field_name: typing_extensions.Literal[
1210+
"_file_name",
1211+
b"_file_name",
1212+
"_line_number",
1213+
b"_line_number",
1214+
"file_name",
1215+
b"file_name",
1216+
"line_number",
1217+
b"line_number",
1218+
],
1219+
) -> None: ...
1220+
@typing.overload
1221+
def WhichOneof(
1222+
self, oneof_group: typing_extensions.Literal["_file_name", b"_file_name"]
1223+
) -> typing_extensions.Literal["file_name"] | None: ...
1224+
@typing.overload
1225+
def WhichOneof(
1226+
self, oneof_group: typing_extensions.Literal["_line_number", b"_line_number"]
1227+
) -> typing_extensions.Literal["line_number"] | None: ...
1228+
1229+
global___SourceCodeLocation = SourceCodeLocation
1230+
11371231
class PipelineQueryFunctionExecutionSignal(google.protobuf.message.Message):
11381232
"""A signal from the server to the client to execute the query function for one or more flows, and
11391233
to register their results with the server.

sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,9 @@ message PipelineCommand {
8686
// The output table format of the dataset. Only applies to dataset_type == TABLE and
8787
// dataset_type == MATERIALIZED_VIEW.
8888
optional string format = 8;
89+
90+
// The location in source code that this dataset was defined.
91+
optional SourceCodeLocation source_code_location = 9;
8992
}
9093

9194
// Request to define a flow targeting a dataset.
@@ -110,6 +113,9 @@ message PipelineCommand {
110113
// evaluation request stream to dispatch evaluation requests to for this flow.
111114
optional string client_id = 6;
112115

116+
// The location in source code that this flow was defined.
117+
optional SourceCodeLocation source_code_location = 7;
118+
113119
message Response {
114120
// Fully qualified flow name that uniquely identify a flow in the Dataflow graph.
115121
optional string flow_name = 1;
@@ -217,6 +223,14 @@ message PipelineEvent {
217223
optional string message = 2;
218224
}
219225

226+
// Source code location information associated with a particular dataset or flow.
227+
message SourceCodeLocation {
228+
// The file that this pipeline source code was defined in.
229+
optional string file_name = 1;
230+
// The specific line number that this pipeline source code is located at, if applicable.
231+
optional int32 line_number = 2;
232+
}
233+
220234
// A signal from the server to the client to execute the query function for one or more flows, and
221235
// to register their results with the server.
222236
message PipelineQueryFunctionExecutionSignal {

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,11 @@ private[connect] object PipelinesHandler extends Logging {
195195
partitionCols = Option(dataset.getPartitionColsList.asScala.toSeq)
196196
.filter(_.nonEmpty),
197197
properties = dataset.getTablePropertiesMap.asScala.toMap,
198-
baseOrigin = QueryOrigin(
198+
origin = QueryOrigin(
199+
filePath = Option.when(dataset.getSourceCodeLocation.hasFileName)(
200+
dataset.getSourceCodeLocation.getFileName),
201+
line = Option.when(dataset.getSourceCodeLocation.hasLineNumber)(
202+
dataset.getSourceCodeLocation.getLineNumber),
199203
objectType = Option(QueryOriginType.Table.toString),
200204
objectName = Option(qualifiedIdentifier.unquotedString),
201205
language = Option(Python())),
@@ -212,6 +216,10 @@ private[connect] object PipelinesHandler extends Logging {
212216
identifier = viewIdentifier,
213217
comment = Option(dataset.getComment),
214218
origin = QueryOrigin(
219+
filePath = Option.when(dataset.getSourceCodeLocation.hasFileName)(
220+
dataset.getSourceCodeLocation.getFileName),
221+
line = Option.when(dataset.getSourceCodeLocation.hasLineNumber)(
222+
dataset.getSourceCodeLocation.getLineNumber),
215223
objectType = Option(QueryOriginType.View.toString),
216224
objectName = Option(viewIdentifier.unquotedString),
217225
language = Option(Python())),
@@ -281,6 +289,10 @@ private[connect] object PipelinesHandler extends Logging {
281289
once = false,
282290
queryContext = QueryContext(Option(defaultCatalog), Option(defaultDatabase)),
283291
origin = QueryOrigin(
292+
filePath = Option.when(flow.getSourceCodeLocation.hasFileName)(
293+
flow.getSourceCodeLocation.getFileName),
294+
line = Option.when(flow.getSourceCodeLocation.hasLineNumber)(
295+
flow.getSourceCodeLocation.getLineNumber),
284296
objectType = Option(QueryOriginType.Flow.toString),
285297
objectName = Option(flowIdentifier.unquotedString),
286298
language = Option(Python()))))

0 commit comments

Comments
 (0)