Skip to content

Commit 6b82b3d

Browse files
authored
[DISCO-3977] Fetch and store engagement data for ADM and Wikipedia (#1335)
* [DISCO-3977] Fetch and store engagement data for ADM and Wikipedia * move blob name to config file * consolidate adm and wiki filemanagers into one
1 parent 523701c commit 6b82b3d

File tree

15 files changed

+538
-17
lines changed

15 files changed

+538
-17
lines changed

merino/configs/default.toml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,10 @@ gcs_bq_project = ""
303303

304304
gcs_storage_project = ""
305305

306+
# MERINO_ENGAGEMENT__BLOB_NAME
307+
# Path within the GCS bucket to the engagement data blob.
308+
blob_name = "suggest-merino-exports/engagement/latest.json"
309+
306310
[default.icon]
307311
# Subdirectory within bucket for favicons
308312
favicons_root = "favicons"
@@ -574,6 +578,10 @@ cron_interval_sec = 60
574578
# Time between re-syncs of Remote Settings data, in seconds. Defaults to 1 hour.
575579
resync_interval_sec = 3600
576580

581+
# MERINO_PROVIDERS__ADM__ENGAGEMENT_RESYNC_INTERVAL_SEC
582+
# Time between re-syncs of engagement data from GCS, in seconds. Defaults to 1 hour.
583+
engagement_resync_interval_sec = 3600
584+
577585
# MERINO_PROVIDERS__ADM__SCORE
578586
# Ranking score for this provider as a floating point number. Defaults to 0.31.
579587
# This is intentionally set to be greater than the score in Remote Settings to fix
@@ -760,6 +768,15 @@ query_timeout_sec = 5.0
760768
# The ranking score for this provider as a floating point number. Defaults to 0.23.
761769
score = 0.23
762770

771+
# MERINO_PROVIDERS__WIKIPEDIA__CRON_INTERVAL_SEC
772+
# The interval of the engagement data cron job tick (in seconds).
773+
# Should be more frequent than `engagement_resync_interval_sec` to allow retries.
774+
cron_interval_sec = 60
775+
776+
# MERINO_PROVIDERS__WIKIPEDIA__ENGAGEMENT_RESYNC_INTERVAL_SEC
777+
# Time between re-syncs of Wikipedia engagement data from GCS, in seconds. Defaults to 1 hour.
778+
engagement_resync_interval_sec = 3600
779+
763780

764781
[default.providers.polygon]
765782
# MERINO_PROVIDERS__POLYGON__TYPE

merino/providers/suggest/adm/backends/protocol.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,13 @@ class FormFactor(Enum):
1717
PHONE = 1
1818

1919

20+
class EngagementData(BaseModel):
21+
"""Model for engagement data file content"""
22+
23+
amp: dict[str, dict[str, str | int]]
24+
amp_aggregated: dict[str, int]
25+
26+
2027
class SuggestionContent(BaseModel):
2128
"""Class that holds the result from a fetch operation."""
2229

merino/providers/suggest/adm/provider.py

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,15 @@
1212

1313
from merino.optimizers.models import EngagementMetrics, ThompsonCandidate
1414
from merino.optimizers.thompson import ThompsonSampler
15-
from merino.providers.suggest.adm.backends.protocol import FormFactor
15+
from merino.providers.suggest.adm.backends.protocol import EngagementData, FormFactor
16+
from merino.utils.gcs.engagement.filemanager import EngagementFilemanager
1617
from merino.utils import cron
1718
from merino.providers.suggest.adm.backends.protocol import AdmBackend, SuggestionContent
18-
from merino.providers.suggest.base import BaseProvider, BaseSuggestion, SuggestionRequest
19+
from merino.providers.suggest.base import (
20+
BaseProvider,
21+
BaseSuggestion,
22+
SuggestionRequest,
23+
)
1924

2025
logger = logging.getLogger(__name__)
2126

@@ -82,6 +87,11 @@ class Provider(BaseProvider):
8287
resync_interval_sec: float
8388
min_attempted_count: int
8489
thompson: ThompsonSampler | None = None
90+
engagement_data: EngagementData
91+
filemanager: EngagementFilemanager
92+
engagement_resync_interval_sec: float
93+
last_engagement_fetch_at: float
94+
engagement_cron_task: asyncio.Task
8595

8696
def __init__(
8797
self,
@@ -90,6 +100,9 @@ def __init__(
90100
name: str,
91101
resync_interval_sec: float,
92102
cron_interval_sec: float,
103+
engagement_gcs_bucket: str,
104+
engagement_blob_name: str,
105+
engagement_resync_interval_sec: float,
93106
enabled_by_default: bool = True,
94107
min_attempted_count: int = 0,
95108
thompson: ThompsonSampler | None = None,
@@ -105,6 +118,13 @@ def __init__(
105118
self._enabled_by_default = enabled_by_default
106119
self.min_attempted_count = min_attempted_count
107120
self.thompson = thompson
121+
self.engagement_data = EngagementData(amp={}, amp_aggregated={})
122+
self.engagement_resync_interval_sec = engagement_resync_interval_sec
123+
self.last_engagement_fetch_at = 0
124+
self.filemanager = EngagementFilemanager(
125+
gcs_bucket_path=engagement_gcs_bucket,
126+
blob_name=engagement_blob_name,
127+
)
108128
super().__init__(**kwargs)
109129

110130
async def initialize(self) -> None:
@@ -132,15 +152,46 @@ async def initialize(self) -> None:
132152
# reference to it.
133153
self.cron_task = asyncio.create_task(cron_job())
134154

155+
engagement_cron_job = cron.Job(
156+
name="resync_engagement_data",
157+
interval=self.cron_interval_sec,
158+
condition=self._should_fetch_engagement,
159+
task=self._fetch_engagement_data,
160+
)
161+
self.engagement_cron_task = asyncio.create_task(engagement_cron_job())
162+
135163
def _should_fetch(self) -> bool:
136164
"""Check if it should fetch data from Remote Settings."""
137165
return (time.time() - self.last_fetch_at) >= self.resync_interval_sec
138166

167+
def _should_fetch_engagement(self) -> bool:
168+
"""Check if it should fetch engagement data from GCS."""
169+
return (time.time() - self.last_engagement_fetch_at) >= self.engagement_resync_interval_sec
170+
139171
async def _fetch(self) -> None:
140172
"""Fetch suggestions, keywords, and icons from Remote Settings."""
141173
self.suggestion_content = await self.backend.fetch()
142174
self.last_fetch_at = time.time()
143175

176+
async def _fetch_engagement_data(self) -> None:
177+
"""Fetch engagement data from GCS and store it in memory.
178+
179+
If the fetch returns no data, `last_engagement_fetch_at` is not updated
180+
so the cron job retries on the next tick.
181+
"""
182+
try:
183+
data = await self.filemanager.get_file()
184+
if data is None:
185+
logger.warning("Engagement data fetch returned None, will retry on next tick")
186+
return
187+
self.engagement_data = EngagementData.model_validate(data.model_dump())
188+
self.last_engagement_fetch_at = time.time()
189+
except Exception as e:
190+
logger.warning(
191+
"Failed to fetch engagement data from GCS",
192+
extra={"error": str(e)},
193+
)
194+
144195
def hidden(self) -> bool: # noqa: D102
145196
return False
146197

merino/providers/suggest/manager.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -185,12 +185,14 @@ def _create_provider(provider_id: str, setting: Settings) -> BaseProvider:
185185
if settings.providers.adm.thompson.enabled:
186186
thompson = ThompsonSampler(
187187
config=ThompsonConfig(
188-
dummy_candidate=EngagementMetrics(
189-
engaged=settings.providers.adm.thompson.dummy_engaged_count,
190-
attempted=settings.providers.adm.thompson.dummy_attempted_count,
191-
)
192-
if settings.providers.adm.thompson.dummy_enabled
193-
else None,
188+
dummy_candidate=(
189+
EngagementMetrics(
190+
engaged=settings.providers.adm.thompson.dummy_engaged_count,
191+
attempted=settings.providers.adm.thompson.dummy_attempted_count,
192+
)
193+
if settings.providers.adm.thompson.dummy_enabled
194+
else None
195+
),
194196
)
195197
)
196198
return AdmProvider(
@@ -202,6 +204,9 @@ def _create_provider(provider_id: str, setting: Settings) -> BaseProvider:
202204
enabled_by_default=setting.enabled_by_default,
203205
min_attempted_count=settings.providers.adm.thompson.min_attempted_count,
204206
thompson=thompson,
207+
engagement_gcs_bucket=settings.engagement.gcs_storage_bucket,
208+
engagement_blob_name=settings.engagement.blob_name,
209+
engagement_resync_interval_sec=setting.engagement_resync_interval_sec,
205210
)
206211
case ProviderType.GEOLOCATION:
207212
return GeolocationProvider(
@@ -236,6 +241,10 @@ def _create_provider(provider_id: str, setting: Settings) -> BaseProvider:
236241
name=provider_id,
237242
query_timeout_sec=setting.query_timeout_sec,
238243
enabled_by_default=setting.enabled_by_default,
244+
engagement_gcs_bucket=settings.engagement.gcs_storage_bucket,
245+
engagement_blob_name=settings.engagement.blob_name,
246+
engagement_resync_interval_sec=setting.engagement_resync_interval_sec,
247+
cron_interval_sec=setting.cron_interval_sec,
239248
)
240249
case ProviderType.POLYGON:
241250
cache = (

merino/providers/suggest/wikipedia/backends/protocol.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,14 @@
22

33
from typing import Any, Protocol
44

5+
from pydantic import BaseModel
6+
7+
8+
class EngagementData(BaseModel):
9+
"""Model for Wikipedia engagement data from GCS."""
10+
11+
wiki_aggregated: dict[str, int]
12+
513

614
class WikipediaBackend(Protocol):
715
"""Protocol for a Wikipedia backend that this provider depends on.

merino/providers/suggest/wikipedia/provider.py

Lines changed: 56 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
"""The provider for the dynamic Wikipedia integration."""
22

3+
import asyncio
34
import logging
5+
import time
46
from typing import Any, Final
57

68
from pydantic import HttpUrl
@@ -13,8 +15,10 @@
1315
SuggestionRequest,
1416
Category,
1517
)
16-
from merino.providers.suggest.wikipedia.backends.protocol import WikipediaBackend
18+
from merino.providers.suggest.wikipedia.backends.protocol import EngagementData, WikipediaBackend
19+
from merino.utils.gcs.engagement.filemanager import EngagementFilemanager
1720
from merino.providers.suggest.wikipedia.backends.utils import get_language_code
21+
from merino.utils import cron
1822

1923
# The Wikipedia icon backed by Merino's image CDN.
2024
# TODO: Use a better way to fetch this icon URL instead of hardcoding it here.
@@ -48,11 +52,21 @@ class Provider(BaseProvider):
4852
backend: WikipediaBackend
4953
score: float
5054
title_block_list: set[str]
55+
engagement_data: EngagementData
56+
filemanager: EngagementFilemanager
57+
engagement_resync_interval_sec: float
58+
cron_interval_sec: float
59+
last_engagement_fetch_at: float
60+
engagement_cron_task: asyncio.Task
5161

5262
def __init__(
5363
self,
5464
backend: WikipediaBackend,
5565
title_block_list: set[str],
66+
engagement_gcs_bucket: str,
67+
engagement_blob_name: str,
68+
engagement_resync_interval_sec: float,
69+
cron_interval_sec: float,
5670
name: str = "wikipedia",
5771
enabled_by_default: bool = True,
5872
query_timeout_sec: float = settings.providers.wikipedia.query_timeout_sec,
@@ -67,11 +81,50 @@ def __init__(
6781
self._enabled_by_default = enabled_by_default
6882
self._query_timeout_sec = query_timeout_sec
6983
self.score = score
84+
self.engagement_data = EngagementData(wiki_aggregated={})
85+
self.engagement_resync_interval_sec = engagement_resync_interval_sec
86+
self.cron_interval_sec = cron_interval_sec
87+
self.last_engagement_fetch_at = 0
88+
self.filemanager = EngagementFilemanager(
89+
gcs_bucket_path=engagement_gcs_bucket,
90+
blob_name=engagement_blob_name,
91+
)
7092
super().__init__(**kwargs)
7193

7294
async def initialize(self) -> None:
73-
"""Initialize Wikipedia provider."""
74-
return
95+
"""Initialize Wikipedia provider and start engagement data cron job."""
96+
engagement_cron_job = cron.Job(
97+
name="resync_wikipedia_engagement_data",
98+
interval=self.cron_interval_sec,
99+
condition=self._should_fetch_engagement,
100+
task=self._fetch_engagement_data,
101+
)
102+
self.engagement_cron_task = asyncio.create_task(engagement_cron_job())
103+
104+
def _should_fetch_engagement(self) -> bool:
105+
"""Check if it should fetch Wikipedia engagement data from GCS."""
106+
return (time.time() - self.last_engagement_fetch_at) >= self.engagement_resync_interval_sec
107+
108+
async def _fetch_engagement_data(self) -> None:
109+
"""Fetch Wikipedia engagement data from GCS and store it in memory.
110+
111+
If the fetch returns no data, `last_engagement_fetch_at` is not updated
112+
so the cron job retries on the next tick.
113+
"""
114+
try:
115+
data = await self.filemanager.get_file()
116+
if data is None:
117+
logger.warning(
118+
"Wikipedia engagement data fetch returned None, will retry on next tick"
119+
)
120+
return
121+
self.engagement_data = EngagementData.model_validate(data.model_dump())
122+
self.last_engagement_fetch_at = time.time()
123+
except Exception as e:
124+
logger.warning(
125+
"Failed to fetch Wikipedia engagement data from GCS",
126+
extra={"error": str(e)},
127+
)
75128

76129
def hidden(self) -> bool: # noqa: D102
77130
"""Whether this provider is hidden or not."""
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# This Source Code Form is subject to the terms of the Mozilla Public
2+
# License, v. 2.0. If a copy of the MPL was not distributed with this
3+
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
4+
"""GCS engagement data utilities."""
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
# This Source Code Form is subject to the terms of the Mozilla Public
2+
# License, v. 2.0. If a copy of the MPL was not distributed with this
3+
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
4+
"""Filemanager for retrieving engagement data from GCS."""
5+
6+
import logging
7+
from json import JSONDecodeError
8+
9+
import orjson
10+
from gcloud.aio.storage import Blob, Bucket, Storage
11+
from pydantic import BaseModel
12+
13+
logger = logging.getLogger(__name__)
14+
15+
16+
class EngagementData(BaseModel):
17+
"""Model for the full engagement data file stored in GCS."""
18+
19+
amp: dict[str, dict[str, str | int]] = {}
20+
amp_aggregated: dict[str, int] = {}
21+
wiki_aggregated: dict[str, int] = {}
22+
23+
24+
class EngagementFilemanager:
25+
"""Filemanager for fetching engagement data from GCS asynchronously."""
26+
27+
gcs_bucket_path: str
28+
blob_name: str
29+
gcs_client: Storage | None
30+
bucket: Bucket | None
31+
32+
def __init__(self, gcs_bucket_path: str, blob_name: str) -> None:
33+
""":param gcs_bucket_path: GCS bucket name to fetch from.
34+
:param blob_name: Name of the blob in the GCS bucket.
35+
"""
36+
self.gcs_bucket_path = gcs_bucket_path
37+
self.blob_name = blob_name
38+
self.gcs_client = None
39+
self.bucket = None
40+
41+
def get_bucket(self) -> Bucket:
42+
"""Lazily instantiate the GCS client and return the configured bucket."""
43+
if self.bucket is not None:
44+
return self.bucket
45+
46+
if self.gcs_client is None:
47+
self.gcs_client = Storage()
48+
49+
self.bucket = Bucket(storage=self.gcs_client, name=self.gcs_bucket_path)
50+
return self.bucket
51+
52+
async def get_file(self) -> EngagementData | None:
53+
"""Fetch the engagement data file from GCS and return a validated model instance."""
54+
try:
55+
bucket = self.get_bucket()
56+
blob: Blob = await bucket.get_blob(self.blob_name)
57+
blob_data = await blob.download()
58+
return EngagementData.model_validate(orjson.loads(blob_data))
59+
except (JSONDecodeError, ValueError) as e:
60+
logger.error(f"Failed to decode engagement data JSON: {e}")
61+
except Exception as e:
62+
logger.error(f"Error fetching engagement data file {self.blob_name}: {e}")
63+
return None

0 commit comments

Comments
 (0)