Skip to content

Commit 6da5452

Browse files
committed
remove unnecessary upserts in favor of update to avoid deadlocks
1 parent 4f490cd commit 6da5452

File tree

2 files changed

+53
-31
lines changed

2 files changed

+53
-31
lines changed

adscrawler/app_stores/scrape_stores.py

Lines changed: 41 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
get_crawl_scenario_countries,
4545
get_store_app_columns,
4646
prepare_for_psycopg,
47+
query_all_developers,
4748
query_all_domains,
4849
query_categories,
4950
query_collections,
@@ -514,6 +515,32 @@ def crawl_developers_for_new_store_ids(
514515
logger.exception(f"{row_info=} failed!")
515516

516517

518+
def check_and_insert_developers(
519+
developers_df: pd.DataFrame,
520+
apps_df: pd.DataFrame,
521+
database_connection: PostgresCon,
522+
) -> pd.DataFrame:
523+
"""Adds missing developers to the database and returns updated developer DataFrame."""
524+
missing_devs = apps_df[
525+
(~apps_df["developer_id"].isin(developers_df["developer_id"]))
526+
& (apps_df["developer_id"].notna())
527+
]
528+
if not missing_devs.empty:
529+
new_devs = missing_devs[
530+
["store", "developer_id", "developer_name"]
531+
].drop_duplicates()
532+
new_devs = upsert_df(
533+
table_name="developers",
534+
df=new_devs.rename(columns={"developer_name": "name"}),
535+
insert_columns=["store", "developer_id", "name"],
536+
key_columns=["store", "developer_id"],
537+
database_connection=database_connection,
538+
return_rows=True,
539+
)
540+
developers_df = pd.concat([new_devs, developers_df])
541+
return developers_df
542+
543+
517544
def check_and_insert_domains(
518545
domains_df: pd.DataFrame,
519546
app_urls: pd.DataFrame,
@@ -671,39 +698,22 @@ def save_developer_info(
671698
apps_df: pd.DataFrame,
672699
database_connection: PostgresCon,
673700
) -> pd.DataFrame:
674-
assert apps_df["developer_id"].to_numpy()[0], (
675-
f"{apps_df['store_id']} Missing Developer ID"
701+
all_developers_df = query_all_developers(database_connection=database_connection)
702+
all_developers_df = check_and_insert_developers(
703+
developers_df=all_developers_df,
704+
apps_df=apps_df,
705+
database_connection=database_connection,
676706
)
677-
df = (
678-
apps_df[["store", "developer_id", "developer_name"]]
679-
.rename(columns={"developer_name": "name"})
680-
.drop_duplicates()
707+
apps_df = pd.merge(
708+
apps_df,
709+
all_developers_df.rename(columns={"id": "developer"})[
710+
["store", "developer_id", "developer"]
711+
],
712+
how="left",
713+
left_on=["store", "developer_id"],
714+
right_on=["store", "developer_id"],
715+
validate="m:1",
681716
)
682-
table_name = "developers"
683-
insert_columns = ["store", "developer_id", "name"]
684-
key_columns = ["store", "developer_id"]
685-
686-
try:
687-
dev_df = upsert_df(
688-
table_name=table_name,
689-
df=df,
690-
insert_columns=insert_columns,
691-
key_columns=key_columns,
692-
database_connection=database_connection,
693-
return_rows=True,
694-
)
695-
apps_df = pd.merge(
696-
apps_df,
697-
dev_df.rename(columns={"id": "developer"})[
698-
["store", "developer_id", "developer"]
699-
],
700-
how="left",
701-
left_on=["store", "developer_id"],
702-
right_on=["store", "developer_id"],
703-
validate="m:1",
704-
)
705-
except Exception as error:
706-
logger.error(f"Developer insert failed with error {error}")
707717
return apps_df
708718

709719

adscrawler/dbcon/queries.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,18 @@ def upsert_df(
376376
return return_df
377377

378378

379+
@lru_cache(maxsize=1)
380+
def query_all_developers(database_connection: PostgresCon) -> pd.DataFrame:
381+
"""Query all developers from the database."""
382+
sel_query = """SELECT
383+
id, store, name, developer_id
384+
FROM developers
385+
;
386+
"""
387+
df = pd.read_sql(sel_query, database_connection.engine)
388+
return df
389+
390+
379391
def query_developers(
380392
database_connection: PostgresCon,
381393
store: int,

0 commit comments

Comments
 (0)