Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/run_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ jobs:

- name: run tests
run: |
coverage run -m pytest --create-db
coverage run -m pytest --create-db -x
coverage xml -o _shtrove_coverage.xml
env:
DATABASE_PASSWORD: postgres
Expand Down
5 changes: 2 additions & 3 deletions api/base/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,11 @@ def initial(self, request, *args, **kwargs):
class RootView(views.APIView):
def get(self, request):
links = {
'rawdata': 'api:rawdatum-list',
'sources': 'api:source-list',
'users': 'api:user-list',
'status': 'api:status',
'rss': 'api:rss',
'atom': 'api:atom',
'rss': 'api:feeds.rss',
'atom': 'api:feeds.atom',
}
ret = {k: request.build_absolute_uri(reverse(v)) for k, v in links.items()}
return Response(ret)
10 changes: 0 additions & 10 deletions api/rawdata/serializers.py

This file was deleted.

7 changes: 0 additions & 7 deletions api/rawdata/urls.py

This file was deleted.

31 changes: 0 additions & 31 deletions api/rawdata/views.py

This file was deleted.

2 changes: 2 additions & 0 deletions api/sourceconfigs/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@

from api.sourceconfigs.serializers import SourceConfigSerializer
from api.base import ShareViewSet
from api.pagination import CursorPagination

from share.models import SourceConfig


class SourceConfigViewSet(ShareViewSet, viewsets.ReadOnlyModelViewSet):
serializer_class = SourceConfigSerializer
pagination_class = CursorPagination

ordering = ('id', )

Expand Down
1 change: 0 additions & 1 deletion api/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
urlpatterns = [
url('^$', RootView.as_view()),
url('^', include('api.banners.urls')),
url('^', include('api.rawdata.urls')),
url('^', include('api.sourceconfigs.urls')),
url('^', include('api.sources.urls')),
url('^', include('api.suids.urls')),
Expand Down
17 changes: 15 additions & 2 deletions project/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,17 @@ def split(string, delim):
},
}

CELERY_RESULT_EXPIRES = 60 * 60 * 24 * 3 # 4 days
CELERY_RESULT_BACKEND = 'share.celery:CeleryDatabaseBackend'
CELERY_RESULT_EXPIRES = int(os.environ.get(
'CELERY_RESULT_EXPIRES',
60 * 60 * 24 * 3, # 3 days
))
# only successful tasks get the default expiration (above)
# -- failed tasks kept longer (see `share.celery`)
FAILED_CELERY_RESULT_EXPIRES = int(os.environ.get(
'FAILED_CELERY_RESULT_EXPIRES',
60 * 60 * 24 * 11, # 11 days
))

# Don't reject tasks that were present on a worker when it was killed
CELERY_TASK_REJECT_ON_WORKER_LOST = False
Expand All @@ -358,7 +367,7 @@ def split(string, delim):
CELERY_TASK_DEFAULT_ROUTING_KEY = 'share_default'

URGENT_TASK_QUEUES = {
'trove.digestive_tract.task__extract_and_derive': 'digestive_tract.urgent',
'trove.digestive_tract.task__derive': 'digestive_tract.urgent',
}


Expand Down Expand Up @@ -440,6 +449,10 @@ def route_urgent_task(name, args, kwargs, options, task=None, **kw):

SHARE_WEB_URL = os.environ.get('SHARE_WEB_URL', 'http://localhost:8003').rstrip('/') + '/'
SHARE_USER_AGENT = os.environ.get('SHARE_USER_AGENT', 'SHAREbot/{} (+{})'.format(VERSION, SHARE_WEB_URL))
SHARE_ADMIN_USERNAME = os.environ.get('SHARE_ADMIN_USERNAME', 'admin')
SHARE_ADMIN_PASSWORD = os.environ.get('SHARE_ADMIN_PASSWORD')
if DEBUG and (SHARE_ADMIN_PASSWORD is None):
SHARE_ADMIN_PASSWORD = 'password'

# Skip some of the more intensive operations on works that surpass these limits
SHARE_LIMITS = {
Expand Down
83 changes: 7 additions & 76 deletions share/admin/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
from django.apps import apps
from django.urls import re_path as url
from django.contrib import admin
from django.http import HttpResponseRedirect
from django.template.response import TemplateResponse
from django.urls import path, reverse
from django.utils.html import format_html
from django.urls import path

from oauth2_provider.models import AccessToken

Expand All @@ -15,7 +11,6 @@
CeleryTaskResult,
FeatureFlag,
IndexBackfill,
RawDatum,
ShareUser,
SiteBanner,
Source,
Expand Down Expand Up @@ -51,26 +46,6 @@ class ShareUserAdmin(admin.ModelAdmin):
search_fields = ['username']


@linked_fk('suid')
class RawDatumAdmin(admin.ModelAdmin):
show_full_result_count = False
list_select_related = ('suid__source_config', )
list_display = ('id', 'identifier', 'source_config_label', 'datestamp', 'date_created', 'date_modified', )
readonly_fields = ('datum__pre', 'sha256')
exclude = ('datum',)
paginator = TimeLimitedPaginator

def identifier(self, obj):
return obj.suid.identifier

def source_config_label(self, obj):
return obj.suid.source_config.label

def datum__pre(self, instance):
return format_html('<pre>{}</pre>', instance.datum)
datum__pre.short_description = 'datum' # type: ignore[attr-defined]


class AccessTokenAdmin(admin.ModelAdmin):
raw_id_fields = ('user',)
list_display = ('token', 'user', 'scope')
Expand All @@ -91,11 +66,10 @@ def save_model(self, request, obj, form, change):

@linked_fk('source')
class SourceConfigAdmin(admin.ModelAdmin):
list_display = ('label', 'source_', 'version', 'enabled', 'button_actions')
list_display = ('label', 'source_', 'version', 'enabled',)
list_select_related = ('source',)
readonly_fields = ('button_actions',)
search_fields = ['label', 'source__name', 'source__long_title']
actions = ['schedule_full_ingest']
actions = ['schedule_derive']

def source_(self, obj):
return obj.source.long_title
Expand All @@ -104,42 +78,10 @@ def enabled(self, obj):
return not obj.disabled
enabled.boolean = True # type: ignore[attr-defined]

@admin.action(description='schedule re-ingest of all raw data for each source config')
def schedule_full_ingest(self, request, queryset):
@admin.action(description='schedule re-derive of all cards for each selected source config')
def schedule_derive(self, request, queryset):
for _id in queryset.values_list('id', flat=True):
digestive_tract.task__schedule_extract_and_derive_for_source_config.delay(_id)

def get_urls(self):
return [
url(
r'^(?P<config_id>.+)/ingest/$',
self.admin_site.admin_view(self.start_ingest),
name='source-config-ingest'
)
] + super().get_urls()

def button_actions(self, obj):
return format_html(
' '.join((
('<a class="button" href="{ingest_href}">Ingest</a>' if not obj.disabled else ''),
)),
ingest_href=reverse('admin:source-config-ingest', args=[obj.pk]),
)
button_actions.short_description = 'Buttons' # type: ignore[attr-defined]

def start_ingest(self, request, config_id):
config = self.get_object(request, config_id)
if request.method == 'POST':
digestive_tract.task__schedule_extract_and_derive_for_source_config.delay(config.pk)
url = reverse(
'admin:share_sourceconfig_changelist',
current_app=self.admin_site.name,
)
return HttpResponseRedirect(url)
else:
context = self.admin_site.each_context(request)
context['source_config'] = config
return TemplateResponse(request, 'admin/start-ingest.html', context)
digestive_tract.task__schedule_derive_for_source_config.delay(_id)


@linked_fk('user')
Expand All @@ -157,26 +99,16 @@ def access_token(self, obj):
@linked_fk('source_config')
@linked_fk('focus_identifier')
@linked_many('formattedmetadatarecord_set', defer=('formatted_metadata',))
@linked_many('raw_data', defer=('datum',))
@linked_many('indexcard_set')
class SourceUniqueIdentifierAdmin(admin.ModelAdmin):
readonly_fields = ('identifier',)
paginator = TimeLimitedPaginator
actions = ('reingest', 'delete_cards_for_suid')
actions = ('delete_cards_for_suid',)
list_filter = (SourceConfigFilter,)
list_select_related = ('source_config',)
show_full_result_count = False
search_fields = ('identifier',)

def reingest(self, request, queryset):
_raw_id_queryset = (
RawDatum.objects
.latest_by_suid_queryset(queryset)
.values_list('id', flat=True)
)
for _raw_id in _raw_id_queryset:
digestive_tract.task__extract_and_derive.delay(raw_id=_raw_id)

def delete_cards_for_suid(self, request, queryset):
for suid in queryset:
digestive_tract.expel_suid(suid)
Expand Down Expand Up @@ -220,7 +152,6 @@ class FeatureFlagAdmin(admin.ModelAdmin):
admin_site.register(CeleryTaskResult, CeleryTaskResultAdmin)
admin_site.register(FeatureFlag, FeatureFlagAdmin)
admin_site.register(IndexBackfill, IndexBackfillAdmin)
admin_site.register(RawDatum, RawDatumAdmin)
admin_site.register(ShareUser, ShareUserAdmin)
admin_site.register(SiteBanner, SiteBannerAdmin)
admin_site.register(Source, SourceAdmin)
Expand Down
45 changes: 26 additions & 19 deletions share/celery.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import datetime
import functools
import logging


from celery import states
from celery.app.task import Context
from celery.backends.base import BaseDictBackend
from celery.utils.time import maybe_timedelta

from django.conf import settings
from django.db import transaction
from django.db.models import Q
from django.utils import timezone

import sentry_sdk
Expand Down Expand Up @@ -90,7 +92,10 @@ def _store_result(self, task_id, result, status, traceback=None, request=None, *

@die_on_unhandled
def cleanup(self, expires=None):
TaskResultCleaner(expires or self.expires).clean()
TaskResultCleaner(
success_ttl=(expires or self.expires),
nonsuccess_ttl=settings.FAILED_CELERY_RESULT_EXPIRES,
).clean()

@die_on_unhandled
def _get_task_meta_for(self, task_id):
Expand All @@ -111,20 +116,19 @@ class TaskResultCleaner:

TaskModel = CeleryTaskResult

TASK_TTLS = {
}

NO_ARCHIVE = {
}

def __init__(self, expires, bucket=None, delete=True, chunk_size=5000):
self.bucket = bucket
def __init__(self, success_ttl, nonsuccess_ttl=None, delete=True, chunk_size=5000):
self.chunk_size = chunk_size
self.delete = delete
self.expires = expires
self.success_ttl = success_ttl
self.nonsuccess_ttl = nonsuccess_ttl or success_ttl

def get_ttl(self, task_name):
return timezone.now() - maybe_timedelta(self.TASK_TTLS.get(task_name, self.expires))
@property
def success_cutoff(self) -> datetime.datetime:
return timezone.now() - maybe_timedelta(self.success_ttl)

@property
def nonsuccess_cutoff(self) -> datetime.datetime:
return timezone.now() - maybe_timedelta(self.nonsuccess_ttl)

def get_task_names(self):
qs = self.TaskModel.objects.values('task_name').annotate(name=GroupBy('task_name'))
Expand All @@ -137,12 +141,15 @@ def get_task_names(self):

def clean(self):
for name in self.get_task_names():
logger.debug('Looking for succeeded %s tasks modified before %s', name, self.get_ttl(name))

queryset = self.TaskModel.objects.filter(
task_name=name,
status=states.SUCCESS,
date_modified__lt=self.get_ttl(name)
success_q = Q(status=states.SUCCESS, date_modified__lt=self.success_cutoff)
nonsuccess_q = (
~Q(status=states.SUCCESS)
& Q(date_modified__lt=self.nonsuccess_cutoff)
)
queryset = (
self.TaskModel.objects
.filter(task_name=name)
.filter(success_q | nonsuccess_q)
)

if not queryset.exists():
Expand Down
28 changes: 0 additions & 28 deletions share/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,3 @@

class ShareException(Exception):
pass


class HarvestError(ShareException):
pass


class IngestError(ShareException):
pass


class TransformError(IngestError):
pass


class RegulateError(IngestError):
pass


class MergeRequired(IngestError):
"""A node disambiguated to multiple objects in the database.
"""
pass


class IngestConflict(IngestError):
"""Multiple data being ingested at the same time conflicted.
"""
pass
Loading