Skip to content

Commit d7e8139

Browse files
Adelina EnacheAdelina Enache
authored andcommitted
first commit
0 parents  commit d7e8139

File tree

7 files changed

+269
-0
lines changed

7 files changed

+269
-0
lines changed

.gitignore

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Python ignores
2+
# Byte-compiled / optimized / DLL files
3+
__pycache__/
4+
*.py[cod]
5+
*$py.class
6+
7+
.DS_Store
8+
9+
# Astro ignores
10+
.astro/deploys
11+
.astro/airflow/logs/
12+
.astro/airflow/postgres/
13+
14+
# IDE configs
15+
.vscode/
16+
.idea/

README.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Plugin - Intercom to S3
2+
3+
This plugin moves data from the [Intercom](https://developers.intercom.com/v2.0/docs) API to S3 based on the specified object
4+
5+
## Hooks
6+
### IntercomHook
7+
This hook handles the authentication and request to Intercom. Based on [python-intercom](https://github.com/jkeyes/python-intercom) module.
8+
9+
### S3Hook
10+
[Core Airflow S3Hook](https://pythonhosted.org/airflow/_modules/S3_hook.html) with the standard boto dependency.
11+
12+
## Operators
13+
### IntercomToS3Operator
14+
This operator composes the logic for this plugin. It fetches the intercom specified object and saves the result in a S3 Bucket, under a specified key, in
15+
njson format. The parameters it can accept include the following.
16+
17+
`intercom_conn_id`: The intercom connection id from Airflow
18+
`intercom_obj`: Intercom object to query
19+
`intercom_method`: *optional* Method from python-intercom.
20+
`s3_conn_id`: S3 connection id from Airflow.
21+
`s3_bucket`: The output s3 bucket.
22+
`s3_key`: The input s3 key.
23+
`output`: Name of the temporary file where the results should be saved
24+
`fields`: *optional* list of fields that you want to get from the object. If *None*, then this will get all fields for the object
25+
`replication_key`: *optional* name of the replication key, if needed.
26+
`replication_key_value`: *(optional)* value of the replication key, if needed. The operator will import only results with the property from replication_key grater than the value of this param.
27+
`intercom_method`: *(optional)* method to call from python-intercom. Default to "all".
28+
`**kwargs`: replication key and value, if replication_key parameter is given and extra params for intercom method if needed.

__init__.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
from airflow.plugins_manager import AirflowPlugin
2+
from intercom_plugin.operators.intercom_to_s3_operator import IntercomToS3Operator
3+
from intercom_plugin.hooks.intercom_hook import IntercomHook
4+
5+
6+
class IntercomToS3Plugin(AirflowPlugin):
7+
name = "intercom_plugin"
8+
hooks = [IntercomHook]
9+
operators = [IntercomToS3Operator]
10+
executors = []
11+
macros = []
12+
admin_views = []
13+
flask_blueprints = []
14+
menu_links = []

hooks/__init__.py

Whitespace-only changes.

hooks/intercom_hook.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
from intercom.client import Client
2+
from airflow.hooks.base_hook import BaseHook
3+
4+
5+
class IntercomHook(BaseHook):
6+
def __init__(
7+
self,
8+
conn_id,
9+
*args,
10+
**kwargs):
11+
self.conn_id = conn_id
12+
self._args = args
13+
self._kwargs = kwargs
14+
15+
self.intercom = None
16+
17+
def get_conn(self):
18+
"""
19+
Initialize a python-intercom instance.
20+
"""
21+
if self.intercom:
22+
return self.intercom
23+
24+
self.connection = self.get_connection(self.conn_id)
25+
self.extras = self.connection.extra_dejson
26+
intercom = Client(
27+
personal_access_token=self.extras['personal_access_token'])
28+
self.intercom = intercom
29+
30+
return intercom
31+
32+
def run_query(self, model, method='all', **kwargs):
33+
"""
34+
Run a query against intercom.
35+
:param model: name of the Intercom model
36+
:param method: name of the python-intercom method
37+
to call
38+
:param \**kwargs: extra args required by the intercom method
39+
"""
40+
intercom = self.get_conn()
41+
intercom_model = getattr(intercom, model)
42+
result = getattr(intercom_model, method)(**kwargs)
43+
44+
return result
45+
46+
def get_records(self, sql):
47+
pass
48+
49+
def get_pandas_df(self, sql):
50+
pass
51+
52+
def run(self, sql):
53+
pass

operators/__init__.py

Whitespace-only changes.
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
import logging
2+
import json
3+
import collections
4+
from airflow.hooks.S3_hook import S3Hook
5+
from airflow.models import BaseOperator
6+
from airflow.utils.decorators import apply_defaults
7+
from intercom_plugin.hooks.intercom_hook import IntercomHook
8+
from tempfile import NamedTemporaryFile
9+
10+
11+
# TODO: Inherit from BaseOperator
12+
class IntercomToS3Operator(BaseOperator):
13+
"""
14+
Make a query against Intercom and write the resulting data to s3.
15+
"""
16+
17+
@apply_defaults
18+
def __init__(
19+
self,
20+
intercom_conn_id,
21+
intercom_obj,
22+
intercom_method='all',
23+
s3_conn_id='',
24+
s3_bucket='',
25+
s3_key='',
26+
output='',
27+
fields=None,
28+
replication_key_name=None,
29+
replication_key_value=0,
30+
*args,
31+
**kwargs
32+
):
33+
"""
34+
Initialize the operator
35+
:param intercom_conn_id: name of the Airflow connection that has
36+
your Intercom tokens
37+
:param intercom_obj: name of the Intercom object we are
38+
fetching data from
39+
:param s3_conn_id: name of the Airflow connection that has
40+
your Amazon S3 conection params
41+
:param s3_bucket: name of the destination S3 bucket
42+
:param s3_key: name of the destination file from bucket
43+
:param output: name of the temporary file where the results
44+
should be saved
45+
:param fields: *(optional)* list of fields that you want
46+
to get from the object.
47+
If *None*, then this will get all fields
48+
for the object
49+
:param replication_key_name: *(optional)* name of the replication key,
50+
if needed.
51+
:param replication_key_value: *(optional)* value of the replication key,
52+
if needed. The operator will import only
53+
results with the property from replication_key
54+
grater than the value of this param.
55+
:param intercom_method *(optional)* method to call from python-intercom
56+
Default to "all".
57+
:param \**kwargs: Extra params for the intercom query, based on python
58+
intercom module
59+
"""
60+
61+
super().__init__(*args, **kwargs)
62+
63+
# TODO: update with get_conn(intercom_conn_id)
64+
self.intercom_conn_id = intercom_conn_id
65+
self.intercom_obj = intercom_obj
66+
self.intercom_method = intercom_method
67+
68+
self.s3_conn_id = s3_conn_id
69+
self.s3_bucket = s3_bucket
70+
self.s3_key = s3_key
71+
self.output = output
72+
73+
self.fields = fields
74+
self.replication_key_name = replication_key_name
75+
self.replication_key_value = replication_key_value
76+
self._kwargs = kwargs
77+
78+
def filter_fields(self, result):
79+
"""
80+
Filter the fields from an resulting object.
81+
82+
This will return a object only with fields given
83+
as parameter in the constructor.
84+
85+
All fields are returned when "fields" param is None.
86+
"""
87+
if not self.fields:
88+
return result
89+
obj = {}
90+
for field in self.fields:
91+
obj[field] = result[field]
92+
return obj
93+
94+
def filter(self, results):
95+
"""
96+
Filter the results.
97+
This will filter the results if there's a replication key given as param.
98+
"""
99+
if not isinstance(results, collections.Iterable):
100+
return json.loads((json.dumps(results, default=lambda o: o.__dict__)))
101+
102+
filtered = []
103+
for result in results:
104+
result_json = json.loads((json.dumps(result,
105+
default=lambda o: o.__dict__)))
106+
107+
if not self.replication_key_name or \
108+
int(result_json[self.replication_key_name]) >= int(self.replication_key_value):
109+
filtered.append(self.filter_fields(result_json))
110+
logging.info(filtered)
111+
112+
return filtered
113+
114+
def execute(self, context):
115+
"""
116+
Execute the operator.
117+
This will get all the data for a particular Intercom model
118+
and write it to a file.
119+
"""
120+
logging.info("Prepping to gather data from Intercom")
121+
hook = IntercomHook(
122+
conn_id=self.intercom_conn_id,
123+
)
124+
125+
# attempt to login to Intercom
126+
# if this process fails, it will raise an error and die right here
127+
# we could wrap it
128+
hook.get_conn()
129+
130+
logging.info(
131+
"Making request for"
132+
" {0} object".format(self.intercom_obj)
133+
)
134+
135+
# fetch the results from intercom and filter them
136+
137+
results = hook.run_query(self.intercom_obj, self.intercom_method)
138+
filterd_results = self.filter(results)
139+
140+
# write the results to a temporary file and save that file to s3
141+
with NamedTemporaryFile("w") as tmp:
142+
for result in filterd_results:
143+
tmp.write(json.dumps(result) + '\n')
144+
145+
tmp.flush()
146+
147+
dest_s3 = S3Hook(s3_conn_id=self.s3_conn_id)
148+
dest_s3.load_file(
149+
filename=tmp.name,
150+
key=self.output,
151+
bucket_name=self.s3_bucket,
152+
replace=True
153+
154+
)
155+
dest_s3.connection.close()
156+
tmp.close()
157+
158+
logging.info("Query finished!")

0 commit comments

Comments
 (0)