Skip to content

Commit df7ff6f

Browse files
authored
Merge pull request #214820 from Dawn2111/prdawn/query-pushdown
query push down
2 parents 081cb9c + 4662973 commit df7ff6f

File tree

1 file changed

+229
-10
lines changed

1 file changed

+229
-10
lines changed

articles/synapse-analytics/spark/synapse-spark-sql-pool-import-export.md

Lines changed: 229 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@ The Azure Synapse Dedicated SQL Pool Connector for Apache Spark in Azure Synapse
1818
At a high-level, the connector provides the following capabilities:
1919

2020
* Read from Azure Synapse Dedicated SQL Pool:
21-
* Read large data sets from Synapse Dedicated SQL Pool Tables (Internal and External) and Views.
21+
* Read large data sets from Synapse Dedicated SQL Pool Tables (Internal and External) and views.
2222
* Comprehensive predicate push down support, where filters on DataFrame get mapped to corresponding SQL predicate push down.
2323
* Support for column pruning.
24+
* Support for query push down.
2425
* Write to Azure Synapse Dedicated SQL Pool:
2526
* Ingest large volume data to Internal and External table types.
2627
* Supports following DataFrame save mode preferences:
@@ -198,17 +199,17 @@ This section presents reference code templates to describe how to use and invoke
198199
##### [Scala](#tab/scala)
199200

200201
```Scala
201-
synapsesql(tableName:String) => org.apache.spark.sql.DataFrame
202+
synapsesql(tableName:String="") => org.apache.spark.sql.DataFrame
202203
```
203204

204205
##### [Python](#tab/python)
205206

206207
```python
207-
synapsesql(table_name: str) -> org.apache.spark.sql.DataFrame
208+
synapsesql(table_name: str="") -> org.apache.spark.sql.DataFrame
208209
```
209210
---
210211

211-
#### Read using Azure AD based authentication
212+
#### Read from a table using Azure AD based authentication
212213

213214
##### [Scala](#tab/scala1)
214215

@@ -268,7 +269,13 @@ dfToReadFromTable.show()
268269
```
269270
---
270271

271-
#### Read using basic authentication
272+
#### Read from a query using Azure AD based authentication
273+
> [!Note]
274+
> Restrictions while reading from query:
275+
> * Table name and query cannot be specified at the same time.
276+
> * Only select queries are allowed. DDL and DML SQLs are not allowed.
277+
> * The select and filter options on dataframe are not pushed down to the SQL dedicated pool when a query is specified.
278+
> * Read from a query is only available in Spark 3.1 and 3.2. It is not available in Spark 2.4.
272279
273280
##### [Scala](#tab/scala2)
274281

@@ -279,6 +286,98 @@ import org.apache.spark.sql.DataFrame
279286
import com.microsoft.spark.sqlanalytics.utils.Constants
280287
import org.apache.spark.sql.SqlAnalyticsConnector._
281288

289+
290+
// Read from a query
291+
// Query can be provided either as an argument to synapsesql or as a Constant - Constants.QUERY
292+
val dfToReadFromQueryAsOption:DataFrame = spark.read.
293+
// Name of the SQL Dedicated Pool or database where to run the query
294+
// Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
295+
option(Constants.DATABASE, "<database_name>").
296+
//If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument
297+
//to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
298+
option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
299+
//Defaults to storage path defined in the runtime configurations
300+
option(Constants.TEMP_FOLDER, "abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<some_base_path_for_temporary_staging_folders>")
301+
//query from which data will be read
302+
.option(Constants.QUERY, "select <column_name>, count(*) as cnt from <schema_name>.<table_name> group by <column_name>")
303+
synapsesql()
304+
305+
val dfToReadFromQueryAsArgument:DataFrame = spark.read.
306+
// Name of the SQL Dedicated Pool or database where to run the query
307+
// Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
308+
option(Constants.DATABASE, "<database_name>")
309+
//If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument
310+
//to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
311+
option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
312+
//Defaults to storage path defined in the runtime configurations
313+
option(Constants.TEMP_FOLDER, "abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<some_base_path_for_temporary_staging_folders>")
314+
//query from which data will be read
315+
.synapsesql("select <column_name>, count(*) as counts from <schema_name>.<table_name> group by <column_name>")
316+
317+
318+
//Show contents of the dataframe
319+
dfToReadFromQueryAsOption.show()
320+
dfToReadFromQueryAsArgument.show()
321+
```
322+
323+
##### [Python](#tab/python2)
324+
325+
```python
326+
# Add required imports
327+
import com.microsoft.spark.sqlanalytics
328+
from com.microsoft.spark.sqlanalytics.Constants import Constants
329+
from pyspark.sql.functions import col
330+
331+
# Name of the SQL Dedicated Pool or database where to run the query
332+
# Database can be specified as a Spark Config or as a Constant - Constants.DATABASE
333+
spark.conf.set("spark.sqlanalyticsconnector.dw.database", "<database_name>")
334+
335+
# Read from a query
336+
# Query can be provided either as an argument to synapsesql or as a Constant - Constants.QUERY
337+
dfToReadFromQueryAsOption = (spark.read
338+
# Name of the SQL Dedicated Pool or database where to run the query
339+
# Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
340+
.option(Constants.DATABASE, "<database_name>")
341+
# If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument
342+
# to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
343+
.option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net")
344+
# Defaults to storage path defined in the runtime configurations
345+
.option(Constants.TEMP_FOLDER, "abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<some_base_path_for_temporary_staging_folders>")
346+
# query from which data will be read
347+
.option(Constants.QUERY, "select <column_name>, count(*) as cnt from <schema_name>.<table_name> group by <column_name>")
348+
.synapsesql()
349+
)
350+
351+
dfToReadFromQueryAsArgument = (spark.read
352+
# Name of the SQL Dedicated Pool or database where to run the query
353+
# Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
354+
.option(Constants.DATABASE, "<database_name>")
355+
# If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument
356+
# to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
357+
.option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net")
358+
# Defaults to storage path defined in the runtime configurations
359+
.option(Constants.TEMP_FOLDER, "abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<some_base_path_for_temporary_staging_folders>")
360+
# query from which data will be read
361+
.synapsesql("select <column_name>, count(*) as counts from <schema_name>.<table_name> group by <column_name>")
362+
)
363+
364+
# Show contents of the dataframe
365+
dfToReadFromQueryAsOption.show()
366+
dfToReadFromQueryAsArgument.show()
367+
```
368+
---
369+
370+
#### Read from a table using basic authentication
371+
372+
##### [Scala](#tab/scala3)
373+
374+
```Scala
375+
//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
376+
//Azure Active Directory based authentication approach is preferred here.
377+
import org.apache.spark.sql.DataFrame
378+
import com.microsoft.spark.sqlanalytics.utils.Constants
379+
import org.apache.spark.sql.SqlAnalyticsConnector._
380+
282381
//Read from existing internal table
283382
val dfToReadFromTable:DataFrame = spark.read.
284383
//If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument
@@ -289,22 +388,23 @@ val dfToReadFromTable:DataFrame = spark.read.
289388
//Set user's password to the database
290389
option(Constants.PASSWORD, "<user_password>").
291390
//Set name of the data source definition that is defined with database scoped credentials.
292-
//Data extracted from the SQL query will be staged to the storage path defined on the data source's location setting.
391+
//Data extracted from the table will be staged to the storage path defined on the data source's location setting.
293392
option(Constants.DATA_SOURCE, "<data_source_name>").
294393
//Three-part table name from where data will be read.
295394
synapsesql("<database_name>.<schema_name>.<table_name>").
296395
//Column-pruning i.e., query select column values.
297-
select("<some_column_1>", "<some_column_5>", "<some_column_n>").
396+
select("<some_column_1>", "<some_column_5>", "<some_column_n>").
298397
//Push-down filter criteria that gets translated to SQL Push-down Predicates.
299398
filter(col("Title").startsWith("E")).
300399
//Fetch a sample of 10 records
301400
limit(10)
401+
302402

303403
//Show contents of the dataframe
304404
dfToReadFromTable.show()
305405
```
306406

307-
##### [Python](#tab/python2)
407+
##### [Python](#tab/python3)
308408

309409
```python
310410
# Add required imports
@@ -323,7 +423,7 @@ dfToReadFromTable = (spark.read
323423
.option(Constants.PASSWORD, "<user_password>")
324424
# Set name of the data source definition that is defined with database scoped credentials.
325425
# https://learn.microsoft.com/sql/t-sql/statements/create-external-data-source-transact-sql?view=sql-server-ver15&tabs=dedicated#h-create-external-data-source-to-access-data-in-azure-storage-using-the-abfs-interface
326-
# Data extracted from the SQL query will be staged to the storage path defined on the data source's location setting.
426+
# Data extracted from the table will be staged to the storage path defined on the data source's location setting.
327427
.option(Constants.DATA_SOURCE, "<data_source_name>")
328428
# Three-part table name from where data will be read.
329429
.synapsesql("<database_name>.<schema_name>.<table_name>")
@@ -332,14 +432,133 @@ dfToReadFromTable = (spark.read
332432
# Push-down filter criteria that gets translated to SQL Push-down Predicates.
333433
.filter(col("Title").contains("E"))
334434
# Fetch a sample of 10 records
335-
.limit(10))
435+
.limit(10)
436+
)
336437

337438
# Show contents of the dataframe
338439
dfToReadFromTable.show()
339440

340441
```
341442
---
342443

444+
#### Read from a query using basic authentication
445+
446+
##### [Scala](#tab/scala4)
447+
448+
```Scala
449+
//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
450+
//Azure Active Directory based authentication approach is preferred here.
451+
import org.apache.spark.sql.DataFrame
452+
import com.microsoft.spark.sqlanalytics.utils.Constants
453+
import org.apache.spark.sql.SqlAnalyticsConnector._
454+
455+
// Name of the SQL Dedicated Pool or database where to run the query
456+
// Database can be specified as a Spark Config or as a Constant - Constants.DATABASE
457+
spark.conf.set("spark.sqlanalyticsconnector.dw.database", "<database_name>")
458+
459+
// Read from a query
460+
// Query can be provided either as an argument to synapsesql or as a Constant - Constants.QUERY
461+
val dfToReadFromQueryAsOption:DataFrame = spark.read.
462+
//Name of the SQL Dedicated Pool or database where to run the query
463+
//Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
464+
option(Constants.DATABASE, "<database_name>").
465+
//If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument
466+
//to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
467+
option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
468+
//Set database user name
469+
option(Constants.USER, "<user_name>").
470+
//Set user's password to the database
471+
option(Constants.PASSWORD, "<user_password>").
472+
//Set name of the data source definition that is defined with database scoped credentials.
473+
//Data extracted from the SQL query will be staged to the storage path defined on the data source's location setting.
474+
option(Constants.DATA_SOURCE, "<data_source_name>").
475+
//Query where data will be read.
476+
option(Constants.QUERY, "select <column_name>, count(*) as counts from <schema_name>.<table_name> group by <column_name>" ).
477+
synapsesql()
478+
479+
val dfToReadFromQueryAsArgument:DataFrame = spark.read.
480+
//Name of the SQL Dedicated Pool or database where to run the query
481+
//Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
482+
option(Constants.DATABASE, "<database_name>").
483+
//If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument
484+
//to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
485+
option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
486+
//Set database user name
487+
option(Constants.USER, "<user_name>").
488+
//Set user's password to the database
489+
option(Constants.PASSWORD, "<user_password>").
490+
//Set name of the data source definition that is defined with database scoped credentials.
491+
//Data extracted from the SQL query will be staged to the storage path defined on the data source's location setting.
492+
option(Constants.DATA_SOURCE, "<data_source_name>").
493+
//Query where data will be read.
494+
synapsesql("select <column_name>, count(*) as counts from <schema_name>.<table_name> group by <column_name>")
495+
496+
497+
//Show contents of the dataframe
498+
dfToReadFromQueryAsOption.show()
499+
dfToReadFromQueryAsArgument.show()
500+
```
501+
502+
##### [Python](#tab/python4)
503+
504+
```python
505+
# Add required imports
506+
import com.microsoft.spark.sqlanalytics
507+
from com.microsoft.spark.sqlanalytics.Constants import Constants
508+
from pyspark.sql.functions import col
509+
510+
# Name of the SQL Dedicated Pool or database where to run the query
511+
# Database can be specified as a Spark Config or as a Constant - Constants.DATABASE
512+
spark.conf.set("spark.sqlanalyticsconnector.dw.database", "<database_name>")
513+
514+
# Read from a query
515+
# Query can be provided either as an argument to synapsesql or as a Constant - Constants.QUERY
516+
dfToReadFromQueryAsOption = (spark.read
517+
# Name of the SQL Dedicated Pool or database where to run the query
518+
# Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
519+
.option(Constants.DATABASE, "<database_name>")
520+
# If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument
521+
# to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
522+
.option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net")
523+
# Set database user name
524+
.option(Constants.USER, "<user_name>")
525+
# Set user's password to the database
526+
.option(Constants.PASSWORD, "<user_password>")
527+
# Set name of the data source definition that is defined with database scoped credentials.
528+
# https://docs.microsoft.com/sql/t-sql/statements/create-external-data-source-transact-sql?view=sql-server-ver15&tabs=dedicated#h-create-external-data-source-to-access-data-in-azure-storage-using-the-abfs-interface
529+
# Data extracted from the SQL query will be staged to the storage path defined on the data source's location setting.
530+
.option(Constants.DATA_SOURCE, "<data_source_name>")
531+
# Query from where data will be read.
532+
.option(Constants.QUERY, "select <column_name>, count(*) as counts from <schema_name>.<table_name> group by <column_name>")
533+
.synapsesql()
534+
)
535+
536+
dfToReadFromQueryAsArgument = (spark.read
537+
# Name of the SQL Dedicated Pool or database where to run the query
538+
# Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
539+
.option(Constants.DATABASE, "<database_name>")
540+
# If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument
541+
# to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
542+
.option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net")
543+
# Set database user name
544+
.option(Constants.USER, "<user_name>")
545+
# Set user's password to the database
546+
.option(Constants.PASSWORD, "<user_password>")
547+
# Set name of the data source definition that is defined with database scoped credentials.
548+
# https://docs.microsoft.com/sql/t-sql/statements/create-external-data-source-transact-sql?view=sql-server-ver15&tabs=dedicated#h-create-external-data-source-to-access-data-in-azure-storage-using-the-abfs-interface
549+
# Data extracted from the SQL query will be staged to the storage path defined on the data source's location setting.
550+
.option(Constants.DATA_SOURCE, "<data_source_name>")
551+
.synapsesql("select <column_name>, count(*) as counts from <schema_name>.<table_name> group by <column_name>")
552+
)
553+
554+
# Show contents of the dataframe
555+
dfToReadFromQueryAsOption.show()
556+
dfToReadFromQueryAsArgument.show()
557+
558+
```
559+
---
560+
561+
343562
### Write to Azure Synapse Dedicated SQL Pool
344563

345564
#### Write Request - `synapsesql` method signature

0 commit comments

Comments
 (0)