22import io
33
44import dateutil .parser
5+ import psycopg2
56
67from mapswipe_workers import auth
78from mapswipe_workers .definitions import logger , sentry
89from mapswipe_workers .firebase_to_postgres import update_data
910
1011
1112def transfer_results (project_id_list = None ):
12- """
13- Download results from firebase,
14- saves them to postgres and then deletes the results in firebase.
15- This is implemented as a transactional operation as described in
16- the Firebase docs to avoid missing new generated results in
17- Firebase during execution of this function.
18- """
19-
20- # Firebase transaction function
21- def transfer (current_results ):
22- if current_results is None :
23- logger .info (f"{ project_id } : No results in Firebase" )
24- return dict ()
25- else :
26- results_user_id_list = get_user_ids_from_results (current_results )
27- update_data .update_user_data (results_user_id_list )
28- results_file = results_to_file (current_results , project_id )
29- save_results_to_postgres (results_file )
30- return dict ()
13+ """Transfer results for one project after the other.
3114
32- fb_db = auth .firebaseDB ()
33-
34- if not project_id_list :
15+ Will only trigger the transfer of results for projects
16+ that are defined in the postgres database.
17+ Will not transfer results for tutorials and
18+ for projects which are not set up in postgres.
19+ """
20+ if project_id_list is None :
3521 # get project_ids from existing results if no project ids specified
22+ fb_db = auth .firebaseDB ()
3623 project_id_list = fb_db .reference ("v2/results/" ).get (shallow = True )
37- if not project_id_list :
24+ if project_id_list is None :
3825 project_id_list = []
3926 logger .info ("There are no results to transfer." )
4027
41- # get all project ids from postgres,
42- # we will only transfer results for projects we have there
28+ # Get all project ids from postgres.
29+ # We will only transfer results for projects we in postgres.
4330 postgres_project_ids = get_projects_from_postgres ()
4431
4532 for project_id in project_id_list :
@@ -55,24 +42,107 @@ def transfer(current_results):
5542 f"We will not transfer these"
5643 )
5744 continue
45+ else :
46+ logger .info (f"{ project_id } : Start transfer results" )
47+ fb_db = auth .firebaseDB ()
48+ results_ref = fb_db .reference (f"v2/results/{ project_id } " )
49+ results = results_ref .get ()
50+ del fb_db
51+ transfer_results_for_project (project_id , results )
52+
53+ return project_id_list
54+
55+
56+ def transfer_results_for_project (project_id , results ):
57+ """Transfer the results for a specific project.
58+
59+ Save results into an in-memory file.
60+ Copy the results to postgres.
61+ Delete results in firebase.
5862
59- logger .info (f"{ project_id } : Start transfering results" )
63+ We are NOT using a Firebase transaction functions here anymore.
64+ This has caused problems, in situations where a lot of mappers are
65+ uploading results to Firebase at the same time. Basically, this is
66+ due to the behaviour of Firebase Transaction function:
6067
61- results_ref = fb_db .reference (f"v2/results/{ project_id } " )
68+ "If another client writes to this location
69+ before the new value is successfully saved,
70+ the update function is called again with the new current value,
71+ and the write will be retried."
72+
73+ (source: https://firebase.google.com/docs/reference/admin/python/firebase_admin.db#firebase_admin.db.Reference.transaction) # noqa
74+
75+ Using Firebase transaction on the group level
76+ has turned out to be too slow when using "normal" queries,
77+ e.g. without using threading. Threading should be avoided here
78+ as well to not run into unforeseen errors.
79+
80+ For more details see issue #478.
81+ """
82+
83+ if results is None :
84+ logger .info (f"{ project_id } : No results in Firebase" )
85+ else :
86+ # First we check for new users in Firebase.
87+ # The user_id is used as a key in the postgres database for the results
88+ # and thus users need to be inserted before results get inserted.
89+ results_user_id_list = get_user_ids_from_results (results )
90+ update_data .update_user_data (results_user_id_list )
91+
92+ try :
93+ # Results are dumped into an in-memory file.
94+ # This allows us to use the COPY statement to insert many
95+ # results at relatively high speed.
96+ results_file = results_to_file (results , project_id )
6297 truncate_temp_results ()
98+ save_results_to_postgres (results_file )
99+ except psycopg2 .errors .ForeignKeyViolation as e :
100+ sentry .capture_exception (e )
101+ sentry .capture_message (
102+ "could not transfer results to postgres due to ForeignKeyViolation: "
103+ f"{ project_id } "
104+ )
105+ logger .exception (e )
106+ logger .warning (
107+ "could not transfer results to postgres due to ForeignKeyViolation: "
108+ f"{ project_id } "
109+ )
110+ except Exception as e :
111+ sentry .capture_exception (e )
112+ sentry .capture_message (f"could not transfer results to postgres: { project_id } " )
113+ logger .exception (e )
114+ logger .warning (f"could not transfer results to postgres: { project_id } " )
115+ else :
116+ # It is important here that we first insert results into postgres
117+ # and then delete these results from Firebase.
118+ # In case something goes wrong during the insert, results in Firebase
119+ # will not get deleted.
120+ delete_results_from_firebase (project_id , results )
121+ logger .info (f"{ project_id } : Transferred results to postgres" )
122+
123+
124+ def delete_results_from_firebase (project_id , results ):
125+ """Delete results from Firebase using update function.
126+
127+ We use the update method of firebase instead of delete.
128+ Update allows to delete items at multiple locations at the same time
129+ and is much faster.
130+ """
63131
64- try :
65- results_ref .transaction (transfer )
66- logger .info (f"{ project_id } : Transfered results to postgres" )
67- except fb_db .TransactionAbortedError :
68- logger .exception (
69- f"{ project_id } : Firebase transaction for "
70- f"transfering results failed to commit"
71- )
72- sentry .capture_exception ()
132+ fb_db = auth .firebaseDB ()
73133
74- del fb_db
75- return project_id_list
134+ # we will use a multi-location update to delete the entries
135+ # therefore we create a dict with the items we want to delete
136+ data = {}
137+ for group_id , users in results .items ():
138+ for user_id , result in users .items ():
139+ key = f"{ group_id } /{ user_id } "
140+ data [key ] = None
141+
142+ results_ref = fb_db .reference (f"v2/results/{ project_id } /" )
143+ results_ref .update (data )
144+
145+ logger .info (f"removed results for project { project_id } " )
76146
77147
78148def results_to_file (results , projectId ):
@@ -200,7 +270,6 @@ def save_results_to_postgres(results_file):
200270 Saves results to a temporary table in postgres
201271 using the COPY Statement of Postgres
202272 for a more efficient import into the database.
203-
204273 Parameters
205274 ----------
206275 results_file: io.StringIO
0 commit comments