Skip to content

Commit fa20669

Browse files
authored
fix(sports): move bulk write out of action accumulation loop (#1344)
Previously was doing a bulk request for each loop tick for all currently accumulated actions, resulting in n*(n+1)/2 total requests per job run.
1 parent 281c810 commit fa20669

File tree

2 files changed

+54
-11
lines changed
  • merino/providers/suggest/sports/backends/sportsdata/common
  • tests/unit/providers/suggest/sports/backends/common

2 files changed

+54
-11
lines changed

merino/providers/suggest/sports/backends/sportsdata/common/elastic.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -787,17 +787,17 @@ async def store_events(
787787
actions.append(action)
788788
if event.status.is_in_progress():
789789
logger.info(f"{LOGGING_TAG} ## Live Game: {event.terms}")
790-
791-
try:
792-
start = datetime.now()
793-
await helpers.async_bulk(client=self.client, actions=actions, stats_only=False)
794-
logger.info(
795-
f"{LOGGING_TAG}⏱ sports.time.load.events [{sport.name}] in [{(datetime.now() - start).microseconds}μs]"
796-
)
797-
except Exception as ex:
798-
raise SportsDataError(
799-
f"Could not load data into elasticSearch for {sport.name}:{index} [{ex}]"
800-
) from ex
790+
# Bulk-write collected actions
791+
try:
792+
start = datetime.now()
793+
await helpers.async_bulk(client=self.client, actions=actions, stats_only=False)
794+
logger.info(
795+
f"{LOGGING_TAG}⏱ sports.time.load.events [{sport.name}] in [{(datetime.now() - start).microseconds}μs]"
796+
)
797+
except Exception as ex:
798+
raise SportsDataError(
799+
f"Could not load data into elasticSearch for {sport.name}:{index} [{ex}]"
800+
) from ex
801801
start = datetime.now()
802802
try:
803803
await self.store_meta("last_update", datetime.now(tz=timezone.utc).isoformat())

tests/unit/providers/suggest/sports/backends/common/test_elastic.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,49 @@ async def test_store_event_fail_and_metrics_captured(
164164
assert len(list(filter(lambda x: "sports.time.load.refresh_indexes" in x, calls))) == 1
165165

166166

167+
@pytest.mark.asyncio
168+
async def test_store_events_bulk_called_once_for_multiple_events(
169+
sport_data_store: SportsDataStore,
170+
es_client: AsyncMock,
171+
mocker: MockerFixture,
172+
) -> None:
173+
"""Test that store_events calls async_bulk once with all events, not once per event.
174+
175+
Regression test for a bug where async_bulk was called inside the action collection
176+
loop, resulting in n*(n+1)/2 writes per job rather than n.
177+
"""
178+
action_count = 3
179+
mock_async_bulk = mocker.patch(
180+
f"{SportsDataStore.__module__}.helpers.async_bulk",
181+
new_callable=AsyncMock,
182+
return_value=(action_count, []),
183+
)
184+
nfl = NFL(settings=settings.providers.sports)
185+
nfl.events = {
186+
i: Event(
187+
sport="football",
188+
id=i,
189+
terms="test",
190+
date=datetime.datetime.now(),
191+
original_date="2025-09-22",
192+
home_team={"key": f"home{i}"},
193+
home_score=0,
194+
away_team={"key": f"away{i}"},
195+
away_score=0,
196+
status=GameStatus.Scheduled,
197+
expiry=datetime.datetime.now(),
198+
updated=datetime.datetime.now(),
199+
)
200+
for i in range(action_count)
201+
}
202+
203+
await sport_data_store.store_events(sport=nfl, language_code="en")
204+
205+
assert mock_async_bulk.call_count == 1
206+
actions = mock_async_bulk.call_args.kwargs["actions"]
207+
assert len(actions) == action_count
208+
209+
167210
@freezegun.freeze_time("2025-09-22T12:00:00Z")
168211
@pytest.mark.asyncio
169212
async def test_sports_search_event_hits(

0 commit comments

Comments
 (0)