Skip to content

Commit 25fe9aa

Browse files
authored
fix: batch get_actors to avoid pg timeouts during large exports (#42546)
1 parent 15fc282 commit 25fe9aa

File tree

3 files changed

+67
-27
lines changed

3 files changed

+67
-27
lines changed

posthog/hogql_queries/actor_strategies.py

Lines changed: 37 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -51,41 +51,52 @@ class PersonStrategy(ActorStrategy):
5151
origin = "persons"
5252
origin_id = "id"
5353

54+
# batching is needed to prevent timeouts when reading from Postgres
55+
BATCH_SIZE = 1000
56+
5457
# This is hand written instead of using the ORM because the ORM was blowing up the memory on exports and taking forever
55-
def get_actors(self, actor_ids, order_by: str = "") -> dict[str, dict]:
56-
# If actor queries start quietly dying again, this might need batching at some point
57-
# but currently works with 800,000 persondistinctid entries (May 24, 2024)
58+
def get_actors(self, actor_ids, sort_by_created_at_descending: bool = False) -> dict[str, dict]:
5859
person_table = Person._meta.db_table
5960
pdi_table = PersonDistinctId._meta.db_table
60-
persons_query = f"""SELECT {person_table}.id, {person_table}.uuid, {person_table}.properties, {person_table}.is_identified, {person_table}.created_at
61-
FROM {person_table}
62-
WHERE {person_table}.uuid = ANY(%(uuids)s)
63-
AND {person_table}.team_id = %(team_id)s"""
64-
if order_by:
65-
persons_query += f" ORDER BY {order_by}"
66-
6761
conn = connections[READ_DB_FOR_PERSONS]
6862

63+
actor_ids_list = list(actor_ids)
64+
all_people: list = []
65+
all_distinct_ids: list = []
66+
6967
with conn.cursor() as cursor:
70-
cursor.execute(
71-
persons_query,
72-
{"uuids": list(actor_ids), "team_id": self.team.pk},
73-
)
74-
people = cursor.fetchall()
75-
cursor.execute(
76-
f"""SELECT {pdi_table}.person_id, {pdi_table}.distinct_id
77-
FROM {pdi_table}
78-
WHERE {pdi_table}.person_id = ANY(%(people_ids)s)
79-
AND {pdi_table}.team_id = %(team_id)s""",
80-
{"people_ids": [x[0] for x in people], "team_id": self.team.pk},
81-
)
82-
distinct_ids = cursor.fetchall()
68+
for i in range(0, len(actor_ids_list), self.BATCH_SIZE):
69+
batch = actor_ids_list[i : i + self.BATCH_SIZE]
70+
persons_query = f"""SELECT {person_table}.id, {person_table}.uuid, {person_table}.properties, {person_table}.is_identified, {person_table}.created_at
71+
FROM {person_table}
72+
WHERE {person_table}.uuid = ANY(%(uuids)s)
73+
AND {person_table}.team_id = %(team_id)s"""
74+
cursor.execute(persons_query, {"uuids": batch, "team_id": self.team.pk})
75+
all_people.extend(cursor.fetchall())
76+
77+
if sort_by_created_at_descending:
78+
from datetime import datetime
79+
80+
min_dt = datetime.min
81+
all_people.sort(key=lambda p: (-(p[4] or min_dt).timestamp(), str(p[1])))
82+
83+
person_ids = [x[0] for x in all_people]
84+
for i in range(0, len(person_ids), self.BATCH_SIZE):
85+
batch = person_ids[i : i + self.BATCH_SIZE]
86+
cursor.execute(
87+
f"""SELECT {pdi_table}.person_id, {pdi_table}.distinct_id
88+
FROM {pdi_table}
89+
WHERE {pdi_table}.person_id = ANY(%(people_ids)s)
90+
AND {pdi_table}.team_id = %(team_id)s""",
91+
{"people_ids": batch, "team_id": self.team.pk},
92+
)
93+
all_distinct_ids.extend(cursor.fetchall())
8394

84-
person_id_to_raw_person_and_set: dict[int, tuple] = {person[0]: (person, []) for person in people}
95+
person_id_to_raw_person_and_set: dict[int, tuple] = {person[0]: (person, []) for person in all_people}
8596

86-
for pdid in distinct_ids:
97+
for pdid in all_distinct_ids:
8798
person_id_to_raw_person_and_set[pdid[0]][1].append(pdid[1])
88-
del distinct_ids
99+
del all_distinct_ids
89100

90101
person_uuid_to_person = {
91102
str(person[1]): {

posthog/hogql_queries/test/test_actors_query_runner.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -748,3 +748,32 @@ def test_direct_actors_query_uses_latest_person_data_after_property_deletion(sel
748748
PersonsArgMaxVersion.V2,
749749
"Direct ActorsQuery should use PersonsArgMaxVersion.V2 for latest person data",
750750
)
751+
752+
def test_person_strategy_batches_large_actor_sets(self):
753+
"""Verify that PersonStrategy.get_actors batches queries."""
754+
from posthog.hogql_queries.actor_strategies import PersonStrategy
755+
from posthog.hogql_queries.insights.paginators import HogQLHasMorePaginator
756+
757+
# Create 5 persons
758+
person_uuids = []
759+
for i in range(5):
760+
person = _create_person(
761+
properties={"email": f"batch_test_{i}@example.com"},
762+
team=self.team,
763+
distinct_ids=[f"batch-test-{UUIDT()}-{i}"],
764+
is_identified=True,
765+
)
766+
person_uuids.append(str(person.uuid))
767+
flush_persons_and_events()
768+
769+
query = ActorsQuery()
770+
paginator = HogQLHasMorePaginator(limit=100, offset=0)
771+
strategy = PersonStrategy(team=self.team, query=query, paginator=paginator)
772+
773+
# Temporarily set a small batch size to verify batching works
774+
with patch.object(PersonStrategy, "BATCH_SIZE", 2):
775+
result = strategy.get_actors(person_uuids)
776+
777+
self.assertEqual(len(result), 5)
778+
for uuid in person_uuids:
779+
self.assertIn(uuid, result)

posthog/queries/actor_base_query.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ def get_serialized_people(
293293
team: Team, people_ids: list[Any], value_per_actor_id: Optional[dict[str, float]] = None, distinct_id_limit=1000
294294
) -> list[SerializedPerson]:
295295
persons_dict = PersonStrategy(team, ActorsQuery(), HogQLHasMorePaginator()).get_actors(
296-
people_ids, order_by="created_at DESC, uuid"
296+
people_ids, sort_by_created_at_descending=True
297297
)
298298
from posthog.api.person import get_person_name_helper
299299

0 commit comments

Comments
 (0)