Skip to content

Commit 6495602

Browse files
finishing class 03
1 parent 8db1fe7 commit 6495602

File tree

3 files changed

+92
-0
lines changed

3 files changed

+92
-0
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ __pycache__
77
# AIRFLOW
88
logs
99
airflow.db
10+
airflow.cfg
11+
webserver_config.py
1012
airflow-webserver.pid
1113
standalone_admin_password.txt
1214
dump.rdb

airflow/hook/fields.json

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
{
2+
"tweet_fields": [
3+
"author_id",
4+
"conversation_id",
5+
"created_at",
6+
"id",
7+
"in_reply_to_user_id",
8+
"public_metrics",
9+
"lang",
10+
"text"
11+
],
12+
"user_fields": [
13+
"id",
14+
"name",
15+
"username",
16+
"created_at"
17+
]
18+
}

airflow/hook/twitter_hook.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
from os.path import join
2+
from pathlib import Path
3+
from airflow.providers.http.hooks.http import HttpHook
4+
from requests import Request
5+
from datetime import datetime, timedelta
6+
import json
7+
8+
class TwitterHook(HttpHook):
9+
def __init__(self, start_time, end_time, query, conn_id=None) -> None:
10+
self.start_time = start_time
11+
self.end_time = end_time
12+
self.query = query
13+
self.conn_id = conn_id or 'twitter_default'
14+
super().__init__(http_conn_id=self.conn_id)
15+
16+
def create_url(self) -> str:
17+
timestamp_format = '%Y-%m-%dT%H:%M:%S.00Z'
18+
19+
start_time = self.start_time
20+
end_time = self.end_time
21+
query = self.query
22+
23+
fields = json.load(open(join(Path(__file__).parents[0], 'fields.json')))
24+
25+
tweet_fields = f'tweet.fields={",".join(fields["tweet_fields"])}'
26+
user_fields = f'expansions=author_id&user.fields={",".join(fields["user_fields"])}'
27+
28+
url_raw = f'{self.base_url}/2/tweets/search/recent?query={query}&{tweet_fields}&{user_fields}&start_time={start_time}&end_time={end_time}'
29+
30+
return url_raw
31+
32+
def connect_to_endpoint(self, url, session):
33+
request = Request("GET", url)
34+
prep = session.prepare_request(request)
35+
self.log.info(f'URL: {url}')
36+
37+
return self.run_and_check(session, prep, {})
38+
39+
def paginate(self, url_raw, session):
40+
list_json_response = []
41+
42+
response = self.connect_to_endpoint(url_raw, session)
43+
json_response = response.json()
44+
list_json_response.append(json_response)
45+
46+
i = 1
47+
while 'next_token' in json_response.get('meta', {}) and i < 10:
48+
next_token = json_response['meta']['next_token']
49+
url = f'{url_raw}&next_token={next_token}'
50+
response = self.connect_to_endpoint(url, session)
51+
json_response = response.json()
52+
list_json_response.append(json_response)
53+
54+
i += 1
55+
56+
return list_json_response
57+
58+
def run(self):
59+
session = self.get_conn()
60+
url_raw = self.create_url()
61+
62+
return self.paginate(url_raw, session)
63+
64+
if __name__ == '__main__':
65+
timestamp_format = '%Y-%m-%dT%H:%M:%S.00Z'
66+
67+
start_time = (datetime.now() + timedelta(-1)).date().strftime(timestamp_format)
68+
end_time = datetime.now().strftime(timestamp_format)
69+
query = "datascience"
70+
71+
for pg in TwitterHook(start_time, end_time, query).run():
72+
print(json.dumps(pg, indent=4, sort_keys=True))

0 commit comments

Comments
 (0)