Skip to content

Commit 35c2788

Browse files
committed
feat(temporal): encrypt durable agent histories
1 parent fbb13ae commit 35c2788

File tree

3 files changed

+71
-18
lines changed

3 files changed

+71
-18
lines changed

packages/tracecat-ee/tracecat_ee/agent/approvals/service.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
from tracecat.identifiers.workflow import exec_id_to_parts
3434
from tracecat.logger import logger
3535
from tracecat.service import BaseWorkspaceService
36+
from tracecat.temporal.visibility import tokenize_visibility_value
3637
from tracecat.workflow.executions.enums import TemporalSearchAttr
3738
from tracecat_ee.agent.activities import (
3839
ApplyApprovalResultsActivityInputs,
@@ -328,7 +329,9 @@ async def list_session_history(
328329
session_id=session_id,
329330
)
330331
return []
331-
alias = build_agent_alias(session.parent_workflow_id, session.action_ref)
332+
alias = tokenize_visibility_value(
333+
build_agent_alias(session.parent_workflow_id, session.action_ref)
334+
)
332335

333336
query = " AND ".join(
334337
[

tests/unit/test_approvals_service.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,22 @@
11
"""Tests for ApprovalService CRUD operations."""
22

33
import uuid
4+
from collections.abc import AsyncIterator
5+
from types import SimpleNamespace
6+
from unittest.mock import AsyncMock, patch
47

58
import pytest
69
from sqlalchemy.exc import IntegrityError
710
from sqlalchemy.ext.asyncio import AsyncSession
811
from tracecat_ee.agent.approvals.schemas import ApprovalCreate, ApprovalUpdate
912
from tracecat_ee.agent.approvals.service import ApprovalService, SessionInfo
1013

14+
from tracecat.agent.aliases import build_agent_alias
1115
from tracecat.agent.approvals.enums import ApprovalStatus
1216
from tracecat.auth.types import Role
1317
from tracecat.db.models import AgentSession, User
18+
from tracecat.identifiers import WorkflowID
19+
from tracecat.temporal.visibility import tokenize_visibility_value
1420

1521
pytestmark = pytest.mark.usefixtures("db")
1622

@@ -264,6 +270,47 @@ async def test_update_approvals_empty_dict(
264270
updated = await approvals_service.update_approvals({})
265271
assert updated == []
266272

273+
async def test_list_session_history_uses_tokenized_alias_query(
274+
self,
275+
approvals_service: ApprovalService,
276+
) -> None:
277+
session_id = uuid.uuid4()
278+
parent_workflow_id = WorkflowID(str(uuid.uuid4()))
279+
action_ref = "tools.some_action"
280+
expected_alias = tokenize_visibility_value(
281+
build_agent_alias(parent_workflow_id, action_ref)
282+
)
283+
captured_query: str | None = None
284+
285+
class FakeTemporalClient:
286+
async def list_workflows(self, **kwargs: str) -> AsyncIterator[object]:
287+
nonlocal captured_query
288+
captured_query = kwargs["query"]
289+
if False:
290+
yield object()
291+
292+
with (
293+
patch.object(
294+
approvals_service,
295+
"get_session",
296+
AsyncMock(
297+
return_value=SimpleNamespace(
298+
parent_workflow_id=parent_workflow_id,
299+
action_ref=action_ref,
300+
)
301+
),
302+
),
303+
patch(
304+
"tracecat_ee.agent.approvals.service.get_temporal_client",
305+
AsyncMock(return_value=FakeTemporalClient()),
306+
),
307+
):
308+
history = await approvals_service.list_session_history(session_id)
309+
310+
assert history == []
311+
assert captured_query is not None
312+
assert expected_alias in captured_query
313+
267314
async def test_delete_approval(
268315
self,
269316
approvals_service: ApprovalService,

tracecat/agent/session/service.py

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
)
5656
from tracecat.chat.service import ChatService
5757
from tracecat.chat.tools import get_default_tools
58+
from tracecat.contexts import with_temporal_workspace_id
5859
from tracecat.db.models import AgentSession, AgentSessionHistory, Approval, Chat
5960
from tracecat.dsl.client import get_temporal_client
6061
from tracecat.dsl.common import RETRY_POLICIES
@@ -891,15 +892,16 @@ async def run_turn(
891892
task_queue=config.TRACECAT__AGENT_QUEUE,
892893
)
893894

894-
await client.start_workflow(
895-
DurableAgentWorkflow.run,
896-
workflow_args,
897-
id=str(workflow_id),
898-
task_queue=config.TRACECAT__AGENT_QUEUE,
899-
execution_timeout=timedelta(hours=1),
900-
retry_policy=RETRY_POLICIES["workflow:fail_fast"],
901-
search_attributes=self._build_direct_agent_search_attributes(),
902-
)
895+
with with_temporal_workspace_id(self.role.workspace_id):
896+
await client.start_workflow(
897+
DurableAgentWorkflow.run,
898+
workflow_args,
899+
id=str(workflow_id),
900+
task_queue=config.TRACECAT__AGENT_QUEUE,
901+
execution_timeout=timedelta(hours=1),
902+
retry_policy=RETRY_POLICIES["workflow:fail_fast"],
903+
search_attributes=self._build_direct_agent_search_attributes(),
904+
)
903905

904906
# Return ChatResponse with session_id for streaming
905907
stream_url = f"/api/agent/sessions/{session_id}/stream"
@@ -1029,14 +1031,15 @@ async def _continue_with_approvals(
10291031
)
10301032

10311033
try:
1032-
await handle.execute_update(
1033-
DurableAgentWorkflow.set_approvals,
1034-
WorkflowApprovalSubmission(
1035-
approvals=approval_map,
1036-
approved_by=self.role.user_id,
1037-
decision_metadata=decision_metadata or None,
1038-
),
1039-
)
1034+
with with_temporal_workspace_id(self.role.workspace_id):
1035+
await handle.execute_update(
1036+
DurableAgentWorkflow.set_approvals,
1037+
WorkflowApprovalSubmission(
1038+
approvals=approval_map,
1039+
approved_by=self.role.user_id,
1040+
decision_metadata=decision_metadata or None,
1041+
),
1042+
)
10401043
except Exception:
10411044
# Allow retriable failures to be resubmitted by clearing dedup marker.
10421045
if dedup_client is not None and dedup_key is not None:

0 commit comments

Comments
 (0)