Skip to content

Commit c5e2669

Browse files
authored
Merge pull request #877 from aaxelb/feature/eng-8038--migrate-expirations
[ENG-8221][ENG-8038] less database data; part 1
2 parents 4a73dfe + 08d4804 commit c5e2669

File tree

5 files changed

+142
-0
lines changed

5 files changed

+142
-0
lines changed

share/models/ingest.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,18 @@ def latest_by_suid_queryset(self, suid_queryset) -> models.QuerySet:
197197
.values('latest_rawdatum_id')
198198
))
199199

200+
def latest_for_each_suid(self) -> models.QuerySet:
201+
# only the latest datum for each described resource
202+
_latest_pk_subquery = models.Subquery(
203+
self.filter(suid_id=models.OuterRef('suid_id'))
204+
.order_by(Coalesce('datestamp', 'date_created').desc(nulls_last=True))
205+
.values('pk')
206+
[:1]
207+
)
208+
return self.annotate(
209+
latest_same_suid=_latest_pk_subquery,
210+
).filter(pk=models.F('latest_same_suid'))
211+
200212

201213
class RawDatum(models.Model):
202214

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import datetime
2+
import time
3+
4+
from django.db.models import OuterRef
5+
6+
from trove.util.django import pk_chunked
7+
8+
from share import models as share_db
9+
from share.management.commands import BaseShareCommand
10+
from trove import models as trove_db
11+
12+
13+
class Command(BaseShareCommand):
14+
# copy all non-null values from `RawDatum.expiration_date` to `SupplementaryIndexcardRdf.expiration_date`
15+
# (while being overly cautious to avoid joins on `RawDatum` or `SourceUniqueIdentifier`)
16+
# meant to be run after trove migration 0008_expiration_dates, before share.RawDatum is deleted
17+
18+
def add_arguments(self, parser):
19+
parser.add_argument('--chunk-size', type=int, default=666)
20+
parser.add_argument('--today', type=datetime.date.fromisoformat, default=datetime.date.today())
21+
parser.add_argument('--continue-after', type=str, default=None)
22+
23+
def handle(self, *args, chunk_size: int, today: datetime.date, continue_after, **kwargs):
24+
_before = time.perf_counter()
25+
_total_updated = 0
26+
_raw_qs = (
27+
share_db.RawDatum.objects.latest_for_each_suid()
28+
.filter(expiration_date__gt=today) # ignore the expired (and the non-expiring)
29+
)
30+
if continue_after is not None:
31+
_raw_qs = _raw_qs.filter(pk__gt=continue_after)
32+
for _raw_pk_chunk in pk_chunked(_raw_qs, chunk_size):
33+
_supp_qs = trove_db.SupplementaryIndexcardRdf.objects.filter(
34+
from_raw_datum_id__in=_raw_pk_chunk,
35+
expiration_date__isnull=True, # avoid overwriting non-null values
36+
)
37+
_updated_count = _supp_qs.update(
38+
expiration_date=share_db.RawDatum.objects.filter(
39+
id=OuterRef('from_raw_datum_id'),
40+
).values('expiration_date'),
41+
)
42+
_total_updated += _updated_count
43+
_last_pk = _raw_pk_chunk[-1]
44+
_elapsed = time.perf_counter() - _before
45+
self.stdout.write(
46+
f'{_elapsed:.2f}: migrated {_updated_count} of {len(_raw_pk_chunk)} --continue-after={_last_pk}',
47+
)
48+
_total_seconds = time.perf_counter() - _before
49+
self.stdout.write(
50+
self.style.SUCCESS(f'done! migrated {_total_updated} in {_total_seconds}s'),
51+
)
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# Generated by Django 3.2.25 on 2025-06-09 15:42
2+
3+
from django.db import migrations, models
4+
5+
6+
class Migration(migrations.Migration):
7+
8+
dependencies = [
9+
('trove', '0007_rawdata_fks_do_nothing'),
10+
]
11+
12+
operations = [
13+
migrations.AddField(
14+
model_name='archivedindexcardrdf',
15+
name='expiration_date',
16+
field=models.DateField(blank=True, help_text='An (optional) date when this description will no longer be valid.', null=True),
17+
),
18+
migrations.AddField(
19+
model_name='latestindexcardrdf',
20+
name='expiration_date',
21+
field=models.DateField(blank=True, help_text='An (optional) date when this description will no longer be valid.', null=True),
22+
),
23+
migrations.AddField(
24+
model_name='supplementaryindexcardrdf',
25+
name='expiration_date',
26+
field=models.DateField(blank=True, help_text='An (optional) date when this description will no longer be valid.', null=True),
27+
),
28+
migrations.AddIndex(
29+
model_name='latestindexcardrdf',
30+
index=models.Index(fields=['expiration_date'], name='trove_lates_expirat_92ac89_idx'),
31+
),
32+
migrations.AddIndex(
33+
model_name='supplementaryindexcardrdf',
34+
index=models.Index(fields=['expiration_date'], name='trove_suppl_expirat_3ea6e1_idx'),
35+
),
36+
]

trove/models/indexcard.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,7 @@ def update_rdf(
248248
defaults={
249249
'rdf_as_turtle': _rdf_as_turtle,
250250
'focus_iri': focus_iri,
251+
'expiration_date': from_raw_datum.expiration_date,
251252
},
252253
)
253254
if (not _archived_created) and (_archived.rdf_as_turtle != _rdf_as_turtle):
@@ -260,6 +261,7 @@ def update_rdf(
260261
'turtle_checksum_iri': _turtle_checksum_iri,
261262
'rdf_as_turtle': _rdf_as_turtle,
262263
'focus_iri': focus_iri,
264+
'expiration_date': from_raw_datum.expiration_date,
263265
},
264266
)
265267
return _latest_indexcard_rdf
@@ -282,6 +284,7 @@ def update_supplementary_rdf(
282284
'turtle_checksum_iri': _turtle_checksum_iri,
283285
'rdf_as_turtle': _rdf_as_turtle,
284286
'focus_iri': focus_iri,
287+
'expiration_date': from_raw_datum.expiration_date,
285288
},
286289
)
287290
return _supplement_rdf
@@ -307,6 +310,13 @@ class IndexcardRdf(models.Model):
307310
focus_iri = models.TextField() # exact iri used in rdf_as_turtle
308311
rdf_as_turtle = models.TextField() # TODO: store elsewhere by checksum
309312

313+
# optional:
314+
expiration_date = models.DateField(
315+
null=True,
316+
blank=True,
317+
help_text='An (optional) date when this description will no longer be valid.',
318+
)
319+
310320
def as_rdf_tripledict(self) -> rdf.RdfTripleDictionary:
311321
return rdf.tripledict_from_turtle(self.rdf_as_turtle)
312322

@@ -344,6 +354,7 @@ class Meta:
344354
]
345355
indexes = [
346356
models.Index(fields=('modified',)), # for OAI-PMH selective harvest
357+
models.Index(fields=['expiration_date']), # for expiring
347358
]
348359

349360

@@ -373,6 +384,9 @@ class Meta:
373384
name='%(app_label)s_%(class)s_uniq_supplement',
374385
),
375386
]
387+
indexes = [
388+
models.Index(fields=['expiration_date']), # for expiring
389+
]
376390

377391

378392
class DerivedIndexcard(models.Model):

trove/util/django.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
from __future__ import annotations
2+
from collections.abc import Iterator
3+
4+
5+
__all__ = ('pk_chunked',)
6+
7+
8+
def pk_chunked(queryset, chunksize: int) -> Iterator[list]:
9+
'''pk_chunked: get primary key values, in chunks, for the given queryset
10+
11+
yields non-empty lists of primary keys up to `chunksize` long
12+
'''
13+
_ordered_qs = queryset.order_by('pk')
14+
_prior_end_pk = None
15+
while True: # for each chunk:
16+
_qs = (
17+
_ordered_qs
18+
if _prior_end_pk is None
19+
else _ordered_qs.filter(pk__gt=_prior_end_pk)
20+
)
21+
# load primary key values only
22+
_pks = list(_qs.values_list('pk', flat=True)[:chunksize])
23+
if not _pks:
24+
break # done
25+
_end_pk = _pks[-1]
26+
if (_prior_end_pk is not None) and (_end_pk <= _prior_end_pk):
27+
raise RuntimeError(f'sentinel pks not ascending?? got {_end_pk} after {_prior_end_pk}')
28+
_prior_end_pk = _end_pk
29+
yield _pks

0 commit comments

Comments
 (0)