-
Notifications
You must be signed in to change notification settings - Fork 0
feat: Add conversation variable persistence layer #155
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: qodo_claude_vs_qodo_base_feat_add_conversation_variable_persistence_layer__pr4
Are you sure you want to change the base?
Changes from all commits
8a4829f
d8867d0
448e05e
f2f684a
32b6e67
0319d50
f9e5edc
cb0055e
64b0a76
e8ab0c9
b0adbc6
1892335
5317356
0a0b1cf
bbda6ac
a3fb50f
908ec85
ae025a5
a69bddc
3a680ac
ec0fdf1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,56 @@ | ||
| import logging | ||
|
|
||
| from core.variables import Variable | ||
| from core.workflow.constants import CONVERSATION_VARIABLE_NODE_ID | ||
| from core.workflow.conversation_variable_updater import ConversationVariableUpdater | ||
| from core.workflow.enums import NodeType | ||
| from core.workflow.graph_engine.layers.base import GraphEngineLayer | ||
| from core.workflow.graph_events import GraphEngineEvent, NodeRunSucceededEvent | ||
| from core.workflow.nodes.variable_assigner.common import helpers as common_helpers | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class ConversationVariablePersistenceLayer(GraphEngineLayer): | ||
| def __init__(self, conversation_variable_updater: ConversationVariableUpdater) -> None: | ||
| super().__init__() | ||
| self._conversation_variable_updater = conversation_variable_updater | ||
|
|
||
| def on_graph_start(self) -> None: | ||
| pass | ||
|
|
||
| def on_event(self, event: GraphEngineEvent) -> None: | ||
| if not isinstance(event, NodeRunSucceededEvent): | ||
| return | ||
| if event.node_type != NodeType.VARIABLE_ASSIGNER: | ||
| return | ||
| if self.graph_runtime_state is None: | ||
| return | ||
|
|
||
| updated_variables = common_helpers.get_updated_variables(event.node_run_result.process_data) or [] | ||
| if not updated_variables: | ||
| return | ||
|
|
||
| conversation_id = self.graph_runtime_state.system_variable.conversation_id | ||
| if conversation_id is None: | ||
| return | ||
|
|
||
| for item in updated_variables: | ||
| selector = item.selector | ||
| if len(selector) < 2: | ||
| logger.warning("Conversation variable selector invalid. selector=%s", selector) | ||
| continue | ||
| if selector[0] != CONVERSATION_VARIABLE_NODE_ID: | ||
| continue | ||
| variable = self.graph_runtime_state.variable_pool.get(selector) | ||
| if not isinstance(variable, Variable): | ||
| logger.warning( | ||
| "Conversation variable not found in variable pool. selector=%s", | ||
| selector, | ||
| ) | ||
| continue | ||
| self._conversation_variable_updater.update(conversation_id=conversation_id, variable=variable) | ||
| self._conversation_variable_updater.flush() | ||
|
Comment on lines
+38
to
+53
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 6. Persistence failures swallowed If ConversationVariablePersistenceLayer raises during persistence (for example when a DB row is missing), the graph engine catches and logs the exception and the workflow can still succeed. This can cause silent conversation-variable persistence loss and can also skip remaining updates in the same event. Agent Prompt
|
||
|
|
||
| def on_graph_end(self, error: Exception | None) -> None: | ||
| pass | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,45 +1,37 @@ | ||
| from collections.abc import Callable, Mapping, Sequence | ||
| from typing import TYPE_CHECKING, Any, TypeAlias | ||
| from collections.abc import Mapping, Sequence | ||
| from typing import TYPE_CHECKING, Any | ||
|
|
||
| from core.variables import SegmentType, Variable | ||
| from core.workflow.constants import CONVERSATION_VARIABLE_NODE_ID | ||
| from core.workflow.conversation_variable_updater import ConversationVariableUpdater | ||
| from core.workflow.entities import GraphInitParams | ||
| from core.workflow.enums import NodeType, WorkflowNodeExecutionStatus | ||
| from core.workflow.node_events import NodeRunResult | ||
| from core.workflow.nodes.base.node import Node | ||
| from core.workflow.nodes.variable_assigner.common import helpers as common_helpers | ||
| from core.workflow.nodes.variable_assigner.common.exc import VariableOperatorNodeError | ||
|
|
||
| from ..common.impl import conversation_variable_updater_factory | ||
| from .node_data import VariableAssignerData, WriteMode | ||
|
|
||
| if TYPE_CHECKING: | ||
| from core.workflow.runtime import GraphRuntimeState | ||
|
|
||
|
|
||
| _CONV_VAR_UPDATER_FACTORY: TypeAlias = Callable[[], ConversationVariableUpdater] | ||
|
|
||
|
|
||
| class VariableAssignerNode(Node[VariableAssignerData]): | ||
| node_type = NodeType.VARIABLE_ASSIGNER | ||
| _conv_var_updater_factory: _CONV_VAR_UPDATER_FACTORY | ||
|
|
||
| def __init__( | ||
| self, | ||
| id: str, | ||
| config: Mapping[str, Any], | ||
| graph_init_params: "GraphInitParams", | ||
| graph_runtime_state: "GraphRuntimeState", | ||
| conv_var_updater_factory: _CONV_VAR_UPDATER_FACTORY = conversation_variable_updater_factory, | ||
| ): | ||
| super().__init__( | ||
| id=id, | ||
| config=config, | ||
| graph_init_params=graph_init_params, | ||
| graph_runtime_state=graph_runtime_state, | ||
| ) | ||
| self._conv_var_updater_factory = conv_var_updater_factory | ||
|
|
||
| @classmethod | ||
| def version(cls) -> str: | ||
|
|
@@ -96,16 +88,7 @@ def _run(self) -> NodeRunResult: | |
| # Over write the variable. | ||
| self.graph_runtime_state.variable_pool.add(assigned_variable_selector, updated_variable) | ||
|
|
||
| # TODO: Move database operation to the pipeline. | ||
| # Update conversation variable. | ||
| conversation_id = self.graph_runtime_state.variable_pool.get(["sys", "conversation_id"]) | ||
| if not conversation_id: | ||
| raise VariableOperatorNodeError("conversation_id not found") | ||
| conv_var_updater = self._conv_var_updater_factory() | ||
| conv_var_updater.update(conversation_id=conversation_id.text, variable=updated_variable) | ||
| conv_var_updater.flush() | ||
| updated_variables = [common_helpers.variable_to_processed_data(assigned_variable_selector, updated_variable)] | ||
|
|
||
| return NodeRunResult( | ||
| status=WorkflowNodeExecutionStatus.SUCCEEDED, | ||
| inputs={ | ||
|
Comment on lines
88
to
94
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 5. Persistence layer not wired VariableAssigner nodes no longer persist conversation variables to the database, but only AdvancedChatAppRunner registers ConversationVariablePersistenceLayer. WorkflowAppRunner and PipelineRunner still run the graph engine without this layer, so conversation variable updates will stop persisting in those execution paths. Agent Prompt
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,22 +5,24 @@ | |
| from extensions.ext_database import db | ||
| from models import ConversationVariable | ||
|
|
||
| from .exc import VariableOperatorNodeError | ||
|
|
||
| class ConversationVariableNotFoundError(Exception): | ||
| pass | ||
|
|
||
|
|
||
| class ConversationVariableUpdaterImpl: | ||
| def update(self, conversation_id: str, variable: Variable): | ||
| def update(self, conversation_id: str, variable: Variable) -> None: | ||
| stmt = select(ConversationVariable).where( | ||
| ConversationVariable.id == variable.id, ConversationVariable.conversation_id == conversation_id | ||
| ) | ||
| with Session(db.engine) as session: | ||
| row = session.scalar(stmt) | ||
| if not row: | ||
| raise VariableOperatorNodeError("conversation variable not found in the database") | ||
| row.data = variable.model_dump_json() | ||
| session.commit() | ||
|
|
||
| def flush(self): | ||
| session = Session(db.engine) | ||
| row = session.scalar(stmt) | ||
| if not row: | ||
| raise ConversationVariableNotFoundError("conversation variable not found in the database") | ||
| row.data = variable.model_dump_json() | ||
|
Comment on lines
+9
to
+22
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 2. Service raises local exception The service code introduces and raises ConversationVariableNotFoundError defined inline instead of using a domain-specific exception from services/errors or core/errors. This breaks the required error taxonomy and makes consistent controller translation harder. Agent Prompt
|
||
| session.commit() | ||
|
Comment on lines
+18
to
+23
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 1. session() not context-managed ConversationVariableUpdaterImpl.update() creates a SQLAlchemy session without a context manager, so the session may not be reliably closed. This violates the required session lifecycle pattern and risks connection/resource leaks. Agent Prompt
|
||
|
|
||
| def flush(self) -> None: | ||
| pass | ||
|
|
||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
4. Warnings lack identifier context
📘 Rule violation✧ QualityAgent Prompt
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools