From 766c16e6eb3a5480211fc003dbfae41c394d9402 Mon Sep 17 00:00:00 2001 From: Alexander Dusenbery Date: Thu, 23 Jan 2025 16:39:15 -0500 Subject: [PATCH 1/3] feat: new spike on a persisted pipeline pattern --- enterprise_access/apps/pipeline/__init__.py | 0 enterprise_access/apps/pipeline/admin.py | 3 + enterprise_access/apps/pipeline/apps.py | 6 + .../apps/pipeline/migrations/0001_initial.py | 96 ++++++++ .../apps/pipeline/migrations/__init__.py | 0 enterprise_access/apps/pipeline/models.py | 233 ++++++++++++++++++ enterprise_access/apps/pipeline/tests.py | 3 + enterprise_access/apps/pipeline/views.py | 3 + enterprise_access/settings/base.py | 1 + 9 files changed, 345 insertions(+) create mode 100644 enterprise_access/apps/pipeline/__init__.py create mode 100644 enterprise_access/apps/pipeline/admin.py create mode 100644 enterprise_access/apps/pipeline/apps.py create mode 100644 enterprise_access/apps/pipeline/migrations/0001_initial.py create mode 100644 enterprise_access/apps/pipeline/migrations/__init__.py create mode 100644 enterprise_access/apps/pipeline/models.py create mode 100644 enterprise_access/apps/pipeline/tests.py create mode 100644 enterprise_access/apps/pipeline/views.py diff --git a/enterprise_access/apps/pipeline/__init__.py b/enterprise_access/apps/pipeline/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/enterprise_access/apps/pipeline/admin.py b/enterprise_access/apps/pipeline/admin.py new file mode 100644 index 000000000..8c38f3f3d --- /dev/null +++ b/enterprise_access/apps/pipeline/admin.py @@ -0,0 +1,3 @@ +from django.contrib import admin + +# Register your models here. diff --git a/enterprise_access/apps/pipeline/apps.py b/enterprise_access/apps/pipeline/apps.py new file mode 100644 index 000000000..f11aaf144 --- /dev/null +++ b/enterprise_access/apps/pipeline/apps.py @@ -0,0 +1,6 @@ +from django.apps import AppConfig + + +class PipelineConfig(AppConfig): + default_auto_field = 'django.db.models.BigAutoField' + name = 'enterprise_access.apps.pipeline' diff --git a/enterprise_access/apps/pipeline/migrations/0001_initial.py b/enterprise_access/apps/pipeline/migrations/0001_initial.py new file mode 100644 index 000000000..b6c06dd86 --- /dev/null +++ b/enterprise_access/apps/pipeline/migrations/0001_initial.py @@ -0,0 +1,96 @@ +# Generated by Django 4.2.17 on 2025-01-24 14:24 + +from django.db import migrations, models +import django.db.models.deletion +import django.utils.timezone +import jsonfield.fields +import model_utils.fields +import uuid + + +class Migration(migrations.Migration): + + initial = True + + dependencies = [ + ] + + operations = [ + migrations.CreateModel( + name='AddToppingsStep', + fields=[ + ('created', model_utils.fields.AutoCreatedField(default=django.utils.timezone.now, editable=False, verbose_name='created')), + ('modified', model_utils.fields.AutoLastModifiedField(default=django.utils.timezone.now, editable=False, verbose_name='modified')), + ('is_removed', models.BooleanField(default=False)), + ('uuid', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False, unique=True)), + ('input_data', jsonfield.fields.JSONField(blank=True)), + ('output_data', jsonfield.fields.JSONField(blank=True, editable=False, null=True)), + ('succeeded_at', models.DateTimeField(blank=True, null=True)), + ('failed_at', models.DateTimeField(blank=True, null=True)), + ], + options={ + 'abstract': False, + }, + ), + migrations.CreateModel( + name='PizzaPipeline', + fields=[ + ('created', model_utils.fields.AutoCreatedField(default=django.utils.timezone.now, editable=False, verbose_name='created')), + ('modified', model_utils.fields.AutoLastModifiedField(default=django.utils.timezone.now, editable=False, verbose_name='modified')), + ('is_removed', models.BooleanField(default=False)), + ('uuid', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False, unique=True)), + ('input_data', jsonfield.fields.JSONField(blank=True)), + ('output_data', jsonfield.fields.JSONField(blank=True, editable=False, null=True)), + ('succeeded_at', models.DateTimeField(blank=True, null=True)), + ('failed_at', models.DateTimeField(blank=True, null=True)), + ], + options={ + 'abstract': False, + }, + ), + migrations.CreateModel( + name='StretchDoughStep', + fields=[ + ('created', model_utils.fields.AutoCreatedField(default=django.utils.timezone.now, editable=False, verbose_name='created')), + ('modified', model_utils.fields.AutoLastModifiedField(default=django.utils.timezone.now, editable=False, verbose_name='modified')), + ('is_removed', models.BooleanField(default=False)), + ('uuid', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False, unique=True)), + ('input_data', jsonfield.fields.JSONField(blank=True)), + ('output_data', jsonfield.fields.JSONField(blank=True, editable=False, null=True)), + ('succeeded_at', models.DateTimeField(blank=True, null=True)), + ('failed_at', models.DateTimeField(blank=True, null=True)), + ('pipeline_record', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='pipeline.pizzapipeline')), + ], + options={ + 'abstract': False, + }, + ), + migrations.CreateModel( + name='BakePizzaStep', + fields=[ + ('created', model_utils.fields.AutoCreatedField(default=django.utils.timezone.now, editable=False, verbose_name='created')), + ('modified', model_utils.fields.AutoLastModifiedField(default=django.utils.timezone.now, editable=False, verbose_name='modified')), + ('is_removed', models.BooleanField(default=False)), + ('uuid', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False, unique=True)), + ('input_data', jsonfield.fields.JSONField(blank=True)), + ('output_data', jsonfield.fields.JSONField(blank=True, editable=False, null=True)), + ('succeeded_at', models.DateTimeField(blank=True, null=True)), + ('failed_at', models.DateTimeField(blank=True, null=True)), + ('pipeline_record', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='pipeline.pizzapipeline')), + ('preceding_step', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='pipeline.addtoppingsstep')), + ], + options={ + 'abstract': False, + }, + ), + migrations.AddField( + model_name='addtoppingsstep', + name='pipeline_record', + field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='pipeline.pizzapipeline'), + ), + migrations.AddField( + model_name='addtoppingsstep', + name='preceding_step', + field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='pipeline.stretchdoughstep'), + ), + ] diff --git a/enterprise_access/apps/pipeline/migrations/__init__.py b/enterprise_access/apps/pipeline/migrations/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/enterprise_access/apps/pipeline/models.py b/enterprise_access/apps/pipeline/models.py new file mode 100644 index 000000000..fab3b2212 --- /dev/null +++ b/enterprise_access/apps/pipeline/models.py @@ -0,0 +1,233 @@ +""" Models to support pipelines/workflows.. """ + +import collections +from uuid import uuid4 + +from django.conf import settings +from django.core.exceptions import ValidationError +from django.db import models +from django.dispatch import receiver +from django.utils import timezone +from django.utils.translation import gettext_lazy as _ +from jsonfield.encoder import JSONEncoder +from jsonfield.fields import JSONField +from model_utils.models import SoftDeletableModel, TimeStampedModel +from simple_history.models import HistoricalRecords +from simple_history.utils import bulk_update_with_history + + +class AbstractUnitOfWork(TimeStampedModel, SoftDeletableModel): + """ + An abstract models that encapsulates the following: + * input data + * process_input() function to do the actual work + * output data + + .. no_pii: This model has no PII + """ + + class Meta: + abstract = True + + uuid = models.UUIDField( + primary_key=True, + default=uuid4, + editable=False, + unique=True, + ) + input_data = JSONField( + blank=True, + null=False, + ) + output_data = JSONField( + blank=True, + null=True, + editable=False, + ) + succeeded_at = models.DateTimeField( + null=True, + blank=True, + ) + failed_at = models.DateTimeField( + null=True, + blank=True, + ) + + def process_input(self, **kwargs): + raise NotImplementedError + + def execute(self, **kwargs): + try: + result = self.process_input(**kwargs) + self.output_data = result + self.succeeded_at = timezone.now() + except: + self.failed_at = timezone.now() + result = {} + + self.save() + return result + + +class AbstractPipeline(AbstractUnitOfWork): + """ + """ + class Meta: + abstract = True + + steps = [] + + def get_input_data_for_step_type(self, step_type): + raise NotImplementedError + + def process_input(self, **kwargs): + if self.succeeded_at: + return + + accumulated_output = {} + + preceding_step_record = None + for PipelineStep in self.steps: + input_data = self.get_input_data_for_step_type(PipelineStep) + kwargs = { + 'pipeline_record': self, + 'defaults': { + 'input_data': input_data, + } + } + if preceding_step_record: + kwargs['preceding_step'] = preceding_step_record + + step_record, _ = PipelineStep.objects.get_or_create(**kwargs) + preceding_step_record = step_record + if step_record.succeeded_at: + accumulated_output.update(step_record.output_data) + continue + + step_output = step_record.execute(**accumulated_output) + accumulated_output.update(step_output) + + return accumulated_output + + +class AbstractPipelineStep(AbstractUnitOfWork): + """ + """ + class Meta: + abstract = True + + @property + def pipeline(self): + """ + Concrete implementations should define a FK field to + the concrete pipeline model, named `pipeline_record`. + """ + return self.pipeline_record + + @property + def preceding_step(self): + """ + Concrete implementations can define a FK field to + the preceding, concrete step model, called `preceding_step`. + If this step is the first, it won't have a preceding step. + """ + return getattr(self, 'preceding_step', None) + + +class PizzaPipelineStep(AbstractPipelineStep): + class Meta: + abstract = True + + pipeline_record = models.ForeignKey( + 'PizzaPipeline', + null=False, + on_delete=models.CASCADE, + ) + + +class StretchDoughStep(PizzaPipelineStep): + def process_input(self, **kwargs): + print('Stretching dough...') + crust_style = self.input_data.get('crust_style') + print(crust_style) + + result = {**self.input_data} + result.update(kwargs) + result['is_saucy'] = crust_style != 'thin' + return result + + +class AddToppingsStep(PizzaPipelineStep): + preceding_step = models.ForeignKey( + StretchDoughStep, + on_delete=models.CASCADE, + ) + + def process_input(self, **kwargs): + """ + Depends on output from prior step. + """ + print('Adding toppings to whole pizza...') + print(self.input_data.get('whole_pie')) + + print('Adding toppings to half pizza...') + print(self.input_data.get('half_pie')) + + print('Sauciness:') + print(kwargs.get('is_saucy')) + + result = {**self.input_data} + result.update(kwargs) + return result + + +class BakePizzaStep(PizzaPipelineStep): + preceding_step = models.ForeignKey( + AddToppingsStep, + on_delete=models.CASCADE, + ) + + def process_input(self, **kwargs): + """ + Depends on output from prior step. + """ + print('Baking...') + doneness = self.input_data.get('doneness') + print(doneness) + + result = {**self.input_data} + result.update(kwargs) + return result + +class PizzaPipeline(AbstractPipeline): + """ + Concrete pipeline to bake a pizza. + """ + steps = [ + StretchDoughStep, + AddToppingsStep, + BakePizzaStep, + ] + + def get_input_data_for_step_type(self, step_type): + if step_type == StretchDoughStep: + return self.input_data.get('stretch_dough') + elif step_type == AddToppingsStep: + return self.input_data.get('toppings') + elif step_type == BakePizzaStep: + return self.input_data.get('bake') + + @classmethod + def test_input(cls): + return { + 'stretch_dough': { + 'crust_style': 'thin', + }, + 'toppings': { + 'whole_pie': ['bacon'], + 'half_pie': ['pineapple'], + }, + 'bake': { + 'doneness': 'well_done', + } + } diff --git a/enterprise_access/apps/pipeline/tests.py b/enterprise_access/apps/pipeline/tests.py new file mode 100644 index 000000000..7ce503c2d --- /dev/null +++ b/enterprise_access/apps/pipeline/tests.py @@ -0,0 +1,3 @@ +from django.test import TestCase + +# Create your tests here. diff --git a/enterprise_access/apps/pipeline/views.py b/enterprise_access/apps/pipeline/views.py new file mode 100644 index 000000000..91ea44a21 --- /dev/null +++ b/enterprise_access/apps/pipeline/views.py @@ -0,0 +1,3 @@ +from django.shortcuts import render + +# Create your views here. diff --git a/enterprise_access/settings/base.py b/enterprise_access/settings/base.py index b897535e9..e4b82c06f 100644 --- a/enterprise_access/settings/base.py +++ b/enterprise_access/settings/base.py @@ -78,6 +78,7 @@ def root(*path_fragments): 'enterprise_access.apps.content_assignments', 'enterprise_access.apps.enterprise_groups', 'enterprise_access.apps.bffs', + 'enterprise_access.apps.pipeline', ) INSTALLED_APPS += THIRD_PARTY_APPS From 016049aeafe9334393050afa6594fb6f464c5804 Mon Sep 17 00:00:00 2001 From: Alexander Dusenbery Date: Fri, 24 Jan 2025 14:48:51 -0500 Subject: [PATCH 2/3] checkpoint: using attrs and cattrs --- enterprise_access/apps/pipeline/models.py | 189 +++++++++++++++++----- 1 file changed, 147 insertions(+), 42 deletions(-) diff --git a/enterprise_access/apps/pipeline/models.py b/enterprise_access/apps/pipeline/models.py index fab3b2212..15cf029ac 100644 --- a/enterprise_access/apps/pipeline/models.py +++ b/enterprise_access/apps/pipeline/models.py @@ -3,6 +3,8 @@ import collections from uuid import uuid4 +from attrs import asdict, define, field, make_class, Factory +from cattrs import structure, unstructure from django.conf import settings from django.core.exceptions import ValidationError from django.db import models @@ -16,6 +18,11 @@ from simple_history.utils import bulk_update_with_history +@define +class Empty: + pass + + class AbstractUnitOfWork(TimeStampedModel, SoftDeletableModel): """ An abstract models that encapsulates the following: @@ -29,6 +36,9 @@ class AbstractUnitOfWork(TimeStampedModel, SoftDeletableModel): class Meta: abstract = True + input_class = Empty + output_class = Empty + uuid = models.UUIDField( primary_key=True, default=uuid4, @@ -53,13 +63,22 @@ class Meta: blank=True, ) + @property + def input_object(self): + return self.input_class(**self.input_data) + + @property + def output_object(self): + return self.output_class(**self.output_data) + def process_input(self, **kwargs): raise NotImplementedError def execute(self, **kwargs): + breakpoint() try: result = self.process_input(**kwargs) - self.output_data = result + self.output_data = asdict(result) self.succeeded_at = timezone.now() except: self.failed_at = timezone.now() @@ -76,23 +95,30 @@ class Meta: abstract = True steps = [] + input_class = Empty + output_class = Empty - def get_input_data_for_step_type(self, step_type): - raise NotImplementedError + @property + def input_object(self): + return structure(self.input_data, self.input_class) + + def get_input_object_for_step_type(self, step_type): + return getattr(self.input_object, step_type.input_class.KEY, None) def process_input(self, **kwargs): if self.succeeded_at: return - accumulated_output = {} + accumulated_output = self.output_class() preceding_step_record = None for PipelineStep in self.steps: - input_data = self.get_input_data_for_step_type(PipelineStep) + breakpoint() + input_object = self.get_input_object_for_step_type(PipelineStep) kwargs = { 'pipeline_record': self, 'defaults': { - 'input_data': input_data, + 'input_data': asdict(input_object), } } if preceding_step_record: @@ -101,11 +127,19 @@ def process_input(self, **kwargs): step_record, _ = PipelineStep.objects.get_or_create(**kwargs) preceding_step_record = step_record if step_record.succeeded_at: - accumulated_output.update(step_record.output_data) + setattr( + accumulated_output, + PipelineStep.output_class.KEY, + step_record.output_object, + ) continue - step_output = step_record.execute(**accumulated_output) - accumulated_output.update(step_output) + step_output = step_record.execute() + setattr( + accumulated_output, + PipelineStep.output_class.KEY, + step_output, + ) return accumulated_output @@ -145,19 +179,54 @@ class Meta: ) +@define +class StretchDoughInput: + KEY = 'stretch_dough_input' + + crust_style: str + + +@define +class StretchDoughOutput: + KEY = 'stretch_dough_output' + + is_saucy: bool = False + + class StretchDoughStep(PizzaPipelineStep): + input_class = StretchDoughInput + output_class = StretchDoughOutput + def process_input(self, **kwargs): print('Stretching dough...') - crust_style = self.input_data.get('crust_style') - print(crust_style) + print(self.input_object) - result = {**self.input_data} - result.update(kwargs) - result['is_saucy'] = crust_style != 'thin' - return result + output_object = self.output_class( + is_saucy=(crust_style != 'thin'), + ) + return asdict(output_object) + + +@define +class ToppingsInput: + KEY = 'toppings_input' + + whole_pie: list[str] = Factory(list) + left_half_pie: list[str] = Factory(list) + right_half_pie: list[str] = Factory(list) + + +@define +class ToppingsOutput: + KEY = 'toppings_output' + + is_awesome: bool = False class AddToppingsStep(PizzaPipelineStep): + input_class = ToppingsInput + output_class = ToppingsOutput + preceding_step = models.ForeignKey( StretchDoughStep, on_delete=models.CASCADE, @@ -167,21 +236,40 @@ def process_input(self, **kwargs): """ Depends on output from prior step. """ + breakpoint() print('Adding toppings to whole pizza...') - print(self.input_data.get('whole_pie')) + print(self.input_object.whole_pie) - print('Adding toppings to half pizza...') - print(self.input_data.get('half_pie')) + print('Adding toppings to left half of pizza...') + print(self.input_object.left_half_pie) - print('Sauciness:') - print(kwargs.get('is_saucy')) + print('Adding toppings to right half of pizza...') + print(self.input_object.right_half_pie) - result = {**self.input_data} - result.update(kwargs) - return result + output_object = self.output_class( + is_awesome=True, + ) + return output_object + + +@define +class BakeInput: + KEY = 'bake_input' + + doneness: str = 'regular' + + +@define +class BakeOutput: + KEY = 'bake_output' + + structural_integrity: str = 'good' class BakePizzaStep(PizzaPipelineStep): + input_class = BakeInput + output_class = BakeInput + preceding_step = models.ForeignKey( AddToppingsStep, on_delete=models.CASCADE, @@ -192,12 +280,32 @@ def process_input(self, **kwargs): Depends on output from prior step. """ print('Baking...') - doneness = self.input_data.get('doneness') - print(doneness) + print(self.input_object.doneness) + + output_object = self.output_class(structural_integrity='excellent') + return output_object + + + +PizzaPipelineInput = make_class( + 'PizzaPipelineInput', + { + StretchDoughInput.KEY: field(type=StretchDoughInput), + ToppingsInput.KEY: field(type=ToppingsInput), + BakeInput.KEY: field(type=BakeInput), + }, +) + + +PizzaPipelineOutput = make_class( + 'PizzaPipelineOutput', + { + StretchDoughOutput.KEY: field(type=StretchDoughOutput, default=None), + ToppingsOutput.KEY: field(type=ToppingsOutput, default=None), + BakeOutput.KEY: field(type=BakeOutput, default=None), + }, +) - result = {**self.input_data} - result.update(kwargs) - return result class PizzaPipeline(AbstractPipeline): """ @@ -208,26 +316,23 @@ class PizzaPipeline(AbstractPipeline): AddToppingsStep, BakePizzaStep, ] - - def get_input_data_for_step_type(self, step_type): - if step_type == StretchDoughStep: - return self.input_data.get('stretch_dough') - elif step_type == AddToppingsStep: - return self.input_data.get('toppings') - elif step_type == BakePizzaStep: - return self.input_data.get('bake') + input_class = PizzaPipelineInput + output_class = PizzaPipelineOutput @classmethod - def test_input(cls): - return { - 'stretch_dough': { + def run_test(cls): + test_input = { + StretchDoughInput.KEY: { 'crust_style': 'thin', }, - 'toppings': { + ToppingsInput.KEY: { 'whole_pie': ['bacon'], - 'half_pie': ['pineapple'], + 'left_half_pie': ['pineapple'], }, - 'bake': { + BakeInput.KEY: { 'doneness': 'well_done', } } + pipeline = cls.objects.create(input_data=test_input) + pipeline.execute() + return pipeline From d7bccaf392ee0852a970ef1c85e4ff7c701f0035 Mon Sep 17 00:00:00 2001 From: Alexander Dusenbery Date: Mon, 27 Jan 2025 10:25:38 -0500 Subject: [PATCH 3/3] feat: admin classes, pass along accumulated output --- enterprise_access/apps/pipeline/admin.py | 57 ++++++++++++++++++- ...er_addtoppingsstep_output_data_and_more.py | 34 +++++++++++ enterprise_access/apps/pipeline/models.py | 46 +++++++++------ 3 files changed, 120 insertions(+), 17 deletions(-) create mode 100644 enterprise_access/apps/pipeline/migrations/0002_alter_addtoppingsstep_output_data_and_more.py diff --git a/enterprise_access/apps/pipeline/admin.py b/enterprise_access/apps/pipeline/admin.py index 8c38f3f3d..461dbc8a7 100644 --- a/enterprise_access/apps/pipeline/admin.py +++ b/enterprise_access/apps/pipeline/admin.py @@ -1,3 +1,58 @@ from django.contrib import admin -# Register your models here. +from .models import ( + PizzaPipeline, + StretchDoughStep, + AddToppingsStep, + BakePizzaStep, +) + + +@admin.register(PizzaPipeline) +class PizzaPipelineAdmin(admin.ModelAdmin): + list_display = [ + 'uuid', + 'succeeded_at', + 'failed_at', + 'input_data', + ] + + list_filter = [ + ('succeeded_at', admin.EmptyFieldListFilter), + ('failed_at', admin.EmptyFieldListFilter), + ] + + readonly_fields = [ + 'output_data', + 'succeeded_at', + 'failed_at', + ] + + ordering = ['-succeeded_at'] + + +@admin.register(StretchDoughStep) +class StretchDoughStepAdmin(admin.ModelAdmin): + readonly_fields = [ + 'output_data', + 'succeeded_at', + 'failed_at', + ] + + +@admin.register(AddToppingsStep) +class AddToppingsStepAdmin(admin.ModelAdmin): + readonly_fields = [ + 'output_data', + 'succeeded_at', + 'failed_at', + ] + + +@admin.register(BakePizzaStep) +class BakePizzaStepAdmin(admin.ModelAdmin): + readonly_fields = [ + 'output_data', + 'succeeded_at', + 'failed_at', + ] diff --git a/enterprise_access/apps/pipeline/migrations/0002_alter_addtoppingsstep_output_data_and_more.py b/enterprise_access/apps/pipeline/migrations/0002_alter_addtoppingsstep_output_data_and_more.py new file mode 100644 index 000000000..74d27c8fa --- /dev/null +++ b/enterprise_access/apps/pipeline/migrations/0002_alter_addtoppingsstep_output_data_and_more.py @@ -0,0 +1,34 @@ +# Generated by Django 4.2.17 on 2025-01-27 15:20 + +from django.db import migrations +import jsonfield.fields + + +class Migration(migrations.Migration): + + dependencies = [ + ('pipeline', '0001_initial'), + ] + + operations = [ + migrations.AlterField( + model_name='addtoppingsstep', + name='output_data', + field=jsonfield.fields.JSONField(blank=True, null=True), + ), + migrations.AlterField( + model_name='bakepizzastep', + name='output_data', + field=jsonfield.fields.JSONField(blank=True, null=True), + ), + migrations.AlterField( + model_name='pizzapipeline', + name='output_data', + field=jsonfield.fields.JSONField(blank=True, null=True), + ), + migrations.AlterField( + model_name='stretchdoughstep', + name='output_data', + field=jsonfield.fields.JSONField(blank=True, null=True), + ), + ] diff --git a/enterprise_access/apps/pipeline/models.py b/enterprise_access/apps/pipeline/models.py index 15cf029ac..94c9adad8 100644 --- a/enterprise_access/apps/pipeline/models.py +++ b/enterprise_access/apps/pipeline/models.py @@ -52,7 +52,6 @@ class Meta: output_data = JSONField( blank=True, null=True, - editable=False, ) succeeded_at = models.DateTimeField( null=True, @@ -71,13 +70,15 @@ def input_object(self): def output_object(self): return self.output_class(**self.output_data) - def process_input(self, **kwargs): + def process_input(self, accumulated_output=None, **kwargs): raise NotImplementedError - def execute(self, **kwargs): - breakpoint() + def execute(self, accumulated_output=None, **kwargs): try: - result = self.process_input(**kwargs) + result = self.process_input( + accumulated_output=accumulated_output, + **kwargs, + ) self.output_data = asdict(result) self.succeeded_at = timezone.now() except: @@ -87,6 +88,9 @@ def execute(self, **kwargs): self.save() return result + def __str__(self): + return str(self.uuid) + str(self.input_class) + str(self.output_class) + class AbstractPipeline(AbstractUnitOfWork): """ @@ -113,18 +117,17 @@ def process_input(self, **kwargs): preceding_step_record = None for PipelineStep in self.steps: - breakpoint() input_object = self.get_input_object_for_step_type(PipelineStep) - kwargs = { + step_record_kwargs = { 'pipeline_record': self, 'defaults': { 'input_data': asdict(input_object), } } if preceding_step_record: - kwargs['preceding_step'] = preceding_step_record + step_record_kwargs['preceding_step'] = preceding_step_record - step_record, _ = PipelineStep.objects.get_or_create(**kwargs) + step_record, _ = PipelineStep.objects.get_or_create(**step_record_kwargs) preceding_step_record = step_record if step_record.succeeded_at: setattr( @@ -134,7 +137,7 @@ def process_input(self, **kwargs): ) continue - step_output = step_record.execute() + step_output = step_record.execute(accumulated_output=accumulated_output) setattr( accumulated_output, PipelineStep.output_class.KEY, @@ -197,7 +200,7 @@ class StretchDoughStep(PizzaPipelineStep): input_class = StretchDoughInput output_class = StretchDoughOutput - def process_input(self, **kwargs): + def process_input(self, accumulated_output=None, **kwargs): print('Stretching dough...') print(self.input_object) @@ -232,11 +235,9 @@ class AddToppingsStep(PizzaPipelineStep): on_delete=models.CASCADE, ) - def process_input(self, **kwargs): + def process_input(self, accumulated_output=None, **kwargs): """ - Depends on output from prior step. """ - breakpoint() print('Adding toppings to whole pizza...') print(self.input_object.whole_pie) @@ -268,20 +269,33 @@ class BakeOutput: class BakePizzaStep(PizzaPipelineStep): input_class = BakeInput - output_class = BakeInput + output_class = BakeOutput preceding_step = models.ForeignKey( AddToppingsStep, on_delete=models.CASCADE, ) - def process_input(self, **kwargs): + def _are_toppings_awesome(self, accumulated_output): + if not accumulated_output: + return False + + toppings_output = getattr(accumulated_output, ToppingsOutput.KEY, None) + if not toppings_output: + return False + + return toppings_output.is_awesome + + def process_input(self, accumulated_output=None, **kwargs): """ Depends on output from prior step. """ print('Baking...') print(self.input_object.doneness) + if self._are_toppings_awesome(accumulated_output): + print('Baking extra good because the toppings were awesome!') + output_object = self.output_class(structural_integrity='excellent') return output_object