-
Notifications
You must be signed in to change notification settings - Fork 185
Expand file tree
/
Copy pathfunction_app.py
More file actions
138 lines (110 loc) · 5.59 KB
/
function_app.py
File metadata and controls
138 lines (110 loc) · 5.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import azure.functions as func
import json
from utils import get_file_names
from process_files import generate_sas_url, get_blob_service_client, analyze_layout, save_analysis_results
from model_paystubs import model_paystubs
from model_loanforms import model_loanforms
from model_loanagreements import model_loanagreements
app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)
@app.blob_trigger(arg_name="myblob", path="data/loanagreements/{name}",
connection="STORAGE_CONNECTION_STRING")
@app.cosmos_db_output(arg_name="outputDocument",
database_name="ContosoDB",
container_name="LoanAgreements",
connection="COSMOS_CONNECTION_STRING")
def ProcessLoanAgreements(myblob: func.InputStream, outputDocument: func.Out[func.Document]):
"""
Triggered when a new blob is added to the 'loanagreements' container.
Processes the blob and saves the results to Cosmos DB if it is a JSON file.
Parameters:
myblob (func.InputStream): The input blob stream.
outputDocument (func.Out[func.Document]): The output document for Cosmos DB.
"""
container_name = 'data/loanagreements'
blob_service_client = get_blob_service_client()
# Extract file name and root from the blob name
file_name, file_root = get_file_names(myblob.name)
# Check if the file extension is .json
if file_name.endswith('.json'):
# Read the blob data
blob_data = myblob.read()
data = json.loads(blob_data)
# Model the paystubs data
document = model_loanagreements(data)
# Save the analysis results to Cosmos DB
outputDocument.set(func.Document.from_dict(document))
return
# Generate SAS URL for the blob
sas_url = generate_sas_url(blob_service_client, container_name, file_name)
# Analyze the layout of the file using the SAS URL
analysis_results = analyze_layout(sas_url=sas_url)
# Save the analysis results
save_analysis_results(blob_service_client, container_name, file_root, analysis_results)
@app.blob_trigger(arg_name="myblob", path="data/loanform/{name}",
connection="STORAGE_CONNECTION_STRING")
@app.cosmos_db_output(arg_name="outputDocument",
database_name="ContosoDB",
container_name="LoanForms",
connection="COSMOS_CONNECTION_STRING")
def ProcessLoanForms(myblob: func.InputStream, outputDocument: func.Out[func.Document]):
"""
Triggered when a new blob is added to the 'loanforms' container.
Processes the blob and saves the results to Cosmos DB if it is a JSON file.
Parameters:
myblob (func.InputStream): The input blob stream.
outputDocument (func.Out[func.Document]): The output document for Cosmos DB.
"""
container_name = 'data/loanforms'
blob_service_client = get_blob_service_client()
# Extract file name and root from the blob name
file_name, file_root = get_file_names(myblob.name)
# Check if the file extension is .json
if file_name.endswith('.json'):
# Read the blob data
blob_data = myblob.read()
data = json.loads(blob_data)
# Model the paystubs data
document = model_loanforms(data)
# Save the analysis results to Cosmos DB
outputDocument.set(func.Document.from_dict(document))
return
# Generate SAS URL for the blob
sas_url = generate_sas_url(blob_service_client, container_name, file_name)
# Analyze the layout of the file using the SAS URL
analysis_results = analyze_layout(sas_url=sas_url)
# Save the analysis results
save_analysis_results(blob_service_client, container_name, file_root, analysis_results)
@app.blob_trigger(arg_name="myblob", path="data/paystubs/{name}",
connection="STORAGE_CONNECTION_STRING")
@app.cosmos_db_output(arg_name="outputDocument",
database_name="ContosoDB",
container_name="PayStubs",
connection="COSMOS_CONNECTION_STRING")
def ProcessPayStubs(myblob: func.InputStream, outputDocument: func.Out[func.Document]):
"""
Triggered when a new blob is added to the 'paystubs' container.
Processes the blob and saves the results to Cosmos DB if it is a JSON file.
Parameters:
myblob (func.InputStream): The input blob stream.
outputDocument (func.Out[func.Document]): The output document for Cosmos DB.
"""
container_name = 'data/paystubs'
blob_service_client = get_blob_service_client()
# Extract file name and root from the blob name
file_name, file_root = get_file_names(myblob.name)
# Check if the file extension is .json
if file_name.endswith('.json'):
# Read the blob data
blob_data = myblob.read()
data = json.loads(blob_data)
# Model the paystubs data
document = model_paystubs(data)
# Save the analysis results to Cosmos DB
outputDocument.set(func.Document.from_dict(document))
return
# Generate SAS URL for the blob
sas_url = generate_sas_url(blob_service_client, container_name, file_name)
# Analyze the layout of the file using the SAS URL
analysis_results = analyze_layout(sas_url=sas_url)
# Save the analysis results back to the blob storage
save_analysis_results(blob_service_client, container_name, file_root, analysis_results)