Skip to content

Commit 88ec61b

Browse files
Merge pull request #212973 from dearandyxu/master
SQL CDC
2 parents 113cb33 + a901bb4 commit 88ec61b

File tree

3 files changed

+216
-0
lines changed

3 files changed

+216
-0
lines changed

articles/data-factory/connector-azure-sql-database.md

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -810,6 +810,8 @@ Settings specific to Azure SQL Database are available in the **Source Options**
810810

811811
**Incremental date column**: When using the incremental extract feature, you must choose the date/time column that you wish to use as the watermark in your source table.
812812

813+
**Enable native change data capture(Preview)**: Use this option to tell ADF to only process delta data captured by [SQL change data capture technology](https://learn.microsoft.com/sql/relational-databases/track-changes/about-change-data-capture-sql-server) since the last time that the pipeline executed. With this option, the delta data including row insert, update and deletion will be loaded automatically without any incremental date column required. You need to [enable change data capture](https://learn.microsoft.com/sql/relational-databases/track-changes/enable-and-disable-change-data-capture-sql-server) on Azure SQL DB before using this option in ADF. For more information about this option in ADF, see [native change data capture](#native-change-data-capture).
814+
813815
**Start reading from beginning**: Setting this option with incremental extract will instruct ADF to read all rows on first execution of a pipeline with incremental extract turned on.
814816

815817
### Sink transformation
@@ -932,6 +934,73 @@ When you copy data from/to Azure SQL Database with [Always Encrypted](/sql/relat
932934
>[!NOTE]
933935
> Currently, Azure SQL Database [**Always Encrypted**](/sql/relational-databases/security/encryption/always-encrypted-database-engine?view=sql-server-ver15&preserve-view=true) is only supported for source transformation in mapping data flows.
934936
937+
## Native change data capture
938+
939+
Azure Data Factory can support native change data capture capabilities for SQL Server, Azure SQL DB and Azure SQL MI. The changed data including row insert, update and deletion in SQL stores can be automatically detected and extracted by ADF mapping dataflow. With the no code experience in mapping dataflow, users can easily achieve data replication scenario from SQL stores by appending a database as destination store. What is more, users can also compose any data transform logic in between to achieve incremental ETL scenario from SQL stores.
940+
941+
Make sure you keep the pipeline and activity name unchanged, so that the checkpoint can be recorded by ADF for you to get changed data from the last run automatically. If you change your pipeline name or activity name, the checkpoint will be reset, which leads you to start from beginning or get changes from now in the next run. If you do want to change the pipeline name or activity name but still keep the checkpoint to get changed data from the last run automatically, please use your own Checkpoint key in dataflow activity to achieve that.
942+
943+
When you debug the pipeline, this feature works the same. Be aware that the checkpoint will be reset when you refresh your browser during the debug run. After you are satisfied with the pipeline result from debug run, you can go ahead to publish and trigger the pipeline. At the moment when you first time trigger your published pipeline, it automatically restarts from the beginning or gets changes from now on.
944+
945+
In the monitoring section, you always have the chance to rerun a pipeline. When you are doing so, the changed data is always captured from the previous checkpoint of your selected pipeline run.
946+
947+
### Example 1:
948+
949+
When you directly chain a source transform referenced to SQL CDC enabled dataset with a sink transform referenced to a database in a mapping dataflow, the changes happened on SQL source will be automatically applied to the target database, so that you will easily get data replication scenario between databases. You can use update method in sink transform to select whether you want to allow insert, allow update or allow delete on target database. The example script in mapping dataflow is as below.
950+
951+
```json
952+
source(output(
953+
id as integer,
954+
name as string
955+
),
956+
allowSchemaDrift: true,
957+
validateSchema: false,
958+
enableNativeCdc: true,
959+
netChanges: true,
960+
skipInitialLoad: false,
961+
isolationLevel: 'READ_UNCOMMITTED',
962+
format: 'table') ~> source1
963+
source1 sink(allowSchemaDrift: true,
964+
validateSchema: false,
965+
deletable:true,
966+
insertable:true,
967+
updateable:true,
968+
upsertable:true,
969+
keys:['id'],
970+
format: 'table',
971+
skipDuplicateMapInputs: true,
972+
skipDuplicateMapOutputs: true,
973+
errorHandlingOption: 'stopOnFirstError') ~> sink1
974+
```
975+
976+
### Example 2:
977+
978+
If you want to enable ETL scenario instead of data replication between database via SQL CDC, you can use expressions in mapping dataflow including isInsert(1), isUpdate(1) and isDelete(1) to differentiate the rows with different operation types. The following is one of the example scripts for mapping dataflow on deriving one column with the value: 1 to indicate inserted rows, 2 to indicate updated rows and 3 to indicate deleted rows for downstream transforms to process the delta data.
979+
980+
```json
981+
source(output(
982+
id as integer,
983+
name as string
984+
),
985+
allowSchemaDrift: true,
986+
validateSchema: false,
987+
enableNativeCdc: true,
988+
netChanges: true,
989+
skipInitialLoad: false,
990+
isolationLevel: 'READ_UNCOMMITTED',
991+
format: 'table') ~> source1
992+
source1 derive(operationType = iif(isInsert(1), 1, iif(isUpdate(1), 2, 3))) ~> derivedColumn1
993+
derivedColumn1 sink(allowSchemaDrift: true,
994+
validateSchema: false,
995+
skipDuplicateMapInputs: true,
996+
skipDuplicateMapOutputs: true) ~> sink1
997+
```
998+
999+
### Known limitation:
1000+
1001+
* Only **net changes** from SQL CDC will be loaded by ADF via [cdc.fn_cdc_get_net_changes_](https://learn.microsoft.com/sql/relational-databases/system-functions/cdc-fn-cdc-get-net-changes-capture-instance-transact-sql?source=recommendations).
1002+
1003+
9351004
## Next steps
9361005
9371006
For a list of data stores supported as sources and sinks by the copy activity, see [Supported data stores and formats](copy-activity-overview.md#supported-data-stores-and-formats).

articles/data-factory/connector-azure-sql-managed-instance.md

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -782,6 +782,11 @@ The below table lists the properties supported by Azure SQL Managed Instance sou
782782
| Query | If you select Query as input, specify a SQL query to fetch data from source, which overrides any table you specify in dataset. Using queries is a great way to reduce rows for testing or lookups.<br><br>**Order By** clause is not supported, but you can set a full SELECT FROM statement. You can also use user-defined table functions. **select * from udfGetData()** is a UDF in SQL that returns a table that you can use in data flow.<br>Query example: `Select * from MyTable where customerId > 1000 and customerId < 2000`| No | String | query |
783783
| Batch size | Specify a batch size to chunk large data into reads. | No | Integer | batchSize |
784784
| Isolation Level | Choose one of the following isolation levels:<br>- Read Committed<br>- Read Uncommitted (default)<br>- Repeatable Read<br>- Serializable<br>- None (ignore isolation level) | No | <small>READ_COMMITTED<br/>READ_UNCOMMITTED<br/>REPEATABLE_READ<br/>SERIALIZABLE<br/>NONE</small> |isolationLevel |
785+
| Enable incremental extract | Use this option to tell ADF to only process rows that have changed since the last time that the pipeline executed. | No | - |- |
786+
| Incremental date column | When using the incremental extract feature, you must choose the date/time column that you wish to use as the watermark in your source table. | No | - |- |
787+
| Enable native change data capture(Preview) | Use this option to tell ADF to only process delta data captured by [SQL change data capture technology](https://learn.microsoft.com/sql/relational-databases/track-changes/about-change-data-capture-sql-server) since the last time that the pipeline executed. With this option, the delta data including row insert, update and deletion will be loaded automatically without any incremental date column required. You need to [enable change data capture](https://learn.microsoft.com/sql/relational-databases/track-changes/enable-and-disable-change-data-capture-sql-server) on Azure SQL MI before using this option in ADF. For more information about this option in ADF, see [native change data capture](#native-change-data-capture). | No | - |- |
788+
| Start reading from beginning | Setting this option with incremental extract will instruct ADF to read all rows on first execution of a pipeline with incremental extract turned on. | No | - |- |
789+
785790

786791
> [!TIP]
787792
> The [common table expression (CTE)](/sql/t-sql/queries/with-common-table-expression-transact-sql?view=sql-server-ver15&preserve-view=true) in SQL is not supported in the mapping data flow **Query** mode, because the prerequisite of using this mode is that queries can be used in the SQL query FROM clause but CTEs cannot do this.
@@ -910,5 +915,73 @@ When you copy data from/to SQL Managed Instance with [Always Encrypted](/sql/rel
910915
>[!NOTE]
911916
>Currently, SQL Managed Instance [**Always Encrypted**](/sql/relational-databases/security/encryption/always-encrypted-database-engine?view=sql-server-ver15&preserve-view=true) is only supported for source transformation in mapping data flows.
912917
918+
919+
## Native change data capture
920+
921+
Azure Data Factory can support native change data capture capabilities for SQL Server, Azure SQL DB and Azure SQL MI. The changed data including row insert, update and deletion in SQL stores can be automatically detected and extracted by ADF mapping dataflow. With the no code experience in mapping dataflow, users can easily achieve data replication scenario from SQL stores by appending a database as destination store. What is more, users can also compose any data transform logic in between to achieve incremental ETL scenario from SQL stores.
922+
923+
Make sure you keep the pipeline and activity name unchanged, so that the checkpoint can be recorded by ADF for you to get changed data from the last run automatically. If you change your pipeline name or activity name, the checkpoint will be reset, which leads you to start from beginning or get changes from now in the next run. If you do want to change the pipeline name or activity name but still keep the checkpoint to get changed data from the last run automatically, please use your own Checkpoint key in dataflow activity to achieve that.
924+
925+
When you debug the pipeline, this feature works the same. Be aware that the checkpoint will be reset when you refresh your browser during the debug run. After you are satisfied with the pipeline result from debug run, you can go ahead to publish and trigger the pipeline. At the moment when you first time trigger your published pipeline, it automatically restarts from the beginning or gets changes from now on.
926+
927+
In the monitoring section, you always have the chance to rerun a pipeline. When you are doing so, the changed data is always captured from the previous checkpoint of your selected pipeline run.
928+
929+
### Example 1:
930+
931+
When you directly chain a source transform referenced to SQL CDC enabled dataset with a sink transform referenced to a database in a mapping dataflow, the changes happened on SQL source will be automatically applied to the target database, so that you will easily get data replication scenario between databases. You can use update method in sink transform to select whether you want to allow insert, allow update or allow delete on target database. The example script in mapping dataflow is as below.
932+
933+
```json
934+
source(output(
935+
id as integer,
936+
name as string
937+
),
938+
allowSchemaDrift: true,
939+
validateSchema: false,
940+
enableNativeCdc: true,
941+
netChanges: true,
942+
skipInitialLoad: false,
943+
isolationLevel: 'READ_UNCOMMITTED',
944+
format: 'table') ~> source1
945+
source1 sink(allowSchemaDrift: true,
946+
validateSchema: false,
947+
deletable:true,
948+
insertable:true,
949+
updateable:true,
950+
upsertable:true,
951+
keys:['id'],
952+
format: 'table',
953+
skipDuplicateMapInputs: true,
954+
skipDuplicateMapOutputs: true,
955+
errorHandlingOption: 'stopOnFirstError') ~> sink1
956+
```
957+
958+
### Example 2:
959+
960+
If you want to enable ETL scenario instead of data replication between database via SQL CDC, you can use expressions in mapping dataflow including isInsert(1), isUpdate(1) and isDelete(1) to differentiate the rows with different operation types. The following is one of the example scripts for mapping dataflow on deriving one column with the value: 1 to indicate inserted rows, 2 to indicate updated rows and 3 to indicate deleted rows for downstream transforms to process the delta data.
961+
962+
```json
963+
source(output(
964+
id as integer,
965+
name as string
966+
),
967+
allowSchemaDrift: true,
968+
validateSchema: false,
969+
enableNativeCdc: true,
970+
netChanges: true,
971+
skipInitialLoad: false,
972+
isolationLevel: 'READ_UNCOMMITTED',
973+
format: 'table') ~> source1
974+
source1 derive(operationType = iif(isInsert(1), 1, iif(isUpdate(1), 2, 3))) ~> derivedColumn1
975+
derivedColumn1 sink(allowSchemaDrift: true,
976+
validateSchema: false,
977+
skipDuplicateMapInputs: true,
978+
skipDuplicateMapOutputs: true) ~> sink1
979+
```
980+
981+
### Known limitation:
982+
983+
* Only **net changes** from SQL CDC will be loaded by ADF via [cdc.fn_cdc_get_net_changes_](https://learn.microsoft.com/sql/relational-databases/system-functions/cdc-fn-cdc-get-net-changes-capture-instance-transact-sql?source=recommendations).
984+
985+
913986
## Next steps
914987
For a list of data stores supported as sources and sinks by the copy activity, see [Supported data stores](copy-activity-overview.md#supported-data-stores-and-formats).

0 commit comments

Comments
 (0)