11import csv
22import io
3+ import datetime as dt
34import dateutil .parser
45
56from mapswipe_workers import auth
89
910
1011def transfer_results (project_id_list = None ):
11- """
12+ '''
1213 Download results from firebase,
1314 saves them to postgres and then deletes the results in firebase.
1415 This is implemented as a transactional operation as described in
1516 the Firebase docs to avoid missing new generated results in
1617 Firebase during execution of this function.
17- """
18+ '''
1819
1920 # Firebase transaction function
2021 def transfer (current_results ):
2122 if current_results is None :
22- logger .info (f" { project_id } : No results in Firebase" )
23+ logger .info (f' { project_id } : No results in Firebase' )
2324 return dict ()
2425 else :
2526 results_user_id_list = get_user_ids_from_results (current_results )
@@ -32,51 +33,41 @@ def transfer(current_results):
3233
3334 if not project_id_list :
3435 # get project_ids from existing results if no project ids specified
35- project_id_list = fb_db .reference (" v2/results/" ).get (shallow = True )
36+ project_id_list = fb_db .reference (' v2/results/' ).get (shallow = True )
3637 if not project_id_list :
3738 project_id_list = []
38- logger .info (f" There are no results to transfer." )
39+ logger .info (f' There are no results to transfer.' )
3940
40- # get all project ids from postgres
41- # we will only transfer results for projects we have there
41+ # get all project ids from postgres, we will only transfer results for projects we have there
4242 postgres_project_ids = get_projects_from_postgres ()
4343
4444 for project_id in project_id_list :
4545 if project_id not in postgres_project_ids :
46- logger .info (
47- f"{ project_id } : This project is not in postgres."
48- f"We will not transfer results"
49- )
46+ logger .info (f'{ project_id } : This project is not in postgres. We will not transfer results' )
5047 continue
51- elif "tutorial" in project_id :
52- logger .info (
53- f"{ project_id } : these are results for a tutorial."
54- f"we will not transfer these"
55- )
48+ elif 'tutorial' in project_id :
49+ logger .info (f'{ project_id } : these are results for a tutorial. we will not transfer these' )
5650 continue
5751
58- logger .info (f" { project_id } : Start transfering results" )
52+ logger .info (f' { project_id } : Start transfering results' )
5953
60- results_ref = fb_db .reference (f" v2/results/{ project_id } " )
54+ results_ref = fb_db .reference (f' v2/results/{ project_id } ' )
6155 truncate_temp_results ()
6256
6357 try :
6458 results_ref .transaction (transfer )
65- logger .info (f"{ project_id } : Transfered results to postgres" )
66- # TransactionAbortedError:A transaction was aborted
67- # after exceeding the maximum number of retries.
68- except fb_db .TransactionAbortedError :
59+ logger .info (f'{ project_id } : Transfered results to postgres' )
60+ except fb_db .TransactionError :
6961 logger .exception (
70- f"{ project_id } : Firebase transaction for"
71- f"transferingresults failed to commit"
62+ f'{ project_id } : Firebase transaction for transfering results failed to commit'
7263 )
7364
7465 del fb_db
7566 return project_id_list
7667
7768
7869def results_to_file (results , projectId ):
79- """
70+ '''
8071 Writes results to an in-memory file like object
8172 formatted as a csv using the buffer module (StringIO).
8273 This can be then used by the COPY statement of Postgres
@@ -90,37 +81,32 @@ def results_to_file(results, projectId):
9081 -------
9182 results_file: io.StingIO
9283 The results in an StringIO buffer.
93- """
84+ '''
9485 # If csv file is a file object, it should be opened with newline=''
9586
96- results_file = io .StringIO ("" )
87+ results_file = io .StringIO ('' )
9788
98- w = csv .writer (results_file , delimiter = "\t " , quotechar = "'" )
89+ w = csv .writer (
90+ results_file ,
91+ delimiter = '\t ' ,
92+ quotechar = "'"
93+ )
9994
100- logger .info (
101- f"Got %s groups for project { projectId } to transfer" % len (results .items ())
102- )
95+ logger .info (f'Got %s groups for project { projectId } to transfer' % len (results .items ()))
10396 for groupId , users in results .items ():
10497 for userId , results in users .items ():
10598
106- # check if all attributes are set,
107- # if not don't transfer the results for this group
99+ # check if all attributes are set, if not don't transfer the results for this group
108100 try :
109- timestamp = results [" timestamp" ]
110- start_time = results [" startTime" ]
111- end_time = results [" endTime" ]
112- results = results [" results" ]
101+ timestamp = results [' timestamp' ]
102+ start_time = results [' startTime' ]
103+ end_time = results [' endTime' ]
104+ results = results [' results' ]
113105 except KeyError as e :
114106 sentry .capture_exception_sentry (e )
115- sentry .capture_message_sentry (
116- f"at least one missing attribute for:"
117- f"{ projectId } /{ groupId } /{ userId } , will skip this one"
118- )
107+ sentry .capture_message_sentry (f'at least one missing attribute for: { projectId } /{ groupId } /{ userId } , will skip this one' )
119108 logger .exception (e )
120- logger .warning (
121- f"at least one missing attribute for:"
122- f"{ projectId } /{ groupId } /{ userId } , will skip this one"
123- )
109+ logger .warning (f'at least one missing attribute for: { projectId } /{ groupId } /{ userId } , will skip this one' )
124110 continue
125111
126112 timestamp = dateutil .parser .parse (timestamp )
@@ -129,18 +115,16 @@ def results_to_file(results, projectId):
129115
130116 if type (results ) is dict :
131117 for taskId , result in results .items ():
132- w .writerow (
133- [
134- projectId ,
135- groupId ,
136- userId ,
137- taskId ,
138- timestamp ,
139- start_time ,
140- end_time ,
141- result ,
142- ]
143- )
118+ w .writerow ([
119+ projectId ,
120+ groupId ,
121+ userId ,
122+ taskId ,
123+ timestamp ,
124+ start_time ,
125+ end_time ,
126+ result ,
127+ ])
144128 elif type (results ) is list :
145129 # TODO: optimize for performance
146130 # (make sure data from firebase is always a dict)
@@ -151,18 +135,16 @@ def results_to_file(results, projectId):
151135 if result is None :
152136 continue
153137 else :
154- w .writerow (
155- [
156- projectId ,
157- groupId ,
158- userId ,
159- taskId ,
160- timestamp ,
161- start_time ,
162- end_time ,
163- result ,
164- ]
165- )
138+ w .writerow ([
139+ projectId ,
140+ groupId ,
141+ userId ,
142+ taskId ,
143+ timestamp ,
144+ start_time ,
145+ end_time ,
146+ result ,
147+ ])
166148 else :
167149 raise TypeError
168150
@@ -171,54 +153,54 @@ def results_to_file(results, projectId):
171153
172154
173155def save_results_to_postgres (results_file ):
174- """
156+ '''
175157 Saves results to a temporary table in postgres using the COPY Statement of Postgres
176158 for a more efficient import into the database.
177159
178160 Parameters
179161 ----------
180162 results_file: io.StringIO
181- """
163+ '''
182164
183165 p_con = auth .postgresDB ()
184166 columns = [
185- " project_id" ,
186- " group_id" ,
187- " user_id" ,
188- " task_id" ,
189- " timestamp" ,
190- " start_time" ,
191- " end_time" ,
192- " result" ,
193- ]
194- p_con .copy_from (results_file , " results_temp" , columns )
167+ ' project_id' ,
168+ ' group_id' ,
169+ ' user_id' ,
170+ ' task_id' ,
171+ ' timestamp' ,
172+ ' start_time' ,
173+ ' end_time' ,
174+ ' result' ,
175+ ]
176+ p_con .copy_from (results_file , ' results_temp' , columns )
195177 results_file .close ()
196178
197- query_insert_results = """
179+ query_insert_results = '''
198180 INSERT INTO results
199181 SELECT * FROM results_temp
200182 ON CONFLICT (project_id,group_id,user_id,task_id)
201183 DO NOTHING;
202- """
184+ '''
203185 p_con .query (query_insert_results )
204- del p_con
186+ del ( p_con )
205187
206188
207189def truncate_temp_results ():
208190 p_con = auth .postgresDB ()
209- query_truncate_temp_results = """
191+ query_truncate_temp_results = '''
210192 TRUNCATE results_temp
211- """
193+ '''
212194 p_con .query (query_truncate_temp_results )
213195 del p_con
214196
215197 return
216198
217199
218200def get_user_ids_from_results (results ):
219- """
201+ '''
220202 Get all users based on the ids provided in the results
221- """
203+ '''
222204
223205 user_ids = set ([])
224206 for groupId , users in results .items ():
@@ -229,14 +211,14 @@ def get_user_ids_from_results(results):
229211
230212
231213def get_projects_from_postgres ():
232- """
214+ '''
233215 Get the id of all projects in postgres
234- """
216+ '''
235217
236218 pg_db = auth .postgresDB ()
237- sql_query = """
219+ sql_query = '''
238220 SELECT project_id from projects;
239- """
221+ '''
240222 raw_ids = pg_db .retr_query (sql_query , None )
241223 project_ids = [i [0 ] for i in raw_ids ]
242224
0 commit comments