1
+ from typing import List
2
+
3
+ import os
4
+ from os .path import dirname
5
+
6
+ import json
7
+ import pathlib
8
+ import logging
9
+
10
+ from azure .storage .blob import BlobServiceClient
11
+ from azure .core .exceptions import ResourceExistsError
12
+
13
+ import azure .functions as func
14
+ import azure .durable_functions as df
15
+
16
+ myApp = df .DFApp (http_auth_level = func .AuthLevel .ANONYMOUS )
17
+
18
+ @myApp .route (route = "orchestrators/{functionName}" )
19
+ @myApp .durable_client_input (client_name = "client" )
20
+ async def HttpStart (req : func .HttpRequest , client ):
21
+ payload : str = json .loads (req .get_body ().decode ()) # Load JSON post request data
22
+ instance_id = await client .start_new (req .route_params ["functionName" ], client_input = payload )
23
+
24
+ logging .info (f"Started orchestration with ID = '{ instance_id } '." )
25
+
26
+ return client .create_check_status_response (req , instance_id )
27
+
28
+ @myApp .orchestration_trigger (context_name = "context" )
29
+ def E2_BackupSiteContent (context : df .DurableOrchestrationContext ):
30
+ root_directory : str = context .get_input ()
31
+
32
+ if not root_directory :
33
+ raise Exception ("A directory path is required as input" )
34
+
35
+ files = yield context .call_activity ("E2_GetFileList" , root_directory )
36
+ tasks = []
37
+ for file in files :
38
+ tasks .append (context .call_activity ("E2_CopyFileToBlob" , file ))
39
+
40
+ results = yield context .task_all (tasks )
41
+ total_bytes = sum (results )
42
+ return total_bytes
43
+
44
+ connect_str = os .getenv ('AzureWebJobsStorage' )
45
+
46
+ @myApp .activity_trigger (input_name = "rootDirectory" )
47
+ def E2_GetFileList (rootDirectory ):
48
+ all_file_paths = []
49
+ # We walk the file system
50
+ for path , _ , files in os .walk (rootDirectory ):
51
+ # We copy the code for activities and orchestrators
52
+ if "E2_" in path :
53
+ # For each file, we add their full-path to the list
54
+ for name in files :
55
+ if name == "__init__.py" or name == "function.json" :
56
+ file_path = os .path .join (path , name )
57
+ all_file_paths .append (file_path )
58
+
59
+ return all_file_paths
60
+
61
+ @myApp .activity_trigger (input_name = "filePath" )
62
+ def E2_CopyFileToBlob (filePath ):
63
+ # Create the BlobServiceClient object which will be used to create a container client
64
+ blob_service_client = BlobServiceClient .from_connection_string (connect_str )
65
+
66
+ # Create a unique name for the container
67
+ container_name = "backups"
68
+
69
+ # Create the container if it does not exist
70
+ try :
71
+ blob_service_client .create_container (container_name )
72
+ except ResourceExistsError :
73
+ pass
74
+
75
+ # Create a blob client using the local file name as the name for the blob
76
+ parent_dir , fname = pathlib .Path (filePath ).parts [- 2 :] # Get last two path components
77
+ blob_name = parent_dir + "_" + fname
78
+ blob_client = blob_service_client .get_blob_client (container = container_name , blob = blob_name )
79
+
80
+ # Count bytes in file
81
+ byte_count = os .path .getsize (filePath )
82
+
83
+ # Upload the created file
84
+ with open (filePath , "rb" ) as data :
85
+ blob_client .upload_blob (data )
86
+
87
+ return byte_count
0 commit comments