Skip to content

Commit 4f490cd

Browse files
committed
Remove upsert for domains since most are not new
1 parent c8fd6af commit 4f490cd

File tree

4 files changed

+78
-36
lines changed

4 files changed

+78
-36
lines changed

adscrawler/app_stores/process_from_s3.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -600,7 +600,7 @@ def process_parquets_and_insert(
600600
country_parquet_paths: list[str],
601601
period: str,
602602
s3_config_key: str,
603-
):
603+
) -> pd.DataFrame:
604604
"""Process parquet files for a specific country and insert into the database."""
605605
period_query = f"""WITH all_data AS (
606606
SELECT * FROM read_parquet({country_parquet_paths})

adscrawler/app_stores/scrape_stores.py

Lines changed: 61 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
get_crawl_scenario_countries,
4545
get_store_app_columns,
4646
prepare_for_psycopg,
47+
query_all_domains,
4748
query_categories,
4849
query_collections,
4950
query_countries,
@@ -68,7 +69,7 @@ def process_chunk(
6869
use_ssh_tunnel: bool,
6970
process_icon: bool,
7071
total_rows: int,
71-
):
72+
) -> None:
7273
chunk_info = f"{store=} chunk={df_chunk.index[0]}-{df_chunk.index[-1]}/{total_rows}"
7374
logger.info(f"{chunk_info} start")
7475
database_connection = get_db_connection(use_ssh_tunnel=use_ssh_tunnel)
@@ -110,14 +111,14 @@ def process_chunk(
110111

111112

112113
def update_app_details(
113-
database_connection,
114-
store,
115-
use_ssh_tunnel,
116-
workers,
117-
process_icon,
118-
limit,
119-
country_priority_group,
120-
):
114+
database_connection: PostgresCon,
115+
store: int,
116+
use_ssh_tunnel: bool,
117+
workers: int,
118+
process_icon: bool,
119+
limit: int,
120+
country_priority_group: int,
121+
) -> None:
121122
"""Process apps with dynamic work queue - simple and efficient."""
122123
log_info = f"{store=} update app details"
123124

@@ -177,7 +178,7 @@ def update_app_details(
177178
)
178179
except Exception as e:
179180
failed_count += 1
180-
logger.error(f"Chunk {chunk_idx} failed: {e}")
181+
logger.exception(f"Chunk {chunk_idx} failed: {e}")
181182
logger.info(f"{log_info} completed={completed_count} failed={failed_count}")
182183

183184

@@ -513,6 +514,33 @@ def crawl_developers_for_new_store_ids(
513514
logger.exception(f"{row_info=} failed!")
514515

515516

517+
def check_and_insert_domains(
518+
domains_df: pd.DataFrame,
519+
app_urls: pd.DataFrame,
520+
database_connection: PostgresCon,
521+
) -> pd.DataFrame:
522+
"""Adds missing ad domains to the database and returns updated domain DataFrame."""
523+
missing_ad_domains = app_urls[
524+
(~app_urls["url"].isin(domains_df["domain_name"])) & (app_urls["url"].notna())
525+
]
526+
if not missing_ad_domains.empty:
527+
new_ad_domains = (
528+
missing_ad_domains[["url"]]
529+
.drop_duplicates()
530+
.rename(columns={"url": "domain_name"})
531+
)
532+
new_ad_domains = upsert_df(
533+
table_name="domains",
534+
df=new_ad_domains,
535+
insert_columns=["domain_name"],
536+
key_columns=["domain_name"],
537+
database_connection=database_connection,
538+
return_rows=True,
539+
)
540+
domains_df = pd.concat([new_ad_domains, domains_df])
541+
return domains_df
542+
543+
516544
def save_app_domains(
517545
apps_df: pd.DataFrame,
518546
database_connection: PostgresCon,
@@ -522,31 +550,30 @@ def save_app_domains(
522550
return
523551
urls_na = apps_df["url"].isna()
524552
app_urls = apps_df[~urls_na][["store_app", "url"]].drop_duplicates()
553+
if app_urls.empty:
554+
logger.warning("No app urls found")
555+
return
525556
app_urls["url"] = app_urls["url"].apply(lambda x: extract_domains(x))
526-
insert_columns = ["domain_name"]
527-
domain_ids_df = upsert_df(
528-
table_name="domains",
529-
df=app_urls.rename(columns={"url": "domain_name"}),
530-
insert_columns=insert_columns,
531-
key_columns=["domain_name"],
557+
all_domains_df = query_all_domains(database_connection=database_connection)
558+
all_domains_df = check_and_insert_domains(
559+
domains_df=all_domains_df,
560+
app_urls=app_urls,
532561
database_connection=database_connection,
533-
return_rows=True,
534-
).rename(columns={"id": "pub_domain"})
535-
if domain_ids_df is not None and not domain_ids_df.empty:
536-
app_urls = pd.merge(
537-
app_urls,
538-
domain_ids_df,
539-
how="left",
540-
left_on="url",
541-
right_on="domain_name",
542-
validate="m:1",
543-
)
562+
)
563+
domain_ids_df = app_urls.merge(
564+
all_domains_df.rename(columns={"id": "pub_domain"}),
565+
left_on="url",
566+
right_on="domain_name",
567+
how="left",
568+
validate="m:1",
569+
)
570+
if not domain_ids_df.empty:
544571
insert_columns = ["store_app", "pub_domain"]
545572
key_columns = ["store_app"]
546573
upsert_df(
547574
table_name="app_urls_map",
548575
insert_columns=insert_columns,
549-
df=app_urls,
576+
df=domain_ids_df[["store_app", "pub_domain"]],
550577
key_columns=key_columns,
551578
database_connection=database_connection,
552579
)
@@ -644,9 +671,9 @@ def save_developer_info(
644671
apps_df: pd.DataFrame,
645672
database_connection: PostgresCon,
646673
) -> pd.DataFrame:
647-
assert apps_df["developer_id"].to_numpy()[
648-
0
649-
], f"{apps_df['store_id']} Missing Developer ID"
674+
assert apps_df["developer_id"].to_numpy()[0], (
675+
f"{apps_df['store_id']} Missing Developer ID"
676+
)
650677
df = (
651678
apps_df[["store", "developer_id", "developer_name"]]
652679
.rename(columns={"developer_name": "name"})
@@ -827,7 +854,9 @@ def upsert_store_apps_descriptions(
827854
# upsert_keywords(keywords_df, database_connection)
828855

829856

830-
def upsert_keywords(keywords_df: pd.DataFrame, database_connection: PostgresCon):
857+
def upsert_keywords(
858+
keywords_df: pd.DataFrame, database_connection: PostgresCon
859+
) -> None:
831860
table_name = "keywords"
832861
insert_columns = ["keyword_text"]
833862
key_columns = ["keyword_text"]

adscrawler/dbcon/queries.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1129,6 +1129,18 @@ def query_all_store_app_descriptions(
11291129
return df
11301130

11311131

1132+
@lru_cache(maxsize=1)
1133+
def query_all_domains(database_connection: PostgresCon) -> pd.DataFrame:
1134+
sel_query = """SELECT
1135+
*
1136+
FROM
1137+
domains
1138+
;
1139+
"""
1140+
df = pd.read_sql(sel_query, con=database_connection.engine)
1141+
return df
1142+
1143+
11321144
@lru_cache(maxsize=1)
11331145
def query_ad_domains(database_connection: PostgresCon) -> pd.DataFrame:
11341146
sel_query = """WITH all_ad_domains AS (

adscrawler/packages/storage.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -469,9 +469,10 @@ def download_app_by_store_id(
469469
logger.error(f"S3 only has failed apk for {store_id=}, no version_code")
470470
if version_str:
471471
df = df[df["version_code"] == version_str]
472+
final_version_str = version_str
472473
else:
473474
df = df.sort_values(by="version_code", ascending=False)
474-
version_str: str = df["version_code"].to_numpy()[0]
475+
final_version_str: str = df["version_code"].to_numpy()[0]
475476
key = df["key"].to_numpy()[0]
476477
filename = key.split("/")[-1]
477478
extension = filename.split(".")[-1]
@@ -495,7 +496,7 @@ def download_app_by_store_id(
495496
raise FileNotFoundError(f"{downloaded_file_path=} after download not found")
496497
final_path = move_downloaded_app_to_main_dir(downloaded_file_path)
497498
logger.info(f"{func_info} to local finished")
498-
return final_path, version_str
499+
return final_path, final_version_str
499500

500501

501502
def download_s3_app_by_key(
@@ -586,4 +587,4 @@ def get_duckdb_connection(s3_config_key: str) -> duckdb.DuckDBPyConnection:
586587
return duckdb_con
587588

588589

589-
S3_CLIENTS = {}
590+
S3_CLIENTS: dict = {}

0 commit comments

Comments
 (0)