Skip to content

Commit 3a2c848

Browse files
committed
Infer from schema on error
1 parent 58bfb30 commit 3a2c848

File tree

3 files changed

+86
-42
lines changed

3 files changed

+86
-42
lines changed

aircan/dags/api_ckan_import_to_bq_v2.py

Lines changed: 14 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
# Local imports
99
from aircan.dependencies.google_cloud.bigquery_handler_v2 import bq_import_csv
10-
from aircan.dependencies.utils import aircan_status_update
10+
from aircan.dependencies.utils import aircan_status_update_nhs as aircan_status_update
1111

1212
# Third-party library imports
1313
from airflow import DAG
@@ -16,7 +16,7 @@
1616
from airflow.models import Variable
1717
from airflow.operators.python_operator import PythonOperator
1818
from airflow.utils.dates import days_ago
19-
19+
import traceback
2020

2121
args = {
2222
'start_date': days_ago(0),
@@ -71,29 +71,19 @@ def task_import_resource_to_bq(**context):
7171
logging.info('Importing %s to BQ %s' % (gc_file_url, bq_table_id))
7272
ckan_conf = context['params'].get('ckan_config', {})
7373
ckan_conf['resource_id'] = context['params'].get('resource', {}).get('ckan_resource_id')
74-
dag_run_id = context['dag_run'].run_id
74+
dag_run_id = context['run_id']
7575
res_id = ckan_conf.get('resource_id')
76-
try:
77-
bq_import_csv(bq_table_id, gc_file_url, schema, ckan_conf)
78-
status_dict = {
79-
'dag_run_id': dag_run_id,
80-
'resource_id': res_id,
81-
'state': 'complete',
82-
'message': 'Data ingestion completed successfully for "{res_id}".'.format(
83-
res_id=res_id),
84-
'clear_logs': True
85-
}
86-
aircan_status_update(ckan_site_url, ckan_api_key, status_dict)
87-
except Exception as e:
88-
status_dict = {
89-
'dag_run_id': dag_run_id,
90-
'resource_id': res_id,
91-
'state': 'failed',
92-
'message': str(e),
93-
'clear_logs': True
94-
}
95-
aircan_status_update(ckan_site_url, ckan_api_key, status_dict)
96-
raise Exception(str(e))
76+
ckan_conf['dag_run_id'] = dag_run_id
77+
bq_import_csv(bq_table_id, gc_file_url, schema, ckan_conf)
78+
status_dict = {
79+
'dag_run_id': dag_run_id,
80+
'resource_id': res_id,
81+
'state': 'complete',
82+
'message': 'Data ingestion completed successfully for "{res_id}".'.format(
83+
res_id=res_id),
84+
'clear_logs': True
85+
}
86+
aircan_status_update(ckan_site_url, ckan_api_key, status_dict)
9787

9888
import_resource_to_bq_task = PythonOperator(
9989
task_id='import_resource_to_bq_v2',

aircan/dependencies/google_cloud/bigquery_handler_v2.py

Lines changed: 35 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from google.cloud import bigquery
22
import google.api_core.exceptions
3-
from aircan.dependencies.utils import AirflowCKANException, aircan_status_update
3+
from aircan.dependencies.utils import AirflowCKANException, aircan_status_update_nhs as aircan_status_update
44
import json
55
import logging
66

@@ -13,34 +13,51 @@ def bq_import_csv(table_id, gcs_path, table_schema, ckan_conf):
1313
try:
1414
client = bigquery.Client()
1515

16-
job_config = bigquery.LoadJobConfig()
16+
try:
17+
job_config = bigquery.LoadJobConfig()
1718

18-
schema = bq_schema_from_table_schema(table_schema)
19-
job_config.schema = schema
19+
schema = bq_schema_from_table_schema(table_schema)
20+
job_config.schema = schema
2021

21-
job_config.skip_leading_rows = 1
22-
job_config.source_format = bigquery.SourceFormat.CSV
23-
# overwrite a Table
24-
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
25-
# set 'True' for schema autodetect but turning it off since we define schema in explicitly when publishing data using datapub
26-
# job_config.autodetect = True
27-
load_job = client.load_table_from_uri(
28-
gcs_path, table_id, job_config=job_config
29-
)
22+
job_config.skip_leading_rows = 1
23+
job_config.source_format = bigquery.SourceFormat.CSV
24+
# overwrite a Table
25+
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
26+
# set 'True' for schema autodetect but turning it off since we define schema in explicitly when publishing data using datapub
27+
# job_config.autodetect = True
28+
load_job = client.load_table_from_uri(
29+
gcs_path, table_id, job_config=job_config
30+
)
3031

31-
load_job.result() # Waits for table load to complete.
32-
destination_table = client.get_table(table_id)
32+
load_job.result() # Waits for table load to complete.
33+
destination_table = client.get_table(table_id)
34+
except Exception as e:
35+
job_config = bigquery.LoadJobConfig()
36+
37+
job_config.skip_leading_rows = 1
38+
job_config.source_format = bigquery.SourceFormat.CSV
39+
# overwrite a Table
40+
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
41+
# set 'True' for schema autodetect but turning it off since we define schema in explicitly when publishing data using datapub
42+
# job_config.autodetect = True
43+
load_job = client.load_table_from_uri(
44+
gcs_path, table_id, job_config=job_config
45+
)
46+
load_job.result() # Waits for table load to complete.
47+
destination_table = client.get_table(table_id)
3348
status_dict = {
3449
'res_id': ckan_conf.get('resource_id'),
3550
'state': 'progress',
36-
'message': 'Data ingestion is in progress.'
51+
'message': 'Data ingestion is in progress.',
52+
'dag_run_id': ckan_conf.get('dag_run_id')
3753
}
3854
aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict)
3955
if destination_table:
4056
status_dict = {
4157
'res_id': ckan_conf.get('resource_id'),
4258
'state': 'complete',
43-
'message': "Ingession Completed"
59+
'message': "Ingession Completed",
60+
'dag_run_id': ckan_conf.get('dag_run_id')
4461
}
4562
aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict)
4663
return {'success': True, 'message': 'BigQuery Table created successfully.'}
@@ -60,6 +77,7 @@ def bq_import_csv(table_id, gcs_path, table_schema, ckan_conf):
6077
logging.info(e)
6178
status_dict = {
6279
'res_id': ckan_conf.get('resource_id'),
80+
'dag_run_id': ckan_conf.get('dag_run_id'),
6381
'state': 'failed',
6482
'message': str(e)
6583
}

aircan/dependencies/utils.py

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,42 @@ def days_ago(n, hour=0, minute=0, second=0, microsecond=0):
7272
time(hour, minute, second, microsecond, tzinfo=timezone.TIMEZONE),
7373
)
7474

75+
def aircan_status_update_nhs (site_url, ckan_api_key, status_dict):
76+
"""
77+
Update aircan run status like pending, error, process, complete
78+
on ckan with message.
79+
"""
80+
logging.info('Updating data loading status')
81+
try:
82+
request_data = {
83+
'dag_run_id': status_dict.get('dag_run_id', ''),
84+
'resource_id': status_dict.get('res_id', ''),
85+
'state': status_dict.get('state', ''),
86+
'last_updated': str(datetime.utcnow()),
87+
'message': status_dict.get('message', ''),
88+
}
89+
90+
if status_dict.get('error', False):
91+
request_data.update({'error': {
92+
'message' : status_dict.get('error', '')
93+
}})
94+
95+
url = urljoin(site_url, '/api/3/action/aircan_status_update')
96+
response = requests.post(url,
97+
data=json.dumps(request_data),
98+
headers={'Content-Type': 'application/json',
99+
'Authorization': ckan_api_key})
100+
print(response.text)
101+
if response.status_code == 200:
102+
resource_json = response.json()
103+
logging.info('Loading status updated successfully in CKAN.')
104+
return {'success': True}
105+
else:
106+
print(response.json())
107+
return response.json()
108+
except Exception as e:
109+
logging.error('Failed to update status in CKAN. {0}'.format(e))
110+
75111
def aircan_status_update(site_url, ckan_api_key, status_dict):
76112
"""
77113
Update aircan run status like pending, error, process, complete
@@ -325,4 +361,4 @@ def join_path(path, *paths):
325361
"""
326362
for p in paths:
327363
path = os.path.join(path, p)
328-
return path
364+
return path

0 commit comments

Comments
 (0)