[WIP] Introduce a new callback "onQueryExecutionStart" to StreamingQueryListener#1
[WIP] Introduce a new callback "onQueryExecutionStart" to StreamingQueryListener#1JevonCowell wants to merge 2435 commits intomasterfrom
Conversation
| @@ -89,6 +90,8 @@ def process(listener_event_str, listener_event_type): # type: ignore[no-untyped | |||
| listener.onQueryIdle(QueryIdleEvent.fromJson(listener_event)) | |||
| elif listener_event_type == 3: | |||
There was a problem hiding this comment.
I'm not super familiar with spark code, but are these based on commands_pb2.pyi? If so shouldn't onQueryIdle be 3?
| handleOnQueryTerminated(event) | ||
| } | ||
|
|
||
| class EventCollectorV3 extends EventCollectorV2 { |
There was a problem hiding this comment.
Not sure if i should make this a dependency chain or just inherit from the first collection
| currentTriggerEndOffsets = null | ||
| currentTriggerLatestOffsets = null | ||
| currentDurationsMs.clear() | ||
| progressReporter.updateQueryTriggerStart(id, runId, name, currentTriggerStartTimestamp) |
There was a problem hiding this comment.
Will sleep on this but wondering if this is the best place to put this, both ContinousProcessing & MicroBatchExecution utilize this method. Based on documentation, ContinousProcessing might be executing this method in the milliseconds (need to dive into code to confirm, this is only my impression)
The
if (now - lastNoExecutionProgressEventTime >= queryTriggerStartEventMinInterval) {
postEvent(
new QueryTriggerStartEvent(id, runId, name, formatTimestamp(currentTriggerStartTimestamp)))
lastNoExecutionProgressEventTime = now
}should make it so that we are not emitting events on each "continuous batch" but still not 100% sure of the consequences of computing this every time and how it affects the "(~1 ms) end-to-end latency" SLA. I might need to move this over to just MicroBatchExecution but not sure about how apache folks will fewel about it.
…tTables on catalogs missing namespace capbalitity ### What changes were proposed in this pull request? For DSv2 catalog implementation that does not mix in the interface `SupportsNamespaces`, `SHOW SCHEMAS IN foo_catalog` fails with `MISSING_CATALOG_ABILITY.NAMESPACES`, we should catch this and return empty result instead of failing on `SparkConnectDatabaseMetaData#getSchemas|getTables` ### Why are the changes needed? Fix a bug in the Connect JDBC dirver. ### Does this PR introduce _any_ user-facing change? No, Connect JDBC dirver is an unreleased feature. ### How was this patch tested? UT is adjusted to cover the changes. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#53246 from pan3793/SPARK-54537. Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request? Couples of minor fix for Connect JDBC driver - fix typos - use import instead of the inline full package class reference - a missing call `checkOpen` - remove duplicated scaladocs generated by AI. ### Why are the changes needed? Minor fix. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#53248 from pan3793/SPARK-54540. Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request? Fix the maxBytes config in tests ### Why are the changes needed? the testing configs `(1000, 4096)` were duplicated, IntMax should be tested instead ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? ci ### Was this patch authored or co-authored using generative AI tooling? no Closes apache#53249 from zhengruifeng/test_grouped_conf. Authored-by: Ruifeng Zheng <ruifengz@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request? Fix the wrong example query in `WindowGroupLimit` ### Why are the changes needed? Fix the incorrect comments ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? <!-- No need test ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#52993 from huangxiaopingRD/SPARK-54299. Authored-by: huangxiaoping <1754789345@qq.com> Signed-off-by: beliefer <beliefer@163.com>
### What changes were proposed in this pull request? I rename _LEGACY_ERROR_TEMP_1201 to more understandable name (UNRESOLVED_COLUMN_AMONG_FIELD_NAMES) and add sqlState ### Why are the changes needed? This is a frequent error that is not classified ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? This is a small refactoring, tests are already there ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#53197 from aleksandr-chernousov-db/le_1201. Authored-by: Ubuntu <aleksandr.chernousov@your.hostname.com> Signed-off-by: yangjie01 <yangjie01@baidu.com>
… `TableProvider`
### What changes were proposed in this pull request?
This PR adds a new marker interface SupportsV1OverwriteWithSaveAsTable that data sources can implement to distinguish between DataFrameWriter V1 saveAsTable with SaveMode.Overwrite and DataFrameWriter V2 createOrReplace/replace operations.
* Added SupportsV1OverwriteWithSaveAsTable mixin interface to TableProvider
* Modified DataFrameWriter.saveAsTable to add an internal write option (`__v1_save_as_table_overwrite=true`) when the provider implements this interface and mode is Overwrite
* Added tests verifying the option is only added for providers that opt-in
### Why are the changes needed?
Spark's SaveMode.Overwrite is documented as:
```
* if data/table already exists, existing data is expected to be overwritten
* by the contents of the DataFrame.
```
It does not define the behaviour of overwriting the table metadata (schema, etc).
However, DataFrameWriter V1 creates a ReplaceTableAsSelect plan, which is the same as the plan of DataFrameWriterV2 createOrReplace API, which is documented as:
```
* The output table's schema, partition layout, properties, and other configuration
* will be based on the contents of the data frame and the configuration set on this
* writer. If the table exists, its configuration and data will be replaced.
```
Therefore, for calls via DataFrameWriter V2 createOrReplace, the metadata always needs to be replaced.
Datasources migrating from V1 to V2 might have interpreted it differently.
In particular, Delta Lake datasource interpretation of this API documentation of DataFrameWriter V1 is to not replace table schema, unless Delta-specific option "overwriteSchema" is set to true. Changing the bahaviour to the V2 semantics is unfriendly to the users, as it can cause corruption of the tables: an operations that overwrote only data before, will now also overwrite the table's schema, partitioning info and other properties.
Since the created plan is exactly the same, Delta used a very ugly hack to detect where the API call is coming from based on the stack trace of the call.
In Spark 4.1 in connect mode, this stopped working because planning and execution of the commands go decoupled, and the stack trace no longer contains this point where the plan got created.
To not introduce a behaviour change in the Delta datasource with Spark 4.1 in connect mode, Spark provides this new interface SupportsV1OverwriteWithSaveAsTable which will make DataFrameWriter V1 add an explicit storage option to indicate to Delta datasource that this call is coming from DataFrameWriter V1.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Spark tests added.
It was tested locally with Delta Lake side of changes. It cannot yet be raised in a Delta Lake PR, because Delta Lake master branch does not yet cross-compile with Spark 4.1 (WIP).
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code opus-4.5
Closes apache#53215 from juliuszsompolski/RequiresDataFrameWriterV1SaveAsTableOverwriteWriteOption.
Authored-by: Juliusz Sompolski <Juliusz Sompolski>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…cs call in makeNegative ### What changes were proposed in this pull request? In the method `org.apache.spark.status.LiveEntityHelpers.makeNegative`, it calls `createMetrics` with a wrong arguments order which passes metric values to wrong positions. The order is wrong starting from the `shufflePushReadMetrics. corruptMergedBlockChunks` ### Why are the changes needed? Passing argument values to wrong positions messes up the method call to `createMetrics` and eventually negates metrics with wrong values. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added a test case. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#53218 from jiwen624/makeNegative-arg-order. Authored-by: Eric Yang <jiwen624@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request? Currently connection error in JDBCUtils.scala gets thrown as is Need to classify it to catch properly (to be able to add sqlState) ### Why are the changes needed? Right now this exception is not caught and unclassified ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Tests are added in this PR ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#53195 from aleksandr-chernousov-db/SPARK-54490. Authored-by: Ubuntu <aleksandr.chernousov@your.hostname.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request? I rename _LEGACY_ERROR_TEMP_1133 to more understandable name (USER_SPECIFIED_AND_ACTUAL_SCHEMA_MISMATCH) and add sqlState ### Why are the changes needed? This is a frequent error that is not classified ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? This is a small refactoring, tests are already there ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#53198 from aleksandr-chernousov-db/le_1133. Authored-by: Ubuntu <aleksandr.chernousov@your.hostname.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…annotation configs by default ### What changes were proposed in this pull request? This PR enables the annotation of the variant parquet logical type and shredded writes and reads by default. ### Why are the changes needed? 1. Having variant data annotated with the variant logical type is required by the parquet variant spec ([source](https://github.com/apache/parquet-format/blob/master/VariantEncoding.md#variant-in-parquet)). This is necessary to adhere to the spec 2. Variant shredding brings in significant performance optimizations over regular unshredded variants, and should be the default mode. ### Does this PR introduce _any_ user-facing change? Yes, variant data written by Spark would be annotated with the variant logical type annotation and variant shredding would be enabled by default. ### How was this patch tested? Existing tests. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#53164 from harshmotw-db/harshmotw-db/enable_variant_shredding. Lead-authored-by: Harsh Motwani <harsh.motwani@databricks.com> Co-authored-by: Wenchen Fan <cloud0fan@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…x length for BinaryType ### What changes were proposed in this pull request? This PR improves the JDBC type mapping for BinaryType in the Spark Connect JDBC client ### Why are the changes needed? - **Semantic correctness**: Types.VARBINARY (variable-length) better matches Spark's BinaryType semantics. - **Industry alignment**: SQL Server dialect already uses VARBINARY(MAX) for BinaryType . Trino JDBC driver uses VARBINARY with a maximum of 1 GB. MariaDB JDBC driver uses VARBINARY/LONGVARBINARY for blob types ### Does this PR introduce _any_ user-facing change? Yes, but minimal impact. Both BINARY and VARBINARY map to byte array types The precision change is within reasonable bounds ### How was this patch tested? Existing tests: All tests in `SparkConnectJdbcDataTypeSuite` pass. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#53252 from vinodkc/br_SPARK-54206_followup_fix. Authored-by: vinodkc <vinod.kc.in@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…ect post-merge builds ### What changes were proposed in this pull request? This PR adds missing python library 'zstandard' in Spark Connect post-merge builds. ### Why are the changes needed? Without zstandard library, Spark Connect server may not function properly as it checks the requirement of library. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? N/A ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#53257 from HeartSaVioR/SPARK-54548. Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…dulerImpl.(executorLost|logExecutorLoss)` methods ### What changes were proposed in this pull request? This PR renamed “hostPort” to “host” in TaskSchedulerImpl. ### Why are the changes needed? Improve code clarity and prevent potential misuse. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? N/A ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#53258 from Xtpacz/use-host-instead-of-hostPort. Authored-by: Shilong Duan <coldwater216@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…ConnectStatement.close()` ### What changes were proposed in this pull request? This PR fixes two issues in `org.apache.spark.sql.connect.client.jdbc.SparkConnectStatement.close()`: - Added `try-catch` to silently handle `ConnectException` during `interruptOperation()` when the server is unavailable - Fixed bug: changed closed = false to closed = true at the end of the method ### Why are the changes needed? - ConnectException handling: The `SparkConnectStatement.close()` method should not throw exceptions during cleanup. Connection exceptions during cleanup are not actionable and only mask more important exceptions. - closed flag bug: Setting closed = false is incorrect and could allow reuse of closed statements. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing unit tests pass ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#53260 from vinodkc/br_handle_close_exception. Authored-by: vinodkc <vinod.kc.in@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…e BINARY data type with `UTF_8`
### What changes were proposed in this pull request?
Fixed `SparkConnectResultSet.getString()` to properly convert BINARY data to UTF-8 strings instead of returning byte array object references (e.g., "[B<hashcode>").
### Why are the changes needed?
The current implementation violates JDBC specification behavior. Users calling getString() on BINARY columns expect UTF-8 decoded strings, not Java object references.
Before
```
SELECT binary('xDeAdBeEf')
spark-sql: `\xDeAdBeEf`
beeline with STS: `\xDeAdBeEf`
beeline with Connect Server: `[B4d518c66`
```
After
```
SELECT binary('xDeAdBeEf')
spark-sql: `\xDeAdBeEf`
beeline with STS: `\xDeAdBeEf`
beeline with Connect Server: `\xDeAdBeEf`
```
### Does this PR introduce _any_ user-facing change?
Yes. getString() on BINARY columns now returns UTF-8 decoded strings instead of byte array references like "[B1a2b3c4d".
### How was this patch tested?
Added new test
### Was this patch authored or co-authored using generative AI tooling?
No
Closes apache#53262 from vinodkc/br_fix_getString_BINARY.
Authored-by: vinodkc <vinod.kc.in@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…age on Yarn ### What changes were proposed in this pull request? Update the RocksDB state store to store its local data underneath `java.io.tmpdir` instead of going through `Utils.getLocalDir` when running on Yarn. This is done through a new util method `createExecutorLocalTempDir`, as there may be other uses cases for this behavior as well. ### Why are the changes needed? On YARN, the local RocksDB directory is placed in a directory created inside the root application folder such as ``` /tmp/nm-local-dir/usercache/<user>/appcache/<app_id>/spark-<uuid>/StateStoreId(...) ``` The problem with this is that if an executor crashes for some reason (like OOM) and the shutdown hooks don't get run, this directory will stay around forever until the application finishes, which can cause jobs to slowly accumulate more and more temporary space until finally the node manager goes unhealthy. Because this data will only ever be accessed by the executor that created this directory, it would make sense to store the data inside the container folder, which will always get cleaned up by the node manager when that yarn container gets cleaned up. Yarn sets the `java.io.tmpdir` to a path inside this directory, such as ``` /tmp/nm-local-dir/usercache/<user>/appcache/<app_id>/<container_id>/tmp/StateStoreId(...) ``` It looks like only Yarn setts the tmpdir property, and other resource managers (standalone and k8s) always rely on the local dirs setting/env vars. ### Does this PR introduce _any_ user-facing change? Shouldn't be any effective changes, other than preventing disk space from filling up on Node Managers under certain scenarios. ### How was this patch tested? New UT Closes apache#42301 from Kimahriman/rocksdb-tmp-dir. Authored-by: Adam Binford <adamq43@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…aTypeSuite ### What changes were proposed in this pull request? Added explicit stdout/stderr redirection to Redirect.DISCARD for the child Spark Connect server process in non-debug mode in SparkConnectServerUtils. ### Why are the changes needed? SparkConnectJdbcDataTypeSuite randomly hangs because the child server process blocks on write() calls when stdout/stderr pipe buffers fill up. Without consuming the output, the buffers reach capacity and cause the process to block indefinitely. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually tested verified `SparkConnectJdbcDataTypeSuite` no longer randomly hangs and all tests pass consistently. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#53261 from vinodkc/br_handle_std_pipe. Authored-by: vinodkc <vinod.kc.in@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
… a config ### What changes were proposed in this pull request? apache#52225 allow MERGE INTO to support case where assignment value is a struct with less fields than the assignment key, ie UPDATE SET big_struct = source.small_struct. This makes this feature off by default, and turned on via a config. ### Why are the changes needed? The change brought some interesting question, for example there is some ambiguity in user intent. Does the UPDATE SET * mean set all nested fields or top level columns? In the first case, missing fields are kept. In the second case, missing fields are nullified. I tried to make a choice in apache#53149 but after some feedback, it may be a bit controversial, choosing one interpretation over another. A SQLConf may not be the right choice, and instead we may need to introduce some new syntax, which require more discussion. ### Does this PR introduce _any_ user-facing change? No this feature is unreleased ### How was this patch tested? Existing unit test ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#53229 from szehon-ho/disable_merge_update_source_coercion. Authored-by: Szehon Ho <szehon.apache@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
… shuffle registration ### What changes were proposed in this pull request? This PR moves the `shuffleManager.registerShuffle()` call to occur after the initialization of `_shuffleMergeAllowed` in `ShuffleDependency`. ### Why are the changes needed? While `spark.shuffle.push.enabled` provides global control over push-based shuffle, there are scenarios requiring more granular control: - Mass spark application migration scenarios where different jobs may need different shuffle strategies - Remote shuffle manager(e.g. celeborn/uniffle) need shuffle-level fallback capabilities to push-based shuffle - Dynamic decision making based on shuffle characteristics during shuffle registration ### Does this PR introduce _any_ user-facing change? No, this is an internal refactoring that maintains backward compatibility. The default behavior remains unchanged. ### How was this patch tested? - Existing unit tests continue to pass - The change only affects the order of initialization, not the logic ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#51629 from gaoyajun02/SPARK-52923. Authored-by: gaoyajun02 <gaoyajun02@meituan.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
### What changes were proposed in this pull request? This PR prevents failures during recaching failing write/refresh operations. ### Why are the changes needed? After recent changes in SPARK-54387, we may now mark write operations as failed even though they successfully committed to the table but the cache refresh was unsuccessful. ### Does this PR introduce _any_ user-facing change? Yes, `recacheByXXX` will no longer throw an exception if recaching fails. ### How was this patch tested? This PR comes with tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#53143 from aokolnychyi/spark-54424. Authored-by: Anton Okolnychyi <aokolnychyi@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…ing tests for connect compat test CI ### What changes were proposed in this pull request? This PR proposes to re-enable streaming tests for connect compatibility test CI. ### Why are the changes needed? They were disabled due to failure, but I can't reproduce these failures in both local and CI after installing zstandard. Code change to trigger compatibility test CI against test branch + install zstandard: apache/spark@master...HeartSaVioR:spark:WIP-investigate-ss-spark-connect-compat-test-failures-master-and-4.0 Code change to re-enable these tests during reproducing: apache/spark@branch-4.0...HeartSaVioR:spark:branch-4.0-SC-213385 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? GA run with the above reproducer setup: https://github.com/HeartSaVioR/spark/actions/runs/19807973545/job/56745231698 ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#53266 from HeartSaVioR/reenable-streaming-connect-tests. Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
…ble a few spark connect tests ### What changes were proposed in this pull request? * This PR re-enables 3 tests, which only fail during the cross-version scenario: Spark Connect 4.0 client <-> 4.2 Spark Server * We fixed these tests on the 4.0 branch separately in another PR apache#53250 *‼️ This PR needs to be backported to `branch-4.1`. How can I make it possible? HyukjinKwon ### Why are the changes needed? To regain test coverage ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? no Closes apache#53235 from xianzhe-databricks/enable-tests. Authored-by: xianzhe-databricks <xianzhe.ma@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…n filter failures ### What changes were proposed in this pull request? This PR refactors the error handling for Hive metastore partition filter failures by migrating from the legacy error code _LEGACY_ERROR_TEMP_2193 to a properly defined error condition INTERNAL_ERROR_HIVE_METASTORE_PARTITION_FILTER with SQL state 58030. The error message is restructured to include the underlying exception details. ### Why are the changes needed? The previous error message was verbose and lacked important diagnostic information. The legacy error code needed to be migrated to a proper error condition with an appropriate SQL state for better error categorization. ### Does this PR introduce _any_ user-facing change? Yes. Users will see an improved error message that includes the actual exception details and clearer guidance. ### How was this patch tested? Updated existing unit tests in HivePartitionFilteringSuite and ExternalCatalogSuite to verify the new error condition. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Sonnet 4.5 Closes apache#53212 from ganeshashree/SPARK-54501. Authored-by: Ganesha S <ganesha.s@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request? Enabled flake8 [F811](https://www.flake8rules.com/rules/F811.html) check on our repo and fixed reported issues. ### Why are the changes needed? I know upgrading lint system is a pain, but we should not just put it aside forever. Our pinned `flake8` version is not even usable on Python3.12+. During this "lint fix", I actually discovered a few real bugs - most of them are silently disabled unittests because there is a test method that has the same name (probably due to copy/paste). I think this result supported the idea that we should take lint more seriously. About `functions.log`, we got it wrong. It's not because `overload` does not work properly - it's because we have two `log` function in that gigantic file. The former one is [dead](https://app.codecov.io/gh/apache/spark/blob/master/python%2Fpyspark%2Fsql%2Ffunctions%2Fbuiltin.py#L3111). I just removed that one. Again, I really think we should upgrade our lint system. I'm trying to do it slowly - piece by piece, so that people's daily workflow is not impacted too much. I hope we can eventually move to a place where all linters are updated and people can be more confident about their changes. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? `flake8` test on major directories. CI should give more a comprehensive result. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#53253 from gaogaotiantian/flake8-f811. Authored-by: Tian Gao <gaogaotiantian@hotmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…h no BEGIN/END are used
### What changes were proposed in this pull request?
When Exception Handlers which don't have BEGIN-END body are triggered, internal exception `java.util.NoSuchElementException` was thrown instead of executing properly or propagating/raising the new error if it happens in handler.
```
BEGIN
DECLARE EXIT HANDLER FOR SQLEXCEPTION
SELECT 1;
SELECT 1/0;
END
```
### Why are the changes needed?
Code was encountering a bug which throws internal error for what should be valid user code.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New unit tests in `SqlScriptingExecutionSuite`.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes apache#53271 from miland-db/milan-dankovic_data/fix-no-body-handlers.
Authored-by: Milan Dankovic <milan.dankovic@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…rces ### What changes were proposed in this pull request? Introducing the OffsetMap format to key source progress by source name, as opposed to ordinal in the logical plan ### Why are the changes needed? These changes are needed in order to enable source evolution on a streaming query (adding, removing, reordering sources) without requiring the user to set a new checkpoint directory ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit tests ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#53123 from ericm-db/offset-map. Authored-by: ericm-db <eric.marnadi@databricks.com> Signed-off-by: Anish Shrigondekar <anish.shrigondekar@databricks.com>
…hine' into jcowell/SPARK-51731-query-state-machine # Conflicts: # sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala # sql/connect/common/src/main/scala/org/apache/spark/sql/connect/StreamingQueryListenerBus.scala # sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ProgressReporter.scala # sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
What changes were proposed in this pull request?
onQueryExecutionStart. This query event is published every time a streaming query is triggered.StreamingQueryListeners with backwards compatibility.Why are the changes needed?
Currently,
StreamingQueryListeners does not notify users when a query is triggered; it only provides updates when a query starts, progresses, becomes idle, or terminates. By introducing an event that is emitted whenever a query is triggered, users will be informed of this occurrence. This new event allowsStreamingQueryListeners to function as a "state machine."In my use case, I plan to use this new event along with
QueryProgressandQueryIdleto develop a Maintenance Job Orchestration System. This system will programmatically submit and execute Spark commands, determining the eligibility of maintenance job executions based on the activity status of streaming queries.Bare bone example of Maintenance Job Orchestration System:
sequenceDiagram participant Scheduler as MaintenanceScheduler participant Orchestrator as MaintenanceJobOrchestrator participant Job as MaintenanceJob participant BackgroundExecutor as BackgroundTaskExecutor participant Listener as StreamingQueryListener Orchestrator->>Listener: Register consumer to update query status Scheduler->>Scheduler: scheduleJob(Job) alt MaintenanceJob is null Scheduler->>Scheduler: throw IllegalArgumentException else MaintenanceJob is not enabled Scheduler->>Scheduler: return else loop Submit to Orchestrator Queue on a schedule Scheduler->>Orchestrator: enqueue(MaintenanceJobRequest) end end loop Continuous Operation BackgroundExecutor->>Orchestrator: attempt to run jobs in queue Listener->>Orchestrator: isStreamingQueryActivelyRunning() Orchestrator->>Orchestrator: checkIfSafeToRunJob() alt A Spark job is currently executing Orchestrator->>Orchestrator: return else No Spark jobs are currently executing Orchestrator->>BackgroundExecutor: submit(job.execute(sparkSession)) BackgroundExecutor->>Job: execute() Job->>Orchestrator: return result Orchestrator->>Orchestrator: log job end end Listener->>Listener: shutdown() BackgroundExecutor->>BackgroundExecutor: shutdown() Orchestrator->>Orchestrator: shutdown() Scheduler->>Scheduler: shutdown()I envision several other valuable use cases, such as enhanced metrics, improved alerting capabilities, and potential optimizations within Spark itself.
Initially, I considered introducing a
QueryAwaitEventto complement theonQueryExecutionStart. This could provide a cleaner approach, avoiding the need to manage bothonQueryProgressandonQueryIdle. However, I believe addressing the current gap withonQueryExecutionStartalone is more effective.Does this PR introduce any user-facing change?
Yes/No. I tried my best to match how
QueryIdlewas added to maintain backwards compatibility &onQueryExecutionStartegress is disabled by default. Users will have to use spark conf to opt into this feature. Existing implementations should not break as a result of this.Users should do the following to opt-in:
To utilize the new method they just need to override it like so:
Scala:
Java:
Python:
How was this patch tested?
IN-PROGRESS
Was this patch authored or co-authored using generative AI tooling?
I utilize ChatGPT to get familiar with the Spark codebase since this is my first time diving in. Aside from that, all code is 100% authored by me.