11import csv
22import io
3- import datetime as dt
43import dateutil .parser
54
65from mapswipe_workers import auth
98
109
1110def transfer_results (project_id_list = None ):
12- '''
11+ """
1312 Download results from firebase,
1413 saves them to postgres and then deletes the results in firebase.
1514 This is implemented as a transactional operation as described in
1615 the Firebase docs to avoid missing new generated results in
1716 Firebase during execution of this function.
18- '''
17+ """
1918
2019 # Firebase transaction function
2120 def transfer (current_results ):
2221 if current_results is None :
23- logger .info (f' { project_id } : No results in Firebase' )
22+ logger .info (f" { project_id } : No results in Firebase" )
2423 return dict ()
2524 else :
2625 results_user_id_list = get_user_ids_from_results (current_results )
@@ -33,41 +32,51 @@ def transfer(current_results):
3332
3433 if not project_id_list :
3534 # get project_ids from existing results if no project ids specified
36- project_id_list = fb_db .reference (' v2/results/' ).get (shallow = True )
35+ project_id_list = fb_db .reference (" v2/results/" ).get (shallow = True )
3736 if not project_id_list :
3837 project_id_list = []
39- logger .info (f' There are no results to transfer.' )
38+ logger .info (f" There are no results to transfer." )
4039
41- # get all project ids from postgres, we will only transfer results for projects we have there
40+ # get all project ids from postgres
41+ # we will only transfer results for projects we have there
4242 postgres_project_ids = get_projects_from_postgres ()
4343
4444 for project_id in project_id_list :
4545 if project_id not in postgres_project_ids :
46- logger .info (f'{ project_id } : This project is not in postgres. We will not transfer results' )
46+ logger .info (
47+ f"{ project_id } : This project is not in postgres."
48+ f"We will not transfer results"
49+ )
4750 continue
48- elif 'tutorial' in project_id :
49- logger .info (f'{ project_id } : these are results for a tutorial. we will not transfer these' )
51+ elif "tutorial" in project_id :
52+ logger .info (
53+ f"{ project_id } : these are results for a tutorial."
54+ f"we will not transfer these"
55+ )
5056 continue
5157
52- logger .info (f' { project_id } : Start transfering results' )
58+ logger .info (f" { project_id } : Start transfering results" )
5359
54- results_ref = fb_db .reference (f' v2/results/{ project_id } ' )
60+ results_ref = fb_db .reference (f" v2/results/{ project_id } " )
5561 truncate_temp_results ()
5662
5763 try :
5864 results_ref .transaction (transfer )
59- logger .info (f'{ project_id } : Transfered results to postgres' )
60- except fb_db .TransactionError :
65+ logger .info (f"{ project_id } : Transfered results to postgres" )
66+ # TransactionAbortedError:A transaction was aborted
67+ # after exceeding the maximum number of retries.
68+ except fb_db .TransactionAbortedError :
6169 logger .exception (
62- f'{ project_id } : Firebase transaction for transfering results failed to commit'
70+ f"{ project_id } : Firebase transaction for"
71+ f"transferingresults failed to commit"
6372 )
6473
6574 del fb_db
6675 return project_id_list
6776
6877
6978def results_to_file (results , projectId ):
70- '''
79+ """
7180 Writes results to an in-memory file like object
7281 formatted as a csv using the buffer module (StringIO).
7382 This can be then used by the COPY statement of Postgres
@@ -81,32 +90,37 @@ def results_to_file(results, projectId):
8190 -------
8291 results_file: io.StingIO
8392 The results in an StringIO buffer.
84- '''
93+ """
8594 # If csv file is a file object, it should be opened with newline=''
8695
87- results_file = io .StringIO ('' )
96+ results_file = io .StringIO ("" )
8897
89- w = csv .writer (
90- results_file ,
91- delimiter = '\t ' ,
92- quotechar = "'"
93- )
98+ w = csv .writer (results_file , delimiter = "\t " , quotechar = "'" )
9499
95- logger .info (f'Got %s groups for project { projectId } to transfer' % len (results .items ()))
100+ logger .info (
101+ f"Got %s groups for project { projectId } to transfer" % len (results .items ())
102+ )
96103 for groupId , users in results .items ():
97104 for userId , results in users .items ():
98105
99- # check if all attributes are set, if not don't transfer the results for this group
106+ # check if all attributes are set,
107+ # if not don't transfer the results for this group
100108 try :
101- timestamp = results [' timestamp' ]
102- start_time = results [' startTime' ]
103- end_time = results [' endTime' ]
104- results = results [' results' ]
109+ timestamp = results [" timestamp" ]
110+ start_time = results [" startTime" ]
111+ end_time = results [" endTime" ]
112+ results = results [" results" ]
105113 except KeyError as e :
106114 sentry .capture_exception_sentry (e )
107- sentry .capture_message_sentry (f'at least one missing attribute for: { projectId } /{ groupId } /{ userId } , will skip this one' )
115+ sentry .capture_message_sentry (
116+ f"at least one missing attribute for:"
117+ f"{ projectId } /{ groupId } /{ userId } , will skip this one"
118+ )
108119 logger .exception (e )
109- logger .warning (f'at least one missing attribute for: { projectId } /{ groupId } /{ userId } , will skip this one' )
120+ logger .warning (
121+ f"at least one missing attribute for:"
122+ f"{ projectId } /{ groupId } /{ userId } , will skip this one"
123+ )
110124 continue
111125
112126 timestamp = dateutil .parser .parse (timestamp )
@@ -115,16 +129,18 @@ def results_to_file(results, projectId):
115129
116130 if type (results ) is dict :
117131 for taskId , result in results .items ():
118- w .writerow ([
119- projectId ,
120- groupId ,
121- userId ,
122- taskId ,
123- timestamp ,
124- start_time ,
125- end_time ,
126- result ,
127- ])
132+ w .writerow (
133+ [
134+ projectId ,
135+ groupId ,
136+ userId ,
137+ taskId ,
138+ timestamp ,
139+ start_time ,
140+ end_time ,
141+ result ,
142+ ]
143+ )
128144 elif type (results ) is list :
129145 # TODO: optimize for performance
130146 # (make sure data from firebase is always a dict)
@@ -135,16 +151,18 @@ def results_to_file(results, projectId):
135151 if result is None :
136152 continue
137153 else :
138- w .writerow ([
139- projectId ,
140- groupId ,
141- userId ,
142- taskId ,
143- timestamp ,
144- start_time ,
145- end_time ,
146- result ,
147- ])
154+ w .writerow (
155+ [
156+ projectId ,
157+ groupId ,
158+ userId ,
159+ taskId ,
160+ timestamp ,
161+ start_time ,
162+ end_time ,
163+ result ,
164+ ]
165+ )
148166 else :
149167 raise TypeError
150168
@@ -153,54 +171,54 @@ def results_to_file(results, projectId):
153171
154172
155173def save_results_to_postgres (results_file ):
156- '''
174+ """
157175 Saves results to a temporary table in postgres using the COPY Statement of Postgres
158176 for a more efficient import into the database.
159177
160178 Parameters
161179 ----------
162180 results_file: io.StringIO
163- '''
181+ """
164182
165183 p_con = auth .postgresDB ()
166184 columns = [
167- ' project_id' ,
168- ' group_id' ,
169- ' user_id' ,
170- ' task_id' ,
171- ' timestamp' ,
172- ' start_time' ,
173- ' end_time' ,
174- ' result' ,
175- ]
176- p_con .copy_from (results_file , ' results_temp' , columns )
185+ " project_id" ,
186+ " group_id" ,
187+ " user_id" ,
188+ " task_id" ,
189+ " timestamp" ,
190+ " start_time" ,
191+ " end_time" ,
192+ " result" ,
193+ ]
194+ p_con .copy_from (results_file , " results_temp" , columns )
177195 results_file .close ()
178196
179- query_insert_results = '''
197+ query_insert_results = """
180198 INSERT INTO results
181199 SELECT * FROM results_temp
182200 ON CONFLICT (project_id,group_id,user_id,task_id)
183201 DO NOTHING;
184- '''
202+ """
185203 p_con .query (query_insert_results )
186- del ( p_con )
204+ del p_con
187205
188206
189207def truncate_temp_results ():
190208 p_con = auth .postgresDB ()
191- query_truncate_temp_results = '''
209+ query_truncate_temp_results = """
192210 TRUNCATE results_temp
193- '''
211+ """
194212 p_con .query (query_truncate_temp_results )
195213 del p_con
196214
197215 return
198216
199217
200218def get_user_ids_from_results (results ):
201- '''
219+ """
202220 Get all users based on the ids provided in the results
203- '''
221+ """
204222
205223 user_ids = set ([])
206224 for groupId , users in results .items ():
@@ -211,14 +229,14 @@ def get_user_ids_from_results(results):
211229
212230
213231def get_projects_from_postgres ():
214- '''
232+ """
215233 Get the id of all projects in postgres
216- '''
234+ """
217235
218236 pg_db = auth .postgresDB ()
219- sql_query = '''
237+ sql_query = """
220238 SELECT project_id from projects;
221- '''
239+ """
222240 raw_ids = pg_db .retr_query (sql_query , None )
223241 project_ids = [i [0 ] for i in raw_ids ]
224242
0 commit comments