@@ -153,7 +153,7 @@ You create linked services in a data factory to link your data stores and comput
153
153
ls_name = ' storageLinkedService'
154
154
155
155
# IMPORTANT: specify the name and key of your Azure Storage account.
156
- storage_string = SecureString(' DefaultEndpointsProtocol=https;AccountName=<storageaccountname>;AccountKey=<storageaccountkey>' )
156
+ storage_string = SecureString(value = ' DefaultEndpointsProtocol=https;AccountName=<storageaccountname>;AccountKey=<storageaccountkey>' )
157
157
158
158
ls_azure_storage = AzureStorageLinkedService(connection_string = storage_string)
159
159
ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
@@ -222,10 +222,7 @@ Add the following code to the **Main** method that **triggers a pipeline run**.
222
222
223
223
``` python
224
224
# Create a pipeline run.
225
- run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name,
226
- {
227
- }
228
- )
225
+ run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters = {})
229
226
```
230
227
231
228
## Monitor a pipeline run
@@ -237,8 +234,12 @@ To monitor the pipeline run, add the following code the **Main** method:
237
234
time.sleep(30 )
238
235
pipeline_run = adf_client.pipeline_runs.get(rg_name, df_name, run_response.run_id)
239
236
print (" \n\t Pipeline run status: {} " .format(pipeline_run.status))
240
- activity_runs_paged = list (adf_client.activity_runs.list_by_pipeline_run(rg_name, df_name, pipeline_run.run_id, datetime.now() - timedelta(1 ), datetime.now() + timedelta(1 )))
241
- print_activity_run_details(activity_runs_paged[0 ])
237
+ filter_params = RunFilterParameters(
238
+ last_updated_after = datetime.now() - timedelta(1 ), last_updated_before = datetime.now() + timedelta(1 ))
239
+ query_response = adf_client.activity_runs.query_by_pipeline_run(
240
+ rg_name, df_name, pipeline_run.run_id, filter_params)
241
+ print_activity_run_details(query_response.value[0 ])
242
+
242
243
```
243
244
244
245
Now, add the following statement to invoke the ** main** method when the program is run:
@@ -334,7 +335,7 @@ def main():
334
335
335
336
# Specify the name and key of your Azure Storage account
336
337
storage_string = SecureString(
337
- ' DefaultEndpointsProtocol=https;AccountName=<storage account name>;AccountKey=<storage account key>' )
338
+ value = ' DefaultEndpointsProtocol=https;AccountName=<storage account name>;AccountKey=<storage account key>' )
338
339
339
340
ls_azure_storage = AzureStorageLinkedService(
340
341
connection_string = storage_string)
@@ -348,15 +349,15 @@ def main():
348
349
blob_path = ' adfv2tutorial/input'
349
350
blob_filename = ' input.txt'
350
351
ds_azure_blob = AzureBlobDataset(
351
- ds_ls, folder_path = blob_path, file_name = blob_filename)
352
+ linked_service_name = ds_ls, folder_path = blob_path, file_name = blob_filename)
352
353
ds = adf_client.datasets.create_or_update(
353
354
rg_name, df_name, ds_name, ds_azure_blob)
354
355
print_item(ds)
355
356
356
357
# Create an Azure blob dataset (output)
357
358
dsOut_name = ' ds_out'
358
359
output_blobpath = ' adfv2tutorial/output'
359
- dsOut_azure_blob = AzureBlobDataset(ds_ls, folder_path = output_blobpath)
360
+ dsOut_azure_blob = AzureBlobDataset(linked_service_name = ds_ls, folder_path = output_blobpath)
360
361
dsOut = adf_client.datasets.create_or_update(
361
362
rg_name, df_name, dsOut_name, dsOut_azure_blob)
362
363
print_item(dsOut)
@@ -379,19 +380,18 @@ def main():
379
380
print_item(p)
380
381
381
382
# Create a pipeline run
382
- run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name,
383
- {
384
- }
385
- )
383
+ run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters = {})
386
384
387
385
# Monitor the pipeline run
388
386
time.sleep(30 )
389
387
pipeline_run = adf_client.pipeline_runs.get(
390
388
rg_name, df_name, run_response.run_id)
391
389
print (" \n\t Pipeline run status: {} " .format(pipeline_run.status))
392
- activity_runs_paged = list (adf_client.activity_runs.list_by_pipeline_run(
393
- rg_name, df_name, pipeline_run.run_id, datetime.now() - timedelta(1 ), datetime.now() + timedelta(1 )))
394
- print_activity_run_details(activity_runs_paged[0 ])
390
+ filter_params = RunFilterParameters(
391
+ last_updated_after = datetime.now() - timedelta(1 ), last_updated_before = datetime.now() + timedelta(1 ))
392
+ query_response = adf_client.activity_runs.query_by_pipeline_run(
393
+ rg_name, df_name, pipeline_run.run_id, filter_params)
394
+ print_activity_run_details(query_response.value[0 ])
395
395
396
396
397
397
# Start the main method
0 commit comments