feat: Add conversation variable persistence layer #155
Conversation
… factory to pass the ConversationVariableUpdater factory (the only non-VariablePool dependency), plus a unit test to verify the injection path. - `api/core/workflow/nodes/variable_assigner/v2/node.py` adds a kw-only `conv_var_updater_factory` dependency (defaulting to `conversation_variable_updater_factory`) and stores it for use in `_run`. - `api/core/workflow/nodes/node_factory.py` now injects the factory when creating VariableAssigner v2 nodes. - `api/tests/unit_tests/core/workflow/nodes/variable_assigner/v2/test_variable_assigner_v2.py` adds a test asserting the factory is injected. Tests not run. Next steps (optional): 1) `make lint` 2) `make type-check` 3) `uv run --project api --dev dev/pytest/pytest_unit_tests.sh`
…ructor args. - `api/core/workflow/nodes/node_factory.py` now directly instantiates `VariableAssignerNode` with the injected dependency, and uses a direct call for all other nodes. No tests run.
Add a new command for GraphEngine to update a group of variables. This command takes a group of variable selectors and new values. When the engine receives the command, it will update the corresponding variable in the variable pool. If it does not exist, it will add it; if it does, it will overwrite it. Both behaviors should be treated the same and do not need to be distinguished.
…be-kanban 0941477f) Create a new persistence layer for the Graph Engine. This layer receives a ConversationVariableUpdater upon initialization, which is used to persist the received ConversationVariables to the database. It can retrieve the currently processing ConversationId from the engine's variable pool. It captures the successful execution event of each node and determines whether the type of this node is VariableAssigner(v1 and v2). If so, it retrieves the variable name and value that need to be updated from the node's outputs. This layer is only used in the Advanced Chat. It should be placed outside of Core.Workflow package.
…rs/conversation_variable_persist_layer.py` to satisfy SIM118 - chore(lint): run `make lint` (passes; warnings about missing RECORD during venv package uninstall) - chore(type-check): run `make type-check` (fails: 1275 errors for missing type stubs like `opentelemetry`, `click`, `sqlalchemy`, `flask`, `pydantic`, `pydantic_settings`)
…tType validation and casting - test(graph-engine): update VariableUpdate usages to include value_type in command tests
… drop common_helpers usage - refactor(variable-assigner-v2): inline updated variable payload and drop common_helpers usage Tests not run.
…n and remove value type validation - test(graph-engine): update UpdateVariablesCommand tests to pass concrete Variable instances - fix(graph-engine): align VariableUpdate values with selector before adding to VariablePool Tests not run.
…e handling for v1/v2 process_data - refactor(app-layer): read updated variables from process_data in conversation variable persistence layer - test(app-layer): adapt persistence layer tests to use common_helpers updated-variable payloads Tests not run.
…nce reads from process_data
…fter venv changes) - chore(type-check): run `make type-check` (fails: 1275 missing type stubs across dependencies) Details: - `make lint` fails with `ModuleNotFoundError: No module named 'dotenv_linter.cli'`. - `make type-check` fails with missing stubs for `opentelemetry`, `click`, `sqlalchemy`, `flask`, `pydantic`, `pydantic_settings`, etc.
…ableUnion and remove value type validation" This reverts commit 5ebc87a.
…h SegmentType validation and casting" This reverts commit 3edd525.
This reverts commit 67007f6.
…y out of core.workflow into `api/services/conversation_variable_updater.py` - refactor(app): update advanced chat app runner and conversation service to import the new updater factory Tests not run.
…-linter module missing) - chore(type-check): run `make type-check` (fails: 1275 missing type stubs) Details: - `make lint` reports: `No matches for ignored import core.workflow.nodes.variable_assigner.common.impl -> extensions.ext_database` and ends with `ModuleNotFoundError: No module named 'dotenv_linter.cli'`. - `make type-check` fails with missing type stubs for `opentelemetry`, `click`, `sqlalchemy`, `flask`, `pydantic`, `pydantic_settings`, etc.
…impl import in `api/.importlinter`
Review Summary by QodoMove conversation variable persistence to dedicated GraphEngine layer
WalkthroughsDescription• Move conversation variable persistence to dedicated layer • Remove database operations from variable assigner nodes • Implement GraphEngineLayer for handling variable updates • Update variable pool interface to use sequence-based selectors Diagramflowchart LR
VA["Variable Assigner Nodes<br/>v1 & v2"]
VPool["Variable Pool"]
Layer["ConversationVariablePersistenceLayer"]
Updater["ConversationVariableUpdater"]
DB["Database"]
VA -- "updates variables" --> VPool
VPool -- "read variables" --> Layer
Layer -- "on NodeRunSucceededEvent" --> Updater
Updater -- "persist to DB" --> DB
File Changes1. api/core/app/layers/conversation_variable_persist_layer.py
|
Code Review by Qodo
1. Session() not context-managed
|
| session = Session(db.engine) | ||
| row = session.scalar(stmt) | ||
| if not row: | ||
| raise ConversationVariableNotFoundError("conversation variable not found in the database") | ||
| row.data = variable.model_dump_json() | ||
| session.commit() |
There was a problem hiding this comment.
1. session() not context-managed 📘 Rule violation ⛯ Reliability
ConversationVariableUpdaterImpl.update() creates a SQLAlchemy session without a context manager, so the session may not be reliably closed. This violates the required session lifecycle pattern and risks connection/resource leaks.
Agent Prompt
## Issue description
`ConversationVariableUpdaterImpl.update()` creates a SQLAlchemy session without a context manager, which can leak connections/resources.
## Issue Context
Compliance requires the context-manager pattern `with Session(db.engine, expire_on_commit=False) as session:` for DB operations.
## Fix Focus Areas
- api/services/conversation_variable_updater.py[18-23]
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| class ConversationVariableNotFoundError(Exception): | ||
| pass | ||
|
|
||
|
|
||
| class ConversationVariableUpdaterImpl: | ||
| def update(self, conversation_id: str, variable: Variable): | ||
| def update(self, conversation_id: str, variable: Variable) -> None: | ||
| stmt = select(ConversationVariable).where( | ||
| ConversationVariable.id == variable.id, ConversationVariable.conversation_id == conversation_id | ||
| ) | ||
| with Session(db.engine) as session: | ||
| row = session.scalar(stmt) | ||
| if not row: | ||
| raise VariableOperatorNodeError("conversation variable not found in the database") | ||
| row.data = variable.model_dump_json() | ||
| session.commit() | ||
|
|
||
| def flush(self): | ||
| session = Session(db.engine) | ||
| row = session.scalar(stmt) | ||
| if not row: | ||
| raise ConversationVariableNotFoundError("conversation variable not found in the database") | ||
| row.data = variable.model_dump_json() |
There was a problem hiding this comment.
2. Service raises local exception 📘 Rule violation ✓ Correctness
The service code introduces and raises ConversationVariableNotFoundError defined inline instead of using a domain-specific exception from services/errors or core/errors. This breaks the required error taxonomy and makes consistent controller translation harder.
Agent Prompt
## Issue description
A new domain error (`ConversationVariableNotFoundError`) is defined and raised inline in a service module, instead of using the domain exception locations mandated by the architecture.
## Issue Context
Services should raise domain-specific exceptions sourced from `services/errors` or `core/errors`, so controllers can translate them consistently.
## Fix Focus Areas
- api/services/conversation_variable_updater.py[9-22]
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| def test_persists_conversation_variables_from_assigner_output(): | ||
| conversation_id = "conv-123" | ||
| variable = StringVariable( | ||
| id="var-1", | ||
| name="name", | ||
| value="updated", | ||
| selector=[CONVERSATION_VARIABLE_NODE_ID, "name"], | ||
| ) | ||
| process_data = common_helpers.set_updated_variables( | ||
| {}, [common_helpers.variable_to_processed_data(variable.selector, variable)] | ||
| ) | ||
|
|
||
| variable_pool = MockReadOnlyVariablePool({(CONVERSATION_VARIABLE_NODE_ID, "name"): variable}) | ||
|
|
||
| updater = Mock() | ||
| layer = ConversationVariablePersistenceLayer(updater) | ||
| layer.initialize(_build_graph_runtime_state(variable_pool, conversation_id), Mock(spec=CommandChannel)) | ||
|
|
||
| event = _build_node_run_succeeded_event(node_type=NodeType.VARIABLE_ASSIGNER, process_data=process_data) | ||
| layer.on_event(event) | ||
|
|
||
| updater.update.assert_called_once_with(conversation_id=conversation_id, variable=variable) | ||
| updater.flush.assert_called_once() | ||
|
|
||
|
|
||
| def test_skips_when_outputs_missing(): | ||
| conversation_id = "conv-456" | ||
| variable = StringVariable( | ||
| id="var-2", | ||
| name="name", | ||
| value="updated", | ||
| selector=[CONVERSATION_VARIABLE_NODE_ID, "name"], | ||
| ) | ||
|
|
||
| variable_pool = MockReadOnlyVariablePool({(CONVERSATION_VARIABLE_NODE_ID, "name"): variable}) | ||
|
|
||
| updater = Mock() | ||
| layer = ConversationVariablePersistenceLayer(updater) | ||
| layer.initialize(_build_graph_runtime_state(variable_pool, conversation_id), Mock(spec=CommandChannel)) | ||
|
|
||
| event = _build_node_run_succeeded_event(node_type=NodeType.VARIABLE_ASSIGNER) | ||
| layer.on_event(event) | ||
|
|
||
| updater.update.assert_not_called() | ||
| updater.flush.assert_not_called() | ||
|
|
||
|
|
||
| def test_skips_non_assigner_nodes(): | ||
| updater = Mock() | ||
| layer = ConversationVariablePersistenceLayer(updater) | ||
| layer.initialize(_build_graph_runtime_state(MockReadOnlyVariablePool()), Mock(spec=CommandChannel)) | ||
|
|
||
| event = _build_node_run_succeeded_event(node_type=NodeType.LLM) | ||
| layer.on_event(event) | ||
|
|
||
| updater.update.assert_not_called() | ||
| updater.flush.assert_not_called() | ||
|
|
||
|
|
||
| def test_skips_non_conversation_variables(): | ||
| conversation_id = "conv-789" |
There was a problem hiding this comment.
3. Tests missing return types 📘 Rule violation ✓ Correctness
New pytest test functions are defined without return type annotations. This violates the requirement that all Python function definitions include parameter and return type hints.
Agent Prompt
## Issue description
New pytest test functions are missing return type annotations, violating the project's requirement for explicit type hints.
## Issue Context
These tests are newly introduced in this PR; add `-> None` to each test function definition.
## Fix Focus Areas
- api/tests/unit_tests/core/app/layers/test_conversation_variable_persist_layer.py[63-123]
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| logger.warning("Conversation variable selector invalid. selector=%s", selector) | ||
| continue | ||
| if selector[0] != CONVERSATION_VARIABLE_NODE_ID: | ||
| continue | ||
| variable = self.graph_runtime_state.variable_pool.get(selector) | ||
| if not isinstance(variable, Variable): | ||
| logger.warning( | ||
| "Conversation variable not found in variable pool. selector=%s", | ||
| selector, | ||
| ) |
There was a problem hiding this comment.
4. Warnings lack identifier context 📘 Rule violation ✧ Quality
New warning logs in ConversationVariablePersistenceLayer do not include identifiers like tenant_id/app_id/workflow_id/conversation_id, reducing observability when diagnosing issues. Adding contextual identifiers would improve traceability and align with the logging-context requirement.
Agent Prompt
## Issue description
Warning logs emitted by the new persistence layer lack key identifiers, making troubleshooting difficult in multi-tenant/production environments.
## Issue Context
Compliance expects contextual logging (tenant/app/workflow identifiers). At minimum, `conversation_id` is available in this layer when processing events.
## Fix Focus Areas
- api/core/app/layers/conversation_variable_persist_layer.py[41-50]
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| # Over write the variable. | ||
| self.graph_runtime_state.variable_pool.add(assigned_variable_selector, updated_variable) | ||
|
|
||
| # TODO: Move database operation to the pipeline. | ||
| # Update conversation variable. | ||
| conversation_id = self.graph_runtime_state.variable_pool.get(["sys", "conversation_id"]) | ||
| if not conversation_id: | ||
| raise VariableOperatorNodeError("conversation_id not found") | ||
| conv_var_updater = self._conv_var_updater_factory() | ||
| conv_var_updater.update(conversation_id=conversation_id.text, variable=updated_variable) | ||
| conv_var_updater.flush() | ||
| updated_variables = [common_helpers.variable_to_processed_data(assigned_variable_selector, updated_variable)] | ||
|
|
||
| return NodeRunResult( | ||
| status=WorkflowNodeExecutionStatus.SUCCEEDED, | ||
| inputs={ |
There was a problem hiding this comment.
5. Persistence layer not wired 🐞 Bug ✓ Correctness
VariableAssigner nodes no longer persist conversation variables to the database, but only AdvancedChatAppRunner registers ConversationVariablePersistenceLayer. WorkflowAppRunner and PipelineRunner still run the graph engine without this layer, so conversation variable updates will stop persisting in those execution paths.
Agent Prompt
### Issue description
Conversation variable DB persistence was removed from VariableAssigner nodes and moved to `ConversationVariablePersistenceLayer`, but the layer is only registered in `AdvancedChatAppRunner`. Other GraphEngine runners (workflow and pipeline) don't register it, causing conversation variable updates to remain in-memory only.
### Issue Context
- `VariableAssignerNode` now only emits updated variables via `process_data`.
- Persistence depends on a GraphEngine layer listening to `NodeRunSucceededEvent`.
### Fix Focus Areas
- api/core/app/apps/workflow/app_runner.py[135-151]
- api/core/app/apps/pipeline/pipeline_runner.py[170-190]
- api/core/app/apps/advanced_chat/app_runner.py[204-208]
- api/core/app/layers/conversation_variable_persist_layer.py[22-54]
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| for item in updated_variables: | ||
| selector = item.selector | ||
| if len(selector) < 2: | ||
| logger.warning("Conversation variable selector invalid. selector=%s", selector) | ||
| continue | ||
| if selector[0] != CONVERSATION_VARIABLE_NODE_ID: | ||
| continue | ||
| variable = self.graph_runtime_state.variable_pool.get(selector) | ||
| if not isinstance(variable, Variable): | ||
| logger.warning( | ||
| "Conversation variable not found in variable pool. selector=%s", | ||
| selector, | ||
| ) | ||
| continue | ||
| self._conversation_variable_updater.update(conversation_id=conversation_id, variable=variable) | ||
| self._conversation_variable_updater.flush() |
There was a problem hiding this comment.
6. Persistence failures swallowed 🐞 Bug ⛯ Reliability
If ConversationVariablePersistenceLayer raises during persistence (for example when a DB row is missing), the graph engine catches and logs the exception and the workflow can still succeed. This can cause silent conversation-variable persistence loss and can also skip remaining updates in the same event.
Agent Prompt
### Issue description
Persistence errors thrown inside `ConversationVariablePersistenceLayer.on_event()` are caught and only logged by the engine, allowing workflows to succeed while persistence fails. Additionally, an exception on one variable prevents processing remaining variables for that event.
### Issue Context
- Updater raises when DB row is missing.
- EventManager catches and logs all layer exceptions.
### Fix Focus Areas
- api/core/app/layers/conversation_variable_persist_layer.py[22-54]
- api/services/conversation_variable_updater.py[14-26]
- api/core/workflow/graph_engine/event_management/event_manager.py[173-186]
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
Benchmark PR from agentic-review-benchmarks#4