-
Notifications
You must be signed in to change notification settings - Fork 0
feat: Add conversation variable persistence layer #129
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: qodo_combined_20260121_qodo_grep_cursor_copilot_1_base_feat_add_conversation_variable_persistence_layer__pr429
Are you sure you want to change the base?
Conversation
… factory to pass the ConversationVariableUpdater factory (the only non-VariablePool dependency), plus a unit test to verify the injection path. - `api/core/workflow/nodes/variable_assigner/v2/node.py` adds a kw-only `conv_var_updater_factory` dependency (defaulting to `conversation_variable_updater_factory`) and stores it for use in `_run`. - `api/core/workflow/nodes/node_factory.py` now injects the factory when creating VariableAssigner v2 nodes. - `api/tests/unit_tests/core/workflow/nodes/variable_assigner/v2/test_variable_assigner_v2.py` adds a test asserting the factory is injected. Tests not run. Next steps (optional): 1) `make lint` 2) `make type-check` 3) `uv run --project api --dev dev/pytest/pytest_unit_tests.sh`
…ructor args. - `api/core/workflow/nodes/node_factory.py` now directly instantiates `VariableAssignerNode` with the injected dependency, and uses a direct call for all other nodes. No tests run.
Add a new command for GraphEngine to update a group of variables. This command takes a group of variable selectors and new values. When the engine receives the command, it will update the corresponding variable in the variable pool. If it does not exist, it will add it; if it does, it will overwrite it. Both behaviors should be treated the same and do not need to be distinguished.
…be-kanban 0941477f) Create a new persistence layer for the Graph Engine. This layer receives a ConversationVariableUpdater upon initialization, which is used to persist the received ConversationVariables to the database. It can retrieve the currently processing ConversationId from the engine's variable pool. It captures the successful execution event of each node and determines whether the type of this node is VariableAssigner(v1 and v2). If so, it retrieves the variable name and value that need to be updated from the node's outputs. This layer is only used in the Advanced Chat. It should be placed outside of Core.Workflow package.
…rs/conversation_variable_persist_layer.py` to satisfy SIM118 - chore(lint): run `make lint` (passes; warnings about missing RECORD during venv package uninstall) - chore(type-check): run `make type-check` (fails: 1275 errors for missing type stubs like `opentelemetry`, `click`, `sqlalchemy`, `flask`, `pydantic`, `pydantic_settings`)
…tType validation and casting - test(graph-engine): update VariableUpdate usages to include value_type in command tests
… drop common_helpers usage - refactor(variable-assigner-v2): inline updated variable payload and drop common_helpers usage Tests not run.
…n and remove value type validation - test(graph-engine): update UpdateVariablesCommand tests to pass concrete Variable instances - fix(graph-engine): align VariableUpdate values with selector before adding to VariablePool Tests not run.
…e handling for v1/v2 process_data - refactor(app-layer): read updated variables from process_data in conversation variable persistence layer - test(app-layer): adapt persistence layer tests to use common_helpers updated-variable payloads Tests not run.
…nce reads from process_data
…fter venv changes) - chore(type-check): run `make type-check` (fails: 1275 missing type stubs across dependencies) Details: - `make lint` fails with `ModuleNotFoundError: No module named 'dotenv_linter.cli'`. - `make type-check` fails with missing stubs for `opentelemetry`, `click`, `sqlalchemy`, `flask`, `pydantic`, `pydantic_settings`, etc.
…ableUnion and remove value type validation" This reverts commit 5ebc87a.
…h SegmentType validation and casting" This reverts commit 3edd525.
This reverts commit 67007f6.
…y out of core.workflow into `api/services/conversation_variable_updater.py` - refactor(app): update advanced chat app runner and conversation service to import the new updater factory Tests not run.
…-linter module missing) - chore(type-check): run `make type-check` (fails: 1275 missing type stubs) Details: - `make lint` reports: `No matches for ignored import core.workflow.nodes.variable_assigner.common.impl -> extensions.ext_database` and ends with `ModuleNotFoundError: No module named 'dotenv_linter.cli'`. - `make type-check` fails with missing type stubs for `opentelemetry`, `click`, `sqlalchemy`, `flask`, `pydantic`, `pydantic_settings`, etc.
…impl import in `api/.importlinter`
Code Review by Qodo
1. Session(db.engine) not context-managed
|
| def update(self, conversation_id: str, variable: Variable) -> None: | ||
| stmt = select(ConversationVariable).where( | ||
| ConversationVariable.id == variable.id, ConversationVariable.conversation_id == conversation_id | ||
| ) | ||
| with Session(db.engine) as session: | ||
| row = session.scalar(stmt) | ||
| if not row: | ||
| raise VariableOperatorNodeError("conversation variable not found in the database") | ||
| row.data = variable.model_dump_json() | ||
| session.commit() | ||
|
|
||
| def flush(self): | ||
| session = Session(db.engine) | ||
| row = session.scalar(stmt) | ||
| if not row: | ||
| raise ConversationVariableNotFoundError("conversation variable not found in the database") | ||
| row.data = variable.model_dump_json() | ||
| session.commit() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1. session(db.engine) not context-managed 📘 Rule violation ⛯ Reliability
• ConversationVariableUpdaterImpl.update() creates a SQLAlchemy Session without a context manager, so the session may not be closed on normal completion or exceptions. • This violates the session lifecycle requirement and can lead to connection/resource leaks under load. • The code should use a context-managed session pattern to guarantee cleanup.
Agent prompt
## Issue description
`ConversationVariableUpdaterImpl.update()` creates a SQLAlchemy `Session` without a context manager, which can leak connections and violates the required session lifecycle pattern.
## Issue Context
The compliance checklist requires context-managed session usage (e.g., `with Session(...) as session:`).
## Fix Focus Areas
- api/services/conversation_variable_updater.py[14-23]
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| def __init__( | ||
| self, | ||
| id: str, | ||
| config: Mapping[str, Any], | ||
| graph_init_params: "GraphInitParams", | ||
| graph_runtime_state: "GraphRuntimeState", | ||
| ): | ||
| super().__init__( | ||
| id=id, | ||
| config=config, | ||
| graph_init_params=graph_init_params, | ||
| graph_runtime_state=graph_runtime_state, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2. init missing -> none 📘 Rule violation ✓ Correctness
• New/modified __init__ methods are missing explicit return type annotations, which violates the requirement that all Python functions include full type annotations. • This reduces type-safety and can fail basedpyright/static analysis expectations. • Add -> None to these __init__ definitions.
Agent prompt
## Issue description
`__init__` methods were introduced/modified without explicit return type annotations.
## Issue Context
Compliance requires all Python functions to have full type annotations (including return types). For `__init__`, the return type must be `None`.
## Fix Focus Areas
- api/core/workflow/nodes/variable_assigner/v1/node.py[22-34]
- api/core/workflow/nodes/variable_assigner/v2/node.py[56-68]
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| def test_persists_conversation_variables_from_assigner_output(): | ||
| conversation_id = "conv-123" | ||
| variable = StringVariable( | ||
| id="var-1", | ||
| name="name", | ||
| value="updated", | ||
| selector=[CONVERSATION_VARIABLE_NODE_ID, "name"], | ||
| ) | ||
| process_data = common_helpers.set_updated_variables( | ||
| {}, [common_helpers.variable_to_processed_data(variable.selector, variable)] | ||
| ) | ||
|
|
||
| variable_pool = MockReadOnlyVariablePool({(CONVERSATION_VARIABLE_NODE_ID, "name"): variable}) | ||
|
|
||
| updater = Mock() | ||
| layer = ConversationVariablePersistenceLayer(updater) | ||
| layer.initialize(_build_graph_runtime_state(variable_pool, conversation_id), Mock(spec=CommandChannel)) | ||
|
|
||
| event = _build_node_run_succeeded_event(node_type=NodeType.VARIABLE_ASSIGNER, process_data=process_data) | ||
| layer.on_event(event) | ||
|
|
||
| updater.update.assert_called_once_with(conversation_id=conversation_id, variable=variable) | ||
| updater.flush.assert_called_once() | ||
|
|
||
|
|
||
| def test_skips_when_outputs_missing(): | ||
| conversation_id = "conv-456" | ||
| variable = StringVariable( | ||
| id="var-2", | ||
| name="name", | ||
| value="updated", | ||
| selector=[CONVERSATION_VARIABLE_NODE_ID, "name"], | ||
| ) | ||
|
|
||
| variable_pool = MockReadOnlyVariablePool({(CONVERSATION_VARIABLE_NODE_ID, "name"): variable}) | ||
|
|
||
| updater = Mock() | ||
| layer = ConversationVariablePersistenceLayer(updater) | ||
| layer.initialize(_build_graph_runtime_state(variable_pool, conversation_id), Mock(spec=CommandChannel)) | ||
|
|
||
| event = _build_node_run_succeeded_event(node_type=NodeType.VARIABLE_ASSIGNER) | ||
| layer.on_event(event) | ||
|
|
||
| updater.update.assert_not_called() | ||
| updater.flush.assert_not_called() | ||
|
|
||
|
|
||
| def test_skips_non_assigner_nodes(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
3. Tests missing return annotations 📘 Rule violation ✓ Correctness
• New pytest test functions are missing -> None return type annotations, violating the requirement that all Python functions include return type hints. • This can cause repository-wide type checking to fail if tests are included in static analysis. • Add -> None to the affected test function definitions.
Agent prompt
## Issue description
New pytest test functions do not include return type annotations.
## Issue Context
The repository compliance requires full type annotations for all Python function parameters and return values.
## Fix Focus Areas
- api/tests/unit_tests/core/app/layers/test_conversation_variable_persist_layer.py[63-120]
- api/tests/unit_tests/core/workflow/nodes/variable_assigner/v2/test_variable_assigner_v2.py[395-431]
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| class ConversationVariableNotFoundError(Exception): | ||
| pass | ||
|
|
||
|
|
||
| class ConversationVariableUpdaterImpl: | ||
| def update(self, conversation_id: str, variable: Variable): | ||
| def update(self, conversation_id: str, variable: Variable) -> None: | ||
| stmt = select(ConversationVariable).where( | ||
| ConversationVariable.id == variable.id, ConversationVariable.conversation_id == conversation_id | ||
| ) | ||
| with Session(db.engine) as session: | ||
| row = session.scalar(stmt) | ||
| if not row: | ||
| raise VariableOperatorNodeError("conversation variable not found in the database") | ||
| row.data = variable.model_dump_json() | ||
| session.commit() | ||
|
|
||
| def flush(self): | ||
| session = Session(db.engine) | ||
| row = session.scalar(stmt) | ||
| if not row: | ||
| raise ConversationVariableNotFoundError("conversation variable not found in the database") | ||
| row.data = variable.model_dump_json() | ||
| session.commit() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
4. conversationvariablenotfounderror lacks context 📘 Rule violation ⛯ Reliability
• The raised error message conversation variable not found in the database does not include actionable context (e.g., conversation_id, variable.id) required for debugging and incident response. • The updater also introduces an ad-hoc exception type in the service module rather than using domain-specific exceptions from services/errors or core/errors, reducing consistency in controller translation. • Improve the exception type/source and include non-sensitive identifiers in the message and/or internal logs.
Agent prompt
## Issue description
The updater raises a generic, locally-defined exception with an error message that lacks actionable context.
## Issue Context
Compliance requires robust error messages (with context) and domain-specific exceptions suitable for controller translation.
## Fix Focus Areas
- api/services/conversation_variable_updater.py[9-23]
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| for item in updated_variables: | ||
| selector = item.selector | ||
| if len(selector) < 2: | ||
| logger.warning("Conversation variable selector invalid. selector=%s", selector) | ||
| continue | ||
| if selector[0] != CONVERSATION_VARIABLE_NODE_ID: | ||
| continue | ||
| variable = self.graph_runtime_state.variable_pool.get(selector) | ||
| if not isinstance(variable, Variable): | ||
| logger.warning( | ||
| "Conversation variable not found in variable pool. selector=%s", | ||
| selector, | ||
| ) | ||
| continue | ||
| self._conversation_variable_updater.update(conversation_id=conversation_id, variable=variable) | ||
| self._conversation_variable_updater.flush() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
5. No audit log on updates 📘 Rule violation ✧ Quality
• Conversation variable persistence performs database updates but does not emit any audit/trace log indicating the write, its outcome, or relevant identifiers. • This makes it difficult to reconstruct who/what changed conversation variables and diagnose production issues when persistence fails. • Add appropriately-leveled structured logs (without sensitive values) including identifiers like conversation_id and the variable selector/name, and log failures at error level.
Agent prompt
## Issue description
Conversation variable persistence updates the database without emitting an audit/trace log for the write and its outcome.
## Issue Context
Compliance requires audit trails for critical actions and operational logs with relevant identifiers and correct severity.
## Fix Focus Areas
- api/core/app/layers/conversation_variable_persist_layer.py[30-55]
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| conversation_id = self.graph_runtime_state.system_variable.conversation_id | ||
| if conversation_id is None: | ||
| return | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
6. Missing conversation_id handling 🐞 Bug ✓ Correctness
• ConversationVariablePersistenceLayer silently skips persistence when conversation_id is missing. • Since VariableAssigner nodes no longer persist directly, conversation-variable writes can now “succeed” in-memory during a run but never persist, with no warning. • This is especially risky for workflow executions that do not populate SystemVariable.conversation_id but still use conversation selectors in configs.
Agent prompt
## Issue description
Conversation variable persistence is silently skipped when `conversation_id` is missing, which can lead to non-persisted conversation state with no signal.
## Issue Context
Variable assigner nodes now only emit `__updated_variables` in `process_data`; persistence is delegated to `ConversationVariablePersistenceLayer`. If `conversation_id` is absent, persistence is skipped entirely.
## Fix Focus Areas
- api/core/app/layers/conversation_variable_persist_layer.py[30-37]
- api/core/app/layers/conversation_variable_persist_layer.py[38-54]
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
Benchmark PR from qodo-benchmark#429