Skip to content

Commit 38c740d

Browse files
Ken LippoldKen Lippold
authored andcommitted
Decoupled ETL settings
1 parent 2d68fd9 commit 38c740d

11 files changed

+332
-67
lines changed
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
# Generated by Django 5.2b1 on 2025-03-28 18:25
2+
3+
import django.db.models.deletion
4+
from django.db import migrations, models
5+
6+
7+
class Migration(migrations.Migration):
8+
9+
dependencies = [
10+
("etl", "0001_initial"),
11+
]
12+
13+
operations = [
14+
migrations.RenameField(
15+
model_name="datasource",
16+
old_name="etl_configuration_settings",
17+
new_name="extractor_configuration_settings",
18+
),
19+
migrations.RenameField(
20+
model_name="linkeddatastream",
21+
old_name="etl_configuration_settings",
22+
new_name="extractor_configuration_settings",
23+
),
24+
migrations.RemoveField(
25+
model_name="datasource",
26+
name="etl_configuration",
27+
),
28+
migrations.RemoveField(
29+
model_name="linkeddatastream",
30+
name="etl_configuration",
31+
),
32+
migrations.AddField(
33+
model_name="datasource",
34+
name="extractor_configuration",
35+
field=models.ForeignKey(
36+
blank=True,
37+
null=True,
38+
on_delete=django.db.models.deletion.DO_NOTHING,
39+
related_name="data_source_extractor_set",
40+
to="etl.etlconfiguration",
41+
),
42+
),
43+
migrations.AddField(
44+
model_name="datasource",
45+
name="loader_configuration",
46+
field=models.ForeignKey(
47+
blank=True,
48+
null=True,
49+
on_delete=django.db.models.deletion.DO_NOTHING,
50+
related_name="data_source_loader_set",
51+
to="etl.etlconfiguration",
52+
),
53+
),
54+
migrations.AddField(
55+
model_name="datasource",
56+
name="loader_configuration_settings",
57+
field=models.JSONField(blank=True, null=True),
58+
),
59+
migrations.AddField(
60+
model_name="datasource",
61+
name="transformer_configuration",
62+
field=models.ForeignKey(
63+
blank=True,
64+
null=True,
65+
on_delete=django.db.models.deletion.DO_NOTHING,
66+
related_name="data_source_transformer_set",
67+
to="etl.etlconfiguration",
68+
),
69+
),
70+
migrations.AddField(
71+
model_name="datasource",
72+
name="transformer_configuration_settings",
73+
field=models.JSONField(blank=True, null=True),
74+
),
75+
migrations.AddField(
76+
model_name="etlconfiguration",
77+
name="etl_configuration_model",
78+
field=models.CharField(default="datasource", max_length=255),
79+
preserve_default=False,
80+
),
81+
migrations.AddField(
82+
model_name="linkeddatastream",
83+
name="extractor_configuration",
84+
field=models.ForeignKey(
85+
blank=True,
86+
null=True,
87+
on_delete=django.db.models.deletion.DO_NOTHING,
88+
related_name="linked_datastreams_extractor",
89+
to="etl.etlconfiguration",
90+
),
91+
),
92+
migrations.AddField(
93+
model_name="linkeddatastream",
94+
name="loader_configuration",
95+
field=models.ForeignKey(
96+
blank=True,
97+
null=True,
98+
on_delete=django.db.models.deletion.DO_NOTHING,
99+
related_name="linked_datastreams_loader",
100+
to="etl.etlconfiguration",
101+
),
102+
),
103+
migrations.AddField(
104+
model_name="linkeddatastream",
105+
name="loader_configuration_settings",
106+
field=models.JSONField(blank=True, null=True),
107+
),
108+
migrations.AddField(
109+
model_name="linkeddatastream",
110+
name="transformer_configuration",
111+
field=models.ForeignKey(
112+
blank=True,
113+
null=True,
114+
on_delete=django.db.models.deletion.DO_NOTHING,
115+
related_name="linked_datastreams_transformer",
116+
to="etl.etlconfiguration",
117+
),
118+
),
119+
migrations.AddField(
120+
model_name="linkeddatastream",
121+
name="transformer_configuration_settings",
122+
field=models.JSONField(blank=True, null=True),
123+
),
124+
]

etl/models/data_source.py

Lines changed: 43 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,7 @@
33
from typing import Literal, Optional
44
from django.db import models
55
from django.db.models import Q
6-
from iam.models import Workspace
76
from iam.models.utils import PermissionChecker
8-
from sta.models import Datastream
9-
from .etl_system import EtlSystem
10-
from .etl_configuration import EtlConfiguration
117

128
if typing.TYPE_CHECKING:
139
from django.contrib.auth import get_user_model
@@ -40,11 +36,11 @@ def visible(self, user: "User"):
4036
class DataSource(models.Model, PermissionChecker):
4137
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
4238
workspace = models.ForeignKey(
43-
Workspace, related_name="data_sources", on_delete=models.DO_NOTHING
39+
"iam.Workspace", related_name="data_sources", on_delete=models.DO_NOTHING
4440
)
4541
name = models.CharField(max_length=255)
4642
etl_system = models.ForeignKey(
47-
EtlSystem, related_name="data_sources", on_delete=models.DO_NOTHING
43+
"EtlSystem", related_name="data_sources", on_delete=models.DO_NOTHING
4844
)
4945
interval = models.PositiveIntegerField(blank=True, null=True)
5046
interval_units = models.CharField(max_length=255, blank=True, null=True)
@@ -56,14 +52,30 @@ class DataSource(models.Model, PermissionChecker):
5652
last_run_message = models.TextField(blank=True, null=True)
5753
last_run = models.DateTimeField(blank=True, null=True)
5854
next_run = models.DateTimeField(blank=True, null=True)
59-
etl_configuration = models.ForeignKey(
60-
EtlConfiguration,
61-
related_name="data_sources",
55+
extractor_configuration = models.ForeignKey(
56+
"EtlConfiguration",
57+
related_name="data_source_extractor_set",
6258
on_delete=models.DO_NOTHING,
6359
blank=True,
6460
null=True,
6561
)
66-
etl_configuration_settings = models.JSONField(blank=True, null=True)
62+
extractor_configuration_settings = models.JSONField(blank=True, null=True)
63+
transformer_configuration = models.ForeignKey(
64+
"EtlConfiguration",
65+
related_name="data_source_transformer_set",
66+
on_delete=models.DO_NOTHING,
67+
blank=True,
68+
null=True,
69+
)
70+
transformer_configuration_settings = models.JSONField(blank=True, null=True)
71+
loader_configuration = models.ForeignKey(
72+
"EtlConfiguration",
73+
related_name="data_source_loader_set",
74+
on_delete=models.DO_NOTHING,
75+
blank=True,
76+
null=True,
77+
)
78+
loader_configuration_settings = models.JSONField(blank=True, null=True)
6779

6880
objects = DataSourceQuerySet.as_manager()
6981

@@ -101,13 +113,29 @@ class LinkedDatastream(models.Model):
101113
DataSource, related_name="linked_datastreams", on_delete=models.DO_NOTHING
102114
)
103115
datastream = models.OneToOneField(
104-
Datastream, related_name="data_source", on_delete=models.DO_NOTHING
116+
"sta.Datastream", related_name="data_source", on_delete=models.DO_NOTHING
117+
)
118+
extractor_configuration = models.ForeignKey(
119+
"EtlConfiguration",
120+
related_name="linked_datastreams_extractor",
121+
on_delete=models.DO_NOTHING,
122+
blank=True,
123+
null=True,
124+
)
125+
extractor_configuration_settings = models.JSONField(blank=True, null=True)
126+
transformer_configuration = models.ForeignKey(
127+
"EtlConfiguration",
128+
related_name="linked_datastreams_transformer",
129+
on_delete=models.DO_NOTHING,
130+
blank=True,
131+
null=True,
105132
)
106-
etl_configuration = models.ForeignKey(
107-
EtlConfiguration,
108-
related_name="linked_datastreams",
133+
transformer_configuration_settings = models.JSONField(blank=True, null=True)
134+
loader_configuration = models.ForeignKey(
135+
"EtlConfiguration",
136+
related_name="linked_datastreams_loader",
109137
on_delete=models.DO_NOTHING,
110138
blank=True,
111139
null=True,
112140
)
113-
etl_configuration_settings = models.JSONField(blank=True, null=True)
141+
loader_configuration_settings = models.JSONField(blank=True, null=True)

etl/models/etl_configuration.py

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from django.db.models import Q
66
from iam.models import Workspace
77
from iam.models.utils import PermissionChecker
8-
from .etl_system_platform import EtlSystemPlatform
8+
from .data_source import DataSource, LinkedDatastream
99

1010
if typing.TYPE_CHECKING:
1111
from django.contrib.auth import get_user_model
@@ -41,14 +41,31 @@ def visible(self, user: Optional["User"]):
4141
class EtlConfiguration(models.Model, PermissionChecker):
4242
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
4343
etl_system_platform = models.ForeignKey(
44-
EtlSystemPlatform,
44+
"EtlSystemPlatform",
4545
related_name="etl_configurations",
4646
on_delete=models.DO_NOTHING,
4747
)
4848
name = models.CharField(max_length=255)
49+
etl_configuration_model = models.CharField(max_length=255)
4950
etl_configuration_type = models.CharField(max_length=255)
5051
etl_configuration_schema = models.JSONField()
5152

53+
@property
54+
def data_sources(self):
55+
return DataSource.objects.filter(
56+
models.Q(extractor_configuration=self) |
57+
models.Q(transformer_configuration=self) |
58+
models.Q(loader_configuration=self)
59+
).distinct()
60+
61+
@property
62+
def linked_datastreams(self):
63+
return LinkedDatastream.objects.filter(
64+
models.Q(extractor_configuration=self) |
65+
models.Q(transformer_configuration=self) |
66+
models.Q(loader_configuration=self)
67+
).distinct()
68+
5269
objects = EtlConfigurationQuerySet.as_manager()
5370

5471
@classmethod
@@ -74,17 +91,17 @@ def delete(self, *args, **kwargs):
7491

7592
@staticmethod
7693
def delete_contents(filter_arg: models.Model, filter_suffix: Optional[str]):
77-
from etl.models import DataSource, LinkedDatastream
94+
etl_configuration_relation_filters = [
95+
"extractor_configuration",
96+
"transformer_configuration",
97+
"loader_configuration",
98+
]
7899

79-
etl_configuration_relation_filter = (
80-
f"etl_configuration__{filter_suffix}"
81-
if filter_suffix
82-
else "etl_configuration"
83-
)
100+
if filter_suffix:
101+
etl_configuration_relation_filters = [
102+
f"{relation}__{filter_suffix}" for relation in etl_configuration_relation_filters
103+
]
84104

85-
DataSource.objects.filter(
86-
**{etl_configuration_relation_filter: filter_arg}
87-
).update(etl_configuration=None)
88-
LinkedDatastream.objects.filter(
89-
**{etl_configuration_relation_filter: filter_arg}
90-
).update(etl_configuration=None)
105+
for relation in etl_configuration_relation_filters:
106+
DataSource.objects.filter(**{relation: filter_arg}).update(**{relation: None})
107+
LinkedDatastream.objects.filter(**{relation: filter_arg}).update(**{relation: None})

etl/schemas/data_source.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,12 @@ class LinkedDatastreamProperties(BaseGetResponse, Schema):
2424

2525

2626
class LinkedDatastreamFields(Schema):
27-
etl_configuration_id: Optional[uuid.UUID] = None
28-
etl_configuration_settings: Optional[dict] = None
27+
extractor_configuration_id: Optional[uuid.UUID] = None
28+
extractor_configuration_settings: Optional[dict] = None
29+
transformer_configuration_id: Optional[uuid.UUID] = None
30+
transformer_configuration_settings: Optional[dict] = None
31+
loader_configuration_id: Optional[uuid.UUID] = None
32+
loader_configuration_settings: Optional[dict] = None
2933

3034

3135
class LinkedDatastreamGetResponse(BaseGetResponse, LinkedDatastreamFields):
@@ -53,8 +57,12 @@ class DataSourceFields(Schema):
5357
last_run_message: Optional[str] = Field(None, max_length=255)
5458
last_run: Optional[datetime] = None
5559
next_run: Optional[datetime] = None
56-
etl_configuration_id: Optional[uuid.UUID] = None
57-
etl_configuration_settings: Optional[dict] = None
60+
extractor_configuration_id: Optional[uuid.UUID] = None
61+
extractor_configuration_settings: Optional[dict] = None
62+
transformer_configuration_id: Optional[uuid.UUID] = None
63+
transformer_configuration_settings: Optional[dict] = None
64+
loader_configuration_id: Optional[uuid.UUID] = None
65+
loader_configuration_settings: Optional[dict] = None
5866

5967

6068
class DataSourceGetResponse(BaseGetResponse, DataSourceFields):

etl/schemas/etl_configuration.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77

88
class EtlConfigurationFields(Schema):
99
name: str = Field(..., max_length=255)
10-
etl_configuration_type: Literal["DataSource", "Datastream"] = Field(
10+
etl_configuration_model: Literal["datasource", "datastream"]
11+
etl_configuration_type: Literal["extractor", "transformer", "loader"] = Field(
1112
..., alias="type"
1213
)
1314
etl_configuration_schema: dict = Field(..., alias="schema")

0 commit comments

Comments
 (0)