Skip to content

Commit f29ce2a

Browse files
finishing class 04
1 parent 6495602 commit f29ce2a

File tree

3 files changed

+58
-3
lines changed

3 files changed

+58
-3
lines changed

.gitignore

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,7 @@ airflow-worker.pid
1616

1717
# ENVIRONMENT VARIABLES
1818
venv
19-
.env
19+
.env
20+
21+
# LOCAL DATA
22+
datalake

airflow/hook/twitter_hook.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@ def __init__(self, start_time, end_time, query, conn_id=None) -> None:
1414
super().__init__(http_conn_id=self.conn_id)
1515

1616
def create_url(self) -> str:
17-
timestamp_format = '%Y-%m-%dT%H:%M:%S.00Z'
18-
1917
start_time = self.start_time
2018
end_time = self.end_time
2119
query = self.query
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import sys
2+
sys.path.append('airflow')
3+
4+
from airflow.models import DAG, TaskInstance, BaseOperator
5+
from hook.twitter_hook import TwitterHook
6+
from datetime import datetime, timedelta
7+
from os.path import join
8+
from pathlib import Path
9+
import json
10+
11+
class TwitterOperator(BaseOperator):
12+
def __init__(self, file_path, start_time, end_time, query, **kwargs):
13+
self.file_path = file_path
14+
self.start_time = start_time
15+
self.end_time = end_time
16+
self.query = query
17+
super().__init__(**kwargs)
18+
19+
def create_parent_folder(self):
20+
Path(self.file_path).parent.mkdir(parents=True, exist_ok=True)
21+
22+
def execute(self, context):
23+
start_time = self.start_time
24+
end_time = self.end_time
25+
query = self.query
26+
27+
self.create_parent_folder()
28+
with open(self.file_path, 'w') as output_file:
29+
for pg in TwitterHook(start_time, end_time, query).run():
30+
json.dump(pg, output_file, ensure_ascii=False)
31+
output_file.write('\n')
32+
33+
if __name__ == '__main__':
34+
timestamp_format = '%Y-%m-%dT%H:%M:%S.00Z'
35+
36+
start_time = (datetime.now() + timedelta(-1)).date().strftime(timestamp_format)
37+
end_time = datetime.now().strftime(timestamp_format)
38+
query = "datascience"
39+
40+
with DAG(dag_id='TwitterTest', start_date=datetime.now()) as dag:
41+
to = TwitterOperator(
42+
task_id='test_run',
43+
file_path=join(
44+
'datalake',
45+
'twitter_datascience',
46+
f'extract_date={datetime.now().date()}',
47+
f'datascience_{datetime.now().date().strftime("%Y%m%d")}.json'
48+
),
49+
start_time=start_time,
50+
end_time=end_time,
51+
query=query
52+
)
53+
ti = TaskInstance(task=to)
54+
to.execute(ti.task_id)

0 commit comments

Comments
 (0)