-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathTweet Link Scraper.py
More file actions
258 lines (222 loc) · 8.73 KB
/
Tweet Link Scraper.py
File metadata and controls
258 lines (222 loc) · 8.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
import urllib.request
from bs4 import BeautifulSoup as bs, Comment
import re, json
from IPython.display import clear_output
import threading
import sys
import queue
import datetime
import time
import getopt
from urllib.parse import urljoin
from pymongo import MongoClient
from multiprocessing import Process, Queue, Lock
import multiprocessing
import requests
import eventlet
eventlet.monkey_patch(socket=True)
# CONFIG
json_file_location = 'single.json'
num_of_threads = 20
num_of_processes = 4
mongo_server = None
mongo_port = None
# Globals
status = (0, 0, None, None, None) # used to print status report on main thread
poison_pill = 'EOF' # Used to poison threads in different processes via shared queue.
EOF = 'EOF'
f = open(json_file_location, encoding='utf8')
lock = Lock()
input_lock = Lock()
passed = 0
failed = 0
client = MongoClient()
db = client['tweet-link-data']
links = db['links']
links.ensure_index("url", unique=True)
def chunkify(lst,n):
return [lst[i::n] for i in range(n)]
def get_tweet():
global f
with input_lock:
try:
while(True):
line = f.readline()
if line == '':
return EOF
tweet = json.loads(line)
if len(tweet['entities']['urls']) > 0:
#print(tweet['entities']['urls'][0]['url'])
return tweet
except:
return EOF
def visible(element):
if element.parent.name in ['style', 'script', '[document]', 'head', 'title', 'meta'] :
return False
elif re.match('<!--.*-->', str(element)) or re.match('<![endif].*', str(element)):
return False
return True
def proc_worker(q, r, num_of_threads):
threads = []
for i in range(num_of_threads):
t = threading.Thread(target=thread_worker, args=(q,r,))
t.daemon = True
t.start()
threads.append(t)
sys.stdout.flush()
for thread in threads:
thread.join()
def main(argv):
handle_args(argv)
open('failed.txt', 'w+').close()
open('output.txt', 'w+').close()
# Queues for cross Process/Thread communication
# q - tweets are read in from json file to this queue, which are then retrieved by threads in other processes.
q = Queue() # tweet queue
# r - (return) extracted site text data and relevant info extracted by all active threads/processes are pushed to this
# queue and popped by a thread in the main processes for post processing / stat display.
r = Queue() # return queue
# Main process - thread that reads tweet data into a process-safe and thread-safe queue for distributed computing.
t = threading.Thread(target=fill_queue_async, args=(q,))
t.daemon = True
t.start()
# Main process - thread for processing return values and status output
rt = threading.Thread(target=return_worker, args=(r,))
rt.daemon = True
rt.start()
processes = []
for i in range(num_of_processes):
p = Process(target=proc_worker, args=(q,r,num_of_threads))
p.start()
processes.append(p)
t.join()
# do stat printing stuff
try:
while True:
finished = True
for p in processes:
if p.is_alive():
#print("Process {} is alive.".format(p.name))
finished = False
clear_output(wait=True)
print("Passed: {}, Failed: {}, Time elapsed: {}, items/second: {} \n{}".format(status[0], status[1], status[2], status[3], status[4]))
print("")
sys.stdout.flush()
time.sleep(2)
if finished:
return
except KeyboardInterrupt:
for p in processes:
p.terminate()
f.close()
print("Done.")
def handle_args(argv):
try:
opts, args = getopt.getopt(argv, "p:t:f:", ["processes=", "threads=", "file="])
except getopt.GetoptError:
sys.exit(2)
for opt, arg in opts:
if opt in ('-p', '--processes'):
global num_of_processes
num_of_processes = int(arg)
elif opt in ('-t', '--threads'):
global num_of_threads
print (arg)
time.sleep(1)
num_of_threads = int(arg)
elif opt in ('-f', '--file'):
global json_file_location
json_file_location = arg
def fill_queue_async(q):
with open(json_file_location, encoding='utf8') as f:
while True:
if q.qsize() < 10000:
line = f.readline()
if not line:
return
tweet = json.loads(line)
if len(tweet['entities']['urls']) > 0:
#print(tweet['entities']['urls'][0]['url'])
q.put(tweet)
else:
print('Queue full.')
def return_worker(r):
global status
start_time = datetime.datetime.now()
passed = 0
failed = 0
while True:
item = r.get()
error = do_queue_work(item)
if error == 0:
# Write/insert into wherever
with open('output.txt', 'a+') as f:
f.write(json.dumps(item) + "\n")
try:
links.insert_one(item)
except:
pass
passed += 1
else:
with open('failed.txt', 'a+') as f:
f.write(item['url']+"\n")
failed += 1
time_delta = datetime.datetime.now() - start_time
seconds_elapsed = time_delta.total_seconds() if time_delta.total_seconds() > 0 else 1
average_item_per_second = int((passed + failed)/seconds_elapsed)
#clear_output()
status = (passed, failed, time_delta, average_item_per_second, item['url'] if 'url' in item else 'Failed: {}'.format(item['tiny_url']))
sys.stdout.flush()
def do_queue_work(item):
if not 'texts' in item and not 'images' in item:
return 1
else:
return 0
def thread_worker(q, r):
global passed, failed, total
while(True):
if q.empty():
#print('CLOSING thread {} in process {}.'.format(threading.currentThread(), multiprocessing.current_process()))
return
try:
tweet = q.get(timeout = 1)
if not tweet:
return
for sub_urls in tweet['entities']['urls']:
doc = {}
doc['id'] = tweet['id']
doc['tiny_url'] = sub_urls['url']
doc['url'] = sub_urls['expanded_url']
try:
with eventlet.Timeout(10):
response = requests.get(doc['url'], timeout=10)
if response.status_code != requests.codes.ok:
raise ValueError('Response not Ok. {} Error occurred.'.format(response.status_code))
html = response.text # Read source HTML from response
soup = bs(html, 'html.parser') # BeautifulSoup is used to extract the text from the html.
comments = soup.findAll(text=lambda text:isinstance(text,Comment))
[comment.extract() for comment in comments]
text = soup.findAll(text=True)
text_items = filter(visible, text) # Attempt to filter out text that isn't the 'primary content' of the page.
# This is currently done by using a cookie-cutter blacklist for unneeded tags.
# An attempt to remove spurious text (such as nav menus) and empty newlines. After cleaning whitespace,
# we only accept strings of text that contain more than 10 space-separated tokens (hopefully words).
# TODO: make this more efficient. All over the place right now.
visible_text = [item.strip() for item in text_items if item != '\n' and len(item.strip().split(' ')) > 10]
doc['texts'] = visible_text
doc['images'] = list(set([tag['src'] for tag in soup.findAll('img') if len(tag['src']) < 100]))
# Update image urls to full urls
doc['images'] = [urljoin(url, img) for img in doc['images']]
except:
#Could not access address
#print(tweet['entities']['urls'][0]['url'])
pass
finally:
try:
r.put(doc, timeout=2)
except:
return
except:
return
if __name__ == '__main__':
main(sys.argv[1:])