1313
1414
1515class GBFSDatabasePopulateHelper (DatabasePopulateHelper ):
16- def __init__ (self , file_path ):
17- super ().__init__ (file_path )
16+ def __init__ (self , filepaths ):
17+ super ().__init__ (filepaths )
1818
1919 def filter_data (self ):
2020 """Filter out rows with Authentication Info and duplicate System IDs"""
21- self .df = self .df [pd .isna (self .df ["Authentication Info" ])]
21+ self .df = self .df [pd .isna (self .df ["Authentication Info URL " ])]
2222 self .df = self .df [~ self .df .duplicated (subset = "System ID" , keep = False )]
2323 self .logger .info (f"Data = { self .df } " )
2424
@@ -45,77 +45,80 @@ def deprecate_feeds(self, deprecated_feeds):
4545 self .logger .info (f"Deprecating feed with stable_id={ stable_id } " )
4646 gbfs_feed .status = "deprecated"
4747
48- def populate_db (self ):
48+ def populate_db (self , session , fetch_url = True ):
4949 """Populate the database with the GBFS feeds"""
5050 start_time = datetime .now ()
5151 configure_polymorphic_mappers ()
5252
5353 try :
54- with self . db . start_db_session () as session :
55- # Compare the database to the CSV file
56- df_from_db = generate_system_csv_from_db ( self .df , session )
57- added_or_updated_feeds , deprecated_feeds = compare_db_to_csv ( df_from_db , self . df , self . logger )
58-
59- self . deprecate_feeds ( deprecated_feeds )
60- if added_or_updated_feeds is None :
61- added_or_updated_feeds = self . df
62- for index , row in added_or_updated_feeds . iterrows ():
63- self .logger . info ( f"Processing row { index + 1 } of { len ( added_or_updated_feeds ) } " )
64- stable_id = self .get_stable_id ( row )
65- gbfs_feed = self . query_feed_by_stable_id ( session , stable_id , "gbfs" )
54+ # Compare the database to the CSV file
55+ df_from_db = generate_system_csv_from_db ( self . df , session )
56+ added_or_updated_feeds , deprecated_feeds = compare_db_to_csv ( df_from_db , self .df , self . logger )
57+
58+ self . deprecate_feeds ( deprecated_feeds )
59+ if added_or_updated_feeds is None :
60+ added_or_updated_feeds = self . df
61+ for index , row in added_or_updated_feeds . iterrows ():
62+ self . logger . info ( f"Processing row { index + 1 } of { len ( added_or_updated_feeds ) } " )
63+ stable_id = self .get_stable_id ( row )
64+ gbfs_feed = self .query_feed_by_stable_id ( session , stable_id , "gbfs" )
65+ if fetch_url :
6666 fetched_data = fetch_data (row ["Auto-Discovery URL" ], self .logger , ["system_information" ])
67- # If the feed already exists, update it. Otherwise, create a new feed.
68- if gbfs_feed :
69- self .logger .info (f"Updating feed { stable_id } - { row ['Name' ]} " )
70- else :
71- feed_id = generate_unique_id ()
72- self .logger .info (f"Creating new feed for { stable_id } - { row ['Name' ]} " )
73- gbfs_feed = Gbfsfeed (
74- id = feed_id ,
75- data_type = "gbfs" ,
76- stable_id = stable_id ,
77- created_at = datetime .now (pytz .utc ),
78- operational_status = "published" ,
79- )
80- gbfs_feed .externalids = [self .get_external_id (feed_id , row ["System ID" ])]
81- session .add (gbfs_feed )
82-
83- system_information_content = get_data_content (fetched_data .get ("system_information" ), self .logger )
84- gbfs_feed .license_url = get_license_url (system_information_content , self .logger )
85- gbfs_feed .feed_contact_email = (
86- system_information_content .get ("feed_contact_email" ) if system_information_content else None
67+ else :
68+ fetched_data = dict ()
69+ # If the feed already exists, update it. Otherwise, create a new feed.
70+ if gbfs_feed :
71+ self .logger .info (f"Updating feed { stable_id } - { row ['Name' ]} " )
72+ else :
73+ feed_id = generate_unique_id ()
74+ self .logger .info (f"Creating new feed for { stable_id } - { row ['Name' ]} " )
75+ gbfs_feed = Gbfsfeed (
76+ id = feed_id ,
77+ data_type = "gbfs" ,
78+ stable_id = stable_id ,
79+ created_at = datetime .now (pytz .utc ),
80+ operational_status = "published" ,
8781 )
88- gbfs_feed .system_id = str (row ["System ID" ]).strip ()
89- gbfs_feed .operator = row ["Name" ]
90- gbfs_feed .provider = row ["Name" ]
91- gbfs_feed .operator_url = row ["URL" ]
92- gbfs_feed .producer_url = row ["URL" ]
93- gbfs_feed .auto_discovery_url = row ["Auto-Discovery URL" ]
94- gbfs_feed .updated_at = datetime .now (pytz .utc )
95-
96- if not gbfs_feed .locations : # If locations are empty, create a new location (no overwrite)
97- country_code = self .get_safe_value (row , "Country Code" , "" )
98- municipality = self .get_safe_value (row , "Location" , "" )
99- location_id = self .get_location_id (country_code , None , municipality )
100- country = pycountry .countries .get (alpha_2 = country_code ) if country_code else None
101- location = session .get (Location , location_id ) or Location (
102- id = location_id ,
103- country_code = country_code ,
104- country = country .name if country else None ,
105- municipality = municipality ,
106- )
107- gbfs_feed .locations .clear ()
108- gbfs_feed .locations = [location ]
109-
110- self .logger .info (80 * "-" )
111-
112- # self.db.session.commit()
113- end_time = datetime .now ()
114- self .logger .info (f"Time taken: { end_time - start_time } seconds" )
82+ gbfs_feed .externalids = [self .get_external_id (feed_id , row ["System ID" ])]
83+ session .add (gbfs_feed )
84+
85+ system_information_content = get_data_content (fetched_data .get ("system_information" ), self .logger )
86+ gbfs_feed .license_url = get_license_url (system_information_content , self .logger )
87+ gbfs_feed .feed_contact_email = (
88+ system_information_content .get ("feed_contact_email" ) if system_information_content else None
89+ )
90+ gbfs_feed .system_id = str (row ["System ID" ]).strip ()
91+ gbfs_feed .operator = row ["Name" ]
92+ gbfs_feed .provider = row ["Name" ]
93+ gbfs_feed .operator_url = row ["URL" ]
94+ gbfs_feed .producer_url = row ["URL" ]
95+ gbfs_feed .auto_discovery_url = row ["Auto-Discovery URL" ]
96+ gbfs_feed .updated_at = datetime .now (pytz .utc )
97+
98+ if not gbfs_feed .locations : # If locations are empty, create a new location (no overwrite)
99+ country_code = self .get_safe_value (row , "Country Code" , "" )
100+ municipality = self .get_safe_value (row , "Location" , "" )
101+ location_id = self .get_location_id (country_code , None , municipality )
102+ country = pycountry .countries .get (alpha_2 = country_code ) if country_code else None
103+ location = session .get (Location , location_id ) or Location (
104+ id = location_id ,
105+ country_code = country_code ,
106+ country = country .name if country else None ,
107+ municipality = municipality ,
108+ )
109+ gbfs_feed .locations .clear ()
110+ gbfs_feed .locations = [location ]
111+
112+ session .flush ()
113+ self .logger .info (80 * "-" )
114+
115+ # self.db.session.commit()
116+ end_time = datetime .now ()
117+ self .logger .info (f"Time taken: { end_time - start_time } seconds" )
115118 except Exception as e :
116119 self .logger .error (f"Error populating the database: { e } " )
117120 raise e
118121
119122
120123if __name__ == "__main__" :
121- GBFSDatabasePopulateHelper (set_up_configs ()).populate_db ( )
124+ GBFSDatabasePopulateHelper (set_up_configs ()).initialize ( trigger_downstream_tasks = False )
0 commit comments