Skip to content

Commit 0f631a7

Browse files
committed
refactor: renamed query trigger start event to query execution start event
1 parent bf30414 commit 0f631a7

26 files changed

Lines changed: 109 additions & 109 deletions

File tree

python/pyspark/sql/connect/streaming/query.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
QueryProgressEvent,
3434
QueryIdleEvent,
3535
QueryTerminatedEvent,
36-
QueryTriggerStartEvent,
36+
QueryExecutionStartEvent,
3737
StreamingQueryProgress,
3838
)
3939
from pyspark.sql.streaming.query import (
@@ -394,15 +394,15 @@ def _query_event_handler(self, iter: Iterator[Dict[str, Any]]) -> None:
394394
@staticmethod
395395
def deserialize(
396396
event: pb2.StreamingQueryListenerEvent,
397-
) -> Union["QueryProgressEvent", "QueryIdleEvent", "QueryTerminatedEvent", "QueryTriggerStartEvent"]:
397+
) -> Union["QueryProgressEvent", "QueryIdleEvent", "QueryTerminatedEvent", "QueryExecutionStartEvent"]:
398398
if event.event_type == proto.StreamingQueryEventType.QUERY_PROGRESS_EVENT:
399399
return QueryProgressEvent.fromJson(json.loads(event.event_json))
400400
elif event.event_type == proto.StreamingQueryEventType.QUERY_TERMINATED_EVENT:
401401
return QueryTerminatedEvent.fromJson(json.loads(event.event_json))
402402
elif event.event_type == proto.StreamingQueryEventType.QUERY_IDLE_EVENT:
403403
return QueryIdleEvent.fromJson(json.loads(event.event_json))
404-
elif event.event_type == proto.StreamingQueryEventType.QUERY_TRIGGER_START_EVENT:
405-
return QueryTriggerStartEvent.fromJson(json.loads(event.event_json))
404+
elif event.event_type == proto.StreamingQueryEventType.QUERY_EXECUTION_START_EVENT:
405+
return QueryExecutionStartEvent.fromJson(json.loads(event.event_json))
406406
else:
407407
raise PySparkValueError(
408408
errorClass="UNKNOWN_VALUE_FOR",
@@ -412,7 +412,7 @@ def deserialize(
412412
def post_to_all(
413413
self,
414414
event: Union[
415-
"QueryStartedEvent", "QueryProgressEvent", "QueryIdleEvent", "QueryTerminatedEvent", "QueryTriggerStartEvent"
415+
"QueryStartedEvent", "QueryProgressEvent", "QueryIdleEvent", "QueryTerminatedEvent", "QueryExecutionStartEvent"
416416
],
417417
) -> None:
418418
"""
@@ -430,8 +430,8 @@ def post_to_all(
430430
listener.onQueryIdle(event)
431431
elif isinstance(event, QueryTerminatedEvent):
432432
listener.onQueryTerminated(event)
433-
elif isinstance(event, QueryTriggerStartEvent):
434-
listener.onQueryTriggerStart(event)
433+
elif isinstance(event, QueryExecutionStartEvent):
434+
listener.onQueryExecutionStart(event)
435435
else:
436436
warnings.warn(f"Unknown StreamingQueryListener event: {event}")
437437
except Exception as e:

python/pyspark/sql/connect/streaming/worker/listener_worker.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
QueryProgressEvent,
4040
QueryTerminatedEvent,
4141
QueryIdleEvent,
42-
QueryTriggerStartEvent,
42+
QueryExecutionStartEvent,
4343
)
4444
from pyspark.worker_util import check_python_version
4545

@@ -91,7 +91,7 @@ def process(listener_event_str, listener_event_type): # type: ignore[no-untyped
9191
elif listener_event_type == 3:
9292
listener.onQueryTerminated(QueryTerminatedEvent.fromJson(listener_event))
9393
elif listener_event_type == 4:
94-
listener.onQueryTriggerStart(QueryTriggerStartEvent.fromJson(listener_event))
94+
listener.onQueryExecutionStart(QueryExecutionStartEvent.fromJson(listener_event))
9595

9696
while True:
9797
event = utf8_deserializer.loads(infile)

python/pyspark/sql/dataframe.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4373,7 +4373,7 @@ def observe(
43734373
... def onQueryTerminated(self, event):
43744374
... pass
43754375
...
4376-
... def onQueryTriggerStart(self, event):
4376+
... def onQueryExecutionStart(self, event):
43774377
... pass
43784378
...
43794379
>>> error_listener = MyErrorListener()

python/pyspark/sql/streaming/listener.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ class StreamingQueryListener(ABC):
6060
... # Do something with event.
6161
... pass
6262
...
63-
... def onQueryTriggerStart(self, event: QueryTriggerStartEvent) -> None:
63+
... def onQueryExecutionStart(self, event: QueryExecutionStartEvent) -> None:
6464
... # Do something with event.
6565
... pass
6666
...
@@ -133,7 +133,7 @@ def onQueryTerminated(self, event: "QueryTerminatedEvent") -> None:
133133

134134
# NOTE: Do not mark this as abstract method, we are following the same pattern as
135135
# onQueryIdle to avoid breaking existing implementations.
136-
def onQueryTriggerStart(self, event: "QueryTriggerStartEvent") -> None:
136+
def onQueryExecutionStart(self, event: "QueryExecutionStartEvent") -> None:
137137
"""
138138
Called when a query trigger is started.
139139
"""
@@ -174,8 +174,8 @@ def onQueryIdle(self, jevent: "JavaObject") -> None:
174174
def onQueryTerminated(self, jevent: "JavaObject") -> None:
175175
self.pylistener.onQueryTerminated(QueryTerminatedEvent.fromJObject(jevent))
176176

177-
def onQueryTriggerStart(self, jevent: "JavaObject") -> None:
178-
self.pylistener.onQueryTriggerStart(QueryTriggerStartEvent.fromJObject(jevent))
177+
def onQueryExecutionStart(self, jevent: "JavaObject") -> None:
178+
self.pylistener.onQueryExecutionStart(QueryExecutionStartEvent.fromJObject(jevent))
179179

180180
class Java:
181181
implements = ["org.apache.spark.sql.streaming.PythonStreamingQueryListener"]
@@ -411,7 +411,7 @@ def errorClassOnException(self) -> Optional[str]:
411411
"""
412412
return self._errorClassOnException
413413

414-
class QueryTriggerStartEvent:
414+
class QueryExecutionStartEvent:
415415
"""
416416
Event representing the start of a query trigger.
417417
@@ -434,7 +434,7 @@ def __init__(
434434
self._timestamp: str = timestamp
435435

436436
@classmethod
437-
def fromJObject(cls, jevent: "JavaObject") -> "QueryTriggerStartEvent":
437+
def fromJObject(cls, jevent: "JavaObject") -> "QueryExecutionStartEvent":
438438
return cls(
439439
id=uuid.UUID(jevent.id().toString()),
440440
runId=uuid.UUID(jevent.runId().toString()),
@@ -443,7 +443,7 @@ def fromJObject(cls, jevent: "JavaObject") -> "QueryTriggerStartEvent":
443443
)
444444

445445
@classmethod
446-
def fromJson(cls, j: Dict[str, Any]) -> "QueryTriggerStartEvent":
446+
def fromJson(cls, j: Dict[str, Any]) -> "QueryExecutionStartEvent":
447447
return cls(
448448
id=uuid.UUID(j["id"]),
449449
runId=uuid.UUID(j["runId"]),

python/pyspark/sql/streaming/query.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -656,7 +656,7 @@ def addListener(self, listener: StreamingQueryListener) -> None:
656656
... def onQueryTerminated(self, event):
657657
... pass
658658
...
659-
... def onQueryTriggerStart(self, event):
659+
... def onQueryExecutionStart(self, event):
660660
... pass
661661
...
662662
>>> test_listener = TestListener()
@@ -708,7 +708,7 @@ def removeListener(self, listener: StreamingQueryListener) -> None:
708708
... def onQueryTerminated(self, event):
709709
... pass
710710
...
711-
... def onQueryTriggerStart(self, event):
711+
... def onQueryExecutionStart(self, event):
712712
... pass
713713
...
714714
>>> test_listener = TestListener()

python/pyspark/sql/tests/connect/streaming/test_parity_listener.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ def onQueryTerminated(self, event):
4646
df = self.spark.createDataFrame(data=[(e,)])
4747
df.write.mode("append").saveAsTable("listener_terminated_events")
4848

49-
def onQueryTriggerStart(self, event) -> None:
49+
def onQueryExecutionStart(self, event) -> None:
5050
pass
5151

5252

@@ -86,7 +86,7 @@ def onQueryIdle(self, event):
8686
def onQueryTerminated(self, event):
8787
self.terminated.append(event)
8888

89-
def onQueryTriggerStart(self, event) -> None:
89+
def onQueryExecutionStart(self, event) -> None:
9090
pass
9191

9292

python/pyspark/sql/tests/streaming/test_streaming_listener.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ def onQueryIdle(self, event):
216216
def onQueryTerminated(self, event):
217217
pass
218218

219-
def onQueryTriggerStart(self, event):
219+
def onQueryExecutionStart(self, event):
220220
pass
221221

222222
try:
@@ -361,7 +361,7 @@ def onQueryTerminated(self, event):
361361
nonlocal terminated_event
362362
terminated_event = event
363363

364-
# V3: The interface after the method `onQueryTriggerStart` is added. It is Spark 4.1+.
364+
# V3: The interface after the method `onQueryExecutionStart` is added. It is Spark 4.1+.
365365
class TestListenerV3(StreamingQueryListener):
366366
def onQueryStarted(self, event):
367367
nonlocal start_event
@@ -378,7 +378,7 @@ def onQueryTerminated(self, event):
378378
nonlocal terminated_event
379379
terminated_event = event
380380

381-
def onQueryTriggerStart(self, event):
381+
def onQueryExecutionStart(self, event):
382382
pass
383383

384384
def verify(test_listener):
@@ -464,7 +464,7 @@ def onQueryIdle(self, event):
464464
def onQueryTerminated(self, event):
465465
pass
466466

467-
# V3: The interface after the method `onQueryTriggerStart` is added. It is Spark 4.1+.
467+
# V3: The interface after the method `onQueryExecutionStart` is added. It is Spark 4.1+.
468468
class TestListenerV3(StreamingQueryListener):
469469
def onQueryStarted(self, event):
470470
pass
@@ -478,7 +478,7 @@ def onQueryIdle(self, event):
478478
def onQueryTerminated(self, event):
479479
pass
480480

481-
def onQueryTriggerStart(self, event):
481+
def onQueryExecutionStart(self, event):
482482
pass
483483

484484
def verify(test_listener):

python/pyspark/sql/tests/test_observation.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ def onQueryIdle(self, event):
137137
def onQueryTerminated(self, event):
138138
pass
139139

140-
def onQueryTriggerStart(self, event):
140+
def onQueryExecutionStart(self, event):
141141
pass
142142

143143
self.spark.streams.addListener(TestListener())

sql/api/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ abstract class StreamingQueryListener extends Serializable {
8181
* Called when a query's microbatch trigger is started.
8282
* @since 4.2.0
8383
*/
84-
def onQueryTriggerStart(event: QueryTriggerStartEvent): Unit = {}
84+
def onQueryExecutionStart(event: QueryExecutionStartEvent): Unit = {}
8585
}
8686

8787
/**
@@ -98,7 +98,7 @@ private[spark] trait PythonStreamingQueryListener {
9898

9999
def onQueryTerminated(event: QueryTerminatedEvent): Unit
100100

101-
def onQueryTriggerStart(event: QueryTriggerStartEvent): Unit
101+
def onQueryExecutionStart(event: QueryExecutionStartEvent): Unit
102102
}
103103

104104
private[spark] class PythonStreamingQueryListenerWrapper(listener: PythonStreamingQueryListener)
@@ -113,8 +113,8 @@ private[spark] class PythonStreamingQueryListenerWrapper(listener: PythonStreami
113113

114114
def onQueryTerminated(event: QueryTerminatedEvent): Unit = listener.onQueryTerminated(event)
115115

116-
override def onQueryTriggerStart(event: QueryTriggerStartEvent): Unit =
117-
listener.onQueryTriggerStart(event)
116+
override def onQueryExecutionStart(event: QueryExecutionStartEvent): Unit =
117+
listener.onQueryExecutionStart(event)
118118
}
119119

120120
/**
@@ -313,7 +313,7 @@ object StreamingQueryListener extends Serializable {
313313
* The timestamp start of a query trigger
314314
*/
315315
@Evolving
316-
class QueryTriggerStartEvent private[sql] (
316+
class QueryExecutionStartEvent private[sql] (
317317
val id: UUID,
318318
val runId: UUID,
319319
val name: String,
@@ -330,10 +330,10 @@ object StreamingQueryListener extends Serializable {
330330
}
331331
}
332332

333-
private[spark] object QueryTriggerStartEvent {
334-
private[spark] def fromJson(json: String): QueryTriggerStartEvent = {
333+
private[spark] object QueryExecutionStartEvent {
334+
private[spark] def fromJson(json: String): QueryExecutionStartEvent = {
335335
val parser = EventParser(json)
336-
new QueryTriggerStartEvent(
336+
new QueryExecutionStartEvent(
337337
parser.getUUID("id"),
338338
parser.getUUID("runId"),
339339
parser.getString("name"),

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5683,7 +5683,7 @@ object SQLConf {
56835683
.booleanConf
56845684
.createWithDefault(true)
56855685

5686-
val STREAMING_QUERY_TRIGGER_START_EVENT_ENABLED =
5686+
val STREAMING_QUERY_EXECUTION_START_EVENT_ENABLED =
56875687
buildConf("spark.sql.streaming.query.trigger.start.event.enabled")
56885688
.internal()
56895689
.doc("When set to true, spark will emit events for when streaming queries micro-batches " +
@@ -5692,10 +5692,10 @@ object SQLConf {
56925692
.booleanConf
56935693
.createWithDefault(false)
56945694

5695-
val STREAMING_QUERY_TRIGGER_START_EVENT_MIN_INTERVAL =
5695+
val STREAMING_QUERY_EXECUTION_START_EVENT_MIN_INTERVAL =
56965696
buildConf("spark.sql.streaming.query.trigger.start.event.minInterval")
56975697
.internal()
5698-
.doc("The minimum interval in milliseconds between QueryTriggerStart events emissions for " +
5698+
.doc("The minimum interval in milliseconds between QueryExecutionStart events emissions for " +
56995699
"streaming queries. This is to avoid flooding the event queue with events for " +
57005700
"high-frequency queries.")
57015701
.version("4.2.0")
@@ -6496,11 +6496,11 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
64966496
def disabledV2StreamingMicroBatchReaders: String =
64976497
getConf(DISABLED_V2_STREAMING_MICROBATCH_READERS)
64986498

6499-
def streamingQueryTriggerStartEventEnabled: Boolean =
6500-
getConf(STREAMING_QUERY_TRIGGER_START_EVENT_ENABLED)
6499+
def streamingQueryExecutionStartEventEnabled: Boolean =
6500+
getConf(STREAMING_QUERY_EXECUTION_START_EVENT_ENABLED)
65016501

6502-
def streamingQueryTriggerStartEventMinInterval: Long =
6503-
getConf(STREAMING_QUERY_TRIGGER_START_EVENT_MIN_INTERVAL)
6502+
def streamingQueryExecutionStartEventMinInterval: Long =
6503+
getConf(STREAMING_QUERY_EXECUTION_START_EVENT_MIN_INTERVAL)
65046504

65056505
def fastFailFileFormatOutput: Boolean = getConf(FASTFAIL_ON_FILEFORMAT_OUTPUT)
65066506

0 commit comments

Comments
 (0)