Skip to content

Commit 8dcb6e4

Browse files
committed
[DOP-28871] Add artificial filters by Operation/Run id
1 parent a914979 commit 8dcb6e4

File tree

4 files changed

+94
-23
lines changed

4 files changed

+94
-23
lines changed

data_rentgen/db/repositories/operation.py

Lines changed: 38 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,13 @@
1111
from data_rentgen.db.models import Operation, OperationStatus, OperationType
1212
from data_rentgen.db.repositories.base import Repository
1313
from data_rentgen.dto import OperationDTO, PaginationDTO
14-
from data_rentgen.utils.uuid import extract_timestamp_from_uuid
14+
from data_rentgen.utils.uuid import extract_timestamp_from_uuid, get_max_uuid, get_min_uuid
1515

1616
insert_statement = insert(Operation).on_conflict_do_nothing()
1717
update_statement = update(Operation)
1818

1919
get_list_by_run_ids_query = select(Operation).where(
20+
Operation.id >= bindparam("min_id"),
2021
Operation.created_at >= bindparam("since"),
2122
Operation.run_id == any_(bindparam("run_ids")),
2223
)
@@ -36,6 +37,7 @@
3637
func.count(Operation.id.distinct()).label("total_operations"),
3738
)
3839
.where(
40+
Operation.id >= bindparam("min_id"),
3941
Operation.created_at >= bindparam("since"),
4042
Operation.run_id == any_(bindparam("run_ids")),
4143
)
@@ -101,25 +103,43 @@ async def paginate(
101103
# do not use `tuple_(Operation.created_at, Operation.id).in_(...),
102104
# as this is too complex filter for Postgres to make an optimal query plan
103105
where = []
106+
107+
# created_at and id are always correlated,
108+
# and primary key starts with id, so we need to apply filter on both
109+
# to get the most optimal query plan
104110
if operation_ids:
105111
min_operation_created_at = extract_timestamp_from_uuid(min(operation_ids))
106112
max_operation_created_at = extract_timestamp_from_uuid(max(operation_ids))
107-
min_created_at = max(since, min_operation_created_at) if since else min_operation_created_at
108-
max_created_at = min(until, max_operation_created_at) if until else max_operation_created_at
113+
# narrow created_at range
114+
min_created_at = max(filter(None, [since, min_operation_created_at]))
115+
max_created_at = min(filter(None, [until, max_operation_created_at]))
109116
where = [
110117
Operation.created_at >= min_created_at,
111118
Operation.created_at <= max_created_at,
119+
Operation.id == any_(list(operation_ids)), # type: ignore[arg-type]
120+
]
121+
122+
elif run_id:
123+
run_created_at = extract_timestamp_from_uuid(run_id)
124+
# narrow created_at range
125+
min_created_at = max(filter(None, [since, run_created_at]))
126+
where = [
127+
Operation.run_id == run_id,
128+
Operation.created_at >= min_created_at,
129+
Operation.id >= get_min_uuid(min_created_at),
130+
]
131+
132+
elif since:
133+
where = [
134+
Operation.created_at >= since,
135+
Operation.id >= get_min_uuid(since),
136+
]
137+
138+
if until and not operation_ids:
139+
where += [
140+
Operation.created_at <= until,
141+
Operation.id <= get_max_uuid(until),
112142
]
113-
else:
114-
if since:
115-
where.append(Operation.created_at >= since)
116-
if until:
117-
where.append(Operation.created_at <= until)
118-
119-
if run_id:
120-
where.append(Operation.run_id == run_id)
121-
if operation_ids:
122-
where.append(Operation.id == any_(list(operation_ids))) # type: ignore[arg-type]
123143

124144
query = select(Operation).where(*where)
125145
order_by: list[UnaryExpression] = [Operation.created_at.desc(), Operation.id.desc()]
@@ -151,6 +171,7 @@ async def list_by_run_ids(
151171
result = await self._session.scalars(
152172
query,
153173
{
174+
"min_id": get_min_uuid(min_operation_created_at),
154175
"since": min_operation_created_at,
155176
"run_ids": list(run_ids),
156177
},
@@ -175,11 +196,13 @@ async def get_stats_by_run_ids(self, run_ids: Collection[UUID]) -> dict[UUID, Ro
175196
if not run_ids:
176197
return {}
177198

199+
# All operations are created after run
200+
since = extract_timestamp_from_uuid(min(run_ids))
178201
query_result = await self._session.execute(
179202
get_stats_by_run_ids,
180203
{
181-
# All operations are created after run
182-
"since": extract_timestamp_from_uuid(min(run_ids)),
204+
"since": since,
205+
"min_id": get_min_uuid(since),
183206
"run_ids": list(run_ids),
184207
},
185208
)

data_rentgen/db/repositories/run.py

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
from data_rentgen.db.repositories.base import Repository
2424
from data_rentgen.db.utils.search import make_tsquery, ts_match, ts_rank
2525
from data_rentgen.dto import PaginationDTO, RunDTO
26-
from data_rentgen.utils.uuid import extract_timestamp_from_uuid
26+
from data_rentgen.utils.uuid import extract_timestamp_from_uuid, get_max_uuid, get_min_uuid
2727

2828
# Do not use `tuple_(Run.created_at, Run.id).in_(...),
2929
# as this is too complex filter for Postgres to make an optimal query plan.
@@ -41,6 +41,7 @@
4141
get_list_by_job_ids_query = (
4242
select(Run)
4343
.where(
44+
Run.id >= bindparam("min_id"),
4445
Run.created_at >= bindparam("since"),
4546
Run.job_id == any_(bindparam("job_ids")),
4647
)
@@ -97,24 +98,34 @@ async def paginate(
9798
# do not use `tuple_(Run.created_at, Run.id).in_(...),
9899
# as this is too complex filter for Postgres to make an optimal query plan
99100
where = []
101+
102+
# created_at and id are always correlated,
103+
# and primary key starts with id, so we need to apply filter on both
104+
# to get the most optimal query plan
100105
if run_ids:
101106
min_run_created_at = extract_timestamp_from_uuid(min(run_ids))
102107
max_run_created_at = extract_timestamp_from_uuid(max(run_ids))
103-
min_created_at = max(since, min_run_created_at) if since else min_run_created_at
104-
max_created_at = min(until, max_run_created_at) if until else max_run_created_at
108+
# narrow created_at range
109+
min_created_at = max(filter(None, [since, min_run_created_at]))
110+
max_created_at = min(filter(None, [until, max_run_created_at]))
105111
where = [
106112
Run.created_at >= min_created_at,
107113
Run.created_at <= max_created_at,
108114
Run.id == any_(list(run_ids)), # type: ignore[arg-type]
109115
]
110116
else:
111117
if since:
112-
where.append(Run.created_at >= since)
118+
where = [
119+
Run.created_at >= since,
120+
Run.id >= get_min_uuid(since),
121+
]
122+
113123
if until:
114-
where.append(Run.created_at <= until)
124+
where += [
125+
Run.created_at <= until,
126+
Run.id <= get_max_uuid(until),
127+
]
115128

116-
if run_ids:
117-
where.append(Run.id == any_(list(run_ids))) # type: ignore[arg-type]
118129
if job_id:
119130
where.append(Run.job_id == job_id)
120131
if parent_run_id:
@@ -181,7 +192,14 @@ async def list_by_job_ids(self, job_ids: Collection[int], since: datetime, until
181192
# until is rarely used, avoid making query too complicated
182193
query = query.where(Run.created_at <= until)
183194

184-
result = await self._session.scalars(query, {"since": since, "job_ids": list(job_ids)})
195+
result = await self._session.scalars(
196+
query,
197+
{
198+
"min_id": get_min_uuid(since),
199+
"since": since,
200+
"job_ids": list(job_ids),
201+
},
202+
)
185203
return list(result.all())
186204

187205
async def fetch_bulk(self, runs_dto: list[RunDTO]) -> list[tuple[RunDTO, Run | None]]:

data_rentgen/utils/uuid.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,21 @@ def _build_uuidv7(timestamp: int, node: int) -> NewUUID:
8080
return NewUUID(int=uuid_int)
8181

8282

83+
def get_min_uuid(timestamp: datetime) -> NewUUID:
84+
"""Get minimal possible UUID for timestamp"""
85+
timestamp_int = int(timestamp.timestamp() * 1000)
86+
uuid_int = (timestamp_int & 0xFFFFFFFFFFFF) << 80
87+
return NewUUID(int=uuid_int)
88+
89+
90+
def get_max_uuid(timestamp: datetime) -> NewUUID:
91+
"""Get maximal possible UUID for timestamp"""
92+
timestamp_int = int(timestamp.timestamp() * 1000)
93+
uuid_int = (timestamp_int & 0xFFFFFFFFFFFF) << 80
94+
uuid_int |= 0xFFFFFFFFFFFFFFFFFFFF
95+
return NewUUID(int=uuid_int)
96+
97+
8398
def generate_static_uuid(data: str) -> BaseUUID:
8499
"""Generate static UUID for data. Each function call returns the same UUID value.
85100

tests/test_consumer/test_utils/test_uuid.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
from datetime import UTC, datetime, timedelta
2+
from uuid import UUID
23

34
from data_rentgen.utils.uuid import (
45
generate_incremental_uuid,
56
generate_new_uuid,
67
generate_static_uuid,
8+
get_max_uuid,
9+
get_min_uuid,
710
)
811

912

@@ -70,3 +73,15 @@ def test_generate_incremental_uuid_sorted_like_timestamp():
7073
uuid1 = generate_incremental_uuid(current, "test")
7174
uuid2 = generate_incremental_uuid(following, "test")
7275
assert uuid1 < uuid2
76+
77+
78+
def test_get_min_uuid():
79+
timestamp = datetime(2025, 9, 21, 23, 35, 49, 123456, tzinfo=UTC)
80+
uuid = get_min_uuid(timestamp)
81+
assert uuid == UUID("01996ea2-3883-0000-0000-000000000000")
82+
83+
84+
def test_get_max_uuid():
85+
timestamp = datetime(2025, 9, 21, 23, 35, 49, 123456, tzinfo=UTC)
86+
uuid = get_max_uuid(timestamp)
87+
assert uuid == UUID("01996ea2-3883-ffff-ffff-ffffffffffff")

0 commit comments

Comments
 (0)