Skip to content

Commit 60f90b1

Browse files
committed
#281 #280 and fix exception for Firebase transfer function: use TransactionAbortedError
1 parent ffd0dab commit 60f90b1

File tree

2 files changed

+117
-82
lines changed

2 files changed

+117
-82
lines changed

mapswipe_workers/mapswipe_workers/firebase_to_postgres/transfer_results.py

Lines changed: 92 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import csv
22
import io
3-
import datetime as dt
43
import dateutil.parser
54

65
from mapswipe_workers import auth
@@ -9,18 +8,18 @@
98

109

1110
def transfer_results(project_id_list=None):
12-
'''
11+
"""
1312
Download results from firebase,
1413
saves them to postgres and then deletes the results in firebase.
1514
This is implemented as a transactional operation as described in
1615
the Firebase docs to avoid missing new generated results in
1716
Firebase during execution of this function.
18-
'''
17+
"""
1918

2019
# Firebase transaction function
2120
def transfer(current_results):
2221
if current_results is None:
23-
logger.info(f'{project_id}: No results in Firebase')
22+
logger.info(f"{project_id}: No results in Firebase")
2423
return dict()
2524
else:
2625
results_user_id_list = get_user_ids_from_results(current_results)
@@ -33,41 +32,51 @@ def transfer(current_results):
3332

3433
if not project_id_list:
3534
# get project_ids from existing results if no project ids specified
36-
project_id_list = fb_db.reference('v2/results/').get(shallow=True)
35+
project_id_list = fb_db.reference("v2/results/").get(shallow=True)
3736
if not project_id_list:
3837
project_id_list = []
39-
logger.info(f'There are no results to transfer.')
38+
logger.info(f"There are no results to transfer.")
4039

41-
# get all project ids from postgres, we will only transfer results for projects we have there
40+
# get all project ids from postgres
41+
# we will only transfer results for projects we have there
4242
postgres_project_ids = get_projects_from_postgres()
4343

4444
for project_id in project_id_list:
4545
if project_id not in postgres_project_ids:
46-
logger.info(f'{project_id}: This project is not in postgres. We will not transfer results')
46+
logger.info(
47+
f"{project_id}: This project is not in postgres."
48+
f"We will not transfer results"
49+
)
4750
continue
48-
elif 'tutorial' in project_id:
49-
logger.info(f'{project_id}: these are results for a tutorial. we will not transfer these')
51+
elif "tutorial" in project_id:
52+
logger.info(
53+
f"{project_id}: these are results for a tutorial."
54+
f"we will not transfer these"
55+
)
5056
continue
5157

52-
logger.info(f'{project_id}: Start transfering results')
58+
logger.info(f"{project_id}: Start transfering results")
5359

54-
results_ref = fb_db.reference(f'v2/results/{project_id}')
60+
results_ref = fb_db.reference(f"v2/results/{project_id}")
5561
truncate_temp_results()
5662

5763
try:
5864
results_ref.transaction(transfer)
59-
logger.info(f'{project_id}: Transfered results to postgres')
60-
except fb_db.TransactionError:
65+
logger.info(f"{project_id}: Transfered results to postgres")
66+
# TransactionAbortedError:A transaction was aborted
67+
# after exceeding the maximum number of retries.
68+
except fb_db.TransactionAbortedError:
6169
logger.exception(
62-
f'{project_id}: Firebase transaction for transfering results failed to commit'
70+
f"{project_id}: Firebase transaction for"
71+
f"transferingresults failed to commit"
6372
)
6473

6574
del fb_db
6675
return project_id_list
6776

6877

6978
def results_to_file(results, projectId):
70-
'''
79+
"""
7180
Writes results to an in-memory file like object
7281
formatted as a csv using the buffer module (StringIO).
7382
This can be then used by the COPY statement of Postgres
@@ -81,32 +90,37 @@ def results_to_file(results, projectId):
8190
-------
8291
results_file: io.StingIO
8392
The results in an StringIO buffer.
84-
'''
93+
"""
8594
# If csv file is a file object, it should be opened with newline=''
8695

87-
results_file = io.StringIO('')
96+
results_file = io.StringIO("")
8897

89-
w = csv.writer(
90-
results_file,
91-
delimiter='\t',
92-
quotechar="'"
93-
)
98+
w = csv.writer(results_file, delimiter="\t", quotechar="'")
9499

95-
logger.info(f'Got %s groups for project {projectId} to transfer' % len(results.items()))
100+
logger.info(
101+
f"Got %s groups for project {projectId} to transfer" % len(results.items())
102+
)
96103
for groupId, users in results.items():
97104
for userId, results in users.items():
98105

99-
# check if all attributes are set, if not don't transfer the results for this group
106+
# check if all attributes are set,
107+
# if not don't transfer the results for this group
100108
try:
101-
timestamp = results['timestamp']
102-
start_time = results['startTime']
103-
end_time = results['endTime']
104-
results = results['results']
109+
timestamp = results["timestamp"]
110+
start_time = results["startTime"]
111+
end_time = results["endTime"]
112+
results = results["results"]
105113
except KeyError as e:
106114
sentry.capture_exception_sentry(e)
107-
sentry.capture_message_sentry(f'at least one missing attribute for: {projectId}/{groupId}/{userId}, will skip this one')
115+
sentry.capture_message_sentry(
116+
f"at least one missing attribute for:"
117+
f"{projectId}/{groupId}/{userId}, will skip this one"
118+
)
108119
logger.exception(e)
109-
logger.warning(f'at least one missing attribute for: {projectId}/{groupId}/{userId}, will skip this one')
120+
logger.warning(
121+
f"at least one missing attribute for:"
122+
f"{projectId}/{groupId}/{userId}, will skip this one"
123+
)
110124
continue
111125

112126
timestamp = dateutil.parser.parse(timestamp)
@@ -115,16 +129,18 @@ def results_to_file(results, projectId):
115129

116130
if type(results) is dict:
117131
for taskId, result in results.items():
118-
w.writerow([
119-
projectId,
120-
groupId,
121-
userId,
122-
taskId,
123-
timestamp,
124-
start_time,
125-
end_time,
126-
result,
127-
])
132+
w.writerow(
133+
[
134+
projectId,
135+
groupId,
136+
userId,
137+
taskId,
138+
timestamp,
139+
start_time,
140+
end_time,
141+
result,
142+
]
143+
)
128144
elif type(results) is list:
129145
# TODO: optimize for performance
130146
# (make sure data from firebase is always a dict)
@@ -135,16 +151,18 @@ def results_to_file(results, projectId):
135151
if result is None:
136152
continue
137153
else:
138-
w.writerow([
139-
projectId,
140-
groupId,
141-
userId,
142-
taskId,
143-
timestamp,
144-
start_time,
145-
end_time,
146-
result,
147-
])
154+
w.writerow(
155+
[
156+
projectId,
157+
groupId,
158+
userId,
159+
taskId,
160+
timestamp,
161+
start_time,
162+
end_time,
163+
result,
164+
]
165+
)
148166
else:
149167
raise TypeError
150168

@@ -153,54 +171,54 @@ def results_to_file(results, projectId):
153171

154172

155173
def save_results_to_postgres(results_file):
156-
'''
174+
"""
157175
Saves results to a temporary table in postgres using the COPY Statement of Postgres
158176
for a more efficient import into the database.
159177
160178
Parameters
161179
----------
162180
results_file: io.StringIO
163-
'''
181+
"""
164182

165183
p_con = auth.postgresDB()
166184
columns = [
167-
'project_id',
168-
'group_id',
169-
'user_id',
170-
'task_id',
171-
'timestamp',
172-
'start_time',
173-
'end_time',
174-
'result',
175-
]
176-
p_con.copy_from(results_file, 'results_temp', columns)
185+
"project_id",
186+
"group_id",
187+
"user_id",
188+
"task_id",
189+
"timestamp",
190+
"start_time",
191+
"end_time",
192+
"result",
193+
]
194+
p_con.copy_from(results_file, "results_temp", columns)
177195
results_file.close()
178196

179-
query_insert_results = '''
197+
query_insert_results = """
180198
INSERT INTO results
181199
SELECT * FROM results_temp
182200
ON CONFLICT (project_id,group_id,user_id,task_id)
183201
DO NOTHING;
184-
'''
202+
"""
185203
p_con.query(query_insert_results)
186-
del(p_con)
204+
del p_con
187205

188206

189207
def truncate_temp_results():
190208
p_con = auth.postgresDB()
191-
query_truncate_temp_results = '''
209+
query_truncate_temp_results = """
192210
TRUNCATE results_temp
193-
'''
211+
"""
194212
p_con.query(query_truncate_temp_results)
195213
del p_con
196214

197215
return
198216

199217

200218
def get_user_ids_from_results(results):
201-
'''
219+
"""
202220
Get all users based on the ids provided in the results
203-
'''
221+
"""
204222

205223
user_ids = set([])
206224
for groupId, users in results.items():
@@ -211,14 +229,14 @@ def get_user_ids_from_results(results):
211229

212230

213231
def get_projects_from_postgres():
214-
'''
232+
"""
215233
Get the id of all projects in postgres
216-
'''
234+
"""
217235

218236
pg_db = auth.postgresDB()
219-
sql_query = '''
237+
sql_query = """
220238
SELECT project_id from projects;
221-
'''
239+
"""
222240
raw_ids = pg_db.retr_query(sql_query, None)
223241
project_ids = [i[0] for i in raw_ids]
224242

mapswipe_workers/scripts/archive_old_projects.py

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,32 @@ def get_old_projects():
1414
return projects
1515

1616

17-
def move_project_data_to_v2(project_id, project_data):
17+
def move_project_data_to_v2(project_id):
1818
"""
1919
Copy project information from old path to v2/projects in Firebase.
2020
Add status=archived attribute.
21+
Use Firebase transaction function for this.
2122
"""
22-
project_data["status"] = "archived"
23+
24+
# Firebase transaction function
25+
def transfer(current_data):
26+
current_data["status"] = "archived"
27+
current_data["projectType"] = 1
28+
fb_db.reference("v2/projects/{0}".format(project_id)).set(current_data)
29+
return dict()
30+
2331
fb_db = auth.firebaseDB()
24-
fb_db.reference("v2/projects/{0}".format(project_id)).set(project_data)
25-
fb_db.reference("projects/{0}".format(project_id)).set({})
26-
logger.info(f"moved old project to v2: {project_id}")
32+
projects_ref = fb_db.reference(f"projects/{project_id}")
33+
try:
34+
projects_ref.transaction(transfer)
35+
logger.info(f"{project_id}: Transfered project to v2 and delete in old path")
36+
return True
37+
except fb_db.TransactionAbortedError:
38+
logger.exception(
39+
f"{project_id}: Firebase transaction"
40+
f"for transferring project failed to commit"
41+
)
42+
return False
2743

2844

2945
def delete_old_groups(project_id):
@@ -59,9 +75,10 @@ def archive_old_projects():
5975

6076
projects = get_old_projects()
6177
for project_id in projects.keys():
62-
project_data = projects[project_id]
63-
move_project_data_to_v2(project_id, project_data)
64-
delete_old_groups(project_id)
78+
if move_project_data_to_v2(project_id):
79+
delete_old_groups(project_id)
80+
else:
81+
logger.info(f"didn't delete project and groups for project: {project_id}")
6582

6683
delete_other_old_data()
6784

0 commit comments

Comments
 (0)