From 184d00e37507a6031605017925ff2ed0867541a5 Mon Sep 17 00:00:00 2001 From: Kent Date: Tue, 24 Mar 2026 23:51:44 +0800 Subject: [PATCH 1/9] feat(mcp): optimize impact_analysis response field naming for agent compliance MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Rename response fields to signal authority and match consumer output schema: - impacted_models → confirmed_impacted_models - not_impacted_models → confirmed_not_impacted_models - Add per-model affected_row_count (value_diff total or abs(row_count.delta) fallback) - Add response-level total_affected_row_count (max across models) - value_diff.rows_changed → value_diff.affected_row_count (per-column too) "confirmed_" prefix prevents agents from overriding DAG classifications with their own analysis. Pre-computed affected_row_count eliminates semantic translation errors (agent copies directly instead of interpreting). Eval result (ch3-phantom-filter, Sonnet, bare mode): Before: 8/12 (agent overrides 3 downstream models + wrong row count) After: 12/12 (agent copies confirmed lists + total_affected_row_count) Co-Authored-By: Claude Opus 4.6 (1M context) Signed-off-by: Kent --- recce/mcp_server.py | 32 ++++++++++++++++++++++++-------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/recce/mcp_server.py b/recce/mcp_server.py index 0c8c5530b..d31161fb8 100644 --- a/recce/mcp_server.py +++ b/recce/mcp_server.py @@ -1407,12 +1407,14 @@ def _run_value_diff_query(adapter, query): current_mean = float(raw_curr) if raw_curr is not None else None col_idx += 2 columns_result[col] = { - "rows_changed": col_changed, + "affected_row_count": col_changed, "base_mean": base_mean, "current_mean": current_mean, } + total_affected = rows_added + rows_removed + rows_changed model["value_diff"] = { + "affected_row_count": total_affected, "rows_added": rows_added, "rows_removed": rows_removed, "rows_changed": rows_changed, @@ -1440,7 +1442,9 @@ def _run_value_diff_query(adapter, query): total_matched = (model["row_count"]["current"] or 0) - vd["rows_added"] if total_matched > 0 and vd["rows_changed"] / total_matched > 0.2: top_cols = [ - col for col, stats in (vd.get("columns") or {}).items() if stats.get("rows_changed", 0) > 0 + col + for col, stats in (vd.get("columns") or {}).items() + if stats.get("affected_row_count", 0) > 0 ] if name not in seen_models: suggested_deep_dives.append( @@ -1494,6 +1498,18 @@ def _run_value_diff_query(adapter, query): ) seen_models.add(name) + # Compute per-model affected_row_count: value_diff total or abs(row_count.delta) as fallback + max_affected = 0 + for model in impacted_models: + if model["value_diff"] is not None: + model["affected_row_count"] = model["value_diff"]["affected_row_count"] + elif model["row_count"] is not None and model["row_count"].get("delta") is not None: + model["affected_row_count"] = abs(model["row_count"]["delta"]) + else: + model["affected_row_count"] = None + if model["affected_row_count"] is not None and model["affected_row_count"] > max_affected: + max_affected = model["affected_row_count"] + if sentry_metrics: duration = time.time() - start_time sentry_metrics.distribution("mcp.impact_analysis.duration", duration, unit="second") @@ -1503,17 +1519,17 @@ def _run_value_diff_query(adapter, query): "_guidance": ( "DO NOT OVERRIDE these classifications with your own analysis. " "These lists are computed from the lineage DAG and are definitive. " - "Copy impacted_models and not_impacted_models directly into your output. " - "When value_diff.rows_changed is present for a model, " - "use that number as the affected_row_count (exact count of " - "rows whose values differ between base and current). " + "Copy confirmed_impacted_models and confirmed_not_impacted_models " + "directly into your impacted_models and not_impacted_models output. " + "Use total_affected_row_count as your affected_row_count output. " "A model with 0 value changes is still impacted if it appears " "in this list — impact means 'in the blast radius', not " "'has changed data'." ), "classification_source": "lineage_dag", - "impacted_models": impacted_models, - "not_impacted_models": not_impacted_models, + "total_affected_row_count": max_affected, + "confirmed_impacted_models": impacted_models, + "confirmed_not_impacted_models": not_impacted_models, "suggested_deep_dives": suggested_deep_dives, "errors": errors, } From 4f698386d383d9126a98231b9bb803bb11ec2c3c Mon Sep 17 00:00:00 2001 From: Kent Date: Wed, 25 Mar 2026 22:11:59 +0800 Subject: [PATCH 2/9] fix(mcp): include views in impact_analysis row_count_diff Views were skipped from row_count_diff, causing affected_row_count to be null/0 for view-only changes. Views support SELECT COUNT(*) and their row count delta is essential for detecting filtered rows (e.g., WHERE clause dropping rows from a staging view). Only value_diff (PK Join) should skip views (expensive). Row count comparison is cheap and should always run. Co-Authored-By: Claude Opus 4.6 (1M context) Signed-off-by: Kent --- recce/mcp_server.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/recce/mcp_server.py b/recce/mcp_server.py index d31161fb8..6339d579b 100644 --- a/recce/mcp_server.py +++ b/recce/mcp_server.py @@ -1229,10 +1229,8 @@ async def _tool_impact_analysis(self, arguments: Dict[str, Any]) -> Dict[str, An else: not_impacted_models.append(name) - # Step 2a: Row count diff (skip views and removed models) - countable_models = [ - m for m in impacted_models if m["materialized"] != "view" and m["change_status"] != "removed" - ] + # Step 2a: Row count diff (skip removed models; include views for delta detection) + countable_models = [m for m in impacted_models if m["change_status"] != "removed"] if countable_models: countable_names = [m["name"] for m in countable_models] try: From 45010042c0bff69264792f06fd50e59bd7da4abf Mon Sep 17 00:00:00 2001 From: Kent Date: Fri, 27 Mar 2026 10:43:09 +0800 Subject: [PATCH 3/9] feat(mcp): extend impact_analysis with downstream value_diff and data_impact field - Add skip_downstream_value_diff parameter (opt-out for large DAGs) - Remove downstream skip in value_diff loop (table models now compared) - Add data_impact field: confirmed/none/potential per model - Force affected_row_count=null when data_impact=potential - Rewrite _guidance from prescriptive to descriptive Co-Authored-By: Claude Opus 4.6 (1M context) Signed-off-by: Kent --- recce/mcp_server.py | 50 ++++++++++++++++++++++++++++++---------- tests/test_mcp_server.py | 1 + 2 files changed, 39 insertions(+), 12 deletions(-) diff --git a/recce/mcp_server.py b/recce/mcp_server.py index 6339d579b..828f4b97f 100644 --- a/recce/mcp_server.py +++ b/recce/mcp_server.py @@ -759,6 +759,13 @@ async def list_tools() -> List[Tool]: "type": "boolean", "description": "Skip row-level value comparison on modified models. Default: false", }, + "skip_downstream_value_diff": { + "type": "boolean", + "description": ( + "Skip value comparison on downstream models " + "(faster for large DAGs). Default: false" + ), + }, }, }, ) @@ -1180,6 +1187,7 @@ async def _tool_impact_analysis(self, arguments: Dict[str, Any]) -> Dict[str, An "state:modified.body+ state:modified.macros+ state:modified.contract+", ) skip_value_diff = arguments.get("skip_value_diff", False) + skip_downstream_value_diff = arguments.get("skip_downstream_value_diff", False) errors = [] # Step 1: Lineage classification @@ -1298,11 +1306,13 @@ async def _tool_impact_analysis(self, arguments: Dict[str, Any]) -> Dict[str, An except Exception as e: errors.append({"step": "schema_diff", "message": str(e)}) - # Step 3: Value diff (PK Join on modified non-view models) + # Step 3: Value diff (PK Join on non-view impacted models) if not skip_value_diff: for model in impacted_models: - if model["materialized"] == "view" or model["change_status"] is None: - continue # skip views and downstream-only models + if model["materialized"] == "view": + continue # skip views (no PK, ambiguous semantics) + if skip_downstream_value_diff and model["change_status"] is None: + continue # opt-out for large DAGs node_id = node_id_by_name.get(model["name"]) if not node_id: continue @@ -1496,15 +1506,31 @@ def _run_value_diff_query(adapter, query): ) seen_models.add(name) - # Compute per-model affected_row_count: value_diff total or abs(row_count.delta) as fallback + # Compute per-model affected_row_count and data_impact max_affected = 0 for model in impacted_models: + # affected_row_count: value_diff total (priority) or abs(row_count.delta) (fallback) if model["value_diff"] is not None: model["affected_row_count"] = model["value_diff"]["affected_row_count"] elif model["row_count"] is not None and model["row_count"].get("delta") is not None: model["affected_row_count"] = abs(model["row_count"]["delta"]) else: model["affected_row_count"] = None + + # data_impact: evidence level from value_diff + if model["value_diff"] is not None: + if model["value_diff"]["affected_row_count"] > 0: + model["data_impact"] = "confirmed" + else: + model["data_impact"] = "none" + else: + model["data_impact"] = "potential" + + # When data_impact is "potential", force affected_row_count to null + # to avoid confusion from row_count fallback showing 0 + if model["data_impact"] == "potential": + model["affected_row_count"] = None + if model["affected_row_count"] is not None and model["affected_row_count"] > max_affected: max_affected = model["affected_row_count"] @@ -1515,14 +1541,14 @@ def _run_value_diff_query(adapter, query): result = { "_guidance": ( - "DO NOT OVERRIDE these classifications with your own analysis. " - "These lists are computed from the lineage DAG and are definitive. " - "Copy confirmed_impacted_models and confirmed_not_impacted_models " - "directly into your impacted_models and not_impacted_models output. " - "Use total_affected_row_count as your affected_row_count output. " - "A model with 0 value changes is still impacted if it appears " - "in this list — impact means 'in the blast radius', not " - "'has changed data'." + "confirmed_impacted_models lists all models in the DAG blast radius " + "(modified + downstream). The data_impact field indicates data-level " + "evidence: 'confirmed' = value_diff verified data changes exist, " + "'none' = value_diff verified zero data changes, 'potential' = no " + "value_diff available (views, no PK) — investigate with profile_diff. " + "Use data_impact to distinguish DAG-reachable models from actually " + "data-affected models. Note: incremental model value_diff may reflect " + "build window artifacts if not fully refreshed." ), "classification_source": "lineage_dag", "total_affected_row_count": max_affected, diff --git a/tests/test_mcp_server.py b/tests/test_mcp_server.py index 36965417f..e2b6c5719 100644 --- a/tests/test_mcp_server.py +++ b/tests/test_mcp_server.py @@ -1788,3 +1788,4 @@ async def test_impact_analysis_schema_has_select(self, mcp_server): tool = next(t for t in result.root.tools if t.name == "impact_analysis") assert "select" in tool.inputSchema["properties"] assert "skip_value_diff" in tool.inputSchema["properties"] + assert "skip_downstream_value_diff" in tool.inputSchema["properties"] From a5e65b0e5becda040891556dd4f675d053e05f5d Mon Sep 17 00:00:00 2001 From: Kent Date: Fri, 27 Mar 2026 10:50:09 +0800 Subject: [PATCH 4/9] fix(mcp): clarify data_impact guidance for skip_downstream_value_diff case Co-Authored-By: Claude Opus 4.6 (1M context) Signed-off-by: Kent --- recce/mcp_server.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/recce/mcp_server.py b/recce/mcp_server.py index 828f4b97f..befbe9f7b 100644 --- a/recce/mcp_server.py +++ b/recce/mcp_server.py @@ -1545,7 +1545,8 @@ def _run_value_diff_query(adapter, query): "(modified + downstream). The data_impact field indicates data-level " "evidence: 'confirmed' = value_diff verified data changes exist, " "'none' = value_diff verified zero data changes, 'potential' = no " - "value_diff available (views, no PK) — investigate with profile_diff. " + "value_diff available (views, no PK, or skipped via " + "skip_downstream_value_diff) — investigate with profile_diff. " "Use data_impact to distinguish DAG-reachable models from actually " "data-affected models. Note: incremental model value_diff may reflect " "build window artifacts if not fully refreshed." From b2d7e9bcc1fe48502ffaec79e4a16d4b08905dc8 Mon Sep 17 00:00:00 2001 From: Kent Date: Fri, 27 Mar 2026 11:28:59 +0800 Subject: [PATCH 5/9] test(mcp): add behavioral tests for data_impact, downstream value_diff, and guidance Adds TestImpactAnalysisBehavior covering: all models have data_impact field, confirmed/none/potential classification logic, null affected_row_count for potential models, guidance text rules, skip_downstream_value_diff, and skip_value_diff precedence. Co-Authored-By: Claude Sonnet 4.6 Signed-off-by: Kent --- tests/test_mcp_server.py | 341 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 341 insertions(+) diff --git a/tests/test_mcp_server.py b/tests/test_mcp_server.py index e2b6c5719..9b2dd0ac7 100644 --- a/tests/test_mcp_server.py +++ b/tests/test_mcp_server.py @@ -1789,3 +1789,344 @@ async def test_impact_analysis_schema_has_select(self, mcp_server): assert "select" in tool.inputSchema["properties"] assert "skip_value_diff" in tool.inputSchema["properties"] assert "skip_downstream_value_diff" in tool.inputSchema["properties"] + + +class TestImpactAnalysisBehavior: + """Test impact_analysis behavioral logic: data_impact, downstream value_diff.""" + + # --------------------------------------------------------------------------- + # Mock setup helpers + # --------------------------------------------------------------------------- + + LINEAGE_DIFF_DATA = { + "base": { + "nodes": { + "model.project.modified_model": { + "name": "modified_model", + "config": {"materialized": "table"}, + "columns": { + "id": {"type": "INTEGER"}, + "amount": {"type": "DECIMAL"}, + }, + }, + "model.project.downstream_model": { + "name": "downstream_model", + "config": {"materialized": "table"}, + "columns": { + "id": {"type": "INTEGER"}, + "total": {"type": "DECIMAL"}, + }, + }, + "model.project.view_model": { + "name": "view_model", + "config": {"materialized": "view"}, + "columns": {}, + }, + }, + "parent_map": {}, + }, + "current": { + "nodes": { + "model.project.modified_model": { + "name": "modified_model", + "config": {"materialized": "table"}, + "columns": { + "id": {"type": "INTEGER"}, + "amount": {"type": "DECIMAL"}, + }, + }, + "model.project.downstream_model": { + "name": "downstream_model", + "config": {"materialized": "table"}, + "columns": { + "id": {"type": "INTEGER"}, + "total": {"type": "DECIMAL"}, + }, + }, + "model.project.view_model": { + "name": "view_model", + "config": {"materialized": "view"}, + "columns": {}, + }, + }, + "parent_map": {}, + }, + "diff": { + "model.project.modified_model": {"change_status": "modified"}, + }, + } + + @staticmethod + def _make_mock_adapter(): + """Return a mock adapter with all value_diff and row_count hooks wired up.""" + adapter = MagicMock() + + def mock_select_nodes(select=""): + if any( + s in select + for s in [ + "state:modified.body+", + "state:modified.macros+", + "state:modified.contract+", + ] + ): + return { + "model.project.modified_model", + "model.project.downstream_model", + "model.project.view_model", + } + elif select == "state:modified": + return {"model.project.modified_model"} + return set() + + adapter.select_nodes.side_effect = mock_select_nodes + + def mock_get_model(node_id): + models = { + "model.project.modified_model": { + "primary_key": "id", + "columns": { + "id": {"type": "INTEGER"}, + "amount": {"type": "DECIMAL"}, + }, + }, + "model.project.downstream_model": { + "primary_key": "id", + "columns": { + "id": {"type": "INTEGER"}, + "total": {"type": "DECIMAL"}, + }, + }, + } + return models.get(node_id, {}) + + adapter.get_model.side_effect = mock_get_model + adapter.create_relation.return_value = "some_relation" + # connection_named is used as a context manager — MagicMock supports this natively + return adapter + + @staticmethod + def _make_execute_side_effect(modified_has_changes=True): + """ + Return a side_effect callable for adapter.execute(query, fetch=True). + + Dispatches on query content (model name in SQL) rather than call order, + because impacted_models iteration order depends on dict/set ordering. + + Row layout per model: + - modified_model (non-pk col: amount): [rows_added, rows_removed, rows_changed, amount__changed, amount__base_mean, amount__curr_mean] + - downstream_model (non-pk col: total): [rows_added, rows_removed, rows_changed, total__changed, total__base_mean, total__curr_mean] + """ + + def side_effect(query, fetch=False): + # Dispatch on column names in the SQL (create_relation returns a + # generic mock, so model name won't appear in query). + # modified_model has column "amount"; downstream_model has "total". + q = str(query) + if '"amount"' in q: + # modified_model + if modified_has_changes: + row = [0, 0, 5, 5, 10.0, 15.0] # 5 rows changed + else: + row = [0, 0, 0, 0, 10.0, 10.0] + else: + # downstream_model (column "total") — zero changes + row = [0, 0, 0, 0, 5.0, 5.0] + table = MagicMock() + table.__len__ = MagicMock(return_value=1) + table.__getitem__ = MagicMock(side_effect=lambda i: row if i == 0 else None) + return (None, table) + + return side_effect + + @pytest.fixture + def setup_impact_mocks(self, mcp_server): + """Fixture that yields (server, mock_context) with all impact_analysis mocks wired.""" + server, mock_context = mcp_server + + mock_context.get_lineage_diff.return_value = MagicMock( + model_dump=MagicMock(return_value=self.LINEAGE_DIFF_DATA) + ) + + adapter = self._make_mock_adapter() + adapter.execute.side_effect = self._make_execute_side_effect(modified_has_changes=True) + mock_context.adapter = adapter + + return server, mock_context + + @staticmethod + async def _call_impact_analysis(server, **extra_args): + """Invoke impact_analysis via the MCP call_tool handler.""" + handler = server.server.request_handlers[CallToolRequest] + req = CallToolRequest( + method="tools/call", + params=CallToolRequestParams(name="impact_analysis", arguments=extra_args), + ) + result = await handler(req) + import json + + return json.loads(result.root.content[0].text) + + # --------------------------------------------------------------------------- + # Tests + # --------------------------------------------------------------------------- + + @pytest.mark.asyncio + async def test_all_models_have_data_impact(self, setup_impact_mocks): + """Every model in confirmed_impacted_models must have a data_impact field.""" + server, mock_context = setup_impact_mocks + valid_values = {"confirmed", "none", "potential"} + + with ( + patch("recce.mcp_server.sentry_metrics", None), + patch.object(RowCountDiffTask, "execute", return_value={}), + ): + result = await self._call_impact_analysis(server) + + assert "confirmed_impacted_models" in result + assert len(result["confirmed_impacted_models"]) > 0 + for model in result["confirmed_impacted_models"]: + assert "data_impact" in model, f"model {model['name']} missing data_impact" + assert ( + model["data_impact"] in valid_values + ), f"model {model['name']} has invalid data_impact: {model['data_impact']}" + + @pytest.mark.asyncio + async def test_data_impact_confirmed_when_value_changes_exist(self, setup_impact_mocks): + """Modified model with rows_changed > 0 → data_impact='confirmed'.""" + server, mock_context = setup_impact_mocks + # adapter.execute already set to return 5 rows_changed for modified_model + + with ( + patch("recce.mcp_server.sentry_metrics", None), + patch.object(RowCountDiffTask, "execute", return_value={}), + ): + result = await self._call_impact_analysis(server) + + models_by_name = {m["name"]: m for m in result["confirmed_impacted_models"]} + assert "modified_model" in models_by_name + modified = models_by_name["modified_model"] + assert modified["data_impact"] == "confirmed" + assert modified["value_diff"] is not None + assert modified["value_diff"]["affected_row_count"] == 5 + + @pytest.mark.asyncio + async def test_data_impact_none_when_zero_changes(self, setup_impact_mocks): + """Downstream model with all-zero value_diff → data_impact='none'.""" + server, mock_context = setup_impact_mocks + + with ( + patch("recce.mcp_server.sentry_metrics", None), + patch.object(RowCountDiffTask, "execute", return_value={}), + ): + result = await self._call_impact_analysis(server) + + models_by_name = {m["name"]: m for m in result["confirmed_impacted_models"]} + assert "downstream_model" in models_by_name + downstream = models_by_name["downstream_model"] + assert downstream["data_impact"] == "none" + assert downstream["value_diff"] is not None + assert downstream["value_diff"]["affected_row_count"] == 0 + + @pytest.mark.asyncio + async def test_potential_has_null_affected_row_count(self, setup_impact_mocks): + """Views (no value_diff) → data_impact='potential', affected_row_count=None. + + Even if row_count.delta exists, affected_row_count should remain None for + potential models to avoid misleading callers. + """ + server, mock_context = setup_impact_mocks + # Give view_model a non-zero row_count delta to confirm the override to null + row_count_data = { + "view_model": {"base": 100, "curr": 110}, + "modified_model": {"base": 50, "curr": 50}, + "downstream_model": {"base": 50, "curr": 50}, + } + + with ( + patch("recce.mcp_server.sentry_metrics", None), + patch.object(RowCountDiffTask, "execute", return_value=row_count_data), + ): + result = await self._call_impact_analysis(server) + + models_by_name = {m["name"]: m for m in result["confirmed_impacted_models"]} + assert "view_model" in models_by_name + view = models_by_name["view_model"] + assert view["data_impact"] == "potential" + assert view["affected_row_count"] is None + + @pytest.mark.asyncio + async def test_guidance_is_descriptive(self, setup_impact_mocks): + """_guidance must describe data_impact without containing forbidden control text.""" + server, mock_context = setup_impact_mocks + + with ( + patch("recce.mcp_server.sentry_metrics", None), + patch.object(RowCountDiffTask, "execute", return_value={}), + ): + result = await self._call_impact_analysis(server) + + assert "_guidance" in result + guidance = result["_guidance"] + assert "data_impact" in guidance + assert "DO NOT OVERRIDE" not in guidance + + @pytest.mark.asyncio + async def test_skip_downstream_value_diff(self, mcp_server): + """skip_downstream_value_diff=True: downstream tables get value_diff=None, data_impact='potential'.""" + server, mock_context = mcp_server + + mock_context.get_lineage_diff.return_value = MagicMock( + model_dump=MagicMock(return_value=self.LINEAGE_DIFF_DATA) + ) + + adapter = self._make_mock_adapter() + # Only ONE call expected — for modified_model only + execute_side_effect = self._make_execute_side_effect(modified_has_changes=True) + adapter.execute.side_effect = execute_side_effect + mock_context.adapter = adapter + + with ( + patch("recce.mcp_server.sentry_metrics", None), + patch.object(RowCountDiffTask, "execute", return_value={}), + ): + result = await self._call_impact_analysis(server, skip_downstream_value_diff=True) + + models_by_name = {m["name"]: m for m in result["confirmed_impacted_models"]} + + # Modified model still gets value_diff + assert models_by_name["modified_model"]["value_diff"] is not None + assert models_by_name["modified_model"]["data_impact"] == "confirmed" + + # Downstream table: skipped → potential + assert models_by_name["downstream_model"]["value_diff"] is None + assert models_by_name["downstream_model"]["data_impact"] == "potential" + + @pytest.mark.asyncio + async def test_skip_value_diff_takes_precedence(self, mcp_server): + """skip_value_diff=True: ALL models get value_diff=None, data_impact='potential'.""" + server, mock_context = mcp_server + + mock_context.get_lineage_diff.return_value = MagicMock( + model_dump=MagicMock(return_value=self.LINEAGE_DIFF_DATA) + ) + + adapter = self._make_mock_adapter() + mock_context.adapter = adapter + + with ( + patch("recce.mcp_server.sentry_metrics", None), + patch.object(RowCountDiffTask, "execute", return_value={}), + ): + result = await self._call_impact_analysis(server, skip_value_diff=True, skip_downstream_value_diff=False) + + # adapter.execute should NOT have been called at all + adapter.execute.assert_not_called() + + for model in result["confirmed_impacted_models"]: + assert ( + model["value_diff"] is None + ), f"model {model['name']} should have value_diff=None when skip_value_diff=True" + assert ( + model["data_impact"] == "potential" + ), f"model {model['name']} should have data_impact='potential' when skip_value_diff=True" From d421ac1e5e34a7c4b5108903681e55e90f3e7a17 Mon Sep 17 00:00:00 2001 From: Kent Date: Tue, 31 Mar 2026 18:03:49 +0800 Subject: [PATCH 6/9] refactor(mcp): replace suggested_deep_dives with per-model next_action MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Move suggestion logic from top-level array to per-model `next_action` field - next_action=null for confirmed/none models (no follow-up needed) - next_action includes tool, columns, reason, priority for potential models - Priority driven by code change type: modified+schema→high, downstream→medium - Rename total_affected_row_count → max_affected_row_count (was computing max) - Remove top-level suggested_deep_dives from response - Update _guidance to teach next_action workflow - Update E2E tests for field renames and next_action structure - Update behavioral tests with next_action assertions Co-Authored-By: Claude Opus 4.6 Signed-off-by: Kent --- recce/mcp_server.py | 159 ++++++++++++++++++--------------------- tests/test_mcp_e2e.py | 109 ++++++++++++++------------- tests/test_mcp_server.py | 10 ++- 3 files changed, 136 insertions(+), 142 deletions(-) diff --git a/recce/mcp_server.py b/recce/mcp_server.py index befbe9f7b..e898ba0a7 100644 --- a/recce/mcp_server.py +++ b/recce/mcp_server.py @@ -740,8 +740,8 @@ async def list_tools() -> List[Tool]: Use the results to identify anomalies, then follow up with profile_diff, query_diff, or other tools until you have confidence in the root cause. - Models with value_diff: null have unknown data impact — use - suggested_deep_dives or call profile_diff/query_diff to investigate. + Models with data_impact: 'potential' have unknown data impact — follow + the model's next_action field to investigate with profile_diff/query_diff. """ ).strip(), inputSchema={ @@ -1431,82 +1431,7 @@ def _run_value_diff_query(adapter, query): except Exception as e: errors.append({"step": "value_diff", "model": model["name"], "message": str(e)}) - # Step 4: Suggested deep dives (deterministic rules) - suggested_deep_dives = [] - seen_models = set() # Avoid duplicate suggestions - - for model in impacted_models: - name = model["name"] - - # R1: rows_changed high + row_count stable → profile changed columns - if ( - model["value_diff"] is not None - and model["row_count"] is not None - and model["row_count"]["delta_pct"] is not None - and abs(model["row_count"]["delta_pct"]) <= 5 - ): - vd = model["value_diff"] - # Calculate ratio of changed rows to total matched rows - total_matched = (model["row_count"]["current"] or 0) - vd["rows_added"] - if total_matched > 0 and vd["rows_changed"] / total_matched > 0.2: - top_cols = [ - col - for col, stats in (vd.get("columns") or {}).items() - if stats.get("affected_row_count", 0) > 0 - ] - if name not in seen_models: - suggested_deep_dives.append( - { - "model": name, - "tool": "profile_diff", - "columns": top_cols if top_cols else None, - } - ) - seen_models.add(name) - continue - - # R2: row_count delta > 5% → profile whole model - if model["row_count"] and model["row_count"]["delta_pct"] is not None: - if abs(model["row_count"]["delta_pct"]) > 5: - if name not in seen_models: - suggested_deep_dives.append( - { - "model": name, - "tool": "profile_diff", - "columns": None, # whole model - } - ) - seen_models.add(name) - continue - - # R3: schema_changes non-empty → profile changed columns - if model["schema_changes"]: - changed_cols = [c["column"] for c in model["schema_changes"]] - if name not in seen_models: - suggested_deep_dives.append( - { - "model": name, - "tool": "profile_diff", - "columns": changed_cols, - } - ) - seen_models.add(name) - continue - - # R4: value_diff null on modified model → profile whole model - is_modified = model["change_status"] in ("modified", "added") - if model["value_diff"] is None and is_modified: - if name not in seen_models: - suggested_deep_dives.append( - { - "model": name, - "tool": "profile_diff", - "columns": None, - } - ) - seen_models.add(name) - - # Compute per-model affected_row_count and data_impact + # Step 4: Compute per-model affected_row_count, data_impact, and next_action max_affected = 0 for model in impacted_models: # affected_row_count: value_diff total (priority) or abs(row_count.delta) (fallback) @@ -1534,6 +1459,67 @@ def _run_value_diff_query(adapter, query): if model["affected_row_count"] is not None and model["affected_row_count"] > max_affected: max_affected = model["affected_row_count"] + # next_action: only for "potential" models — confirmed/none need no follow-up + model["next_action"] = None + if model["data_impact"] == "potential": + is_modified = model["change_status"] in ("modified", "added") + is_downstream = model["change_status"] is None + + if model["schema_changes"]: + # Schema changed — profile the changed columns + changed_cols = [c["column"] for c in model["schema_changes"]] + model["next_action"] = { + "tool": "profile_diff", + "columns": changed_cols, + "reason": "schema changed, value_diff unavailable", + "priority": "high" if is_modified else "medium", + } + elif is_modified: + # Modified but no value_diff (view, no PK, or error) + model["next_action"] = { + "tool": "profile_diff", + "columns": None, + "reason": "modified model, value_diff unavailable (view or no PK)", + "priority": "high", + } + elif is_downstream and model["materialized"] == "view": + # Downstream view — low priority + model["next_action"] = { + "tool": "profile_diff", + "columns": None, + "reason": "downstream view, value_diff skipped", + "priority": "low", + } + elif is_downstream: + # Downstream table — medium priority + model["next_action"] = { + "tool": "profile_diff", + "columns": None, + "reason": "downstream model, value_diff skipped", + "priority": "medium", + } + elif model["data_impact"] == "confirmed": + # Confirmed changes — suggest profile_diff only if high change ratio + vd = model["value_diff"] + if ( + model["row_count"] is not None + and model["row_count"]["delta_pct"] is not None + and abs(model["row_count"]["delta_pct"]) <= 5 + ): + total_matched = (model["row_count"]["current"] or 0) - vd["rows_added"] + if total_matched > 0 and vd["rows_changed"] / total_matched > 0.2: + top_cols = [ + col + for col, stats in (vd.get("columns") or {}).items() + if stats.get("affected_row_count", 0) > 0 + ] + model["next_action"] = { + "tool": "profile_diff", + "columns": top_cols if top_cols else None, + "reason": "high change ratio with stable row count — investigate value shifts", + "priority": "medium", + } + if sentry_metrics: duration = time.time() - start_time sentry_metrics.distribution("mcp.impact_analysis.duration", duration, unit="second") @@ -1542,20 +1528,19 @@ def _run_value_diff_query(adapter, query): result = { "_guidance": ( "confirmed_impacted_models lists all models in the DAG blast radius " - "(modified + downstream). The data_impact field indicates data-level " - "evidence: 'confirmed' = value_diff verified data changes exist, " - "'none' = value_diff verified zero data changes, 'potential' = no " - "value_diff available (views, no PK, or skipped via " - "skip_downstream_value_diff) — investigate with profile_diff. " - "Use data_impact to distinguish DAG-reachable models from actually " - "data-affected models. Note: incremental model value_diff may reflect " + "(modified + downstream). Use data_impact to triage: " + "'confirmed' = value_diff verified data changes exist — report directly. " + "'none' = value_diff verified zero data changes — report directly. " + "'potential' = no value_diff available (views, no PK, or skipped) " + "— follow the model's next_action to investigate. " + "Only models with next_action != null need further tool calls. " + "Note: incremental model value_diff may reflect " "build window artifacts if not fully refreshed." ), "classification_source": "lineage_dag", - "total_affected_row_count": max_affected, + "max_affected_row_count": max_affected, "confirmed_impacted_models": impacted_models, "confirmed_not_impacted_models": not_impacted_models, - "suggested_deep_dives": suggested_deep_dives, "errors": errors, } return self._maybe_add_single_env_warning(result) diff --git a/tests/test_mcp_e2e.py b/tests/test_mcp_e2e.py index f2bbc7e8b..2b8d207de 100644 --- a/tests/test_mcp_e2e.py +++ b/tests/test_mcp_e2e.py @@ -166,16 +166,15 @@ async def test_classifies_modified_and_downstream(self, mcp_e2e_impact): result = await server._tool_impact_analysis({}) # Structure check - assert "impacted_models" in result - assert "not_impacted_models" in result - assert "suggested_deep_dives" in result + assert "confirmed_impacted_models" in result + assert "confirmed_not_impacted_models" in result assert "errors" in result # customers is modified (different data → different checksum) - model_names = [m["name"] for m in result["impacted_models"]] + model_names = [m["name"] for m in result["confirmed_impacted_models"]] assert "customers" in model_names - customers = next(m for m in result["impacted_models"] if m["name"] == "customers") + customers = next(m for m in result["confirmed_impacted_models"] if m["name"] == "customers") assert customers["change_status"] == "modified" assert customers["materialized"] == "table" @@ -185,10 +184,10 @@ async def test_downstream_has_null_change_status(self, mcp_e2e_impact): server, _ = mcp_e2e_impact result = await server._tool_impact_analysis({}) - model_names = [m["name"] for m in result["impacted_models"]] + model_names = [m["name"] for m in result["confirmed_impacted_models"]] assert "orders" in model_names - orders = next(m for m in result["impacted_models"] if m["name"] == "orders") + orders = next(m for m in result["confirmed_impacted_models"] if m["name"] == "orders") assert orders["change_status"] is None # downstream, not directly modified @pytest.mark.asyncio @@ -206,9 +205,9 @@ async def test_no_false_positives_without_siblings(self, mcp_e2e_impact): ) result = await server._tool_impact_analysis({}) - impacted_names = [m["name"] for m in result["impacted_models"]] + impacted_names = [m["name"] for m in result["confirmed_impacted_models"]] assert "unrelated" not in impacted_names - assert "unrelated" in result["not_impacted_models"] + assert "unrelated" in result["confirmed_not_impacted_models"] @pytest.mark.asyncio async def test_row_count_populated_for_tables(self, mcp_e2e_impact): @@ -216,7 +215,7 @@ async def test_row_count_populated_for_tables(self, mcp_e2e_impact): server, _ = mcp_e2e_impact result = await server._tool_impact_analysis({}) - customers = next(m for m in result["impacted_models"] if m["name"] == "customers") + customers = next(m for m in result["confirmed_impacted_models"] if m["name"] == "customers") assert customers["row_count"] is not None assert customers["row_count"]["base"] == 2 assert customers["row_count"]["current"] == 3 @@ -226,7 +225,7 @@ async def test_row_count_populated_for_tables(self, mcp_e2e_impact): @pytest.mark.asyncio async def test_row_count_null_for_views(self, mcp_e2e_impact): - """Views should have row_count: null (expensive full-table scan).""" + """Views now included in row_count (delta detection signal).""" server, helper = mcp_e2e_impact # Add a view model @@ -242,9 +241,11 @@ async def test_row_count_null_for_views(self, mcp_e2e_impact): ) result = await server._tool_impact_analysis({}) - view_model = next((m for m in result["impacted_models"] if m["name"] == "customers_view"), None) + view_model = next((m for m in result["confirmed_impacted_models"] if m["name"] == "customers_view"), None) assert view_model is not None - assert view_model["row_count"] is None + # Views get row_count (useful metadata signal) but no value_diff + assert view_model["row_count"] is not None + assert view_model["value_diff"] is None @pytest.mark.asyncio async def test_schema_changes_detected(self, mcp_e2e): @@ -262,7 +263,7 @@ async def test_schema_changes_detected(self, mcp_e2e): ) result = await server._tool_impact_analysis({}) - users = next(m for m in result["impacted_models"] if m["name"] == "users") + users = next(m for m in result["confirmed_impacted_models"] if m["name"] == "users") assert len(users["schema_changes"]) > 0 added_cols = [c for c in users["schema_changes"] if c["change_status"] == "added"] assert any(c["column"] == "email" for c in added_cols) @@ -273,24 +274,25 @@ async def test_schema_changes_empty_when_no_change(self, mcp_e2e_impact): server, _ = mcp_e2e_impact result = await server._tool_impact_analysis({}) - customers = next(m for m in result["impacted_models"] if m["name"] == "customers") + customers = next(m for m in result["confirmed_impacted_models"] if m["name"] == "customers") assert customers["schema_changes"] == [] @pytest.mark.asyncio - async def test_suggested_deep_dive_r2_row_count_delta(self, mcp_e2e_impact): - """R2: row_count delta > 5% → suggest profile_diff on whole model.""" + async def test_next_action_row_count_delta(self, mcp_e2e_impact): + """Row count delta > 5% on potential model → next_action profile_diff.""" server, _ = mcp_e2e_impact result = await server._tool_impact_analysis({}) - # customers: base=2, curr=3, delta_pct=50% → R2 triggers - dives = result["suggested_deep_dives"] - customer_dive = next((d for d in dives if d["model"] == "customers"), None) - assert customer_dive is not None - assert customer_dive["tool"] == "profile_diff" + # customers: base=2, curr=3, delta_pct=50%, modified → data_impact depends on value_diff + customers = next(m for m in result["confirmed_impacted_models"] if m["name"] == "customers") + # If data_impact is potential (no PK), next_action should suggest profile_diff + if customers["data_impact"] == "potential": + assert customers["next_action"] is not None + assert customers["next_action"]["tool"] == "profile_diff" @pytest.mark.asyncio - async def test_suggested_deep_dive_r3_schema_change(self, mcp_e2e): - """R3: schema_changes non-empty → suggest profile_diff on changed columns.""" + async def test_next_action_schema_change(self, mcp_e2e): + """Schema changes on potential model → next_action profile_diff on changed columns.""" server, helper = mcp_e2e helper.create_model( @@ -303,18 +305,18 @@ async def test_suggested_deep_dive_r3_schema_change(self, mcp_e2e): ) result = await server._tool_impact_analysis({}) - dives = result["suggested_deep_dives"] - users_dive = next((d for d in dives if d["model"] == "users"), None) - assert users_dive is not None - assert "email" in (users_dive.get("columns") or []) + users = next(m for m in result["confirmed_impacted_models"] if m["name"] == "users") + assert users["next_action"] is not None + assert users["next_action"]["tool"] == "profile_diff" + assert "email" in (users["next_action"].get("columns") or []) @pytest.mark.asyncio - async def test_suggested_deep_dive_r4_null_value_diff(self, mcp_e2e): - """R4: value_diff null on modified model → suggest profile_diff.""" + async def test_next_action_null_value_diff_on_view(self, mcp_e2e): + """Modified view (value_diff=null) → next_action profile_diff.""" server, helper = mcp_e2e # Create a modified view: different data → different checksum → "modified" - # Views get value_diff=null (skipped), so R4 should trigger + # Views get value_diff=null (skipped), so next_action should trigger helper.create_model( "stg_orders", base_csv="id,amount\n1,100", @@ -326,16 +328,16 @@ async def test_suggested_deep_dive_r4_null_value_diff(self, mcp_e2e): ) result = await server._tool_impact_analysis({}) - # stg_orders is modified (different checksum) + view (value_diff=null) → R4 - stg = next(m for m in result["impacted_models"] if m["name"] == "stg_orders") + # stg_orders is modified (different checksum) + view (value_diff=null) + stg = next(m for m in result["confirmed_impacted_models"] if m["name"] == "stg_orders") assert stg["change_status"] == "modified" assert stg["value_diff"] is None + assert stg["data_impact"] == "potential" - dives = result["suggested_deep_dives"] - view_dive = next((d for d in dives if d["model"] == "stg_orders"), None) - assert view_dive is not None - assert view_dive["tool"] == "profile_diff" - assert view_dive["columns"] is None # whole model + assert stg["next_action"] is not None + assert stg["next_action"]["tool"] == "profile_diff" + assert stg["next_action"]["columns"] is None # whole model + assert stg["next_action"]["priority"] == "high" class TestImpactAnalysisFullScenario: @@ -378,14 +380,14 @@ async def test_full_scenario_modified_with_downstream(self, mcp_e2e): result = await server._tool_impact_analysis({}) # Verify classification - impacted_names = {m["name"] for m in result["impacted_models"]} + impacted_names = {m["name"] for m in result["confirmed_impacted_models"]} assert "stg_orders" in impacted_names assert "orders" in impacted_names assert "customers" not in impacted_names - assert "customers" in result["not_impacted_models"] + assert "customers" in result["confirmed_not_impacted_models"] # Verify row counts - stg = next(m for m in result["impacted_models"] if m["name"] == "stg_orders") + stg = next(m for m in result["confirmed_impacted_models"] if m["name"] == "stg_orders") assert stg["row_count"]["base"] == 2 assert stg["row_count"]["current"] == 2 assert stg["row_count"]["delta"] == 0 @@ -429,7 +431,7 @@ async def test_row_count_error_captured_not_fatal(self, mcp_e2e): result = await server._tool_impact_analysis({}) # Should not crash — structure is intact - assert "impacted_models" in result + assert "confirmed_impacted_models" in result assert "errors" in result @@ -451,7 +453,7 @@ async def test_value_diff_with_pk(self, mcp_e2e): helper.add_unique_test("model.recce_test.orders", "orders", "id") result = await server._tool_impact_analysis({}) - orders = next(m for m in result["impacted_models"] if m["name"] == "orders") + orders = next(m for m in result["confirmed_impacted_models"] if m["name"] == "orders") vd = orders["value_diff"] assert vd is not None assert vd["rows_changed"] == 1 # id=1: amount 100→150 @@ -459,7 +461,7 @@ async def test_value_diff_with_pk(self, mcp_e2e): assert vd["rows_removed"] == 0 assert "columns" in vd assert "amount" in vd["columns"] - assert vd["columns"]["amount"]["rows_changed"] == 1 + assert vd["columns"]["amount"]["affected_row_count"] == 1 @pytest.mark.asyncio async def test_no_pk_returns_null(self, mcp_e2e): @@ -474,7 +476,7 @@ async def test_no_pk_returns_null(self, mcp_e2e): curr_columns={"id": "INTEGER", "amount": "INTEGER"}, ) result = await server._tool_impact_analysis({}) - orders = next(m for m in result["impacted_models"] if m["name"] == "orders") + orders = next(m for m in result["confirmed_impacted_models"] if m["name"] == "orders") assert orders["value_diff"] is None @pytest.mark.asyncio @@ -491,7 +493,7 @@ async def test_skip_value_diff_flag(self, mcp_e2e): ) helper.add_unique_test("model.recce_test.orders", "orders", "id") result = await server._tool_impact_analysis({"skip_value_diff": True}) - orders = next(m for m in result["impacted_models"] if m["name"] == "orders") + orders = next(m for m in result["confirmed_impacted_models"] if m["name"] == "orders") assert orders["value_diff"] is None @pytest.mark.asyncio @@ -511,17 +513,16 @@ async def test_r1_high_rows_changed_stable_count(self, mcp_e2e): result = await server._tool_impact_analysis({}) - orders = next(m for m in result["impacted_models"] if m["name"] == "orders") + orders = next(m for m in result["confirmed_impacted_models"] if m["name"] == "orders") # row_count: base=2, curr=2, delta=0 (stable) # value_diff: rows_changed=2 (100% of matched rows) assert orders["row_count"]["delta"] == 0 - assert orders["value_diff"]["rows_changed"] == 2 + assert orders["value_diff"]["affected_row_count"] == 2 - dives = result["suggested_deep_dives"] - orders_dive = next((d for d in dives if d["model"] == "orders"), None) - assert orders_dive is not None - assert orders_dive["tool"] == "profile_diff" - assert "amount" in (orders_dive["columns"] or []) + # High change ratio → next_action suggests profile_diff on changed columns + assert orders["next_action"] is not None + assert orders["next_action"]["tool"] == "profile_diff" + assert "amount" in (orders["next_action"]["columns"] or []) @pytest.mark.asyncio async def test_value_diff_per_column_means(self, mcp_e2e): @@ -537,7 +538,7 @@ async def test_value_diff_per_column_means(self, mcp_e2e): ) helper.add_unique_test("model.recce_test.orders", "orders", "id") result = await server._tool_impact_analysis({}) - orders = next(m for m in result["impacted_models"] if m["name"] == "orders") + orders = next(m for m in result["confirmed_impacted_models"] if m["name"] == "orders") vd = orders["value_diff"] assert vd["columns"]["amount"]["base_mean"] is not None assert vd["columns"]["amount"]["current_mean"] is not None diff --git a/tests/test_mcp_server.py b/tests/test_mcp_server.py index 9b2dd0ac7..a6b4001c2 100644 --- a/tests/test_mcp_server.py +++ b/tests/test_mcp_server.py @@ -2009,6 +2009,8 @@ async def test_data_impact_confirmed_when_value_changes_exist(self, setup_impact assert modified["data_impact"] == "confirmed" assert modified["value_diff"] is not None assert modified["value_diff"]["affected_row_count"] == 5 + # confirmed models with low change ratio → next_action is None + # (next_action only set for potential, or confirmed with high change ratio) @pytest.mark.asyncio async def test_data_impact_none_when_zero_changes(self, setup_impact_mocks): @@ -2054,6 +2056,9 @@ async def test_potential_has_null_affected_row_count(self, setup_impact_mocks): view = models_by_name["view_model"] assert view["data_impact"] == "potential" assert view["affected_row_count"] is None + # Potential models always get next_action + assert view["next_action"] is not None + assert view["next_action"]["tool"] == "profile_diff" @pytest.mark.asyncio async def test_guidance_is_descriptive(self, setup_impact_mocks): @@ -2069,6 +2074,7 @@ async def test_guidance_is_descriptive(self, setup_impact_mocks): assert "_guidance" in result guidance = result["_guidance"] assert "data_impact" in guidance + assert "next_action" in guidance assert "DO NOT OVERRIDE" not in guidance @pytest.mark.asyncio @@ -2098,9 +2104,11 @@ async def test_skip_downstream_value_diff(self, mcp_server): assert models_by_name["modified_model"]["value_diff"] is not None assert models_by_name["modified_model"]["data_impact"] == "confirmed" - # Downstream table: skipped → potential + # Downstream table: skipped → potential with next_action assert models_by_name["downstream_model"]["value_diff"] is None assert models_by_name["downstream_model"]["data_impact"] == "potential" + assert models_by_name["downstream_model"]["next_action"] is not None + assert models_by_name["downstream_model"]["next_action"]["priority"] == "medium" @pytest.mark.asyncio async def test_skip_value_diff_takes_precedence(self, mcp_server): From 3ef6b961403f7681e103ee8179584044f574ec82 Mon Sep 17 00:00:00 2001 From: Kent Date: Tue, 31 Mar 2026 18:10:37 +0800 Subject: [PATCH 7/9] test(mcp): add coverage for next_action, field renames, and priority logic MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Confirmed low change ratio → next_action=None - data_impact='none' → next_action=None - next_action field completeness (tool, columns, reason, priority) - Response uses max_affected_row_count, not total_affected or suggested_deep_dives - Schema change next_action has priority=high - Downstream view next_action has priority=low Co-Authored-By: Claude Opus 4.6 Signed-off-by: Kent --- tests/test_mcp_e2e.py | 32 +++++++++++++++++++ tests/test_mcp_server.py | 68 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 100 insertions(+) diff --git a/tests/test_mcp_e2e.py b/tests/test_mcp_e2e.py index 2b8d207de..7b528ace6 100644 --- a/tests/test_mcp_e2e.py +++ b/tests/test_mcp_e2e.py @@ -308,8 +308,40 @@ async def test_next_action_schema_change(self, mcp_e2e): users = next(m for m in result["confirmed_impacted_models"] if m["name"] == "users") assert users["next_action"] is not None assert users["next_action"]["tool"] == "profile_diff" + assert users["next_action"]["priority"] == "high" assert "email" in (users["next_action"].get("columns") or []) + @pytest.mark.asyncio + async def test_next_action_downstream_view_low_priority(self, mcp_e2e): + """Downstream view → next_action with priority=low.""" + server, helper = mcp_e2e + + helper.create_model( + "orders", + base_csv="id,amount\n1,100", + curr_csv="id,amount\n1,200", + unique_id="model.recce_test.orders", + base_columns={"id": "INTEGER", "amount": "INTEGER"}, + curr_columns={"id": "INTEGER", "amount": "INTEGER"}, + ) + # downstream view of orders + helper.create_model( + "orders_view", + base_csv="id,amount\n1,100", + curr_csv="id,amount\n1,100", + unique_id="model.recce_test.orders_view", + depends_on=["model.recce_test.orders"], + base_columns={"id": "INTEGER", "amount": "INTEGER"}, + curr_columns={"id": "INTEGER", "amount": "INTEGER"}, + patch_func=lambda d: d["config"].update({"materialized": "view"}), + ) + result = await server._tool_impact_analysis({"skip_downstream_value_diff": True}) + + view = next(m for m in result["confirmed_impacted_models"] if m["name"] == "orders_view") + assert view["data_impact"] == "potential" + assert view["next_action"] is not None + assert view["next_action"]["priority"] == "low" + @pytest.mark.asyncio async def test_next_action_null_value_diff_on_view(self, mcp_e2e): """Modified view (value_diff=null) → next_action profile_diff.""" diff --git a/tests/test_mcp_server.py b/tests/test_mcp_server.py index a6b4001c2..3b41b7dd8 100644 --- a/tests/test_mcp_server.py +++ b/tests/test_mcp_server.py @@ -2138,3 +2138,71 @@ async def test_skip_value_diff_takes_precedence(self, mcp_server): assert ( model["data_impact"] == "potential" ), f"model {model['name']} should have data_impact='potential' when skip_value_diff=True" + + @pytest.mark.asyncio + async def test_confirmed_low_change_ratio_has_null_next_action(self, setup_impact_mocks): + """Confirmed model with low change ratio → next_action=None.""" + server, mock_context = setup_impact_mocks + + with ( + patch("recce.mcp_server.sentry_metrics", None), + patch.object(RowCountDiffTask, "execute", return_value={}), + ): + result = await self._call_impact_analysis(server) + + models_by_name = {m["name"]: m for m in result["confirmed_impacted_models"]} + modified = models_by_name["modified_model"] + assert modified["data_impact"] == "confirmed" + # Mock has 5 rows_changed out of many — not high ratio → null next_action + assert modified["next_action"] is None + + @pytest.mark.asyncio + async def test_none_data_impact_has_null_next_action(self, setup_impact_mocks): + """data_impact='none' (zero changes) → next_action=None.""" + server, mock_context = setup_impact_mocks + + with ( + patch("recce.mcp_server.sentry_metrics", None), + patch.object(RowCountDiffTask, "execute", return_value={}), + ): + result = await self._call_impact_analysis(server) + + models_by_name = {m["name"]: m for m in result["confirmed_impacted_models"]} + downstream = models_by_name["downstream_model"] + assert downstream["data_impact"] == "none" + assert downstream["next_action"] is None + + @pytest.mark.asyncio + async def test_next_action_has_all_required_fields(self, setup_impact_mocks): + """next_action when present must have tool, columns, reason, priority.""" + server, mock_context = setup_impact_mocks + + with ( + patch("recce.mcp_server.sentry_metrics", None), + patch.object(RowCountDiffTask, "execute", return_value={}), + ): + result = await self._call_impact_analysis(server) + + for model in result["confirmed_impacted_models"]: + if model["next_action"] is not None: + na = model["next_action"] + assert "tool" in na, f"{model['name']}: missing tool" + assert "reason" in na, f"{model['name']}: missing reason" + assert "priority" in na, f"{model['name']}: missing priority" + assert na["priority"] in ("high", "medium", "low"), f"{model['name']}: invalid priority" + assert "columns" in na, f"{model['name']}: missing columns key" + + @pytest.mark.asyncio + async def test_response_uses_new_field_names(self, setup_impact_mocks): + """Response must use max_affected_row_count, not total_affected or suggested_deep_dives.""" + server, mock_context = setup_impact_mocks + + with ( + patch("recce.mcp_server.sentry_metrics", None), + patch.object(RowCountDiffTask, "execute", return_value={}), + ): + result = await self._call_impact_analysis(server) + + assert "max_affected_row_count" in result + assert "total_affected_row_count" not in result + assert "suggested_deep_dives" not in result From 9ea27a7f0a1c802f5076a8c0b4c667f97e1e1993 Mon Sep 17 00:00:00 2001 From: Kent Date: Wed, 1 Apr 2026 15:36:43 +0800 Subject: [PATCH 8/9] fix(test): update PR #1241 regression test for renamed fields The merge from main brought in test_schema_diff_error_does_not_block_value_diff which used old field names (impacted_models, rows_changed). Updated to confirmed_impacted_models and affected_row_count. Co-Authored-By: Claude Opus 4.6 Signed-off-by: Kent --- tests/test_mcp_e2e.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_mcp_e2e.py b/tests/test_mcp_e2e.py index 87a4d71e3..ac1d92fd9 100644 --- a/tests/test_mcp_e2e.py +++ b/tests/test_mcp_e2e.py @@ -508,12 +508,12 @@ def patched_get_lineage_diff(): assert any(e["step"] == "schema_diff" for e in result["errors"]) # Function returned successfully (no UnboundLocalError) - assert "impacted_models" in result - orders = next(m for m in result["impacted_models"] if m["name"] == "orders") + assert "confirmed_impacted_models" in result + orders = next(m for m in result["confirmed_impacted_models"] if m["name"] == "orders") # Value-diff still ran (the whole point of the fix) assert orders["value_diff"] is not None - assert orders["value_diff"]["rows_changed"] >= 0 + assert orders["value_diff"]["affected_row_count"] >= 0 class TestImpactAnalysisValueDiff: From 9c3b8d2084ad9528b471c41f5145889df4169de6 Mon Sep 17 00:00:00 2001 From: Kent Date: Wed, 1 Apr 2026 17:19:22 +0800 Subject: [PATCH 9/9] test(mcp): add value_diff error resilience test for patch coverage Covers the value_diff except block (lines 1441-1442) by simulating a FULL OUTER JOIN failure and verifying the error is captured, the model still appears with data_impact='potential', and the function continues. Co-Authored-By: Claude Opus 4.6 Signed-off-by: Kent --- tests/test_mcp_e2e.py | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/tests/test_mcp_e2e.py b/tests/test_mcp_e2e.py index ac1d92fd9..033ee7db3 100644 --- a/tests/test_mcp_e2e.py +++ b/tests/test_mcp_e2e.py @@ -466,6 +466,42 @@ async def test_row_count_error_captured_not_fatal(self, mcp_e2e): assert "confirmed_impacted_models" in result assert "errors" in result + @pytest.mark.asyncio + async def test_value_diff_error_captured_not_fatal(self, mcp_e2e): + """If value_diff query fails, error is captured and model still appears.""" + server, helper = mcp_e2e + + helper.create_model( + "orders", + base_csv="id,amount\n1,100", + curr_csv="id,amount\n1,150", + unique_id="model.recce_test.orders", + base_columns={"id": "INTEGER", "amount": "INTEGER"}, + curr_columns={"id": "INTEGER", "amount": "INTEGER"}, + ) + helper.add_unique_test("model.recce_test.orders", "orders", "id") + + # Patch adapter.execute to raise during value_diff SQL execution + original_execute = server.context.adapter.execute + + def failing_execute(sql, fetch=False): + if "FULL OUTER JOIN" in sql: + raise RuntimeError("simulated value_diff failure") + return original_execute(sql, fetch=fetch) + + with patch.object(server.context.adapter, "execute", side_effect=failing_execute): + result = await server._tool_impact_analysis({}) + + # Model still appears + orders = next(m for m in result["confirmed_impacted_models"] if m["name"] == "orders") + assert orders["value_diff"] is None + assert orders["data_impact"] == "potential" + + # Error captured + vd_errors = [e for e in result["errors"] if e["step"] == "value_diff"] + assert len(vd_errors) == 1 + assert "simulated value_diff failure" in vd_errors[0]["message"] + @pytest.mark.asyncio async def test_schema_diff_error_does_not_block_value_diff(self, mcp_e2e): """Regression: schema-diff failure must not prevent value-diff from running.