Skip to content

Commit a302e58

Browse files
committed
SQL CDC1
1 parent c5fcc7a commit a302e58

File tree

3 files changed

+147
-3
lines changed

3 files changed

+147
-3
lines changed

articles/data-factory/connector-azure-sql-database.md

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -944,6 +944,8 @@ When you debug the pipeline, this feature works the same. Be aware that the chec
944944
945945
In the monitoring section, you always have the chance to rerun a pipeline. When you are doing so, the changed data is always captured from the previous checkpoint of your selected pipeline run.
946946
947+
### Example 1:
948+
947949
When you directly chain a source transform referenced to SQL CDC enabled dataset with a sink transform referenced to a database in a mapping dataflow, the changes happened on SQL source will be automatically applied to the target database, so that you will easily get data replication scenario between databases. You can use update method in sink transform to select whether you want to allow insert, allow update or allow delete on target database. The example script in mapping dataflow is as below.
948950
949951
```json
@@ -971,7 +973,9 @@ source1 sink(allowSchemaDrift: true,
971973
errorHandlingOption: 'stopOnFirstError') ~> sink1
972974
```
973975
974-
If you want to enable ETL scenario instead of data replication between database via SQL CDC, you can use expressions in mapping dataflow including isInsert(1), isUpdate(1) and isDelete(1) to differentiate the rows with different operation types. The following is one of the example scripts for mapping dataflow on deriving one column with the value: 1 to indicate inserted rows, 2 to indicate updated rows and 3 to indicate deleted rows for downstream transforms to further process the delta data.
976+
### Example 2:
977+
978+
If you want to enable ETL scenario instead of data replication between database via SQL CDC, you can use expressions in mapping dataflow including isInsert(1), isUpdate(1) and isDelete(1) to differentiate the rows with different operation types. The following is one of the example scripts for mapping dataflow on deriving one column with the value: 1 to indicate inserted rows, 2 to indicate updated rows and 3 to indicate deleted rows for downstream transforms to process the delta data.
975979
976980
```json
977981
source(output(
@@ -992,9 +996,9 @@ derivedColumn1 sink(allowSchemaDrift: true,
992996
skipDuplicateMapOutputs: true) ~> sink1
993997
```
994998
995-
### Known limitations:
999+
### Known limitation:
9961000
997-
* Only Net changes" is supported.
1001+
* Only **net changes** from SQL CDC will be loaded by ADF via [cdc.fn_cdc_get_net_changes_](https://learn.microsoft.com/sql/relational-databases/system-functions/cdc-fn-cdc-get-net-changes-capture-instance-transact-sql?source=recommendations).
9981002
9991003
10001004
## Next steps

articles/data-factory/connector-azure-sql-managed-instance.md

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -782,6 +782,11 @@ The below table lists the properties supported by Azure SQL Managed Instance sou
782782
| Query | If you select Query as input, specify a SQL query to fetch data from source, which overrides any table you specify in dataset. Using queries is a great way to reduce rows for testing or lookups.<br><br>**Order By** clause is not supported, but you can set a full SELECT FROM statement. You can also use user-defined table functions. **select * from udfGetData()** is a UDF in SQL that returns a table that you can use in data flow.<br>Query example: `Select * from MyTable where customerId > 1000 and customerId < 2000`| No | String | query |
783783
| Batch size | Specify a batch size to chunk large data into reads. | No | Integer | batchSize |
784784
| Isolation Level | Choose one of the following isolation levels:<br>- Read Committed<br>- Read Uncommitted (default)<br>- Repeatable Read<br>- Serializable<br>- None (ignore isolation level) | No | <small>READ_COMMITTED<br/>READ_UNCOMMITTED<br/>REPEATABLE_READ<br/>SERIALIZABLE<br/>NONE</small> |isolationLevel |
785+
| Enable incremental extract | Use this option to tell ADF to only process rows that have changed since the last time that the pipeline executed. | No | - |- |
786+
| Incremental date column | When using the incremental extract feature, you must choose the date/time column that you wish to use as the watermark in your source table. | No | - |- |
787+
| Enable native change data capture(Preview) | Use this option to tell ADF to only process delta data captured by [SQL change data capture technology](https://learn.microsoft.com/sql/relational-databases/track-changes/about-change-data-capture-sql-server) since the last time that the pipeline executed. With this option, the delta data including row insert, update and deletion will be loaded automatically without any incremental date column required. You need to [enable change data capture](https://learn.microsoft.com/sql/relational-databases/track-changes/enable-and-disable-change-data-capture-sql-server) on Azure SQL DB before using this option in ADF. For more details about this option in ADF, see [native change data capture](#native-change-data-capture). | No | - |- |
788+
| Start reading from beginning | Setting this option with incremental extract will instruct ADF to read all rows on first execution of a pipeline with incremental extract turned on. | No | - |- |
789+
785790

786791
> [!TIP]
787792
> The [common table expression (CTE)](/sql/t-sql/queries/with-common-table-expression-transact-sql?view=sql-server-ver15&preserve-view=true) in SQL is not supported in the mapping data flow **Query** mode, because the prerequisite of using this mode is that queries can be used in the SQL query FROM clause but CTEs cannot do this.
@@ -910,5 +915,69 @@ When you copy data from/to SQL Managed Instance with [Always Encrypted](/sql/rel
910915
>[!NOTE]
911916
>Currently, SQL Managed Instance [**Always Encrypted**](/sql/relational-databases/security/encryption/always-encrypted-database-engine?view=sql-server-ver15&preserve-view=true) is only supported for source transformation in mapping data flows.
912917
918+
919+
## Native change data capture
920+
921+
Azure Data Factory can support native change data capture capabilities for SQL Server, Azure SQL DB and Azure SQL MI. The changed data including row insert, update and deletion in SQL stores can be automatically detected and extracted by ADF mapping dataflow. With the no code experience in mapping dataflow, users can easily achieve data replication scenario from SQL stores by appending a database as destination store. What is more, users can also compose any data transform logic in between to achieve incremental ETL scenario from SQL stores.
922+
923+
Make sure you keep the pipeline and activity name unchanged, so that the checkpoint can be recorded by ADF for you to get changed data from the last run automatically. If you change your pipeline name or activity name, the checkpoint will be reset, which leads you to start from beginning or get changes from now in the next run. If you do want to change the pipeline name or activity name but still keep the checkpoint to get changed data from the last run automatically, please use your own Checkpoint key in dataflow activity to achieve that.
924+
925+
When you debug the pipeline, this feature works the same. Be aware that the checkpoint will be reset when you refresh your browser during the debug run. After you are satisfied with the pipeline result from debug run, you can go ahead to publish and trigger the pipeline. At the moment when you first time trigger your published pipeline, it automatically restarts from the beginning or gets changes from now on.
926+
927+
In the monitoring section, you always have the chance to rerun a pipeline. When you are doing so, the changed data is always captured from the previous checkpoint of your selected pipeline run.
928+
929+
### Example 1:
930+
931+
When you directly chain a source transform referenced to SQL CDC enabled dataset with a sink transform referenced to a database in a mapping dataflow, the changes happened on SQL source will be automatically applied to the target database, so that you will easily get data replication scenario between databases. You can use update method in sink transform to select whether you want to allow insert, allow update or allow delete on target database. The example script in mapping dataflow is as below.
932+
933+
```json
934+
source(output(
935+
id as integer,
936+
name as string
937+
),
938+
allowSchemaDrift: true,
939+
validateSchema: false,
940+
enableNativeCdc: true,
941+
netChanges: true,
942+
skipInitialLoad: false,
943+
isolationLevel: 'READ_UNCOMMITTED',
944+
format: 'table') ~> source1
945+
source1 sink(allowSchemaDrift: true,
946+
validateSchema: false,
947+
deletable:true,
948+
insertable:true,
949+
updateable:true,
950+
upsertable:true,
951+
keys:['id'],
952+
format: 'table',
953+
skipDuplicateMapInputs: true,
954+
skipDuplicateMapOutputs: true,
955+
errorHandlingOption: 'stopOnFirstError') ~> sink1
956+
```
957+
958+
### Example 2:
959+
960+
If you want to enable ETL scenario instead of data replication between database via SQL CDC, you can use expressions in mapping dataflow including isInsert(1), isUpdate(1) and isDelete(1) to differentiate the rows with different operation types. The following is one of the example scripts for mapping dataflow on deriving one column with the value: 1 to indicate inserted rows, 2 to indicate updated rows and 3 to indicate deleted rows for downstream transforms to process the delta data.
961+
962+
```json
963+
source(output(
964+
id as integer,
965+
name as string
966+
),
967+
allowSchemaDrift: true,
968+
validateSchema: false,
969+
enableNativeCdc: true,
970+
netChanges: true,
971+
skipInitialLoad: false,
972+
isolationLevel: 'READ_UNCOMMITTED',
973+
format: 'table') ~> source1
974+
source1 derive(operationType = iif(isInsert(1), 1, iif(isUpdate(1), 2, 3))) ~> derivedColumn1
975+
derivedColumn1 sink(allowSchemaDrift: true,
976+
validateSchema: false,
977+
skipDuplicateMapInputs: true,
978+
skipDuplicateMapOutputs: true) ~> sink1
979+
```
980+
981+
913982
## Next steps
914983
For a list of data stores supported as sources and sinks by the copy activity, see [Supported data stores](copy-activity-overview.md#supported-data-stores-and-formats).

articles/data-factory/connector-sql-server.md

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -690,6 +690,13 @@ The below table lists the properties supported by SQL Server source. You can edi
690690
| Query | If you select Query as input, specify a SQL query to fetch data from source, which overrides any table you specify in dataset. Using queries is a great way to reduce rows for testing or lookups.<br><br>**Order By** clause is not supported, but you can set a full SELECT FROM statement. You can also use user-defined table functions. **select * from udfGetData()** is a UDF in SQL that returns a table that you can use in data flow.<br>Query example: `Select * from MyTable where customerId > 1000 and customerId < 2000`| No | String | query |
691691
| Batch size | Specify a batch size to chunk large data into reads. | No | Integer | batchSize |
692692
| Isolation Level | Choose one of the following isolation levels:<br>- Read Committed<br>- Read Uncommitted (default)<br>- Repeatable Read<br>- Serializable<br>- None (ignore isolation level) | No | <small>READ_COMMITTED<br/>READ_UNCOMMITTED<br/>REPEATABLE_READ<br/>SERIALIZABLE<br/>NONE</small> |isolationLevel |
693+
| Enable incremental extract | Use this option to tell ADF to only process rows that have changed since the last time that the pipeline executed. | No | - |- |
694+
| Incremental date column | When using the incremental extract feature, you must choose the date/time column that you wish to use as the watermark in your source table. | No | - |- |
695+
| Enable native change data capture(Preview) | Use this option to tell ADF to only process delta data captured by [SQL change data capture technology](https://learn.microsoft.com/sql/relational-databases/track-changes/about-change-data-capture-sql-server) since the last time that the pipeline executed. With this option, the delta data including row insert, update and deletion will be loaded automatically without any incremental date column required. You need to [enable change data capture](https://learn.microsoft.com/sql/relational-databases/track-changes/enable-and-disable-change-data-capture-sql-server) on Azure SQL DB before using this option in ADF. For more details about this option in ADF, see [native change data capture](#native-change-data-capture). | No | - |- |
696+
| Start reading from beginning | Setting this option with incremental extract will instruct ADF to read all rows on first execution of a pipeline with incremental extract turned on. | No | - |- |
697+
698+
699+
693700

694701
> [!TIP]
695702
> The [common table expression (CTE)](/sql/t-sql/queries/with-common-table-expression-transact-sql?view=sql-server-ver15&preserve-view=true) in SQL is not supported in the mapping data flow **Query** mode, because the prerequisite of using this mode is that queries can be used in the SQL query FROM clause but CTEs cannot do this.
@@ -821,6 +828,70 @@ When you copy data from/to SQL Server with [Always Encrypted](/sql/relational-da
821828
>[!NOTE]
822829
>Currently, SQL Server [**Always Encrypted**](/sql/relational-databases/security/encryption/always-encrypted-database-engine?view=sql-server-ver15&preserve-view=true) is only supported for source transformation in mapping data flows.
823830
831+
832+
## Native change data capture
833+
834+
Azure Data Factory can support native change data capture capabilities for SQL Server, Azure SQL DB and Azure SQL MI. The changed data including row insert, update and deletion in SQL stores can be automatically detected and extracted by ADF mapping dataflow. With the no code experience in mapping dataflow, users can easily achieve data replication scenario from SQL stores by appending a database as destination store. What is more, users can also compose any data transform logic in between to achieve incremental ETL scenario from SQL stores.
835+
836+
Make sure you keep the pipeline and activity name unchanged, so that the checkpoint can be recorded by ADF for you to get changed data from the last run automatically. If you change your pipeline name or activity name, the checkpoint will be reset, which leads you to start from beginning or get changes from now in the next run. If you do want to change the pipeline name or activity name but still keep the checkpoint to get changed data from the last run automatically, please use your own Checkpoint key in dataflow activity to achieve that.
837+
838+
When you debug the pipeline, this feature works the same. Be aware that the checkpoint will be reset when you refresh your browser during the debug run. After you are satisfied with the pipeline result from debug run, you can go ahead to publish and trigger the pipeline. At the moment when you first time trigger your published pipeline, it automatically restarts from the beginning or gets changes from now on.
839+
840+
In the monitoring section, you always have the chance to rerun a pipeline. When you are doing so, the changed data is always captured from the previous checkpoint of your selected pipeline run.
841+
842+
### Example 1:
843+
844+
When you directly chain a source transform referenced to SQL CDC enabled dataset with a sink transform referenced to a database in a mapping dataflow, the changes happened on SQL source will be automatically applied to the target database, so that you will easily get data replication scenario between databases. You can use update method in sink transform to select whether you want to allow insert, allow update or allow delete on target database. The example script in mapping dataflow is as below.
845+
846+
```json
847+
source(output(
848+
id as integer,
849+
name as string
850+
),
851+
allowSchemaDrift: true,
852+
validateSchema: false,
853+
enableNativeCdc: true,
854+
netChanges: true,
855+
skipInitialLoad: false,
856+
isolationLevel: 'READ_UNCOMMITTED',
857+
format: 'table') ~> source1
858+
source1 sink(allowSchemaDrift: true,
859+
validateSchema: false,
860+
deletable:true,
861+
insertable:true,
862+
updateable:true,
863+
upsertable:true,
864+
keys:['id'],
865+
format: 'table',
866+
skipDuplicateMapInputs: true,
867+
skipDuplicateMapOutputs: true,
868+
errorHandlingOption: 'stopOnFirstError') ~> sink1
869+
```
870+
871+
### Example 2:
872+
873+
If you want to enable ETL scenario instead of data replication between database via SQL CDC, you can use expressions in mapping dataflow including isInsert(1), isUpdate(1) and isDelete(1) to differentiate the rows with different operation types. The following is one of the example scripts for mapping dataflow on deriving one column with the value: 1 to indicate inserted rows, 2 to indicate updated rows and 3 to indicate deleted rows for downstream transforms to process the delta data.
874+
875+
```json
876+
source(output(
877+
id as integer,
878+
name as string
879+
),
880+
allowSchemaDrift: true,
881+
validateSchema: false,
882+
enableNativeCdc: true,
883+
netChanges: true,
884+
skipInitialLoad: false,
885+
isolationLevel: 'READ_UNCOMMITTED',
886+
format: 'table') ~> source1
887+
source1 derive(operationType = iif(isInsert(1), 1, iif(isUpdate(1), 2, 3))) ~> derivedColumn1
888+
derivedColumn1 sink(allowSchemaDrift: true,
889+
validateSchema: false,
890+
skipDuplicateMapInputs: true,
891+
skipDuplicateMapOutputs: true) ~> sink1
892+
```
893+
894+
824895
## Troubleshoot connection issues
825896

826897
1. Configure your SQL Server instance to accept remote connections. Start **SQL Server Management Studio**, right-click **server**, and select **Properties**. Select **Connections** from the list, and select the **Allow remote connections to this server** check box.

0 commit comments

Comments
 (0)