1515 get_db_connection ,
1616)
1717from adscrawler .dbcon .queries import (
18+ delete_and_insert ,
1819 query_categories ,
1920 query_collections ,
2021 query_countries ,
22+ query_languages ,
2123 query_store_id_map ,
2224 query_store_id_map_cached ,
2325 upsert_df ,
@@ -295,10 +297,14 @@ def manual_import_app_metrics_from_s3(
295297 use_ssh_tunnel = use_tunnel , config_key = "madrone"
296298 )
297299
300+ start_date = datetime .datetime .fromisoformat ("2025-10-01" ).date ()
298301 for snapshot_date in pd .date_range (start_date , end_date , freq = "D" ):
299302 snapshot_date = snapshot_date .date ()
300303 for store in [1 , 2 ]:
301- process_app_metrics_to_db (database_connection , store , snapshot_date )
304+ try :
305+ process_app_metrics_to_db (database_connection , store , snapshot_date )
306+ except :
307+ process_app_metrics_to_db (database_connection , store , snapshot_date )
302308
303309
304310def import_app_metrics_from_s3 (
@@ -322,6 +328,11 @@ def process_app_metrics_to_db(
322328 make_s3_app_country_metrics_history (store , snapshot_date = snapshot_date )
323329 logger .info (f"date={ snapshot_date } , store={ store } agg df load" )
324330 df = get_s3_agg_daily_snapshots (snapshot_date , snapshot_date , store )
331+ if df .empty :
332+ logger .warning (
333+ f"No data found for S3 agg app metrics { store = } { snapshot_date = } "
334+ )
335+ return
325336 if store == 2 :
326337 # Should be resolved from 11/1/2025
327338 problem_rows = df ["store_id" ].str .contains (".0" )
@@ -336,11 +347,6 @@ def process_app_metrics_to_db(
336347 df = df .drop_duplicates (
337348 ["snapshot_date" , "country" , "store_id" ], keep = "last"
338349 )
339- if df .empty :
340- logger .warning (
341- f"No data found for S3 agg app metrics { store = } { snapshot_date = } "
342- )
343- return
344350 logger .info (f"date={ snapshot_date } , store={ store } agg df prep" )
345351 df = prep_app_metrics_history (
346352 df = df , store = store , database_connection = database_connection
@@ -439,6 +445,7 @@ def app_details_country_history_query(
439445 # lookback_date_str: str,
440446 snapshot_date_str : str ,
441447) -> str :
448+ bucket = CONFIG ["s3" ]["bucket" ]
442449 if store == 2 :
443450 data_cols = """
444451 CAST(trackId AS VARCHAR) AS store_id,
@@ -500,7 +507,7 @@ def app_details_country_history_query(
500507 PARTITION BY store_id, country
501508 ORDER BY crawled_at DESC, { extra_sort_column }
502509 ) = 1
503- ) TO 's3://adscrawler /agg-data/app_country_metrics/store={ store } /snapshot_date={ snapshot_date_str } /'
510+ ) TO 's3://{ bucket } /agg-data/app_country_metrics/store={ store } /snapshot_date={ snapshot_date_str } /'
504511 (FORMAT PARQUET,
505512 PARTITION_BY (country),
506513 ROW_GROUP_SIZE 100000,
@@ -571,7 +578,7 @@ def process_ranks_from_s3(
571578 logger .info (
572579 f"DuckDB { store = } period_start={ period_date_str } { country = } files={ len (country_parquet_paths )} "
573580 )
574- wdf = process_parquets_and_insert (
581+ wdf = query_store_collection_ranks (
575582 country_parquet_paths = country_parquet_paths ,
576583 period = period ,
577584 s3_config_key = s3_config_key ,
@@ -632,7 +639,7 @@ def process_ranks_from_s3(
632639 )
633640
634641
635- def process_parquets_and_insert (
642+ def query_store_collection_ranks (
636643 country_parquet_paths : list [str ],
637644 period : str ,
638645 s3_config_key : str ,
@@ -687,3 +694,95 @@ def manual_download_rankings(
687694 s3_client .download_file (bucket , s3_key , str (local_path ))
688695 df = pd .read_parquet (local_path )
689696 return df
697+
698+
699+ def import_keywords_from_s3 (
700+ start_date : datetime .date , end_date : datetime .date , database_connection : PostgresCon
701+ ) -> None :
702+ language = "en"
703+ country_map = query_countries (database_connection )
704+ languages_map = query_languages (database_connection )
705+ language_dict = languages_map .set_index ("language_slug" )["id" ].to_dict ()
706+ _language_key = language_dict [language ]
707+ s3_config_key = "s3"
708+ bucket = CONFIG [s3_config_key ]["bucket" ]
709+ for snapshot_date in pd .date_range (start_date , end_date , freq = "D" ):
710+ snapshot_date = snapshot_date .date ()
711+ for store in [1 , 2 ]:
712+ s3_loc = "raw-data/keywords"
713+ s3_key = f"{ s3_loc } /store={ store } /crawled_date={ snapshot_date } /"
714+ parquet_paths = get_parquet_paths_by_prefix (bucket , s3_key )
715+ if len (parquet_paths ) == 0 :
716+ logger .warning (f"No parquet paths found for { s3_key } " )
717+ continue
718+ df = query_keywords_from_s3 (parquet_paths , s3_config_key )
719+ store_id_map = query_store_id_map_cached (database_connection , store )
720+ df ["store_app" ] = df ["store_id" ].map (
721+ store_id_map .set_index ("store_id" )["id" ].to_dict ()
722+ )
723+ df ["country" ] = df ["country" ].map (
724+ country_map .set_index ("alpha2" )["id" ].to_dict ()
725+ )
726+ if df ["store_app" ].isna ().any ():
727+ check_and_insert_new_apps (
728+ database_connection = database_connection ,
729+ dicts = df .to_dict (orient = "records" ),
730+ crawl_source = "keywords" ,
731+ store = store ,
732+ )
733+ store_id_map = query_store_id_map_cached (database_connection , store )
734+ df ["store_app" ] = df ["store_id" ].map (
735+ store_id_map .set_index ("store_id" )["id" ].to_dict ()
736+ )
737+ delete_and_insert (
738+ df = df ,
739+ table_name = "app_keyword_ranks_daily" ,
740+ schema = "frontend" ,
741+ database_connection = database_connection ,
742+ delete_by_keys = ["crawled_date" ],
743+ insert_columns = [
744+ "country" ,
745+ "keyword_id" ,
746+ "crawled_date" ,
747+ "store_app" ,
748+ "app_rank" ,
749+ ],
750+ delete_keys_have_duplicates = True ,
751+ )
752+
753+
754+ def query_keywords_from_s3 (
755+ parquet_paths : list [str ],
756+ s3_config_key : str ,
757+ ) -> pd .DataFrame :
758+ """Query keywords from S3 parquet files."""
759+ period_query = f"""WITH all_data AS (
760+ SELECT * FROM read_parquet({ parquet_paths } )
761+ ),
762+ latest_per_keyword AS (
763+ SELECT
764+ store,
765+ country,
766+ keyword_id,
767+ rank,
768+ MAX(crawled_at) AS latest_crawled_at
769+ FROM all_data
770+ GROUP BY store, country, keyword_id, rank
771+ )
772+ SELECT
773+ ar.crawled_date,
774+ ar.country,
775+ ar.store,
776+ ar.rank AS app_rank,
777+ ar.keyword_id,
778+ ar.store_id
779+ FROM all_data ar
780+ JOIN latest_per_keyword lp
781+ ON ar.keyword_id = lp.keyword_id
782+ AND ar.store = lp.store
783+ AND ar.country = lp.country
784+ AND ar.rank = lp.rank
785+ AND ar.crawled_at = lp.latest_crawled_at;
786+ """
787+ duckdb_con = get_duckdb_connection (s3_config_key )
788+ return duckdb_con .execute (period_query ).df ()
0 commit comments