Skip to content

Commit 0ec6d30

Browse files
committed
For large updates to store_apps table use regular UPDATE and avoid ON CONFLICT
1 parent 5c22840 commit 0ec6d30

File tree

5 files changed

+183
-61
lines changed

5 files changed

+183
-61
lines changed

adscrawler/app_stores/google.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,13 +106,13 @@ def clean_google_play_app_df(apps_df: pd.DataFrame) -> pd.DataFrame:
106106
.fillna("0")
107107
.astype(int),
108108
category=apps_df["category"].str.lower(),
109+
release_date=pd.to_datetime(
110+
apps_df["release_date"], format="%b %d, %Y"
111+
).dt.date,
109112
store_last_updated=pd.to_datetime(
110113
apps_df["store_last_updated"],
111114
unit="s",
112115
).fillna(apps_df["release_date"]),
113-
release_date=pd.to_datetime(
114-
apps_df["release_date"], format="%b %d, %Y"
115-
).dt.date,
116116
)
117117
if "developer_name" in apps_df.columns:
118118
apps_df.loc[apps_df["developer_name"].notna(), "developer_name"] = apps_df.loc[

adscrawler/app_stores/process_from_s3.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ def app_details_to_s3(
6363
df: pd.DataFrame,
6464
store: int,
6565
) -> None:
66-
logger.info(f"S3 upload app details {store=} start")
66+
logger.info(f"S3 upload app_details {store=}, rows={df.shape[0]} start")
6767
if store is None:
6868
raise ValueError("store is required")
6969
s3_client = get_s3_client()

adscrawler/app_stores/scrape_stores.py

Lines changed: 42 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
from adscrawler.dbcon.queries import (
4444
get_crawl_scenario_countries,
4545
get_store_app_columns,
46+
prepare_for_psycopg,
4647
query_categories,
4748
query_collections,
4849
query_countries,
@@ -53,6 +54,7 @@
5354
query_store_id_map,
5455
query_store_id_map_cached,
5556
query_store_ids,
57+
update_from_df,
5658
upsert_df,
5759
)
5860
from adscrawler.packages.storage import get_s3_client
@@ -113,7 +115,7 @@ def update_app_details(
113115
workers,
114116
process_icon,
115117
limit,
116-
country_crawl_priority,
118+
country_priority_group,
117119
):
118120
"""Process apps with dynamic work queue - simple and efficient."""
119121
log_info = f"Update app details: {store=}"
@@ -122,12 +124,12 @@ def update_app_details(
122124
store=store,
123125
database_connection=database_connection,
124126
limit=limit,
125-
country_crawl_priority=country_crawl_priority,
127+
country_priority_group=country_priority_group,
126128
)
127129
df = df.sort_values("country_code").reset_index(drop=True)
128130
logger.info(f"{log_info} start {len(df)} apps")
129131

130-
max_chunk_size = 10000
132+
max_chunk_size = 5000
131133
chunks = []
132134
# Try keeping countries together for larger end S3 files
133135
for _country, country_df in df.groupby("country_code"):
@@ -696,6 +698,7 @@ def process_live_app_details(
696698
df_chunk: pd.DataFrame,
697699
) -> None:
698700
for crawl_result, apps_df in results_df.groupby("crawl_result"):
701+
logger.info(f"{store=} {crawl_result=} processing {len(apps_df)} apps for db")
699702
if crawl_result != 1:
700703
apps_df = apps_df[["store_id", "store", "crawled_at", "crawl_result"]]
701704
else:
@@ -725,100 +728,85 @@ def process_live_app_details(
725728
)
726729
except Exception:
727730
logger.exception("failed to process app icon")
728-
apps_df = apps_df.convert_dtypes(dtype_backend="pyarrow")
729-
apps_df = apps_df.replace({pd.NA: None})
731+
# I think only coming from S3?
732+
# apps_df = apps_df.convert_dtypes(dtype_backend="pyarrow")
733+
# apps_df = apps_df.replace({pd.NA: None})
730734
apps_details_to_db(
731735
apps_df=apps_df,
732736
database_connection=database_connection,
737+
crawl_result=crawl_result,
733738
)
734739

735740

736741
def apps_details_to_db(
737742
apps_df: pd.DataFrame,
738743
database_connection: PostgresCon,
744+
crawl_result: int,
739745
) -> None:
740746
key_columns = ["store", "store_id"]
741747
if (apps_df["crawl_result"] == 1).all() and apps_df["developer_id"].notna().all():
742748
apps_df = save_developer_info(apps_df, database_connection)
743749
insert_columns = [
744750
x for x in get_store_app_columns(database_connection) if x in apps_df.columns
745751
]
746-
# Update columns we always want the latest of
747-
# Eg name, developer_id
748-
store_apps_df = upsert_df(
752+
apps_df = prepare_for_psycopg(apps_df)
753+
return_rows = crawl_result == 1
754+
logger.info(f"{crawl_result=} update store_apps table for {len(apps_df)} apps")
755+
store_apps_df = update_from_df(
749756
table_name="store_apps",
750757
df=apps_df,
751-
insert_columns=insert_columns,
758+
update_columns=insert_columns,
752759
key_columns=key_columns,
753760
database_connection=database_connection,
754-
return_rows=True,
761+
return_rows=return_rows,
762+
)
763+
if store_apps_df is None or store_apps_df.empty or crawl_result != 1:
764+
return
765+
store_apps_df = store_apps_df.rename(columns={"id": "store_app"})
766+
apps_df = pd.merge(
767+
apps_df,
768+
store_apps_df[["store_id", "store_app"]],
769+
how="left",
770+
validate="1:1",
771+
)
772+
upsert_store_apps_descriptions(apps_df, database_connection)
773+
save_app_domains(
774+
apps_df=apps_df,
775+
database_connection=database_connection,
755776
)
756-
if (
757-
store_apps_df is not None
758-
and not store_apps_df[store_apps_df["crawl_result"] == 1].empty
759-
):
760-
store_apps_descriptions = store_apps_df[
761-
store_apps_df["crawl_result"] == 1
762-
].copy()
763-
store_apps_descriptions = pd.merge(
764-
store_apps_descriptions,
765-
apps_df[
766-
[
767-
"store_id",
768-
"description",
769-
"description_short",
770-
"queried_language",
771-
"store_language_code",
772-
]
773-
],
774-
on="store_id",
775-
)
776-
upsert_store_apps_descriptions(store_apps_descriptions, database_connection)
777-
if store_apps_df is not None and not store_apps_df.empty:
778-
store_apps_df = store_apps_df.rename(columns={"id": "store_app"})
779-
apps_df = pd.merge(
780-
apps_df,
781-
store_apps_df[["store_id", "store_app"]],
782-
how="left",
783-
validate="1:1",
784-
)
785-
save_app_domains(
786-
apps_df=apps_df,
787-
database_connection=database_connection,
788-
)
789777
return
790778

791779

792780
def upsert_store_apps_descriptions(
793-
store_apps_descriptions: pd.DataFrame,
781+
apps_df: pd.DataFrame,
794782
database_connection: PostgresCon,
795783
) -> None:
796784
table_name = "store_apps_descriptions"
797785
languages_map = query_languages(database_connection)
798-
store_apps_descriptions = pd.merge(
799-
store_apps_descriptions,
786+
apps_df = pd.merge(
787+
apps_df,
800788
languages_map[["id", "language_slug"]],
801789
how="left",
802790
left_on="store_language_code",
803791
right_on="language_slug",
804792
validate="m:1",
805793
).rename(columns={"id": "language_id"})
806-
if store_apps_descriptions["language_id"].isna().any():
807-
null_ids = store_apps_descriptions["language_id"].isna()
808-
null_langs = store_apps_descriptions[
794+
if apps_df["language_id"].isna().any():
795+
null_ids = apps_df["language_id"].isna()
796+
null_langs = apps_df[null_ids][
809797
["store_id", "store_language_code"]
810798
].drop_duplicates()
811799
logger.error(f"App descriptions dropping unknown language codes: {null_langs}")
812-
store_apps_descriptions = store_apps_descriptions[~null_ids]
813-
if store_apps_descriptions.empty:
800+
apps_df = apps_df[~null_ids]
801+
if apps_df.empty:
814802
logger.debug("Dropped all descriptions, no language id found")
815803
return
816-
if "description_short" not in store_apps_descriptions.columns:
817-
store_apps_descriptions["description_short"] = ""
804+
if "description_short" not in apps_df.columns:
805+
apps_df["description_short"] = ""
818806
key_columns = ["store_app", "language_id", "description", "description_short"]
819807
upsert_df(
820808
table_name=table_name,
821-
df=store_apps_descriptions,
809+
df=apps_df,
822810
insert_columns=key_columns,
823811
key_columns=key_columns,
824812
md5_key_columns=["description", "description_short"],

adscrawler/dbcon/queries.py

Lines changed: 136 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,140 @@ def insert_df(
118118
return return_df
119119

120120

121+
def prepare_for_psycopg(df: pd.DataFrame) -> pd.DataFrame:
122+
df = df.copy()
123+
for col in df.select_dtypes(include=["datetimetz", "datetime64[ns]"]):
124+
# Convert to object dtype first so it can hold None
125+
df[col] = (
126+
df[col]
127+
.apply(lambda x: x.to_pydatetime() if pd.notna(x) else None)
128+
.astype("object")
129+
)
130+
# Replace NaN (for floats, strings, etc.)
131+
df = df.astype(object).where(pd.notna(df), None)
132+
return df
133+
134+
135+
def update_from_df(
136+
df: pd.DataFrame,
137+
table_name: str,
138+
database_connection: Connection,
139+
key_columns: list[str],
140+
update_columns: list[str],
141+
return_rows: bool = False,
142+
schema: str | None = None,
143+
md5_key_columns: list[str] | None = None,
144+
log: bool = False,
145+
) -> pd.DataFrame | None:
146+
"""Perform an UPDATE on a PostgreSQL table from a DataFrame.
147+
Parameters
148+
----------
149+
df : pandas.DataFrame
150+
The DataFrame containing update data.
151+
table_name : str
152+
The name of the target table.
153+
database_connection : Connection
154+
The database connection object.
155+
key_columns : list of str
156+
Column name(s) on which to match for the UPDATE.
157+
update_columns : list of str
158+
Columns to update (excluding key columns).
159+
return_rows : bool, optional
160+
Whether to return the rows that were updated.
161+
schema : str, optional
162+
The name of the schema containing the target table.
163+
md5_key_columns: list of str, optional
164+
Key columns that use MD5 hashing in their index.
165+
log : bool, optional
166+
Print generated SQL statement for debugging.
167+
Returns
168+
-------
169+
pd.DataFrame or None
170+
DataFrame of updated rows if return_rows=True, else None.
171+
"""
172+
raw_conn = database_connection.engine.raw_connection()
173+
# Handle special date columns
174+
if "crawled_date" in df.columns and df["crawled_date"].isna().all():
175+
df["crawled_date"] = pd.to_datetime(df["crawled_date"]).dt.date
176+
df["crawled_date"] = None
177+
if "release_date" in df.columns and df["release_date"].isna().all():
178+
df["release_date"] = None
179+
# Build table identifier
180+
table_identifier = Identifier(table_name)
181+
if schema:
182+
table_identifier = Composed([Identifier(schema), SQL("."), table_identifier])
183+
# Build UPDATE SET clause for update_columns only
184+
update_set = SQL(", ").join(
185+
SQL("{0} = %s").format(Identifier(col)) for col in update_columns
186+
)
187+
# Build WHERE conditions for key_columns
188+
if md5_key_columns:
189+
where_conditions = SQL(" AND ").join(
190+
(
191+
SQL("md5({col}) = %s").format(col=Identifier(col))
192+
if col in md5_key_columns
193+
else SQL("{col} = %s").format(col=Identifier(col))
194+
)
195+
for col in key_columns
196+
)
197+
else:
198+
where_conditions = SQL(" AND ").join(
199+
SQL("{col} = %s").format(col=Identifier(col)) for col in key_columns
200+
)
201+
if return_rows:
202+
update_query = SQL(
203+
"""
204+
UPDATE {table}
205+
SET {update_set}
206+
WHERE {where_conditions}
207+
RETURNING *
208+
"""
209+
).format(
210+
table=table_identifier,
211+
update_set=update_set,
212+
where_conditions=where_conditions,
213+
)
214+
else:
215+
update_query = SQL(
216+
"""
217+
UPDATE {table}
218+
SET {update_set}
219+
WHERE {where_conditions}
220+
"""
221+
).format(
222+
table=table_identifier,
223+
update_set=update_set,
224+
where_conditions=where_conditions,
225+
)
226+
if log:
227+
logger.info(f"Update query: {update_query.as_string(raw_conn)}")
228+
all_columns = update_columns + key_columns
229+
with raw_conn.cursor() as cur:
230+
# Prepare data
231+
data = [
232+
tuple(row) for row in df[all_columns].itertuples(index=False, name=None)
233+
]
234+
if log:
235+
logger.info(f"Update data sample: {data[:5] if len(data) > 5 else data}")
236+
# Execute updates
237+
if return_rows:
238+
all_results = []
239+
for row in data:
240+
cur.execute(update_query, row)
241+
result = cur.fetchall()
242+
all_results.extend(result)
243+
if all_results:
244+
column_names = [desc[0] for desc in cur.description]
245+
return_df = pd.DataFrame(all_results, columns=column_names)
246+
else:
247+
return_df = pd.DataFrame()
248+
else:
249+
cur.executemany(update_query, data)
250+
return_df = None
251+
raw_conn.commit()
252+
return return_df
253+
254+
121255
def upsert_df(
122256
df: pd.DataFrame,
123257
table_name: str,
@@ -717,7 +851,7 @@ def get_crawl_scenario_countries(
717851
def query_store_apps_to_update(
718852
database_connection: PostgresCon,
719853
store: int,
720-
country_crawl_priority: int,
854+
country_priority_group: int,
721855
log_query=False,
722856
limit: int = 1000,
723857
) -> pd.DataFrame:
@@ -744,7 +878,7 @@ def query_store_apps_to_update(
744878
)
745879
params = {
746880
"store": store,
747-
"country_crawl_priority": country_crawl_priority,
881+
"country_crawl_priority": country_priority_group,
748882
"short_update_ts": short_update_ts,
749883
"short_update_installs": short_update_installs,
750884
"short_update_ratings": short_update_ratings,

main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,7 @@ def update_app_details(self, store: int, country_priority_group: int) -> None:
376376
use_ssh_tunnel=self.args.use_ssh_tunnel,
377377
workers=int(self.args.workers),
378378
process_icon=self.args.process_icons,
379-
country_crawl_priority=self.args.country_priority_group,
379+
country_priority_group=self.args.country_priority_group,
380380
limit=self.args.limit_query_rows,
381381
)
382382

0 commit comments

Comments
 (0)