-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathtweetDB.py
More file actions
277 lines (247 loc) · 9.59 KB
/
tweetDB.py
File metadata and controls
277 lines (247 loc) · 9.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
import sqlite3
import time
from datetime import datetime
from tweet import Tweet
from writer import Writer
class TweetDB:
def __init__(self, stock_market_name):
"""
Database class for Tweets that will be collected. Uses SQLite3.
Args:
`stock_market_name` (str): stock market name that will be
used as database name after creation there will be a
.db file with this name in the directory
To make database creation on memory set stock_market_name to `':memory:'`
"""
self.name = stock_market_name + '.db' if stock_market_name != ':memory:' else ''
self.conn = sqlite3.connect(f'{self.name}')
self.c = self.conn.cursor()
def create_tables(self):
"""Creates needed tables if they does not exists. """
with self.conn:
self.c.execute(
"""
CREATE TABLE IF NOT EXISTS Tweet (
tweet_id INTEGER PRIMARY KEY,
writer TEXT NOT NULL,
post_date INTEGER NOT NULL,
body TEXT NOT NULL,
comment_num INTEGER,
retweet_num INTEGER,
like_num INTEGER
)
"""
)
# Create index on post_date
self.c.execute(
"CREATE INDEX IF NOT EXISTS IX_post_date ON Tweet (post_date);")
# Create index on writer
self.c.execute(
"CREATE INDEX IF NOT EXISTS IX_writer ON Tweet (writer);")
self.c.execute(
"""
CREATE TABLE IF NOT EXISTS Writer (
user_id TEXT PRIMARY KEY,
username TEXT NOT NULL,
following INTEGER NOT NULL,
followerer INTEGER NOT NULL,
tweet_count INTEGER,
bio_text TEXT,
location TEXT,
website TEXT,
birthdate INTEGER,
joined INTEGER
)
"""
)
self.c.execute(
"""
CREATE TABLE IF NOT EXISTS SearchKey_Tweet (
tweet_id INTEGER,
searchKey TEXT,
PRIMARY KEY (tweet_id, searchKey)
)
"""
)
def _insert_writer_executer(self, writer: Writer) -> None:
"""
Executes a insert operation on single writer.
Commit is needed after execution.
Args:
`writer` (Writer): Writer instance to insert
"""
try:
self.c.execute(
"""
INSERT INTO Writer VALUES (
:user_id,
:username,
:following,
:follower,
:tweet_count,
:bio_text,
:location,
:website,
:birthdate,
:joined
)
""",
{
'user_id': writer.user_id,
'username': writer.username,
'following': writer.following,
'follower': writer.follower,
'tweet_count': writer.tweet_count,
'bio_text': writer.bio_text,
'location': writer.location,
'website': writer.website,
'birthdate': self.struct_to_seconds(writer.born) if writer.born else None,
'joined': time.mktime(writer.joined)
}
)
except sqlite3.IntegrityError:
print(f"Existing writer: {writer.user_id}")
def insert_writers(self, writers):
"""
Insert multiple writers.
Args:
writers (Iterable): Writer instances to insert
Raises:
ValueError if one of the writers is not an instance
of Writer. In the case none of the writers will be
inserted
"""
if any([not isinstance(e, Writer) for e in writers]):
raise ValueError(Writer)
with self.conn:
for writer in writers:
self._insert_writer_executer(writer)
def _insert_tweet_executer(self, tweet: Tweet) -> None:
"""
Executes a insert operation on single tweet.
Commit is needed after execution.
Args:
`tweet` (Tweet): Tweet instance to insert
"""
try:
self.c.execute(
"""
INSERT INTO Tweet VALUES (
:tweet_id,
:writer,
:post_date,
:body,
:comment_num,
:retweet_num,
:like_num
)
""",
{
'tweet_id': tweet.tweet_id,
'writer': tweet.writer,
'post_date': time.mktime(tweet.post_date),
'body': tweet.body,
'comment_num': tweet.comment_num,
'retweet_num': tweet.retweet_num,
'like_num': tweet.like_num
}
)
except sqlite3.IntegrityError:
print(f"Existing tweet: {tweet.tweet_id}")
try:
self.c.execute(
"""
INSERT INTO SearchKey_Tweet VALUES (
:tweet_id,
:searchKey
)
""",
{
'tweet_id': tweet.tweet_id,
'searchKey': tweet.searchKey
}
)
except sqlite3.IntegrityError:
print(
f"Existing tweet-searchKey pair: {tweet.tweet_id}-{tweet.searchKey}")
def insert_tweet(self, tweet: Tweet) -> None:
"""
Inserts single tweet.
Args:
`tweet` (Tweet): Tweet instance to insert
Raises:
TypeError if input tweet is not a Tweet instance
"""
if not isinstance(tweet, Tweet):
raise TypeError(Tweet)
with self.conn:
self._insert_tweet_executer(tweet)
def insert_tweets(self, tweets):
"""
Inserts multiple tweets.
Args:
`tweets` (iterable): any iterable that contains Tweet
Raises:
ValueError if input tweets contains at least one value that
is not a Tweet instance. In the case, none of the tweets
will be inserted. Be careful while choosing number of
tweets that will be inserted at once.
"""
if any([not isinstance(tweet, Tweet) for tweet in tweets]):
raise ValueError(Tweet)
with self.conn:
for tweet in tweets:
self.insert_tweet(tweet)
def _get_tweets_by_query(self, query: str, searchKey: str) -> list:
"""
Recieves tweets from database with given query.
Args:
`query` (str): SQLite query that will be executed
(e.g. `"SELECT * FROM Tweet"`)
`searchKey` (str): search key
Returns:
A list that contains Tweet instances created from executed
`query`
"""
self.c.execute(query)
tweets = []
for row in self.c.fetchall():
tweets.append(Tweet(tweet_id=row[0],
writer=row[1],
post_date=time.localtime(row[2]),
body=row[3],
searchKey=searchKey,
comment_num=row[4],
retweet_num=row[5],
like_num=row[6]))
return tweets
def get_tweet(self, tweet_id, searchKey) -> Tweet:
return self._get_tweets_by_query(
f"SELECT * FROM Tweet WHERE tweet_id={tweet_id}", searchKey)[0]
def get_searchKey_tweets(self, searchKey) -> list:
return self._get_tweets_by_query(
f"""
SELECT *
FROM Tweet
WHERE tweet_id=(
SELECT tweet_id
FROM SearchKey_Tweet
WHERE searchKey='{searchKey}'
)
""",
searchKey
)
def get_searchKeys(self):
self.c.execute("SELECT DISTINCT searchKey FROM SearchKey_Tweet")
return self.c.fetchall()
def get_companies_by_query(self, query):
self.c.execute(query)
return self.c.fetchall()
@staticmethod
def struct_to_seconds(time_struct: time.struct_time):
epoch = datetime(1970, 1, 1)
t = datetime(time_struct.tm_year, time_struct.tm_mon,
time_struct.tm_mday)
return (t-epoch).days * 24 * 3600
def close_DB(self):
self.conn.close()