Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions CleanTrigger1/clean.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
import logging
import os
import pandas as pd
from azure.storage.blob import ContentSettings
from azure.storage.blob import BlockBlobService
from azure.storage.blob import BlobServiceClient
from io import StringIO

blob_account_name = os.getenv("BlobAccountName")
blob_account_key = os.getenv("BlobAccountKey")
block_blob_service = BlockBlobService(account_name=blob_account_name,
account_key=blob_account_key)
blob_service_client = BlobServiceClient(
account_url=f"https://{blob_account_name}.blob.core.windows.net",
credential={"account_name": f"{blob_account_name}", "account_key":f"{blob_account_key}"}
)
out_blob_container_name = os.getenv("C1")

def clean(req_body):
blob_obj,filename = extract_blob_props(req_body[0]['data']['url'] )
df = pd.read_csv(StringIO(blob_obj.content))
df = pd.read_csv(StringIO(blob_obj.readall().decode('UTF-8')))
result = clean_blob(df,filename)
return result

Expand All @@ -24,7 +25,8 @@ def extract_blob_props(url):
in_container_name = url.rsplit('/',2)[-2]

# remove file extension from blob name
readblob = block_blob_service.get_blob_to_text(in_container_name,blob_file_name)
container_client = blob_service_client.get_container_client(container=in_container_name)
readblob = container_client.download_blob(blob=blob_file_name)
return readblob, blob_file_name

def clean_blob(df, blob_file_name):
Expand All @@ -37,5 +39,6 @@ def clean_blob(df, blob_file_name):
outcsv = df2.to_csv(index=False)

cleaned_blob_file_name = "cleaned_" +blob_file_name
block_blob_service.create_blob_from_text(out_blob_container_name, cleaned_blob_file_name, outcsv)
container_client = blob_service_client.get_container_client(container=out_blob_container_name)
container_client.upload_blob(cleaned_blob_file_name, outcsv)
return "Success"
17 changes: 10 additions & 7 deletions CleanTrigger2/clean.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,28 @@
import logging
import os
import pandas as pd
from azure.storage.blob import ContentSettings
from azure.storage.blob import BlockBlobService
from azure.storage.blob import BlobServiceClient
from io import StringIO

blob_account_name = os.getenv("BlobAccountName")
blob_account_key = os.getenv("BlobAccountKey")
block_blob_service = BlockBlobService(account_name=blob_account_name,
account_key=blob_account_key)
blob_service_client = BlobServiceClient(
account_url=f"https://{blob_account_name}.blob.core.windows.net",
credential={"account_name": f"{blob_account_name}", "account_key":f"{blob_account_key}"}
)
out_blob_container_name = os.getenv("C2")

def clean(req_body):
blob_obj,filename = extract_blob_props(req_body[0]['data']['url'])
df = pd.read_csv(StringIO(blob_obj.content))
df = pd.read_csv(StringIO(blob_obj.readall().decode('UTF-8')))
result = clean_blob(df, filename)
return result

def extract_blob_props(url):
blob_file_name = url.rsplit('/',1)[-1]
in_container_name = url.rsplit('/',2)[-2]
readblob = block_blob_service.get_blob_to_text(in_container_name,blob_file_name)
container_client = blob_service_client.get_container_client(container=in_container_name)
readblob = container_client.download_blob(blob=blob_file_name)
return readblob, blob_file_name

def clean_blob(df, blob_file_name):
Expand All @@ -32,6 +34,7 @@ def clean_blob(df, blob_file_name):
outcsv = df2.to_csv(index=False)

cleaned_blob_file_name = "cleaned_" +blob_file_name
block_blob_service.create_blob_from_text(out_blob_container_name, cleaned_blob_file_name, outcsv)
container_client = blob_service_client.get_container_client(container=out_blob_container_name)
container_client.upload_blob(cleaned_blob_file_name, outcsv)
return "Success"

7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@ Using this sample we demonstrate a real use case where this is used to perform c

- Deploy through Azure CLI
- Open AZ CLI and run `az group create -l [region] -n [resourceGroupName]` to create a resource group in your Azure subscription (i.e. [region] could be westus2, eastus, etc.)
- Run `az group deployment create --name [deploymentName] --resource-group [resourceGroupName] --template-file azuredeploy.json`
- Run `az deployment group create --name [deploymentName] --resource-group [resourceGroupName] --template-file azure-deploy-linux-app-plan.json`

- Deploy Function App
- [Create/Activate virtual environment](https://docs.microsoft.com/en-us/azure/azure-functions/functions-create-first-function-python#create-and-activate-a-virtual-environment)
- Run `func azure functionapp publish [functionAppName] --build-native-deps`
- Run `func azure functionapp publish [functionAppName] --build-native-deps`

- Deploy through Azure CLI
- Run `az group deployment create -g resource_group_name --template-file azure-deploy-event-grid-subscription.json`

### Test

Expand Down
12 changes: 7 additions & 5 deletions Reconcile/clean.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import logging
import os
import pandas as pd
from azure.storage.blob import ContentSettings
from azure.storage.blob import BlockBlobService
from azure.storage.blob import BlobServiceClient
from io import StringIO
from . import fetch_blob as fetching_service

blob_account_name = os.getenv("BlobAccountName")
blob_account_key = os.getenv("BlobAccountKey")
block_blob_service = BlockBlobService(account_name=blob_account_name,
account_key=blob_account_key)
blob_service_client = BlobServiceClient(
account_url=f"https://{blob_account_name}.blob.core.windows.net",
credential={"account_name": f"{blob_account_name}", "account_key":f"{blob_account_key}"}
)
out_blob_container_name = os.getenv("FINAL")
container_client = blob_service_client.get_container_client(container=out_blob_container_name)

# Clean blob flow from event grid events
# This function will call all the other functions in clean.py
Expand Down Expand Up @@ -38,5 +40,5 @@ def fetch_blobs(batch_id,file_2_container_name,file_1_container_name):
def final_reconciliation(f2_df, f1_df,batch_id):
outcsv = f2_df.to_csv(index=False)
cleaned_blob_file_name = "reconciled_" + batch_id
block_blob_service.create_blob_from_text(out_blob_container_name, cleaned_blob_file_name, outcsv)
container_client.upload_blob(name=cleaned_blob_file_name, data=outcsv)
return "Success"
16 changes: 10 additions & 6 deletions Reconcile/fetch_blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,25 @@
import os
import collections
import pandas as pd
from azure.storage.blob import ContentSettings
from azure.storage.blob import BlockBlobService
from azure.storage.blob import BlobServiceClient
from io import StringIO
#kill $(lsof -t -i :7071)

blob_account_name = os.getenv("BlobAccountName")
blob_account_key = os.getenv("BlobAccountKey")
block_blob_service = BlockBlobService(account_name=blob_account_name,
account_key=blob_account_key)
blob_service_client = BlobServiceClient(
account_url=f"https://{blob_account_name}.blob.core.windows.net",
credential={"account_name": f"{blob_account_name}", "account_key":f"{blob_account_key}"}
)

def blob_dict_to_df(my_ordered_dict, filter_string):
logging.warning('blob_dict_to_df')
filtered_dict = {k:v for k,v in my_ordered_dict.items() if filter_string in k}
logging.warning(filtered_dict)
container_key = list(filtered_dict.keys())[0]
latest_file = list(filtered_dict.values())[0]
blobstring = block_blob_service.get_blob_to_text(container_key, latest_file).content
container_client = blob_service_client.get_container_client(container=container_key)
blobstring = container_client.download_blob(blob=latest_file).readall().decode('UTF-8')
df = pd.read_csv(StringIO(blobstring),dtype=str)
return df

Expand All @@ -36,8 +38,10 @@ def blob_to_dict(batchId,*args):
file_names = []
for container in container_list:
logging.warning('FOR LOOP')
generator = block_blob_service.list_blobs(container)
container_client = blob_service_client.get_container_client(container)
generator = container_client.list_blobs()
logging.warning(list(generator))
generator = container_client.list_blobs()
for file in generator:
if "cleaned" in file.name:
file_names.append(file.name)
Expand Down
14 changes: 4 additions & 10 deletions azure-deploy-linux-app-plan.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,6 @@
"defaultValue": "WestUSLinuxDynamicPlan",
"type": "String"
},
"siteName1": {
"defaultValue": "customerendpointdh1.azurewebsites.net",
"type": "String"
},
"siteName2": {
"defaultValue": "customerendpointdh2.azurewebsites.net",
"type": "String"
},
"outputBlobContainerName" : {
"defaultValue": "cleaned",
"type": "String"
Expand All @@ -38,7 +30,9 @@
"variables": {
"storageAccountid": "[concat(resourceGroup().id,'/providers/','Microsoft.Storage/storageAccounts/', parameters('storageName'))]",
"container1" : "raw",
"container2" : "cleaned"
"container2" : "cleaned",
"siteName1" : "[concat(parameters('functionapp1'),'.azurewebsites.net')]",
"siteName2" : "[concat(parameters('functionapp2'),'.azurewebsites.net')]"
},
"resources": [
{
Expand Down Expand Up @@ -202,7 +196,7 @@
},
{
"type": "Microsoft.Web/sites/hostNameBindings",
"name": "[concat(parameters('functionapp1'), '/', parameters('siteName1'))]",
"name": "[concat(parameters('functionapp1'), '/', variables('siteName1'))]",
"apiVersion": "2016-08-01",
"location": "West US",
"properties": {
Expand Down
12 changes: 7 additions & 5 deletions blob_to_smart_contract/clean.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
import numpy as np
import os
import pandas as pd
from azure.storage.blob import ContentSettings
from azure.storage.blob import BlockBlobService
from azure.storage.blob import BlobServiceClient
from io import StringIO
from adal import AuthenticationContext
from . import fetch_blob as fetching_service
Expand All @@ -25,9 +24,12 @@
#%%
blob_account_name = os.getenv("BlobAccountName")
blob_account_key = os.getenv("BlobAccountKey")
block_blob_service = BlockBlobService(account_name=blob_account_name,
account_key=blob_account_key)
blob_service_client = BlobServiceClient(
account_url=f"https://{blob_account_name}.blob.core.windows.net",
credential={"account_name": f"{blob_account_name}", "account_key":f"{blob_account_key}"}
)
out_blob_final = os.getenv("OutBlobFinal")
container_client = blob_service_client.get_container_client(container=out_blob_final)
#%%
AUTHORITY = 'https://login.microsoftonline.com/gemtudev.onmicrosoft.com'

Expand Down Expand Up @@ -139,7 +141,7 @@ def create_json_blob(json_array):
#myarray = np.asarray(json_array).tolist()
myarray = pd.Series(json_array).to_json(orient='values')
blob_file_name = "df_to_json.json"
block_blob_service.create_blob_from_text(out_blob_final, blob_file_name, myarray)
container_client.upload_blob(name=blob_file_name, data=myarray)
return 'Success'
#%%
#testPayload3b = json.dumps(testPayload3)
Expand Down
16 changes: 10 additions & 6 deletions blob_to_smart_contract/fetch_blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@
import collections
import pandas as pd
import numpy as np
from azure.storage.blob import ContentSettings
from azure.storage.blob import BlockBlobService
from azure.storage.blob import BlobServiceClient
from io import StringIO
#kill $(lsof -t -i :7071)

blob_account_name = os.getenv("BlobAccountName")
blob_account_key = os.getenv("BlobAccountKey")
block_blob_service = BlockBlobService(account_name=blob_account_name,
account_key=blob_account_key)
blob_service_client = BlobServiceClient(
account_url=f"https://{blob_account_name}.blob.core.windows.net",
credential={"account_name": f"{blob_account_name}", "account_key":f"{blob_account_key}"}
)

def blob_dict_to_df(my_ordered_dict, filter_string):
logging.warning('blob_dict_to_df')
Expand All @@ -21,7 +22,8 @@ def blob_dict_to_df(my_ordered_dict, filter_string):
logging.warning(filtered_dict)
container_key = list(filtered_dict.keys())[0]
latest_file = list(filtered_dict.values())[0]
blobstring = block_blob_service.get_blob_to_text(container_key, latest_file).content
container_client = blob_service_client.get_container_client(container=container_key)
blobstring = container_client.download_blob(blob=latest_file).readall().decode('UTF-8')
df = pd.read_csv(StringIO(blobstring),dtype=str)
df = df.replace(np.nan, '', regex=True)
df["initstate"] = df["finalresult"].map(lambda x: "0" if "no" in x else "2")
Expand All @@ -42,8 +44,10 @@ def blob_to_dict(*args):
file_names = []
for container in container_list:
logging.warning('FOR LOOP')
generator = block_blob_service.list_blobs(container)
container_client = blob_service_client.get_container_client(container)
generator = container_client.list_blobs()
logging.warning(list(generator))
generator = container_client.list_blobs()
for file in generator:
file_names.append(file.name)
logging.info(file_names[ii])
Expand Down
14 changes: 7 additions & 7 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
adal==1.2.7
asn1crypto==0.24.0
astroid==2.1.0
azure-common==1.1.18
azure-functions==1.0.0a5
azure-functions-worker==1.0.0a6
azure-storage-blob==1.4.0
azure-storage-common==1.4.0
azure-common==1.1.28
azure-functions==1.10.1
azure-functions-worker==1.1.9
azure-storage-blob==12.11.0
certifi==2018.11.29
cffi==1.11.5
chardet==3.0.4
cryptography==2.5
cycler==0.10.0
grpcio==1.14.2
grpcio-tools==1.14.2
grpcio==1.33.1
grpcio-tools==1.33.1
idna==2.8
isort==4.3.4
kiwisolver==1.0.1
Expand Down