Skip to content

Commit 2343f78

Browse files
authored
Add history events for changes made during deployments (#1600)
* Add history events for changes made during deployments * Save history inline * Remove save history events callable from deployment context
1 parent 9e8fc80 commit 2343f78

File tree

4 files changed

+275
-8
lines changed

4 files changed

+275
-8
lines changed

datajunction-server/datajunction_server/api/deployments.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
import asyncio
66
import logging
7-
from typing import Callable
87
import uuid
98

109
from fastapi import Depends, BackgroundTasks, Request
@@ -16,7 +15,6 @@
1615
from datajunction_server.internal.caching.cachelib_cache import get_cache
1716
from datajunction_server.internal.caching.interface import Cache
1817
from datajunction_server.service_clients import QueryServiceClient
19-
from datajunction_server.api.helpers import get_save_history
2018
from datajunction_server.models.deployment import (
2119
DeploymentResult,
2220
DeploymentSpec,
@@ -159,7 +157,6 @@ async def create_deployment(
159157
session: AsyncSession = Depends(get_session),
160158
current_user: User = Depends(get_current_user),
161159
query_service_client: QueryServiceClient = Depends(get_query_service_client),
162-
save_history: Callable = Depends(get_save_history),
163160
cache: Cache = Depends(get_cache),
164161
validate_access: access.ValidateAccessFn = Depends(
165162
validate_access,
@@ -176,7 +173,6 @@ async def create_deployment(
176173
current_user=current_user,
177174
request=request,
178175
query_service_client=query_service_client,
179-
save_history=save_history,
180176
validate_access=validate_access,
181177
background_tasks=background_tasks,
182178
cache=cache,

datajunction-server/datajunction_server/internal/deployment/orchestrator.py

Lines changed: 84 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@
8989
LinkType,
9090
)
9191
from datajunction_server.models.history import ActivityType
92+
from datajunction_server.database.history import History
93+
from datajunction_server.internal.history import EntityType
9294
from datajunction_server.models.node import (
9395
NodeStatus,
9496
NodeType,
@@ -638,7 +640,6 @@ async def _bulk_delete_links(self, to_delete, node_spec) -> list[DeploymentResul
638640
and link.role == delete_link.role
639641
):
640642
link_ids_to_delete.append(link.id) # type: ignore
641-
# await self.session.delete(link)
642643
delete_results.append(
643644
DeploymentResult(
644645
name=link_name,
@@ -647,6 +648,23 @@ async def _bulk_delete_links(self, to_delete, node_spec) -> list[DeploymentResul
647648
operation=DeploymentResult.Operation.DELETE,
648649
),
649650
)
651+
652+
# Track history for link deletion
653+
self.session.add(
654+
History(
655+
entity_type=EntityType.LINK,
656+
entity_name=node.name,
657+
node=node.name,
658+
activity_type=ActivityType.DELETE,
659+
details={
660+
"dimension_node": delete_link.rendered_dimension_node,
661+
"role": delete_link.role,
662+
"deployment_id": self.deployment_id,
663+
},
664+
user=self.context.current_user.username,
665+
),
666+
)
667+
650668
if link_ids_to_delete:
651669
await self.session.execute(
652670
DimensionLink.__table__.delete().where(
@@ -754,6 +772,31 @@ async def _create_or_update_dimension_link(
754772
)
755773
role_suffix = f"[{link_spec.role}]" if link_spec.role else ""
756774
await self.session.flush()
775+
776+
# Track history for dimension link create/update (skip REFRESH/NOOP)
777+
if activity_type in (ActivityType.CREATE, ActivityType.UPDATE):
778+
link_details = {
779+
"dimension_node": dimension_node.name,
780+
"link_type": link_spec.type,
781+
"role": link_spec.role,
782+
"deployment_id": self.deployment_id,
783+
}
784+
if link_spec.type == LinkType.JOIN:
785+
join_link = cast(DimensionJoinLinkSpec, link_spec)
786+
link_details["join_type"] = join_link.join_type
787+
link_details["join_on"] = join_link.rendered_join_on
788+
789+
self.session.add(
790+
History(
791+
entity_type=EntityType.LINK,
792+
entity_name=new_revision.name,
793+
node=new_revision.name,
794+
activity_type=activity_type,
795+
details=link_details,
796+
user=self.context.current_user.username,
797+
),
798+
)
799+
757800
return DeploymentResult(
758801
name=f"{new_revision.name} -> {dimension_node.name}" + role_suffix,
759802
deploy_type=DeploymentResult.Type.LINK,
@@ -1132,6 +1175,24 @@ async def _create_cubes_from_validation(
11321175
)
11331176
)
11341177

1178+
# Track history for cube create/update operations
1179+
activity_type = ActivityType.UPDATE if existing else ActivityType.CREATE
1180+
self.session.add(
1181+
History(
1182+
entity_type=EntityType.NODE,
1183+
entity_name=cube_spec.rendered_name,
1184+
node=cube_spec.rendered_name,
1185+
activity_type=activity_type,
1186+
details={
1187+
"version": new_node.current_version,
1188+
"deployment_id": self.deployment_id,
1189+
"metrics": cube_spec.rendered_metrics,
1190+
"dimensions": cube_spec.rendered_dimensions,
1191+
},
1192+
user=self.context.current_user.username,
1193+
),
1194+
)
1195+
11351196
# Create deployment result
11361197
deployment_result = DeploymentResult(
11371198
name=cube_spec.rendered_name,
@@ -1219,12 +1280,16 @@ async def _delete_nodes(self, to_delete: list[NodeSpec]) -> list[DeploymentResul
12191280
]
12201281

12211282
async def _deploy_delete_node(self, name: str) -> DeploymentResult:
1283+
async def add_history(event, session):
1284+
"""Add history to session without committing"""
1285+
session.add(event)
1286+
12221287
try:
12231288
await hard_delete_node(
12241289
name=name,
12251290
session=self.session,
12261291
current_user=self.context.current_user,
1227-
save_history=self.context.save_history,
1292+
save_history=add_history,
12281293
)
12291294
return DeploymentResult(
12301295
name=name,
@@ -1523,6 +1588,23 @@ async def _process_valid_node_deploy(
15231588
self.session.add(new_node)
15241589
self.session.add(new_revision)
15251590
await self.session.flush()
1591+
1592+
# Track history for create/update operations
1593+
activity_type = ActivityType.UPDATE if existing else ActivityType.CREATE
1594+
self.session.add(
1595+
History(
1596+
entity_type=EntityType.NODE,
1597+
entity_name=result.spec.rendered_name,
1598+
node=result.spec.rendered_name,
1599+
activity_type=activity_type,
1600+
details={
1601+
"version": new_node.current_version,
1602+
"deployment_id": self.deployment_id,
1603+
},
1604+
user=self.context.current_user.username,
1605+
),
1606+
)
1607+
15261608
deployment_result = DeploymentResult(
15271609
name=result.spec.rendered_name,
15281610
deploy_type=DeploymentResult.Type.NODE,

datajunction-server/datajunction_server/internal/deployment/utils.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
from typing import Callable
21
from fastapi import Request, BackgroundTasks
32

43
from collections import defaultdict
@@ -115,7 +114,6 @@ class DeploymentContext:
115114
current_user: User
116115
request: Request
117116
query_service_client: QueryServiceClient
118-
save_history: Callable
119117
validate_access: access.ValidateAccessFn
120118
background_tasks: BackgroundTasks
121119
cache: Cache

datajunction-server/tests/api/deployments_test.py

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2853,3 +2853,194 @@ async def test_print_roads_spec(roads_nodes):
28532853
)
28542854
print("Roads Spec:", json.dumps(spec.model_dump()))
28552855
assert 1 == 2
2856+
2857+
2858+
@pytest.mark.xdist_group(name="deployments")
2859+
class TestDeploymentHistoryTracking:
2860+
"""Tests for history tracking during YAML deployments"""
2861+
2862+
@pytest.mark.asyncio
2863+
async def test_deployment_creates_history_for_nodes(
2864+
self,
2865+
client,
2866+
default_hard_hats,
2867+
default_us_states,
2868+
default_us_state,
2869+
):
2870+
"""
2871+
Test that deploying nodes creates history entries for each create/update operation
2872+
"""
2873+
namespace = "history_test"
2874+
dim_spec = DimensionSpec(
2875+
name="default.hard_hat",
2876+
description="""Hard hat dimension""",
2877+
query="""
2878+
SELECT
2879+
hard_hat_id,
2880+
last_name,
2881+
first_name,
2882+
state
2883+
FROM ${prefix}default.hard_hats
2884+
""",
2885+
primary_key=["hard_hat_id"],
2886+
owners=["dj"],
2887+
)
2888+
2889+
# Deploy nodes
2890+
data = await deploy_and_wait(
2891+
client,
2892+
DeploymentSpec(
2893+
namespace=namespace,
2894+
nodes=[
2895+
default_hard_hats,
2896+
default_us_states,
2897+
default_us_state,
2898+
dim_spec,
2899+
],
2900+
),
2901+
)
2902+
assert data["status"] == DeploymentStatus.SUCCESS.value
2903+
2904+
# Check history for source node
2905+
response = await client.get(
2906+
f"/history/node/{namespace}.default.hard_hats/",
2907+
)
2908+
assert response.status_code == 200
2909+
history = response.json()
2910+
assert len(history) >= 1
2911+
# Find the create event
2912+
create_events = [h for h in history if h["activity_type"] == "create"]
2913+
assert len(create_events) == 1
2914+
assert create_events[0]["entity_type"] == "node"
2915+
assert "deployment_id" in create_events[0]["details"]
2916+
2917+
# Check history for dimension node
2918+
response = await client.get(
2919+
f"/history/node/{namespace}.default.hard_hat/",
2920+
)
2921+
assert response.status_code == 200
2922+
history = response.json()
2923+
assert len(history) >= 1
2924+
create_events = [h for h in history if h["activity_type"] == "create"]
2925+
assert len(create_events) == 1
2926+
assert create_events[0]["entity_type"] == "node"
2927+
assert "deployment_id" in create_events[0]["details"]
2928+
2929+
@pytest.mark.asyncio
2930+
async def test_deployment_creates_history_for_updates(
2931+
self,
2932+
client,
2933+
default_hard_hats,
2934+
):
2935+
"""
2936+
Test that updating nodes via deployment creates history entries
2937+
"""
2938+
namespace = "history_update_test"
2939+
2940+
# First deployment - create
2941+
data = await deploy_and_wait(
2942+
client,
2943+
DeploymentSpec(
2944+
namespace=namespace,
2945+
nodes=[default_hard_hats],
2946+
),
2947+
)
2948+
assert data["status"] == DeploymentStatus.SUCCESS.value
2949+
2950+
# Second deployment - update description
2951+
updated_hard_hats = SourceSpec(
2952+
name=default_hard_hats.name,
2953+
description="Updated description for hard hats table",
2954+
catalog=default_hard_hats.catalog,
2955+
schema=default_hard_hats.schema_,
2956+
table=default_hard_hats.table,
2957+
columns=default_hard_hats.columns,
2958+
owners=default_hard_hats.owners,
2959+
)
2960+
data = await deploy_and_wait(
2961+
client,
2962+
DeploymentSpec(
2963+
namespace=namespace,
2964+
nodes=[updated_hard_hats],
2965+
),
2966+
)
2967+
assert data["status"] == DeploymentStatus.SUCCESS.value
2968+
2969+
# Check history shows both create and update
2970+
response = await client.get(
2971+
f"/history/node/{namespace}.default.hard_hats/",
2972+
)
2973+
assert response.status_code == 200
2974+
history = response.json()
2975+
create_events = [h for h in history if h["activity_type"] == "create"]
2976+
update_events = [h for h in history if h["activity_type"] == "update"]
2977+
assert len(create_events) >= 1
2978+
assert len(update_events) >= 1
2979+
# Verify deployment_id is tracked
2980+
assert "deployment_id" in create_events[0]["details"]
2981+
assert "deployment_id" in update_events[0]["details"]
2982+
2983+
@pytest.mark.asyncio
2984+
async def test_deployment_creates_history_for_dimension_links(
2985+
self,
2986+
client,
2987+
default_hard_hats,
2988+
default_us_states,
2989+
default_us_state,
2990+
):
2991+
"""
2992+
Test that deploying nodes with dimension links creates history entries for links
2993+
"""
2994+
namespace = "history_link_test"
2995+
dim_spec = DimensionSpec(
2996+
name="default.hard_hat",
2997+
description="""Hard hat dimension""",
2998+
query="""
2999+
SELECT
3000+
hard_hat_id,
3001+
last_name,
3002+
first_name,
3003+
state
3004+
FROM ${prefix}default.hard_hats
3005+
""",
3006+
primary_key=["hard_hat_id"],
3007+
dimension_links=[
3008+
DimensionJoinLinkSpec(
3009+
dimension_node="${prefix}default.us_state",
3010+
join_type="inner",
3011+
join_on="${prefix}default.hard_hat.state = ${prefix}default.us_state.state_short",
3012+
),
3013+
],
3014+
owners=["dj"],
3015+
)
3016+
3017+
# Deploy nodes with dimension links
3018+
data = await deploy_and_wait(
3019+
client,
3020+
DeploymentSpec(
3021+
namespace=namespace,
3022+
nodes=[
3023+
default_hard_hats,
3024+
default_us_states,
3025+
default_us_state,
3026+
dim_spec,
3027+
],
3028+
),
3029+
)
3030+
assert data["status"] == DeploymentStatus.SUCCESS.value
3031+
3032+
# Check history for dimension links
3033+
response = await client.get(
3034+
f"/history?node={namespace}.default.hard_hat",
3035+
)
3036+
assert response.status_code == 200
3037+
history = response.json()
3038+
3039+
# Should have link creation history
3040+
link_events = [h for h in history if h["entity_type"] == "link"]
3041+
assert len(link_events) >= 1
3042+
link_create_events = [h for h in link_events if h["activity_type"] == "create"]
3043+
assert len(link_create_events) >= 1
3044+
# Verify link details are tracked
3045+
assert "dimension_node" in link_create_events[0]["details"]
3046+
assert "deployment_id" in link_create_events[0]["details"]

0 commit comments

Comments
 (0)