Skip to content

Commit f4b5944

Browse files
committed
support upsert for all sql families
1 parent d17414b commit f4b5944

File tree

4 files changed

+119
-62
lines changed

4 files changed

+119
-62
lines changed

articles/data-factory/connector-azure-sql-data-warehouse.md

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ ms.service: data-factory
88
ms.subservice: data-movement
99
ms.custom: synapse
1010
ms.topic: conceptual
11-
ms.date: 12/29/2021
11+
ms.date: 01/14/2022
1212
---
1313

1414
# Copy and transform data in Azure Synapse Analytics by using Azure Data Factory or Synapse pipelines
@@ -456,8 +456,13 @@ To copy data to Azure Synapse Analytics, set the sink type in Copy Activity to *
456456
| tableOption | Specifies whether to [automatically create the sink table](copy-activity-overview.md#auto-create-sink-tables) if not exists based on the source schema. Allowed values are: `none` (default), `autoCreate`. |No |
457457
| disableMetricsCollection | The service collects metrics such as Azure Synapse Analytics DWUs for copy performance optimization and recommendations, which introduce additional master DB access. If you are concerned with this behavior, specify `true` to turn it off. | No (default is `false`) |
458458
| maxConcurrentConnections |The upper limit of concurrent connections established to the data store during the activity run. Specify a value only when you want to limit concurrent connections.| No |
459+
| WriteBehavior | Specify the write behavior for copy activity to load data into Azure SQL Database. <br/> The allowed value is **Insert** and **Upsert**. By default, the service uses insert to load data. | No |
460+
| upsertSettings | Specify the group of the settings for write behavior. <br/> Apply when the WriteBehavior option is `Upert`. | No |
461+
| ***Under `upsertSettings`:*** | | |
462+
| keys | Specify the column names for unique row identification. Either a single key or a series of keys can be used. If not specified, the primary key is used. | No |
463+
| interimSchemaName | Specify the interim schema for creating interim table. Note: user need to have the permission for creating and deleting table. By default, interim table will share the same schema as sink table. | No |
459464

460-
#### Azure Synapse Analytics sink example
465+
#### Example 1: Azure Synapse Analytics sink
461466

462467
```json
463468
"sink": {
@@ -473,6 +478,21 @@ To copy data to Azure Synapse Analytics, set the sink type in Copy Activity to *
473478
}
474479
```
475480

481+
#### Example 2: Upsert data
482+
483+
```json
484+
"sink": {
485+
"type": "SqlDWSink",
486+
"writeBehavior": "Upsert",
487+
"upsertSettings": {
488+
"keys": [
489+
"<column name>"
490+
],
491+
"interimSchemaName": "<interim schema name>"
492+
},
493+
}
494+
```
495+
476496
## Parallel copy from Azure Synapse Analytics
477497

478498
The Azure Synapse Analytics connector in copy activity provides built-in data partitioning to copy data in parallel. You can find data partitioning options on the **Source** tab of the copy activity.

articles/data-factory/connector-azure-sql-database.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -562,7 +562,8 @@ Learn more details from [Invoke a stored procedure from a SQL sink](#invoke-a-st
562562
}
563563
]
564564
```
565-
**Example 1: Upsert data**
565+
566+
**Example 3: Upsert data**
566567

567568
```json
568569
"activities":[
@@ -588,7 +589,6 @@ Learn more details from [Invoke a stored procedure from a SQL sink](#invoke-a-st
588589
"sink": {
589590
"type": "AzureSqlSink",
590591
"tableOption": "autoCreate",
591-
"writeBatchSize": 100000,
592592
"writeBehavior": "upsert",
593593
"upsertSettings": {
594594
"useTempDB": true,

articles/data-factory/connector-azure-sql-managed-instance.md

Lines changed: 47 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ ms.topic: conceptual
88
ms.author: jianleishen
99
author: jianleishen
1010
ms.custom: synapse
11-
ms.date: 12/28/2021
11+
ms.date: 01/14/2022
1212
---
1313

1414
# Copy and transform data in Azure SQL Managed Instance using Azure Data Factory or Synapse Analytics
@@ -488,6 +488,12 @@ To copy data to SQL Managed Instance, the following properties are supported in
488488
| writeBatchSize |Number of rows to insert into the SQL table *per batch*.<br/>Allowed values are integers for the number of rows. By default, the service dynamically determines the appropriate batch size based on the row size. |No |
489489
| writeBatchTimeout |This property specifies the wait time for the batch insert operation to complete before it times out.<br/>Allowed values are for the timespan. An example is "00:30:00," which is 30 minutes. |No |
490490
| maxConcurrentConnections |The upper limit of concurrent connections established to the data store during the activity run. Specify a value only when you want to limit concurrent connections.| No |
491+
| WriteBehavior | Specify the write behavior for copy activity to load data into Azure SQL MI. <br/> The allowed value is **Insert** and **Upsert**. By default, the service uses insert to load data. | No |
492+
| upsertSettings | Specify the group of the settings for write behavior. <br/> Apply when the WriteBehavior option is `Upert`. | No |
493+
| ***Under `upsertSettings`:*** | | |
494+
| useTempDB | Specify whether to use the a global temporary table or physical table as the interim table for upsert. <br>By default, the service uses global temporary table as the interim table. value is `true`. | No |
495+
| interimSchemaName | Specify the interim schema for creating interim table if physical table is used. Note: user need to have the permission for creating and deleting table. By default, interim table will share the same schema as sink table. <br/> Apply when the useTempDB option is `False`. | No |
496+
| keys | Specify the column names for unique row identification. Either a single key or a series of keys can be used. If not specified, the primary key is used. | No |
491497

492498
**Example 1: Append data**
493499

@@ -562,6 +568,45 @@ Learn more details from [Invoke a stored procedure from a SQL MI sink](#invoke-a
562568
]
563569
```
564570

571+
**Example 3: Upsert data**
572+
573+
```json
574+
"activities":[
575+
{
576+
"name": "CopyToAzureSqlMI",
577+
"type": "Copy",
578+
"inputs": [
579+
{
580+
"referenceName": "<input dataset name>",
581+
"type": "DatasetReference"
582+
}
583+
],
584+
"outputs": [
585+
{
586+
"referenceName": "<SQL Managed Instance output dataset name>",
587+
"type": "DatasetReference"
588+
}
589+
],
590+
"typeProperties": {
591+
"source": {
592+
"type": "<source type>"
593+
},
594+
"sink": {
595+
"type": "SqlMISink",
596+
"tableOption": "autoCreate",
597+
"writeBehavior": "upsert",
598+
"upsertSettings": {
599+
"useTempDB": true,
600+
"keys": [
601+
"<column name>"
602+
]
603+
},
604+
}
605+
}
606+
}
607+
]
608+
```
609+
565610
## Parallel copy from SQL MI
566611

567612
The Azure SQL Managed Instance connector in copy activity provides built-in data partitioning to copy data in parallel. You can find data partitioning options on the **Source** tab of the copy activity.
@@ -646,34 +691,7 @@ Appending data is the default behavior of the SQL Managed Instance sink connecto
646691
647692
### Upsert data
648693
649-
**Option 1:** When you have a large amount of data to copy, you can bulk load all records into a staging table by using the copy activity, then run a stored procedure activity to apply a [MERGE](/sql/t-sql/statements/merge-transact-sql) or INSERT/UPDATE statement in one shot.
650-
651-
Copy activity currently doesn't natively support loading data into a database temporary table. There is an advanced way to set it up with a combination of multiple activities, refer to [Optimize SQL Database Bulk Upsert scenarios](https://github.com/scoriani/azuresqlbulkupsert). Below shows a sample of using a permanent table as staging.
652-
653-
As an example, you can create a pipeline with a **Copy activity** chained with a **Stored Procedure activity**. The former copies data from your source store into an Azure SQL Managed Instance staging table, for example, **UpsertStagingTable**, as the table name in the dataset. Then the latter invokes a stored procedure to merge source data from the staging table into the target table and clean up the staging table.
654-
655-
:::image type="content" source="./media/connector-azure-sql-database/azure-sql-database-upsert.png" alt-text="Upsert":::
656-
657-
In your database, define a stored procedure with MERGE logic, like the following example, which is pointed to from the previous stored procedure activity. Assume that the target is the **Marketing** table with three columns: **ProfileID**, **State**, and **Category**. Do the upsert based on the **ProfileID** column.
658-
659-
```sql
660-
CREATE PROCEDURE [dbo].[spMergeData]
661-
AS
662-
BEGIN
663-
MERGE TargetTable AS target
664-
USING UpsertStagingTable AS source
665-
ON (target.[ProfileID] = source.[ProfileID])
666-
WHEN MATCHED THEN
667-
UPDATE SET State = source.State
668-
WHEN NOT matched THEN
669-
INSERT ([ProfileID], [State], [Category])
670-
VALUES (source.ProfileID, source.State, source.Category);
671-
672-
TRUNCATE TABLE UpsertStagingTable
673-
END
674-
```
675-
676-
**Option 2:** You can choose to [invoke a stored procedure within the copy activity](#invoke-a-stored-procedure-from-a-sql-sink). This approach runs each batch (as governed by the `writeBatchSize` property) in the source table instead of using bulk insert as the default approach in the copy activity.
694+
Copy activity now supports natively loading data into a database temporary table and then update the data in sink table if key exists and otherwise insert new data.
677695
678696
### Overwrite the entire table
679697

articles/data-factory/connector-sql-server.md

Lines changed: 48 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ ms.service: data-factory
88
ms.subservice: data-movement
99
ms.topic: conceptual
1010
ms.custom: synapse
11-
ms.date: 12/20/2021
11+
ms.date: 01/14/2022
1212
---
1313

1414
# Copy and transform data to and from SQL Server by using Azure Data Factory or Azure Synapse Analytics
@@ -358,6 +358,12 @@ To copy data to SQL Server, set the sink type in the copy activity to **SqlSink*
358358
| writeBatchSize |Number of rows to insert into the SQL table *per batch*.<br/>Allowed values are integers for the number of rows. By default, the service dynamically determines the appropriate batch size based on the row size. |No |
359359
| writeBatchTimeout |This property specifies the wait time for the batch insert operation to complete before it times out.<br/>Allowed values are for the timespan. An example is "00:30:00" for 30 minutes. If no value is specified, the timeout defaults to "02:00:00". |No |
360360
| maxConcurrentConnections |The upper limit of concurrent connections established to the data store during the activity run. Specify a value only when you want to limit concurrent connections.| No |
361+
| WriteBehavior | Specify the write behavior for copy activity to load data into SQL Server Database. <br/> The allowed value is **Insert** and **Upsert**. By default, the service uses insert to load data. | No |
362+
| upsertSettings | Specify the group of the settings for write behavior. <br/> Apply when the WriteBehavior option is `Upert`. | No |
363+
| ***Under `upsertSettings`:*** | | |
364+
| useTempDB | Specify whether to use the a global temporary table or physical table as the interim table for upsert. <br>By default, the service uses global temporary table as the interim table. value is `true`. | No |
365+
| interimSchemaName | Specify the interim schema for creating interim table if physical table is used. Note: user need to have the permission for creating and deleting table. By default, interim table will share the same schema as sink table. <br/> Apply when the useTempDB option is `False`. | No |
366+
| keys | Specify the column names for unique row identification. Either a single key or a series of keys can be used. If not specified, the primary key is used. | No |
361367

362368
**Example 1: Append data**
363369

@@ -432,6 +438,46 @@ Learn more details from [Invoke a stored procedure from a SQL sink](#invoke-a-st
432438
]
433439
```
434440

441+
**Example 3: Upsert data**
442+
443+
```json
444+
"activities":[
445+
{
446+
"name": "CopyToSQLServer",
447+
"type": "Copy",
448+
"inputs": [
449+
{
450+
"referenceName": "<input dataset name>",
451+
"type": "DatasetReference"
452+
}
453+
],
454+
"outputs": [
455+
{
456+
"referenceName": "<SQL Server output dataset name>",
457+
"type": "DatasetReference"
458+
}
459+
],
460+
"typeProperties": {
461+
"source": {
462+
"type": "<source type>"
463+
},
464+
"sink": {
465+
"type": "SqlSink",
466+
"tableOption": "autoCreate",
467+
"writeBehavior": "upsert",
468+
"upsertSettings": {
469+
"useTempDB": true,
470+
"keys": [
471+
"<column name>"
472+
]
473+
},
474+
}
475+
}
476+
}
477+
]
478+
```
479+
480+
435481
## Parallel copy from SQL database
436482

437483
The SQL Server connector in copy activity provides built-in data partitioning to copy data in parallel. You can find data partitioning options on the **Source** tab of the copy activity.
@@ -516,34 +562,7 @@ Appending data is the default behavior of this SQL Server sink connector. the se
516562

517563
### Upsert data
518564

519-
**Option 1:** When you have a large amount of data to copy, you can bulk load all records into a staging table by using the copy activity, then run a stored procedure activity to apply a [MERGE](/sql/t-sql/statements/merge-transact-sql) or INSERT/UPDATE statement in one shot.
520-
521-
Copy activity currently doesn't natively support loading data into a database temporary table. There is an advanced way to set it up with a combination of multiple activities, refer to [Optimize SQL Database Bulk Upsert scenarios](https://github.com/scoriani/azuresqlbulkupsert). Below shows a sample of using a permanent table as staging.
522-
523-
As an example, you can create a pipeline with a **Copy activity** chained with a **Stored Procedure activity**. The former copies data from your source store into a SQL Server staging table, for example, **UpsertStagingTable**, as the table name in the dataset. Then the latter invokes a stored procedure to merge source data from the staging table into the target table and clean up the staging table.
524-
525-
:::image type="content" source="./media/connector-azure-sql-database/azure-sql-database-upsert.png" alt-text="Upsert":::
526-
527-
In your database, define a stored procedure with MERGE logic, like the following example, which is pointed to from the previous stored procedure activity. Assume that the target is the **Marketing** table with three columns: **ProfileID**, **State**, and **Category**. Do the upsert based on the **ProfileID** column.
528-
529-
```sql
530-
CREATE PROCEDURE [dbo].[spMergeData]
531-
AS
532-
BEGIN
533-
MERGE TargetTable AS target
534-
USING UpsertStagingTable AS source
535-
ON (target.[ProfileID] = source.[ProfileID])
536-
WHEN MATCHED THEN
537-
UPDATE SET State = source.State
538-
WHEN NOT matched THEN
539-
INSERT ([ProfileID], [State], [Category])
540-
VALUES (source.ProfileID, source.State, source.Category);
541-
542-
TRUNCATE TABLE UpsertStagingTable
543-
END
544-
```
545-
546-
**Option 2:** You can choose to [invoke a stored procedure within the copy activity](#invoke-a-stored-procedure-from-a-sql-sink). This approach runs each batch (as governed by the `writeBatchSize` property) in the source table instead of using bulk insert as the default approach in the copy activity.
565+
Copy activity now supports natively loading data into a database temporary table and then update the data in sink table if key exists and otherwise insert new data.
547566

548567
### Overwrite the entire table
549568

0 commit comments

Comments
 (0)