Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions share/models/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,18 @@ def latest_by_suid_queryset(self, suid_queryset) -> models.QuerySet:
.values('latest_rawdatum_id')
))

def latest_for_each_suid(self) -> models.QuerySet:
# only the latest datum for each described resource
_latest_pk_subquery = models.Subquery(
self.filter(suid_id=models.OuterRef('suid_id'))
.order_by(Coalesce('datestamp', 'date_created').desc(nulls_last=True))
.values('pk')
[:1]
)
return self.annotate(
latest_same_suid=_latest_pk_subquery,
).filter(pk=models.F('latest_same_suid'))


class RawDatum(models.Model):

Expand Down
51 changes: 51 additions & 0 deletions trove/management/commands/migrate_rawdatum_expiration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import datetime
import time

from django.db.models import OuterRef

from trove.util.django import pk_chunked

from share import models as share_db
from share.management.commands import BaseShareCommand
from trove import models as trove_db


class Command(BaseShareCommand):
# copy all non-null values from `RawDatum.expiration_date` to `SupplementaryIndexcardRdf.expiration_date`
# (while being overly cautious to avoid joins on `RawDatum` or `SourceUniqueIdentifier`)
# meant to be run after trove migration 0008_expiration_dates, before share.RawDatum is deleted

def add_arguments(self, parser):
parser.add_argument('--chunk-size', type=int, default=666)
parser.add_argument('--today', type=datetime.date.fromisoformat, default=datetime.date.today())
parser.add_argument('--continue-after', type=str, default=None)

def handle(self, *args, chunk_size: int, today: datetime.date, continue_after, **kwargs):
_before = time.perf_counter()
_total_updated = 0
_raw_qs = (
share_db.RawDatum.objects.latest_for_each_suid()
.filter(expiration_date__gt=today) # ignore the expired (and the non-expiring)
)
if continue_after is not None:
_raw_qs = _raw_qs.filter(pk__gt=continue_after)
for _raw_pk_chunk in pk_chunked(_raw_qs, chunk_size):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rewrote this loop slightly to be non-modifying, and did some benchmarking. A little slow to start due to _raw_qs and the size of the RawDatum table, but appears workable. EXPLAINALYZE'd the slowest query, and no apparent way to speed it up:

                                                                                QUERY PLAN

--------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Index Scan using share_rawdatum_expiration_idx on share_rawdatum  (cost=0.57..42489486.91 rows=3096 width=1069) (actual time=0.284..13910.434 rows=483200 loops=1)
   Index Cond: (expiration_date > '2025-06-16'::date)
   Filter: (id = (SubPlan 2))
   Rows Removed by Filter: 172539
   SubPlan 1
     ->  Limit  (cost=67.95..67.95 rows=1 width=12) (actual time=0.006..0.006 rows=1 loops=483200)
           ->  Sort  (cost=67.95..68.20 rows=102 width=12) (actual time=0.005..0.005 rows=1 loops=483200)
                 Sort Key: (COALESCE(u0.datestamp, u0.date_created)) DESC NULLS LAST
                 Sort Method: quicksort  Memory: 25kB
                 ->  Index Scan using share_rawdatum_01248af3 on share_rawdatum u0  (cost=0.57..67.44 rows=102 width=12) (actual time=0.002..0.004 rows=4 loops=483200)
                       Index Cond: (suid_id = share_rawdatum.suid_id)
   SubPlan 2
     ->  Limit  (cost=67.95..67.95 rows=1 width=12) (actual time=0.014..0.014 rows=1 loops=655739)
           ->  Sort  (cost=67.95..68.20 rows=102 width=12) (actual time=0.014..0.014 rows=1 loops=655739)
                 Sort Key: (COALESCE(u0_1.datestamp, u0_1.date_created)) DESC NULLS LAST
                 Sort Method: quicksort  Memory: 25kB
                 ->  Index Scan using share_rawdatum_01248af3 on share_rawdatum u0_1  (cost=0.57..67.44 rows=102 width=12) (actual time=0.005..0.012 rows=5 loops=655739)
                       Index Cond: (suid_id = share_rawdatum.suid_id)
 Planning time: 6.497 ms
 Execution time: 13939.482 ms
(20 rows)

_supp_qs = trove_db.SupplementaryIndexcardRdf.objects.filter(
from_raw_datum_id__in=_raw_pk_chunk,
expiration_date__isnull=True, # avoid overwriting non-null values
)
_updated_count = _supp_qs.update(
expiration_date=share_db.RawDatum.objects.filter(
id=OuterRef('from_raw_datum_id'),
).values('expiration_date'),
)
_total_updated += _updated_count
_last_pk = _raw_pk_chunk[-1]
_elapsed = time.perf_counter() - _before
self.stdout.write(
f'{_elapsed:.2f}: migrated {_updated_count} of {len(_raw_pk_chunk)} --continue-after={_last_pk}',
)
_total_seconds = time.perf_counter() - _before
self.stdout.write(
self.style.SUCCESS(f'done! migrated {_total_updated} in {_total_seconds}s'),
)
36 changes: 36 additions & 0 deletions trove/migrations/0008_expiration_dates.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Generated by Django 3.2.25 on 2025-06-09 15:42

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('trove', '0007_rawdata_fks_do_nothing'),
]

operations = [
migrations.AddField(
model_name='archivedindexcardrdf',
name='expiration_date',
field=models.DateField(blank=True, help_text='An (optional) date when this description will no longer be valid.', null=True),
),
migrations.AddField(
model_name='latestindexcardrdf',
name='expiration_date',
field=models.DateField(blank=True, help_text='An (optional) date when this description will no longer be valid.', null=True),
),
migrations.AddField(
model_name='supplementaryindexcardrdf',
name='expiration_date',
field=models.DateField(blank=True, help_text='An (optional) date when this description will no longer be valid.', null=True),
),
migrations.AddIndex(
model_name='latestindexcardrdf',
index=models.Index(fields=['expiration_date'], name='trove_lates_expirat_92ac89_idx'),
),
migrations.AddIndex(
model_name='supplementaryindexcardrdf',
index=models.Index(fields=['expiration_date'], name='trove_suppl_expirat_3ea6e1_idx'),
),
]
14 changes: 14 additions & 0 deletions trove/models/indexcard.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ def update_rdf(
defaults={
'rdf_as_turtle': _rdf_as_turtle,
'focus_iri': focus_iri,
'expiration_date': from_raw_datum.expiration_date,
},
)
if (not _archived_created) and (_archived.rdf_as_turtle != _rdf_as_turtle):
Expand All @@ -260,6 +261,7 @@ def update_rdf(
'turtle_checksum_iri': _turtle_checksum_iri,
'rdf_as_turtle': _rdf_as_turtle,
'focus_iri': focus_iri,
'expiration_date': from_raw_datum.expiration_date,
},
)
return _latest_indexcard_rdf
Expand All @@ -282,6 +284,7 @@ def update_supplementary_rdf(
'turtle_checksum_iri': _turtle_checksum_iri,
'rdf_as_turtle': _rdf_as_turtle,
'focus_iri': focus_iri,
'expiration_date': from_raw_datum.expiration_date,
},
)
return _supplement_rdf
Expand All @@ -307,6 +310,13 @@ class IndexcardRdf(models.Model):
focus_iri = models.TextField() # exact iri used in rdf_as_turtle
rdf_as_turtle = models.TextField() # TODO: store elsewhere by checksum

# optional:
expiration_date = models.DateField(
null=True,
blank=True,
help_text='An (optional) date when this description will no longer be valid.',
)

def as_rdf_tripledict(self) -> rdf.RdfTripleDictionary:
return rdf.tripledict_from_turtle(self.rdf_as_turtle)

Expand Down Expand Up @@ -344,6 +354,7 @@ class Meta:
]
indexes = [
models.Index(fields=('modified',)), # for OAI-PMH selective harvest
models.Index(fields=['expiration_date']), # for expiring
]


Expand Down Expand Up @@ -373,6 +384,9 @@ class Meta:
name='%(app_label)s_%(class)s_uniq_supplement',
),
]
indexes = [
models.Index(fields=['expiration_date']), # for expiring
]


class DerivedIndexcard(models.Model):
Expand Down
29 changes: 29 additions & 0 deletions trove/util/django.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from __future__ import annotations
from collections.abc import Iterator


__all__ = ('pk_chunked',)


def pk_chunked(queryset, chunksize: int) -> Iterator[list]:
'''pk_chunked: get primary key values, in chunks, for the given queryset

yields non-empty lists of primary keys up to `chunksize` long
'''
_ordered_qs = queryset.order_by('pk')
_prior_end_pk = None
while True: # for each chunk:
_qs = (
_ordered_qs
if _prior_end_pk is None
else _ordered_qs.filter(pk__gt=_prior_end_pk)
)
# load primary key values only
_pks = list(_qs.values_list('pk', flat=True)[:chunksize])
if not _pks:
break # done
_end_pk = _pks[-1]
if (_prior_end_pk is not None) and (_end_pk <= _prior_end_pk):
raise RuntimeError(f'sentinel pks not ascending?? got {_end_pk} after {_prior_end_pk}')
_prior_end_pk = _end_pk
yield _pks