Skip to content

Commit 46583d5

Browse files
ic
0 parents  commit 46583d5

File tree

6 files changed

+184
-0
lines changed

6 files changed

+184
-0
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
.DS_Store

__init__.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
from airflow.plugins_manager import AirflowPlugin
2+
from mysql_plugin.hooks.astro_mysql_hook import AstroMySqlHook
3+
from mysql_plugin.operators.mysql_to_s3_operator import MySQLToS3Operator
4+
5+
6+
class MySQLToS3Plugin(AirflowPlugin):
7+
name = "MySQLToS3Plugin"
8+
operators = [MySQLToS3Operator]
9+
# Leave in for explicitness
10+
hooks = [AstroMySqlHook]
11+
executors = []
12+
macros = []
13+
admin_views = []
14+
flask_blueprints = []
15+
menu_links = []

hooks/__init__.py

Whitespace-only changes.

hooks/astro_mysql_hook.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
from airflow.hooks.mysql_hook import MySqlHook
2+
3+
4+
class AstroMySqlHook(MySqlHook):
5+
def get_schema(self, table):
6+
query = \
7+
"""
8+
SELECT COLUMN_NAME, COLUMN_TYPE
9+
FROM COLUMNS
10+
WHERE TABLE_NAME = '{0}';
11+
""".format(table)
12+
self.schema = 'information_schema'
13+
return super().get_records(query)

operators/__init__.py

Whitespace-only changes.

operators/mysql_to_s3_operator.py

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
from airflow.models import BaseOperator
2+
from airflow.hooks.S3_hook import S3Hook
3+
from mysql_plugin.hooks.astro_mysql_hook import AstroMySqlHook
4+
5+
from airflow.utils.decorators import apply_defaults
6+
import json
7+
import logging
8+
9+
10+
class MySQLToS3Operator(BaseOperator):
11+
"""
12+
MySQL to Spreadsheet Operator
13+
14+
NOTE: When using the MySQLToS3Operator, it is necessary to set the cursor
15+
to "dictcursor" in the MySQL connection settings within "Extra"
16+
(e.g.{"cursor":"dictcursor"}). To avoid invalid characters, it is also
17+
recommended to specify the character encoding (e.g {"charset":"utf8"}).
18+
19+
NOTE: Because this operator accesses a single database via concurrent
20+
connections, it is advised that a connection pool be used to control
21+
requests. - https://airflow.incubator.apache.org/concepts.html#pools
22+
23+
:param mysql_conn_id: The input mysql connection id.
24+
:type mysql_conn_id: string
25+
:param mysql_table: The input MySQL table to pull data from.
26+
:type mysql_table: string
27+
:param s3_conn_id: The destination s3 connection id.
28+
:type s3_conn_id: string
29+
:param s3_bucket: The destination s3 bucket.
30+
:type s3_bucket: string
31+
:param s3_key: The destination s3 key.
32+
:type s3_key: string
33+
:param package_schema: *(optional)* Whether or not to pull the
34+
schema information for the table as well as
35+
the data.
36+
:type package_schema: boolean
37+
:param incremental_key: *(optional)* The incrementing key to filter
38+
the source data with. Currently only
39+
accepts a column with type of timestamp.
40+
:type incremental_key: string
41+
:param start: *(optional)* The start date to filter
42+
records with based on the incremental_key.
43+
Only required if using the incremental_key
44+
field.
45+
:type start: timestamp (YYYY-MM-DD HH:MM:SS)
46+
:param end: *(optional)* The end date to filter
47+
records with based on the incremental_key.
48+
Only required if using the incremental_key
49+
field.
50+
:type end: timestamp (YYYY-MM-DD HH:MM:SS)
51+
"""
52+
53+
template_fields = ['start', 'end', 's3_key']
54+
55+
@apply_defaults
56+
def __init__(self,
57+
mysql_conn_id,
58+
mysql_table,
59+
s3_conn_id,
60+
s3_bucket,
61+
s3_key,
62+
package_schema=False,
63+
incremental_key=None,
64+
start=None,
65+
end=None,
66+
*args,
67+
**kwargs):
68+
super().__init__(*args, **kwargs)
69+
self.mysql_conn_id = mysql_conn_id
70+
self.mysql_table = mysql_table
71+
self.s3_conn_id = s3_conn_id
72+
self.s3_bucket = s3_bucket
73+
self.s3_key = s3_key
74+
self.package_schema = package_schema
75+
self.incremental_key = incremental_key
76+
self.start = start
77+
self.end = end
78+
79+
def execute(self, context):
80+
hook = AstroMySqlHook(self.mysql_conn_id)
81+
self.get_records(hook)
82+
if self.package_schema:
83+
self.get_schema(hook, self.mysql_table)
84+
85+
def get_schema(self, hook, table):
86+
logging.info('Initiating schema retrieval.')
87+
results = list(hook.get_schema(table))
88+
output_dict = {}
89+
for i in results:
90+
new = []
91+
new_dict = {}
92+
for n in i:
93+
if n == 'COLUMN_NAME':
94+
new.insert(0, i[n])
95+
else:
96+
new.insert(1, i[n])
97+
new = [i for i in new if i.islower()]
98+
if len(new) == 2:
99+
new_dict[new[0]] = new[1]
100+
output_dict.update(new_dict)
101+
self.s3_upload(str(output_dict), schema=True)
102+
103+
def get_records(self, hook):
104+
logging.info('Initiating record retrieval.')
105+
logging.info('Start Date: {0}'.format(self.start))
106+
logging.info('End Date: {0}'.format(self.end))
107+
108+
if all([self.incremental_key, self.start, self.end]):
109+
query_filter = """ WHERE {0} >= '{1}' AND {0} < '{2}'
110+
""".format(self.incremental_key, self.start, self.end)
111+
112+
if all([self.incremental_key, self.start]) and not self.end:
113+
query_filter = """ WHERE {0} >= '{1}'
114+
""".format(self.incremental_key, self.start)
115+
116+
if not self.incremental_key:
117+
query_filter = ''
118+
119+
query = \
120+
"""
121+
SELECT *
122+
FROM {0}
123+
{1}
124+
""".format(self.mysql_table, query_filter)
125+
126+
# Perform query and convert returned tuple to list
127+
results = list(hook.get_records(query))
128+
logging.info('Successfully performed query.')
129+
130+
# Iterate through list of dictionaries (one dict per row queried)
131+
# and convert datetime and date values to isoformat.
132+
# (e.g. datetime(2017, 08, 01) --> "2017-08-01T00:00:00")
133+
results = [dict([k, str(v)] if v is not None else [k, v]
134+
for k, v in i.items()) for i in results]
135+
results = '\n'.join([json.dumps(i) for i in results])
136+
self.s3_upload(results)
137+
return results
138+
139+
def s3_upload(self, results, schema=False):
140+
s3 = S3Hook(s3_conn_id=self.s3_conn_id)
141+
key = '{0}'.format(self.s3_key)
142+
# If the file being uploaded to s3 is a schema, append "_schema" to the
143+
# end of the file name.
144+
if schema and key[-5:] == '.json':
145+
key = key[:-5] + '_schema' + key[-5:]
146+
if schema and key[-4:] == '.csv':
147+
key = key[:-4] + '_schema' + key[-4:]
148+
s3.load_string(
149+
string_data=results,
150+
bucket_name=self.s3_bucket,
151+
key=key,
152+
replace=True
153+
)
154+
s3.connection.close()
155+
logging.info('File uploaded to s3')

0 commit comments

Comments
 (0)