From b84d4c202c414af7cf39fd3e3c275c786c2fc581 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filip=20Paw=C5=82owski?= Date: Mon, 3 Nov 2025 09:19:10 +0100 Subject: [PATCH 01/14] SNOW-295953: rows_affected_proposal --- src/snowflake/connector/cursor.py | 42 ++++++++++++ test/integ/test_connection.py | 109 ++++++++++++++++++++++++++++++ 2 files changed, 151 insertions(+) diff --git a/src/snowflake/connector/cursor.py b/src/snowflake/connector/cursor.py index c13ab242c7..b63da7d6ac 100644 --- a/src/snowflake/connector/cursor.py +++ b/src/snowflake/connector/cursor.py @@ -418,6 +418,10 @@ def __init__( self._log_max_query_length = connection.log_max_query_length self._inner_cursor: SnowflakeCursorBase | None = None self._prefetch_hook = None + self._stats_data: dict[str, int] | None = ( + None # Stores stats from response for DML operations + ) + self._rownumber: int | None = None self.reset() @@ -454,6 +458,26 @@ def _description_internal(self) -> list[ResultMetadataV2]: def rowcount(self) -> int | None: return self._total_rowcount if self._total_rowcount >= 0 else None + @property + def rows_affected(self) -> RowsAffected | None: + """Returns detailed rows affected statistics for DML operations. + + Returns a NamedTuple with fields: + - num_rows_inserted: Number of rows inserted + - num_rows_deleted: Number of rows deleted + - num_rows_updated: Number of rows updated + + Returns None if no DML stats are available. + """ + if self._stats_data is None: + return RowsAffected(None, None, None, None) + return RowsAffected( + num_rows_inserted=self._stats_data.get("numRowsInserted", None), + num_rows_deleted=self._stats_data.get("numRowsDeleted", None), + num_rows_updated=self._stats_data.get("numRowsUpdated", None), + num_dml_duplicates=self._stats_data.get("numDmlDuplicates", None), + ) + @property def rownumber(self) -> int | None: return self._rownumber if self._rownumber >= 0 else None @@ -1201,6 +1225,10 @@ def _init_result_and_meta(self, data: dict[Any, Any]) -> None: self._rownumber = -1 self._result_state = ResultState.VALID + # Extract rows_affected from stats object if available (for DML operations like CTAS, INSERT, UPDATE, DELETE) + self._stats_data = data.get("stats", None) + logger.debug(f"Execution stats: {self.rows_affected}") + # don't update the row count when the result is returned from `describe` method if is_dml and "rowset" in data and len(data["rowset"]) > 0: updated_rows = 0 @@ -2007,3 +2035,17 @@ def __getattr__(name): ) return None raise AttributeError(f"module {__name__!r} has no attribute {name!r}") + + +class RowsAffected(NamedTuple): + """ + Statistics for rows affected by a DML operation. + None value expresses particular statistic being unknown - not returned by the backend service. + + Added in the first place to expose DML data of CTAS statements - SNOW-295953 + """ + + num_rows_inserted: int | None = None + num_rows_deleted: int | None = None + num_rows_updated: int | None = None + num_dml_duplicates: int | None = None diff --git a/test/integ/test_connection.py b/test/integ/test_connection.py index 1ff1d11c99..2c7762777d 100644 --- a/test/integ/test_connection.py +++ b/test/integ/test_connection.py @@ -1892,3 +1892,112 @@ def test_snowflake_version(): assert re.match( version_pattern, conn.snowflake_version ), f"snowflake_version should match pattern 'x.y.z', but got '{conn.snowflake_version}'" + + +@pytest.mark.skipolddriver +def test_ctas_rows_affected_from_stats(conn_cnx): + """Test that cursor.rowcount is correctly extracted from stats for CTAS operations.""" + with conn_cnx() as conn: + with conn.cursor() as cur: + # Create temp table with data - should have rowcount from stats.numRowsInserted + cur.execute( + "create temp table test_ctas_stats (col1 int) as select col1 from values (1), (2), (3) as t(col1)" + ) + # For CTAS, rowcount should be the number of rows inserted (3) + assert ( + cur.rowcount == 3 + ), f"Expected rowcount 3 for CTAS with 3 rows, got {cur.rowcount}" + # rows_affected should contain the detailed stats as a NamedTuple + assert ( + cur.rows_affected is not None + ), "rows_affected should not be None for CTAS" + assert ( + cur.rows_affected.num_rows_inserted == 3 + ), f"Expected num_rows_inserted=3, got {cur.rows_affected.num_rows_inserted}" + assert cur.rows_affected.num_rows_deleted == 0 + assert cur.rows_affected.num_rows_updated == 0 + assert cur.rows_affected.num_dml_duplicates == 0 + + +@pytest.mark.skipolddriver +def test_create_view_rows_affected_from_stats(conn_cnx): + """Test that cursor.rowcount is correctly extracted from stats for CTAS operations.""" + with conn_cnx() as conn: + with conn.cursor() as cur: + # Create temp table with data - should have rowcount from stats.numRowsInserted + cur.execute( + "create temp view test_ctas_stats as select col1 from values (1), (2), (3) as t(col1)" + ) + # For CTAS, rowcount should be the number of rows inserted (3) + assert ( + cur.rowcount == 1 + ), f"Expected rowcount 3 for CTAS with 3 rows, got {cur.rowcount}" + # rows_affected should contain the detailed stats as a NamedTuple + assert ( + cur.rows_affected is not None + ), "rows_affected should not be None for CTAS" + assert ( + cur.rows_affected.num_rows_inserted is None + ), f"Expected num_rows_inserted=3, got {cur.rows_affected.num_rows_inserted}" + assert cur.rows_affected.num_rows_deleted is None + assert cur.rows_affected.num_rows_updated is None + assert cur.rows_affected.num_dml_duplicates is None + + +@pytest.mark.skipolddriver +def test_cvas_separate_cursors_rows_affected_from_stats(conn_cnx): + """Test that cursor.rowcount is correctly extracted from stats for CTAS operations.""" + with conn_cnx() as conn: + with conn.cursor() as cur: + # Create temp table with data - should have rowcount from stats.numRowsInserted + cur.execute( + "create temp table test_ctas_stats (col1 int) as select col1 from values (1), (2), (3) as t(col1)" + ) + + with conn.cursor() as cur: + cur.execute( + "create temp view test_cvas_stats as select col1 from test_ctas_stats" + ) + # For CTAS, rowcount should be the number of rows inserted (3) + assert ( + cur.rowcount == 1 + ), f"Expected rowcount 3 for CTAS with 3 rows, got {cur.rowcount}" + # rows_affected should contain the detailed stats as a NamedTuple + assert ( + cur.rows_affected is not None + ), "rows_affected should not be None for CTAS" + assert ( + cur.rows_affected.num_rows_inserted is None + ), f"Expected num_rows_inserted=3, got {cur.rows_affected.num_rows_inserted}" + assert cur.rows_affected.num_rows_deleted is None + assert cur.rows_affected.num_rows_updated is None + assert cur.rows_affected.num_dml_duplicates is None + + +@pytest.mark.skipolddriver +def test_cvas_one_cursor_rows_affected_from_stats(conn_cnx): + """Test that cursor.rowcount is correctly extracted from stats for CTAS operations.""" + with conn_cnx() as conn: + with conn.cursor() as cur: + # Create temp table with data - should have rowcount from stats.numRowsInserted + cur.execute( + "create temp table test_ctas_stats (col1 int) as select col1 from values (1), (2), (3) as t(col1)" + ) + + cur.execute( + "create temp view test_cvas_stats as select col1 from test_ctas_stats" + ) + # For CTAS, rowcount should be the number of rows inserted (3) + assert ( + cur.rowcount == 1 + ), f"Expected rowcount 3 for CTAS with 3 rows, got {cur.rowcount}" + # rows_affected should contain the detailed stats as a NamedTuple + assert ( + cur.rows_affected is not None + ), "rows_affected should not be None for CTAS" + assert ( + cur.rows_affected.num_rows_inserted is None + ), f"Expected num_rows_inserted=3, got {cur.rows_affected.num_rows_inserted}" + assert cur.rows_affected.num_rows_deleted is None + assert cur.rows_affected.num_rows_updated is None + assert cur.rows_affected.num_dml_duplicates is None From d2a0f19e1f73f25964c8b8536bd0bff95c7c6cc9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filip=20Paw=C5=82owski?= Date: Mon, 3 Nov 2025 10:26:18 +0100 Subject: [PATCH 02/14] SNOW-295953: fixed bug with wrong property assumption --- test/integ/test_connection.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/integ/test_connection.py b/test/integ/test_connection.py index 2c7762777d..9bf7bcf6d4 100644 --- a/test/integ/test_connection.py +++ b/test/integ/test_connection.py @@ -1905,7 +1905,7 @@ def test_ctas_rows_affected_from_stats(conn_cnx): ) # For CTAS, rowcount should be the number of rows inserted (3) assert ( - cur.rowcount == 3 + cur.rowcount == 1 ), f"Expected rowcount 3 for CTAS with 3 rows, got {cur.rowcount}" # rows_affected should contain the detailed stats as a NamedTuple assert ( @@ -1932,7 +1932,7 @@ def test_create_view_rows_affected_from_stats(conn_cnx): assert ( cur.rowcount == 1 ), f"Expected rowcount 3 for CTAS with 3 rows, got {cur.rowcount}" - # rows_affected should contain the detailed stats as a NamedTuple + # rowcount should stay compliant with old approach - show only the amount of rows returned by Backend. assert ( cur.rows_affected is not None ), "rows_affected should not be None for CTAS" From f90b72e3b1cef6e154ded0bab817956ef0112844 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filip=20Paw=C5=82owski?= Date: Mon, 3 Nov 2025 10:56:36 +0100 Subject: [PATCH 03/14] SNOW-295953: Cleanup docs and comments --- src/snowflake/connector/cursor.py | 2 +- test/integ/test_connection.py | 68 ++++++++++--------------------- 2 files changed, 23 insertions(+), 47 deletions(-) diff --git a/src/snowflake/connector/cursor.py b/src/snowflake/connector/cursor.py index b63da7d6ac..7a810419c9 100644 --- a/src/snowflake/connector/cursor.py +++ b/src/snowflake/connector/cursor.py @@ -467,7 +467,7 @@ def rows_affected(self) -> RowsAffected | None: - num_rows_deleted: Number of rows deleted - num_rows_updated: Number of rows updated - Returns None if no DML stats are available. + Returns None on each position if no DML stats are available. """ if self._stats_data is None: return RowsAffected(None, None, None, None) diff --git a/test/integ/test_connection.py b/test/integ/test_connection.py index 9bf7bcf6d4..51b3516f8f 100644 --- a/test/integ/test_connection.py +++ b/test/integ/test_connection.py @@ -1063,7 +1063,7 @@ def test_client_fetch_threads_setting(conn_cnx): @pytest.mark.skipolddriver @pytest.mark.parametrize("disable_request_pooling", [True, False]) def test_ocsp_and_rest_pool_isolation(conn_cnx, disable_request_pooling): - """Each connection’s SessionManager is isolated; OCSP picks the right one.""" + """Each connection's SessionManager is isolated; OCSP picks the right one.""" from snowflake.connector.ssl_wrap_socket import get_current_session_manager # @@ -1896,17 +1896,15 @@ def test_snowflake_version(): @pytest.mark.skipolddriver def test_ctas_rows_affected_from_stats(conn_cnx): - """Test that cursor.rowcount is correctly extracted from stats for CTAS operations.""" + """Test that cursor.rowcount and cursor.rows_affected work for CTAS operations.""" with conn_cnx() as conn: with conn.cursor() as cur: - # Create temp table with data - should have rowcount from stats.numRowsInserted cur.execute( "create temp table test_ctas_stats (col1 int) as select col1 from values (1), (2), (3) as t(col1)" ) - # For CTAS, rowcount should be the number of rows inserted (3) assert ( cur.rowcount == 1 - ), f"Expected rowcount 3 for CTAS with 3 rows, got {cur.rowcount}" + ), f"Expected rowcount 1 for CTAS, got {cur.rowcount}" # rows_affected should contain the detailed stats as a NamedTuple assert ( cur.rows_affected is not None @@ -1921,24 +1919,18 @@ def test_ctas_rows_affected_from_stats(conn_cnx): @pytest.mark.skipolddriver def test_create_view_rows_affected_from_stats(conn_cnx): - """Test that cursor.rowcount is correctly extracted from stats for CTAS operations.""" + """Test that cursor.rows_affected returns None fields for VIEW operations.""" with conn_cnx() as conn: with conn.cursor() as cur: - # Create temp table with data - should have rowcount from stats.numRowsInserted cur.execute( - "create temp view test_ctas_stats as select col1 from values (1), (2), (3) as t(col1)" + "create temp view test_view_stats as select col1 from values (1), (2), (3) as t(col1)" ) - # For CTAS, rowcount should be the number of rows inserted (3) assert ( cur.rowcount == 1 - ), f"Expected rowcount 3 for CTAS with 3 rows, got {cur.rowcount}" - # rowcount should stay compliant with old approach - show only the amount of rows returned by Backend. - assert ( - cur.rows_affected is not None - ), "rows_affected should not be None for CTAS" - assert ( - cur.rows_affected.num_rows_inserted is None - ), f"Expected num_rows_inserted=3, got {cur.rows_affected.num_rows_inserted}" + ), f"Expected rowcount 1 for VIEW, got {cur.rowcount}" + # VIEW operations don't return DML stats, all fields should be None + assert cur.rows_affected is not None + assert cur.rows_affected.num_rows_inserted is None assert cur.rows_affected.num_rows_deleted is None assert cur.rows_affected.num_rows_updated is None assert cur.rows_affected.num_dml_duplicates is None @@ -1946,29 +1938,20 @@ def test_create_view_rows_affected_from_stats(conn_cnx): @pytest.mark.skipolddriver def test_cvas_separate_cursors_rows_affected_from_stats(conn_cnx): - """Test that cursor.rowcount is correctly extracted from stats for CTAS operations.""" + """Test cursor.rows_affected with CVAS in separate cursor from the one used for CTAS of the table.""" with conn_cnx() as conn: with conn.cursor() as cur: - # Create temp table with data - should have rowcount from stats.numRowsInserted cur.execute( - "create temp table test_ctas_stats (col1 int) as select col1 from values (1), (2), (3) as t(col1)" + "create temp table test_table (col1 int) as select col1 from values (1), (2), (3) as t(col1)" ) - with conn.cursor() as cur: - cur.execute( - "create temp view test_cvas_stats as select col1 from test_ctas_stats" - ) - # For CTAS, rowcount should be the number of rows inserted (3) + cur.execute("create temp view test_view as select col1 from test_table") assert ( cur.rowcount == 1 - ), f"Expected rowcount 3 for CTAS with 3 rows, got {cur.rowcount}" - # rows_affected should contain the detailed stats as a NamedTuple - assert ( - cur.rows_affected is not None - ), "rows_affected should not be None for CTAS" - assert ( - cur.rows_affected.num_rows_inserted is None - ), f"Expected num_rows_inserted=3, got {cur.rows_affected.num_rows_inserted}" + ), "Due to old behaviour we should keep rowcount equal to 1 - as the number of rows returned by the backend" + # VIEW operations don't return DML stats + assert cur.rows_affected is not None + assert cur.rows_affected.num_rows_inserted is None assert cur.rows_affected.num_rows_deleted is None assert cur.rows_affected.num_rows_updated is None assert cur.rows_affected.num_dml_duplicates is None @@ -1976,28 +1959,21 @@ def test_cvas_separate_cursors_rows_affected_from_stats(conn_cnx): @pytest.mark.skipolddriver def test_cvas_one_cursor_rows_affected_from_stats(conn_cnx): - """Test that cursor.rowcount is correctly extracted from stats for CTAS operations.""" + """Test cursor.rows_affected with CVAS in the same cursor - make sure it's cleaned up after usage.""" with conn_cnx() as conn: with conn.cursor() as cur: - # Create temp table with data - should have rowcount from stats.numRowsInserted cur.execute( "create temp table test_ctas_stats (col1 int) as select col1 from values (1), (2), (3) as t(col1)" ) - cur.execute( - "create temp view test_cvas_stats as select col1 from test_ctas_stats" + "create temp view test_view as select col1 from test_ctas_stats" ) - # For CTAS, rowcount should be the number of rows inserted (3) assert ( cur.rowcount == 1 - ), f"Expected rowcount 3 for CTAS with 3 rows, got {cur.rowcount}" - # rows_affected should contain the detailed stats as a NamedTuple - assert ( - cur.rows_affected is not None - ), "rows_affected should not be None for CTAS" - assert ( - cur.rows_affected.num_rows_inserted is None - ), f"Expected num_rows_inserted=3, got {cur.rows_affected.num_rows_inserted}" + ), "Due to old behaviour we should keep rowcount equal to 1 - as the number of rows returned by the backend" + # VIEW operations don't return DML stats + assert cur.rows_affected is not None + assert cur.rows_affected.num_rows_inserted is None assert cur.rows_affected.num_rows_deleted is None assert cur.rows_affected.num_rows_updated is None assert cur.rows_affected.num_dml_duplicates is None From 70ba3d0d44fdd1970c6d4789c608d455b6145569 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filip=20Paw=C5=82owski?= Date: Tue, 4 Nov 2025 21:26:44 +0100 Subject: [PATCH 04/14] SNOW-295953: renamed rows_Affected to stats --- src/snowflake/connector/cursor.py | 6 +-- test/integ/test_connection.py | 62 +++++++++++++++---------------- 2 files changed, 33 insertions(+), 35 deletions(-) diff --git a/src/snowflake/connector/cursor.py b/src/snowflake/connector/cursor.py index 7a810419c9..689a42fee7 100644 --- a/src/snowflake/connector/cursor.py +++ b/src/snowflake/connector/cursor.py @@ -459,7 +459,7 @@ def rowcount(self) -> int | None: return self._total_rowcount if self._total_rowcount >= 0 else None @property - def rows_affected(self) -> RowsAffected | None: + def stats(self) -> RowsAffected | None: """Returns detailed rows affected statistics for DML operations. Returns a NamedTuple with fields: @@ -1225,9 +1225,9 @@ def _init_result_and_meta(self, data: dict[Any, Any]) -> None: self._rownumber = -1 self._result_state = ResultState.VALID - # Extract rows_affected from stats object if available (for DML operations like CTAS, INSERT, UPDATE, DELETE) + # Extract stats object if available (for DML operations like CTAS, INSERT, UPDATE, DELETE) self._stats_data = data.get("stats", None) - logger.debug(f"Execution stats: {self.rows_affected}") + logger.debug(f"Execution DML stats: {self.stats}") # don't update the row count when the result is returned from `describe` method if is_dml and "rowset" in data and len(data["rowset"]) > 0: diff --git a/test/integ/test_connection.py b/test/integ/test_connection.py index 51b3516f8f..3ee94209c2 100644 --- a/test/integ/test_connection.py +++ b/test/integ/test_connection.py @@ -1895,8 +1895,8 @@ def test_snowflake_version(): @pytest.mark.skipolddriver -def test_ctas_rows_affected_from_stats(conn_cnx): - """Test that cursor.rowcount and cursor.rows_affected work for CTAS operations.""" +def test_ctas_stats(conn_cnx): + """Test that cursor.rowcount and cursor.stats work for CTAS operations.""" with conn_cnx() as conn: with conn.cursor() as cur: cur.execute( @@ -1905,21 +1905,19 @@ def test_ctas_rows_affected_from_stats(conn_cnx): assert ( cur.rowcount == 1 ), f"Expected rowcount 1 for CTAS, got {cur.rowcount}" - # rows_affected should contain the detailed stats as a NamedTuple + # stats should contain the details as a NamedTuple + assert cur.stats is not None, "stats should not be None for CTAS" assert ( - cur.rows_affected is not None - ), "rows_affected should not be None for CTAS" - assert ( - cur.rows_affected.num_rows_inserted == 3 - ), f"Expected num_rows_inserted=3, got {cur.rows_affected.num_rows_inserted}" - assert cur.rows_affected.num_rows_deleted == 0 - assert cur.rows_affected.num_rows_updated == 0 - assert cur.rows_affected.num_dml_duplicates == 0 + cur.stats.num_rows_inserted == 3 + ), f"Expected num_rows_inserted=3, got {cur.stats.num_rows_inserted}" + assert cur.stats.num_rows_deleted == 0 + assert cur.stats.num_rows_updated == 0 + assert cur.stats.num_dml_duplicates == 0 @pytest.mark.skipolddriver -def test_create_view_rows_affected_from_stats(conn_cnx): - """Test that cursor.rows_affected returns None fields for VIEW operations.""" +def test_create_view_stats(conn_cnx): + """Test that cursor.stats returns None fields for VIEW operations.""" with conn_cnx() as conn: with conn.cursor() as cur: cur.execute( @@ -1929,16 +1927,16 @@ def test_create_view_rows_affected_from_stats(conn_cnx): cur.rowcount == 1 ), f"Expected rowcount 1 for VIEW, got {cur.rowcount}" # VIEW operations don't return DML stats, all fields should be None - assert cur.rows_affected is not None - assert cur.rows_affected.num_rows_inserted is None - assert cur.rows_affected.num_rows_deleted is None - assert cur.rows_affected.num_rows_updated is None - assert cur.rows_affected.num_dml_duplicates is None + assert cur.stats is not None + assert cur.stats.num_rows_inserted is None + assert cur.stats.num_rows_deleted is None + assert cur.stats.num_rows_updated is None + assert cur.stats.num_dml_duplicates is None @pytest.mark.skipolddriver -def test_cvas_separate_cursors_rows_affected_from_stats(conn_cnx): - """Test cursor.rows_affected with CVAS in separate cursor from the one used for CTAS of the table.""" +def test_cvas_separate_cursors_stats(conn_cnx): + """Test cursor.stats with CVAS in separate cursor from the one used for CTAS of the table.""" with conn_cnx() as conn: with conn.cursor() as cur: cur.execute( @@ -1950,16 +1948,16 @@ def test_cvas_separate_cursors_rows_affected_from_stats(conn_cnx): cur.rowcount == 1 ), "Due to old behaviour we should keep rowcount equal to 1 - as the number of rows returned by the backend" # VIEW operations don't return DML stats - assert cur.rows_affected is not None - assert cur.rows_affected.num_rows_inserted is None - assert cur.rows_affected.num_rows_deleted is None - assert cur.rows_affected.num_rows_updated is None - assert cur.rows_affected.num_dml_duplicates is None + assert cur.stats is not None + assert cur.stats.num_rows_inserted is None + assert cur.stats.num_rows_deleted is None + assert cur.stats.num_rows_updated is None + assert cur.stats.num_dml_duplicates is None @pytest.mark.skipolddriver -def test_cvas_one_cursor_rows_affected_from_stats(conn_cnx): - """Test cursor.rows_affected with CVAS in the same cursor - make sure it's cleaned up after usage.""" +def test_cvas_one_cursor_stats(conn_cnx): + """Test cursor.stats with CVAS in the same cursor - make sure it's cleaned up after usage.""" with conn_cnx() as conn: with conn.cursor() as cur: cur.execute( @@ -1972,8 +1970,8 @@ def test_cvas_one_cursor_rows_affected_from_stats(conn_cnx): cur.rowcount == 1 ), "Due to old behaviour we should keep rowcount equal to 1 - as the number of rows returned by the backend" # VIEW operations don't return DML stats - assert cur.rows_affected is not None - assert cur.rows_affected.num_rows_inserted is None - assert cur.rows_affected.num_rows_deleted is None - assert cur.rows_affected.num_rows_updated is None - assert cur.rows_affected.num_dml_duplicates is None + assert cur.stats is not None + assert cur.stats.num_rows_inserted is None + assert cur.stats.num_rows_deleted is None + assert cur.stats.num_rows_updated is None + assert cur.stats.num_dml_duplicates is None From b413d5d305dcbabd8db0318718d859856ce2eafd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filip=20Paw=C5=82owski?= Date: Tue, 4 Nov 2025 21:45:08 +0100 Subject: [PATCH 05/14] SNOW-295953: renamed RowsAffected to QueryResultStats --- src/snowflake/connector/cursor.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/snowflake/connector/cursor.py b/src/snowflake/connector/cursor.py index 689a42fee7..b388de2c24 100644 --- a/src/snowflake/connector/cursor.py +++ b/src/snowflake/connector/cursor.py @@ -459,7 +459,7 @@ def rowcount(self) -> int | None: return self._total_rowcount if self._total_rowcount >= 0 else None @property - def stats(self) -> RowsAffected | None: + def stats(self) -> QueryResultStats | None: """Returns detailed rows affected statistics for DML operations. Returns a NamedTuple with fields: @@ -470,8 +470,8 @@ def stats(self) -> RowsAffected | None: Returns None on each position if no DML stats are available. """ if self._stats_data is None: - return RowsAffected(None, None, None, None) - return RowsAffected( + return QueryResultStats(None, None, None, None) + return QueryResultStats( num_rows_inserted=self._stats_data.get("numRowsInserted", None), num_rows_deleted=self._stats_data.get("numRowsDeleted", None), num_rows_updated=self._stats_data.get("numRowsUpdated", None), @@ -2037,7 +2037,7 @@ def __getattr__(name): raise AttributeError(f"module {__name__!r} has no attribute {name!r}") -class RowsAffected(NamedTuple): +class QueryResultStats(NamedTuple): """ Statistics for rows affected by a DML operation. None value expresses particular statistic being unknown - not returned by the backend service. From b72e123cedee328a8cc049d73daf299e7a9b150b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filip=20Paw=C5=82owski?= Date: Mon, 8 Dec 2025 10:33:08 +0100 Subject: [PATCH 06/14] SNOW-295953: DESCRIPTION.md update --- DESCRIPTION.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/DESCRIPTION.md b/DESCRIPTION.md index 56b93c9e06..d44eea1f22 100644 --- a/DESCRIPTION.md +++ b/DESCRIPTION.md @@ -9,6 +9,8 @@ Source code is also available at: https://github.com/snowflakedb/snowflake-conne # Release Notes - v4.2.0(TBD) - Added support for async I/O. Asynchronous version of connector is available via `snowflake.connector.aio` module. + - Added `SnowflakeCursor.stats` property - to expose DML queries data when `SnowflakeCursor.rowcount` is not sufficient. + - v4.1.1(TBD) - Relaxed pandas dependency requirements for Python below 3.12. - Changed CRL cache cleanup background task to daemon to avoid blocking main thread. From 03b35802c365e7e8a69c4551a7dfce428b256aa3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filip=20Paw=C5=82owski?= Date: Mon, 8 Dec 2025 10:43:38 +0100 Subject: [PATCH 07/14] SNOW-295953: Docs fix --- src/snowflake/connector/cursor.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/snowflake/connector/cursor.py b/src/snowflake/connector/cursor.py index b388de2c24..3e68302b48 100644 --- a/src/snowflake/connector/cursor.py +++ b/src/snowflake/connector/cursor.py @@ -466,6 +466,7 @@ def stats(self) -> QueryResultStats | None: - num_rows_inserted: Number of rows inserted - num_rows_deleted: Number of rows deleted - num_rows_updated: Number of rows updated + - num_dml_duplicates: Number of duplicates in DML statement Returns None on each position if no DML stats are available. """ From 99be9aafa6331e2673ec3e6d6ad1f2e948a27f7d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filip=20Paw=C5=82owski?= Date: Mon, 8 Dec 2025 17:38:18 +0100 Subject: [PATCH 08/14] Update src/snowflake/connector/cursor.py logger Co-authored-by: Tomasz Urbaszek --- src/snowflake/connector/cursor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/snowflake/connector/cursor.py b/src/snowflake/connector/cursor.py index 3e68302b48..fb6a607ac1 100644 --- a/src/snowflake/connector/cursor.py +++ b/src/snowflake/connector/cursor.py @@ -1228,7 +1228,7 @@ def _init_result_and_meta(self, data: dict[Any, Any]) -> None: # Extract stats object if available (for DML operations like CTAS, INSERT, UPDATE, DELETE) self._stats_data = data.get("stats", None) - logger.debug(f"Execution DML stats: {self.stats}") + logger.debug(f"Execution DML stats: %s", self.stats) # don't update the row count when the result is returned from `describe` method if is_dml and "rowset" in data and len(data["rowset"]) > 0: From 5dfc8b6dca986d658d94994c6a10487593fd9e3c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filip=20Paw=C5=82owski?= Date: Mon, 8 Dec 2025 18:09:20 +0100 Subject: [PATCH 09/14] SNOW-295953: Docs fix --- DESCRIPTION.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/DESCRIPTION.md b/DESCRIPTION.md index d44eea1f22..acd254db04 100644 --- a/DESCRIPTION.md +++ b/DESCRIPTION.md @@ -9,7 +9,7 @@ Source code is also available at: https://github.com/snowflakedb/snowflake-conne # Release Notes - v4.2.0(TBD) - Added support for async I/O. Asynchronous version of connector is available via `snowflake.connector.aio` module. - - Added `SnowflakeCursor.stats` property - to expose DML queries data when `SnowflakeCursor.rowcount` is not sufficient. + - Added `SnowflakeCursor.stats` property to expose granular DML statistics (rows inserted, deleted, updated, and duplicates) for operations like CTAS where `rowcount` is insufficient. - v4.1.1(TBD) - Relaxed pandas dependency requirements for Python below 3.12. From 14a54c58c6e5c94f979aa4eec859a51d1b47a988 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filip=20Paw=C5=82owski?= Date: Tue, 9 Dec 2025 12:42:01 +0100 Subject: [PATCH 10/14] SNOW-295953: asyncio support + base for more tests --- src/snowflake/connector/aio/_cursor.py | 4 + test/integ/aio_it/test_connection_async.py | 628 +++++++++++++++++++++ test/integ/test_connection.py | 519 +++++++++++++++++ 3 files changed, 1151 insertions(+) diff --git a/src/snowflake/connector/aio/_cursor.py b/src/snowflake/connector/aio/_cursor.py index a0160dc4c8..5466d70456 100644 --- a/src/snowflake/connector/aio/_cursor.py +++ b/src/snowflake/connector/aio/_cursor.py @@ -378,6 +378,10 @@ async def _init_result_and_meta(self, data: dict[Any, Any]) -> None: self._rownumber = -1 self._result_state = ResultState.VALID + # Extract stats object if available (for DML operations like CTAS, INSERT, UPDATE, DELETE) + self._stats_data = data.get("stats", None) + logger.debug("Execution DML stats: %s", self.stats) + # don't update the row count when the result is returned from `describe` method if is_dml and "rowset" in data and len(data["rowset"]) > 0: updated_rows = 0 diff --git a/test/integ/aio_it/test_connection_async.py b/test/integ/aio_it/test_connection_async.py index dd5ef70a53..1a0d0981fc 100644 --- a/test/integ/aio_it/test_connection_async.py +++ b/test/integ/aio_it/test_connection_async.py @@ -28,6 +28,7 @@ from snowflake.connector.aio._description import CLIENT_NAME from snowflake.connector.compat import IS_WINDOWS from snowflake.connector.connection import DEFAULT_CLIENT_PREFETCH_THREADS +from snowflake.connector.cursor import QueryResultStats from snowflake.connector.errorcode import ( ER_CONNECTION_IS_CLOSED, ER_FAILED_PROCESSING_PYFORMAT, @@ -1838,3 +1839,630 @@ async def test_no_new_warnings_or_errors_on_successful_basic_select(conn_cnx, ca f"Error count increased from {baseline_error_count} to {test_error_count}. " f"New errors: {[r.getMessage() for r in caplog.records if r.levelno >= logging.ERROR]}" ) + + +@pytest.mark.skipolddriver +async def test_ctas_stats(conn_cnx): + """Test that cursor.rowcount and cursor.stats work for CTAS operations.""" + async with conn_cnx() as conn: + async with conn.cursor() as cur: + await cur.execute( + "create temp table test_ctas_stats (col1 int) as select col1 from values (1), (2), (3) as t(col1)" + ) + assert ( + cur.rowcount == 1 + ), f"Expected rowcount 1 for CTAS, got {cur.rowcount}" + # stats should contain the details as a NamedTuple + assert cur.stats is not None, "stats should not be None for CTAS" + assert ( + cur.stats.num_rows_inserted == 3 + ), f"Expected num_rows_inserted=3, got {cur.stats.num_rows_inserted}" + assert cur.stats.num_rows_deleted == 0 + assert cur.stats.num_rows_updated == 0 + assert cur.stats.num_dml_duplicates == 0 + + +@pytest.mark.skipolddriver +async def test_create_view_stats(conn_cnx): + """Test that cursor.stats returns None fields for VIEW operations.""" + async with conn_cnx() as conn: + async with conn.cursor() as cur: + await cur.execute( + "create temp view test_view_stats as select col1 from values (1), (2), (3) as t(col1)" + ) + assert ( + cur.rowcount == 1 + ), f"Expected rowcount 1 for VIEW, got {cur.rowcount}" + # VIEW operations don't return DML stats, all fields should be None + assert cur.stats is not None + assert cur.stats.num_rows_inserted is None + assert cur.stats.num_rows_deleted is None + assert cur.stats.num_rows_updated is None + assert cur.stats.num_dml_duplicates is None + + +@pytest.mark.skipolddriver +async def test_cvas_separate_cursors_stats(conn_cnx): + """Test cursor.stats with CVAS in separate cursor from the one used for CTAS of the table.""" + async with conn_cnx() as conn: + async with conn.cursor() as cur: + await cur.execute( + "create temp table test_table (col1 int) as select col1 from values (1), (2), (3) as t(col1)" + ) + async with conn.cursor() as cur: + await cur.execute( + "create temp view test_view as select col1 from test_table" + ) + assert ( + cur.rowcount == 1 + ), "Due to old behaviour we should keep rowcount equal to 1 - as the number of rows returned by the backend" + # VIEW operations don't return DML stats + assert cur.stats is not None + assert cur.stats.num_rows_inserted is None + assert cur.stats.num_rows_deleted is None + assert cur.stats.num_rows_updated is None + assert cur.stats.num_dml_duplicates is None + + +@pytest.mark.skipolddriver +async def test_cvas_one_cursor_stats(conn_cnx): + """Test cursor.stats with CVAS in the same cursor - make sure it's cleaned up after usage.""" + async with conn_cnx() as conn: + async with conn.cursor() as cur: + await cur.execute( + "create temp table test_ctas_stats (col1 int) as select col1 from values (1), (2), (3) as t(col1)" + ) + await cur.execute( + "create temp view test_view as select col1 from test_ctas_stats" + ) + assert ( + cur.rowcount == 1 + ), "Due to old behaviour we should keep rowcount equal to 1 - as the number of rows returned by the backend" + # VIEW operations don't return DML stats + assert cur.stats is not None + assert cur.stats.num_rows_inserted is None + assert cur.stats.num_rows_deleted is None + assert cur.stats.num_rows_updated is None + assert cur.stats.num_dml_duplicates is None + + +def _assert_stats(actual_stats, expected_stats): + """Helper function to assert stats values. + + Args: + actual_stats: The actual QueryResultStats from cursor.stats + expected_stats: Expected QueryResultStats to compare against + """ + assert actual_stats is not None, "stats should not be None" + assert isinstance( + expected_stats, QueryResultStats + ), "expected_stats must be a QueryResultStats instance" + + assert ( + actual_stats.num_rows_inserted == expected_stats.num_rows_inserted + ), f"Expected num_rows_inserted={expected_stats.num_rows_inserted}, got {actual_stats.num_rows_inserted}" + + assert ( + actual_stats.num_rows_deleted == expected_stats.num_rows_deleted + ), f"Expected num_rows_deleted={expected_stats.num_rows_deleted}, got {actual_stats.num_rows_deleted}" + + assert ( + actual_stats.num_rows_updated == expected_stats.num_rows_updated + ), f"Expected num_rows_updated={expected_stats.num_rows_updated}, got {actual_stats.num_rows_updated}" + + assert ( + actual_stats.num_dml_duplicates == expected_stats.num_dml_duplicates + ), f"Expected num_dml_duplicates={expected_stats.num_dml_duplicates}, got {actual_stats.num_dml_duplicates}" + + +@pytest.mark.skipolddriver +@pytest.mark.parametrize( + "operation,setup_sql,test_sql,expected_stats", + [ + pytest.param( + "insert_simple", + "create temp table test_stats_table (id int, name varchar(50))", + "insert into test_stats_table values (1, 'Alice'), (2, 'Bob'), (3, 'Charlie')", + QueryResultStats( + num_rows_inserted=3, + num_rows_deleted=0, + num_rows_updated=0, + num_dml_duplicates=0, + ), + id="insert_simple", + ), + pytest.param( + "update", + """ + create temp table test_stats_table (id int, name varchar(50)); + insert into test_stats_table values (1, 'Alice'), (2, 'Bob'), (3, 'Charlie'); + """, + "update test_stats_table set name = 'Updated' where id in (1, 2)", + QueryResultStats( + num_rows_inserted=0, + num_rows_deleted=0, + num_rows_updated=2, + num_dml_duplicates=0, + ), + id="update", + ), + pytest.param( + "delete", + """ + create temp table test_stats_table (id int, name varchar(50)); + insert into test_stats_table values (1, 'Alice'), (2, 'Bob'), (3, 'Charlie'); + """, + "delete from test_stats_table where id in (1, 3)", + QueryResultStats( + num_rows_inserted=0, + num_rows_deleted=2, + num_rows_updated=0, + num_dml_duplicates=0, + ), + id="delete", + ), + pytest.param( + "merge_insert", + """ + create temp table test_stats_target (id int, name varchar(50)); + insert into test_stats_target values (1, 'Alice'); + """, + """ + merge into test_stats_target t + using (select * from values (2, 'Bob'), (3, 'Charlie') as v(id, name)) s + on t.id = s.id + when not matched then insert (id, name) values (s.id, s.name) + """, + QueryResultStats( + num_rows_inserted=2, + num_rows_deleted=0, + num_rows_updated=0, + num_dml_duplicates=0, + ), + id="merge_insert", + ), + pytest.param( + "merge_update", + """ + create temp table test_stats_target (id int, name varchar(50)); + insert into test_stats_target values (1, 'Alice'), (2, 'Bob'); + """, + """ + merge into test_stats_target t + using (select * from values (1, 'Alice Updated'), (2, 'Bob Updated') as v(id, name)) s + on t.id = s.id + when matched then update set t.name = s.name + """, + QueryResultStats( + num_rows_inserted=0, + num_rows_deleted=0, + num_rows_updated=2, + num_dml_duplicates=0, + ), + id="merge_update", + ), + pytest.param( + "merge_insert_update", + """ + create temp table test_stats_target (id int, name varchar(50)); + insert into test_stats_target values (1, 'Alice'); + """, + """ + merge into test_stats_target t + using (select * from values (1, 'Alice Updated'), (2, 'Bob'), (3, 'Charlie') as v(id, name)) s + on t.id = s.id + when matched then update set t.name = s.name + when not matched then insert (id, name) values (s.id, s.name) + """, + QueryResultStats( + num_rows_inserted=2, + num_rows_deleted=0, + num_rows_updated=1, + num_dml_duplicates=0, + ), + id="merge_insert_update", + ), + ], +) +async def test_dml_stats_operations( + conn_cnx, operation, setup_sql, test_sql, expected_stats +): + """Test cursor.stats for various DML operations.""" + async with conn_cnx() as conn: + async with conn.cursor() as cur: + # Setup + for sql in setup_sql.strip().split(";"): + sql = sql.strip() + if sql: + await cur.execute(sql) + + # Execute test operation + await cur.execute(test_sql) + + # Verify stats + _assert_stats(cur.stats, expected_stats) + + +@pytest.mark.skipolddriver +async def test_merge_with_duplicates_in_source(conn_cnx): + """Test cursor.stats for MERGE operations with duplicates in source data.""" + async with conn_cnx() as conn: + async with conn.cursor() as cur: + # Create target table + await cur.execute( + "create temp table test_merge_target (id int, name varchar(50))" + ) + await cur.execute( + "insert into test_merge_target values (1, 'Alice'), (2, 'Bob')" + ) + + # Create source with duplicates + # When source has duplicates, Snowflake may report them in num_dml_duplicates + await cur.execute( + """ + merge into test_merge_target t + using ( + select * from values + (1, 'Alice_Updated'), + (1, 'Alice_Updated_Again'), -- Duplicate key in source + (3, 'Charlie'), + (3, 'Charlie_Dup') -- Another duplicate in source + as s(id, name) + ) s + on t.id = s.id + when matched then update set t.name = s.name + when not matched then insert (id, name) values (s.id, s.name) + """ + ) + + # Check stats - duplicates in source may be reported + assert cur.stats is not None + # The exact duplicate count depends on Snowflake's reporting + # but we verify the field is accessible + assert hasattr(cur.stats, "num_dml_duplicates") + assert ( + cur.stats.num_dml_duplicates is not None + or cur.stats.num_dml_duplicates == 0 + ) + + +@pytest.mark.skipolddriver +async def test_insert_all_with_duplicates_stats(conn_cnx): + """Test cursor.stats for INSERT ALL with duplicate handling.""" + async with conn_cnx() as conn: + async with conn.cursor() as cur: + # Create tables + await cur.execute( + "create temp table test_dup_table1 (id int primary key, name varchar(50))" + ) + await cur.execute( + "create temp table test_dup_table2 (id int primary key, name varchar(50))" + ) + + # Pre-populate with some data + await cur.execute("insert into test_dup_table1 values (1, 'Alice')") + await cur.execute("insert into test_dup_table2 values (2, 'Bob')") + + # Try INSERT ALL - some inserts will conflict with existing keys + # This tests if duplicates are properly reported + try: + await cur.execute( + """ + insert all + into test_dup_table1 values (id, name) + into test_dup_table2 values (id, name) + select * from values + (1, 'Alice_New'), -- Duplicate in table1 + (2, 'Bob_New'), -- Duplicate in table2 + (3, 'Charlie') -- New in both + as t(id, name) + """ + ) + # If this succeeds, check stats + assert cur.stats is not None + assert hasattr(cur.stats, "num_dml_duplicates") + # TODO: bad + except Exception: + # INSERT ALL with duplicates might fail depending on constraints + # This is expected behavior - just verify we can handle it + pass + + +@pytest.mark.skipolddriver +async def test_stats_duplicates_field_exists(conn_cnx): + """Test that num_dml_duplicates field always exists in stats, even if zero.""" + async with conn_cnx() as conn: + async with conn.cursor() as cur: + # Simple insert with no duplicates + await cur.execute( + "create temp table test_dup_exists (id int, name varchar(50))" + ) + await cur.execute( + "insert into test_dup_exists values (1, 'Alice'), (2, 'Bob')" + ) + + # Verify the field exists and is 0 + assert cur.stats is not None + assert hasattr(cur.stats, "num_dml_duplicates") + assert ( + cur.stats.num_dml_duplicates == 0 + or cur.stats.num_dml_duplicates is None + ) + + +@pytest.mark.skipolddriver +async def test_multi_statement_stats(conn_cnx): + """Test that stats are updated correctly for multi-statement queries.""" + async with conn_cnx() as conn: + async with conn.cursor() as cur: + # Setup + await cur.execute( + "create temp table test_multi_stats (id int, name varchar(50))" + ) + + # First insert + await cur.execute( + "insert into test_multi_stats values (1, 'Alice'), (2, 'Bob')" + ) + _assert_stats( + cur.stats, + QueryResultStats( + num_rows_inserted=2, + num_rows_deleted=0, + num_rows_updated=0, + num_dml_duplicates=0, + ), + ) + + # Second insert in same cursor + await cur.execute("insert into test_multi_stats values (3, 'Charlie')") + _assert_stats( + cur.stats, + QueryResultStats( + num_rows_inserted=1, + num_rows_deleted=0, + num_rows_updated=0, + num_dml_duplicates=0, + ), + ) + + # Update + await cur.execute( + "update test_multi_stats set name = 'Updated' where id = 1" + ) + _assert_stats( + cur.stats, + QueryResultStats( + num_rows_inserted=0, + num_rows_deleted=0, + num_rows_updated=1, + num_dml_duplicates=0, + ), + ) + + # Delete + await cur.execute("delete from test_multi_stats where id = 2") + _assert_stats( + cur.stats, + QueryResultStats( + num_rows_inserted=0, + num_rows_deleted=1, + num_rows_updated=0, + num_dml_duplicates=0, + ), + ) + + +@pytest.mark.skipolddriver +async def test_stats_reset_on_select(conn_cnx): + """Test that stats are reset to None when executing SELECT queries.""" + async with conn_cnx() as conn: + async with conn.cursor() as cur: + # Setup and insert + await cur.execute("create temp table test_stats_reset (id int)") + await cur.execute("insert into test_stats_reset values (1), (2), (3)") + _assert_stats( + cur.stats, + QueryResultStats( + num_rows_inserted=3, + num_rows_deleted=0, + num_rows_updated=0, + num_dml_duplicates=0, + ), + ) + + # Execute a SELECT - stats should have all None values + await cur.execute("select * from test_stats_reset") + _assert_stats( + cur.stats, + QueryResultStats( + num_rows_inserted=None, + num_rows_deleted=None, + num_rows_updated=None, + num_dml_duplicates=None, + ), + ) + + +@pytest.mark.skipolddriver +async def test_truncate_stats(conn_cnx): + """Test cursor.stats for TRUNCATE operations.""" + async with conn_cnx() as conn: + async with conn.cursor() as cur: + # Setup + await cur.execute("create temp table test_truncate_stats (id int)") + await cur.execute("insert into test_truncate_stats values (1), (2), (3)") + + # Truncate doesn't provide detailed stats + await cur.execute("truncate table test_truncate_stats") + # Truncate typically doesn't populate DML stats + assert cur.stats is not None + + +@pytest.mark.skipolddriver +async def test_empty_result_stats(conn_cnx): + """Test cursor.stats for DML operations that affect zero rows.""" + async with conn_cnx() as conn: + async with conn.cursor() as cur: + # Setup + await cur.execute( + "create temp table test_empty_stats (id int, name varchar(50))" + ) + await cur.execute("insert into test_empty_stats values (1, 'Alice')") + + # Update with no matching rows + await cur.execute( + "update test_empty_stats set name = 'Updated' where id = 999" + ) + _assert_stats( + cur.stats, + QueryResultStats( + num_rows_inserted=0, + num_rows_deleted=0, + num_rows_updated=0, + num_dml_duplicates=0, + ), + ) + + # Delete with no matching rows + await cur.execute("delete from test_empty_stats where id = 999") + _assert_stats( + cur.stats, + QueryResultStats( + num_rows_inserted=0, + num_rows_deleted=0, + num_rows_updated=0, + num_dml_duplicates=0, + ), + ) + + +@pytest.mark.skipolddriver +@pytest.mark.parametrize( + "operation,setup_sql,test_sql,expected_stats", + [ + pytest.param( + "insert_async", + "create temp table test_async_stats (id int, name varchar(50))", + "insert into test_async_stats values (1, 'Alice'), (2, 'Bob'), (3, 'Charlie')", + QueryResultStats( + num_rows_inserted=3, + num_rows_deleted=0, + num_rows_updated=0, + num_dml_duplicates=0, + ), + id="insert_async", + ), + pytest.param( + "update_async", + """ + create temp table test_async_stats (id int, name varchar(50)); + insert into test_async_stats values (1, 'Alice'), (2, 'Bob'), (3, 'Charlie'); + """, + "update test_async_stats set name = 'Updated' where id in (1, 2)", + QueryResultStats( + num_rows_inserted=0, + num_rows_deleted=0, + num_rows_updated=2, + num_dml_duplicates=0, + ), + id="update_async", + ), + pytest.param( + "delete_async", + """ + create temp table test_async_stats (id int, name varchar(50)); + insert into test_async_stats values (1, 'Alice'), (2, 'Bob'), (3, 'Charlie'); + """, + "delete from test_async_stats where id in (1, 3)", + QueryResultStats( + num_rows_inserted=0, + num_rows_deleted=2, + num_rows_updated=0, + num_dml_duplicates=0, + ), + id="delete_async", + ), + pytest.param( + "ctas_async", + "", + "create temp table test_async_ctas (id int) as select * from values (1), (2), (3), (4), (5) as t(id)", + QueryResultStats( + num_rows_inserted=5, + num_rows_deleted=0, + num_rows_updated=0, + num_dml_duplicates=0, + ), + id="ctas_async", + ), + ], +) +async def test_execute_async_stats( + conn_cnx, operation, setup_sql, test_sql, expected_stats +): + """Test cursor.stats for DML operations executed asynchronously.""" + async with conn_cnx() as conn: + async with conn.cursor() as cur: + # Setup + if setup_sql: + for sql in setup_sql.strip().split(";"): + sql = sql.strip() + if sql: + await cur.execute(sql) + + # Execute async + await cur.execute_async(test_sql) + query_id = cur.sfqid + + # Get results + await cur.get_results_from_sfqid(query_id) + + # Verify stats are available after getting results + _assert_stats(cur.stats, expected_stats) + + +@pytest.mark.skipolddriver +async def test_execute_async_stats_multiple_queries(conn_cnx): + """Test cursor.stats with multiple async queries.""" + async with conn_cnx() as conn: + async with conn.cursor() as cur: + # Setup + await cur.execute( + "create temp table test_multi_async (id int, name varchar(50))" + ) + + # Execute first async query + await cur.execute_async( + "insert into test_multi_async values (1, 'Alice'), (2, 'Bob')" + ) + qid1 = cur.sfqid + + # Execute second async query + await cur.execute_async( + "insert into test_multi_async values (3, 'Charlie')" + ) + qid2 = cur.sfqid + + # Get results for first query + await cur.get_results_from_sfqid(qid1) + _assert_stats( + cur.stats, + QueryResultStats( + num_rows_inserted=2, + num_rows_deleted=0, + num_rows_updated=0, + num_dml_duplicates=0, + ), + ) + + # Get results for second query + await cur.get_results_from_sfqid(qid2) + _assert_stats( + cur.stats, + QueryResultStats( + num_rows_inserted=1, + num_rows_deleted=0, + num_rows_updated=0, + num_dml_duplicates=0, + ), + ) diff --git a/test/integ/test_connection.py b/test/integ/test_connection.py index 0fed8aa9e6..af2f6701e8 100644 --- a/test/integ/test_connection.py +++ b/test/integ/test_connection.py @@ -24,6 +24,7 @@ DEFAULT_CLIENT_PREFETCH_THREADS, SnowflakeConnection, ) +from snowflake.connector.cursor import QueryResultStats from snowflake.connector.description import CLIENT_NAME from snowflake.connector.errorcode import ( ER_CONNECTION_IS_CLOSED, @@ -1979,3 +1980,521 @@ def test_cvas_one_cursor_stats(conn_cnx): assert cur.stats.num_rows_deleted is None assert cur.stats.num_rows_updated is None assert cur.stats.num_dml_duplicates is None + + +def _assert_stats(actual_stats, expected_stats): + """Helper function to assert stats values. + + Args: + actual_stats: The actual QueryResultStats from cursor.stats + expected_stats: Expected QueryResultStats to compare against + """ + assert actual_stats is not None, "stats should not be None" + assert isinstance( + expected_stats, QueryResultStats + ), "expected_stats must be a QueryResultStats instance" + + assert ( + actual_stats.num_rows_inserted == expected_stats.num_rows_inserted + ), f"Expected num_rows_inserted={expected_stats.num_rows_inserted}, got {actual_stats.num_rows_inserted}" + + assert ( + actual_stats.num_rows_deleted == expected_stats.num_rows_deleted + ), f"Expected num_rows_deleted={expected_stats.num_rows_deleted}, got {actual_stats.num_rows_deleted}" + + assert ( + actual_stats.num_rows_updated == expected_stats.num_rows_updated + ), f"Expected num_rows_updated={expected_stats.num_rows_updated}, got {actual_stats.num_rows_updated}" + + assert ( + actual_stats.num_dml_duplicates == expected_stats.num_dml_duplicates + ), f"Expected num_dml_duplicates={expected_stats.num_dml_duplicates}, got {actual_stats.num_dml_duplicates}" + + +@pytest.mark.skipolddriver +@pytest.mark.parametrize( + "operation,setup_sql,test_sql,expected_stats", + [ + pytest.param( + "insert_simple", + "create temp table test_stats_table (id int, name varchar(50))", + "insert into test_stats_table values (1, 'Alice'), (2, 'Bob'), (3, 'Charlie')", + QueryResultStats( + num_rows_inserted=3, + num_rows_deleted=0, + num_rows_updated=0, + num_dml_duplicates=0, + ), + id="insert_simple", + ), + pytest.param( + "update", + """ + create temp table test_stats_table (id int, name varchar(50)); + insert into test_stats_table values (1, 'Alice'), (2, 'Bob'), (3, 'Charlie'); + """, + "update test_stats_table set name = 'Updated' where id in (1, 2)", + QueryResultStats( + num_rows_inserted=0, + num_rows_deleted=0, + num_rows_updated=2, + num_dml_duplicates=0, + ), + id="update", + ), + pytest.param( + "delete", + """ + create temp table test_stats_table (id int, name varchar(50)); + insert into test_stats_table values (1, 'Alice'), (2, 'Bob'), (3, 'Charlie'); + """, + "delete from test_stats_table where id in (1, 3)", + QueryResultStats( + num_rows_inserted=0, + num_rows_deleted=2, + num_rows_updated=0, + num_dml_duplicates=0, + ), + id="delete", + ), + pytest.param( + "merge_insert", + """ + create temp table test_stats_target (id int, name varchar(50)); + insert into test_stats_target values (1, 'Alice'); + """, + """ + merge into test_stats_target t + using (select * from values (2, 'Bob'), (3, 'Charlie') as v(id, name)) s + on t.id = s.id + when not matched then insert (id, name) values (s.id, s.name) + """, + QueryResultStats( + num_rows_inserted=2, + num_rows_deleted=0, + num_rows_updated=0, + num_dml_duplicates=0, + ), + id="merge_insert", + ), + pytest.param( + "merge_update", + """ + create temp table test_stats_target (id int, name varchar(50)); + insert into test_stats_target values (1, 'Alice'), (2, 'Bob'); + """, + """ + merge into test_stats_target t + using (select * from values (1, 'Alice Updated'), (2, 'Bob Updated') as v(id, name)) s + on t.id = s.id + when matched then update set t.name = s.name + """, + QueryResultStats( + num_rows_inserted=0, + num_rows_deleted=0, + num_rows_updated=2, + num_dml_duplicates=0, + ), + id="merge_update", + ), + pytest.param( + "merge_insert_update", + """ + create temp table test_stats_target (id int, name varchar(50)); + insert into test_stats_target values (1, 'Alice'); + """, + """ + merge into test_stats_target t + using (select * from values (1, 'Alice Updated'), (2, 'Bob'), (3, 'Charlie') as v(id, name)) s + on t.id = s.id + when matched then update set t.name = s.name + when not matched then insert (id, name) values (s.id, s.name) + """, + QueryResultStats( + num_rows_inserted=2, + num_rows_deleted=0, + num_rows_updated=1, + num_dml_duplicates=0, + ), + id="merge_insert_update", + ), + ], +) +def test_dml_stats_operations(conn_cnx, operation, setup_sql, test_sql, expected_stats): + """Test cursor.stats for various DML operations.""" + with conn_cnx() as conn: + with conn.cursor() as cur: + # Setup + for sql in setup_sql.strip().split(";"): + sql = sql.strip() + if sql: + cur.execute(sql) + + # Execute test operation + cur.execute(test_sql) + + # Verify stats + _assert_stats(cur.stats, expected_stats) + + +@pytest.mark.skipolddriver +def test_merge_with_duplicates_in_source(conn_cnx): + """Test cursor.stats for MERGE operations with duplicates in source data.""" + with conn_cnx() as conn: + with conn.cursor() as cur: + # Create target table + cur.execute( + "create temp table test_merge_target (id int, name varchar(50))" + ) + cur.execute("insert into test_merge_target values (1, 'Alice'), (2, 'Bob')") + + # Create source with duplicates + # When source has duplicates, Snowflake may report them in num_dml_duplicates + cur.execute( + """ + merge into test_merge_target t + using ( + select * from values + (1, 'Alice_Updated'), + (1, 'Alice_Updated_Again'), -- Duplicate key in source + (3, 'Charlie'), + (3, 'Charlie_Dup') -- Another duplicate in source + as s(id, name) + ) s + on t.id = s.id + when matched then update set t.name = s.name + when not matched then insert (id, name) values (s.id, s.name) + """ + ) + + # Check stats - duplicates in source may be reported + assert cur.stats is not None + # The exact duplicate count depends on Snowflake's reporting + # but we verify the field is accessible + assert hasattr(cur.stats, "num_dml_duplicates") + assert ( + cur.stats.num_dml_duplicates is not None + or cur.stats.num_dml_duplicates == 0 + ) + + +@pytest.mark.skipolddriver +def test_insert_all_with_duplicates_stats(conn_cnx): + """Test cursor.stats for INSERT ALL with duplicate handling.""" + with conn_cnx() as conn: + with conn.cursor() as cur: + # Create tables + cur.execute( + "create temp table test_dup_table1 (id int primary key, name varchar(50))" + ) + cur.execute( + "create temp table test_dup_table2 (id int primary key, name varchar(50))" + ) + + # Pre-populate with some data + cur.execute("insert into test_dup_table1 values (1, 'Alice')") + cur.execute("insert into test_dup_table2 values (2, 'Bob')") + + # Try INSERT ALL - some inserts will conflict with existing keys + # This tests if duplicates are properly reported + try: + cur.execute( + """ + insert all + into test_dup_table1 values (id, name) + into test_dup_table2 values (id, name) + select * from values + (1, 'Alice_New'), -- Duplicate in table1 + (2, 'Bob_New'), -- Duplicate in table2 + (3, 'Charlie') -- New in both + as t(id, name) + """ + ) + # If this succeeds, check stats + assert cur.stats is not None + # assert hasattr(cur.stats, "num_dml_duplicates") + # TODO: bad + except Exception: + # INSERT ALL with duplicates might fail depending on constraints + # This is expected behavior - just verify we can handle it + pass + + +@pytest.mark.skipolddriver +def test_stats_duplicates_field_exists(conn_cnx): + """Test that num_dml_duplicates field always exists in stats, even if zero.""" + with conn_cnx() as conn: + with conn.cursor() as cur: + # Simple insert with no duplicates + cur.execute("create temp table test_dup_exists (id int, name varchar(50))") + cur.execute("insert into test_dup_exists values (1, 'Alice'), (2, 'Bob')") + + # Verify the field exists and is 0 + assert cur.stats is not None + assert hasattr(cur.stats, "num_dml_duplicates") + assert ( + cur.stats.num_dml_duplicates == 0 + or cur.stats.num_dml_duplicates is None + ) + + +@pytest.mark.skipolddriver +def test_multi_statement_stats(conn_cnx): + """Test that stats are updated correctly for multi-statement queries.""" + with conn_cnx() as conn: + with conn.cursor() as cur: + # Setup + cur.execute("create temp table test_multi_stats (id int, name varchar(50))") + + # First insert + cur.execute("insert into test_multi_stats values (1, 'Alice'), (2, 'Bob')") + _assert_stats( + cur.stats, + QueryResultStats( + num_rows_inserted=2, + num_rows_deleted=0, + num_rows_updated=0, + num_dml_duplicates=0, + ), + ) + + # Second insert in same cursor + cur.execute("insert into test_multi_stats values (3, 'Charlie')") + _assert_stats( + cur.stats, + QueryResultStats( + num_rows_inserted=1, + num_rows_deleted=0, + num_rows_updated=0, + num_dml_duplicates=0, + ), + ) + + # Update + cur.execute("update test_multi_stats set name = 'Updated' where id = 1") + _assert_stats( + cur.stats, + QueryResultStats( + num_rows_inserted=0, + num_rows_deleted=0, + num_rows_updated=1, + num_dml_duplicates=0, + ), + ) + + # Delete + cur.execute("delete from test_multi_stats where id = 2") + _assert_stats( + cur.stats, + QueryResultStats( + num_rows_inserted=0, + num_rows_deleted=1, + num_rows_updated=0, + num_dml_duplicates=0, + ), + ) + + +@pytest.mark.skipolddriver +def test_stats_reset_on_select(conn_cnx): + """Test that stats are reset to None when executing SELECT queries.""" + with conn_cnx() as conn: + with conn.cursor() as cur: + # Setup and insert + cur.execute("create temp table test_stats_reset (id int)") + cur.execute("insert into test_stats_reset values (1), (2), (3)") + _assert_stats( + cur.stats, + QueryResultStats( + num_rows_inserted=3, + num_rows_deleted=0, + num_rows_updated=0, + num_dml_duplicates=0, + ), + ) + + # Execute a SELECT - stats should have all None values + cur.execute("select * from test_stats_reset") + _assert_stats( + cur.stats, + QueryResultStats( + num_rows_inserted=None, + num_rows_deleted=None, + num_rows_updated=None, + num_dml_duplicates=None, + ), + ) + + +@pytest.mark.skipolddriver +def test_truncate_stats(conn_cnx): + """Test cursor.stats for TRUNCATE operations.""" + with conn_cnx() as conn: + with conn.cursor() as cur: + # Setup + cur.execute("create temp table test_truncate_stats (id int)") + cur.execute("insert into test_truncate_stats values (1), (2), (3)") + + # Truncate doesn't provide detailed stats + cur.execute("truncate table test_truncate_stats") + # Truncate typically doesn't populate DML stats + assert cur.stats is not None + + +@pytest.mark.skipolddriver +def test_empty_result_stats(conn_cnx): + """Test cursor.stats for DML operations that affect zero rows.""" + with conn_cnx() as conn: + with conn.cursor() as cur: + # Setup + cur.execute("create temp table test_empty_stats (id int, name varchar(50))") + cur.execute("insert into test_empty_stats values (1, 'Alice')") + + # Update with no matching rows + cur.execute("update test_empty_stats set name = 'Updated' where id = 999") + _assert_stats( + cur.stats, + QueryResultStats( + num_rows_inserted=0, + num_rows_deleted=0, + num_rows_updated=0, + num_dml_duplicates=0, + ), + ) + + # Delete with no matching rows + cur.execute("delete from test_empty_stats where id = 999") + _assert_stats( + cur.stats, + QueryResultStats( + num_rows_inserted=0, + num_rows_deleted=0, + num_rows_updated=0, + num_dml_duplicates=0, + ), + ) + + +@pytest.mark.skipolddriver +@pytest.mark.parametrize( + "operation,setup_sql,test_sql,expected_stats", + [ + pytest.param( + "insert_async", + "create temp table test_async_stats (id int, name varchar(50))", + "insert into test_async_stats values (1, 'Alice'), (2, 'Bob'), (3, 'Charlie')", + QueryResultStats( + num_rows_inserted=3, + num_rows_deleted=0, + num_rows_updated=0, + num_dml_duplicates=0, + ), + id="insert_async", + ), + pytest.param( + "update_async", + """ + create temp table test_async_stats (id int, name varchar(50)); + insert into test_async_stats values (1, 'Alice'), (2, 'Bob'), (3, 'Charlie'); + """, + "update test_async_stats set name = 'Updated' where id in (1, 2)", + QueryResultStats( + num_rows_inserted=0, + num_rows_deleted=0, + num_rows_updated=2, + num_dml_duplicates=0, + ), + id="update_async", + ), + pytest.param( + "delete_async", + """ + create temp table test_async_stats (id int, name varchar(50)); + insert into test_async_stats values (1, 'Alice'), (2, 'Bob'), (3, 'Charlie'); + """, + "delete from test_async_stats where id in (1, 3)", + QueryResultStats( + num_rows_inserted=0, + num_rows_deleted=2, + num_rows_updated=0, + num_dml_duplicates=0, + ), + id="delete_async", + ), + pytest.param( + "ctas_async", + "", + "create temp table test_async_ctas (id int) as select * from values (1), (2), (3), (4), (5) as t(id)", + QueryResultStats( + num_rows_inserted=5, + num_rows_deleted=0, + num_rows_updated=0, + num_dml_duplicates=0, + ), + id="ctas_async", + ), + ], +) +def test_execute_async_stats(conn_cnx, operation, setup_sql, test_sql, expected_stats): + """Test cursor.stats for DML operations executed asynchronously.""" + with conn_cnx() as conn: + with conn.cursor() as cur: + # Setup + if setup_sql: + for sql in setup_sql.strip().split(";"): + sql = sql.strip() + if sql: + cur.execute(sql) + + # Execute async + cur.execute_async(test_sql) + query_id = cur.sfqid + + # Get results + cur.get_results_from_sfqid(query_id) + + # Verify stats are available after getting results + _assert_stats(cur.stats, expected_stats) + + +@pytest.mark.skipolddriver +def test_execute_async_stats_multiple_queries(conn_cnx): + """Test cursor.stats with multiple async queries.""" + with conn_cnx() as conn: + with conn.cursor() as cur: + # Setup + cur.execute("create temp table test_multi_async (id int, name varchar(50))") + + # Execute first async query + cur.execute_async( + "insert into test_multi_async values (1, 'Alice'), (2, 'Bob')" + ) + qid1 = cur.sfqid + + # Execute second async query + cur.execute_async("insert into test_multi_async values (3, 'Charlie')") + qid2 = cur.sfqid + + # Get results for first query + cur.get_results_from_sfqid(qid1) + _assert_stats( + cur.stats, + QueryResultStats( + num_rows_inserted=2, + num_rows_deleted=0, + num_rows_updated=0, + num_dml_duplicates=0, + ), + ) + + # Get results for second query + cur.get_results_from_sfqid(qid2) + _assert_stats( + cur.stats, + QueryResultStats( + num_rows_inserted=1, + num_rows_deleted=0, + num_rows_updated=0, + num_dml_duplicates=0, + ), + ) From f4a9a24d1102e2f1f5e96da58e57d6fd5998913e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filip=20Paw=C5=82owski?= Date: Tue, 9 Dec 2025 21:40:20 +0100 Subject: [PATCH 11/14] SNOW-295953: better tests --- src/snowflake/connector/cursor.py | 18 +- test/integ/test_connection.py | 370 ++++++++++++------------------ 2 files changed, 159 insertions(+), 229 deletions(-) diff --git a/src/snowflake/connector/cursor.py b/src/snowflake/connector/cursor.py index fb6a607ac1..7688d54a59 100644 --- a/src/snowflake/connector/cursor.py +++ b/src/snowflake/connector/cursor.py @@ -472,12 +472,7 @@ def stats(self) -> QueryResultStats | None: """ if self._stats_data is None: return QueryResultStats(None, None, None, None) - return QueryResultStats( - num_rows_inserted=self._stats_data.get("numRowsInserted", None), - num_rows_deleted=self._stats_data.get("numRowsDeleted", None), - num_rows_updated=self._stats_data.get("numRowsUpdated", None), - num_dml_duplicates=self._stats_data.get("numDmlDuplicates", None), - ) + return QueryResultStats.from_dict(self._stats_data) @property def rownumber(self) -> int | None: @@ -1228,7 +1223,7 @@ def _init_result_and_meta(self, data: dict[Any, Any]) -> None: # Extract stats object if available (for DML operations like CTAS, INSERT, UPDATE, DELETE) self._stats_data = data.get("stats", None) - logger.debug(f"Execution DML stats: %s", self.stats) + logger.debug("Execution DML stats: %s", self.stats) # don't update the row count when the result is returned from `describe` method if is_dml and "rowset" in data and len(data["rowset"]) > 0: @@ -2050,3 +2045,12 @@ class QueryResultStats(NamedTuple): num_rows_deleted: int | None = None num_rows_updated: int | None = None num_dml_duplicates: int | None = None + + @classmethod + def from_dict(cls, stats_dict: dict[str, int]) -> QueryResultStats: + return cls( + num_rows_inserted=stats_dict.get("numRowsInserted", None), + num_rows_deleted=stats_dict.get("numRowsDeleted", None), + num_rows_updated=stats_dict.get("numRowsUpdated", None), + num_dml_duplicates=stats_dict.get("numDmlDuplicates", None), + ) diff --git a/test/integ/test_connection.py b/test/integ/test_connection.py index af2f6701e8..c533f8fb0f 100644 --- a/test/integ/test_connection.py +++ b/test/integ/test_connection.py @@ -2118,6 +2118,52 @@ def _assert_stats(actual_stats, expected_stats): ), id="merge_insert_update", ), + pytest.param( + "merge_delete", + """ + create temp table test_stats_target (id int, name varchar(50)); + insert into test_stats_target values (1, 'Alice'), (2, 'Bob'), (3, 'Charlie'); + """, + """ + merge into test_stats_target t + using (select * from values (1, 'Delete Me'), (2, 'Delete Me Too') as v(id, name)) s + on t.id = s.id + when matched then delete + """, + QueryResultStats( + num_rows_inserted=0, + num_rows_deleted=2, + num_rows_updated=0, + num_dml_duplicates=0, + ), + id="merge_delete", + ), + pytest.param( + "merge_insert_update_delete", + """ + create temp table test_stats_target (id int, name varchar(50), status varchar(20)); + insert into test_stats_target values (1, 'Alice', 'active'), (2, 'Bob', 'inactive'), (3, 'Charlie', 'active'); + """, + """ + merge into test_stats_target t + using (select * from values + (1, 'Alice Updated', 'active'), + (2, 'Bob', 'inactive'), + (4, 'David', 'active') + as v(id, name, status)) s + on t.id = s.id + when matched and s.status = 'active' then update set t.name = s.name + when matched and s.status = 'inactive' then delete + when not matched then insert (id, name, status) values (s.id, s.name, s.status) + """, + QueryResultStats( + num_rows_inserted=1, + num_rows_deleted=1, + num_rows_updated=1, + num_dml_duplicates=0, + ), + id="merge_insert_update_delete", + ), ], ) def test_dml_stats_operations(conn_cnx, operation, setup_sql, test_sql, expected_stats): @@ -2138,104 +2184,135 @@ def test_dml_stats_operations(conn_cnx, operation, setup_sql, test_sql, expected @pytest.mark.skipolddriver -def test_merge_with_duplicates_in_source(conn_cnx): - """Test cursor.stats for MERGE operations with duplicates in source data.""" +def test_copy_into_stats_with_stage(conn_cnx, tmp_path): + """Test cursor.stats for COPY INTO operations with actual stage files.""" + import csv + + # Create a CSV file + csv_file = tmp_path / "test_data.csv" + with open(csv_file, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow([1, "Alice"]) + writer.writerow([2, "Bob"]) + writer.writerow([3, "Charlie"]) + with conn_cnx() as conn: with conn.cursor() as cur: - # Create target table - cur.execute( - "create temp table test_merge_target (id int, name varchar(50))" - ) - cur.execute("insert into test_merge_target values (1, 'Alice'), (2, 'Bob')") + # Create table and stage + cur.execute("create temp table test_copy_stats (id int, name varchar(50))") + cur.execute("create temp stage test_copy_stage") - # Create source with duplicates - # When source has duplicates, Snowflake may report them in num_dml_duplicates + # PUT file to stage + cur.execute(f"put file://{csv_file} @test_copy_stage") + + # COPY INTO from stage cur.execute( """ - merge into test_merge_target t - using ( - select * from values - (1, 'Alice_Updated'), - (1, 'Alice_Updated_Again'), -- Duplicate key in source - (3, 'Charlie'), - (3, 'Charlie_Dup') -- Another duplicate in source - as s(id, name) - ) s - on t.id = s.id - when matched then update set t.name = s.name - when not matched then insert (id, name) values (s.id, s.name) + copy into test_copy_stats + from @test_copy_stage/test_data.csv.gz + file_format = (type = csv) """ ) - # Check stats - duplicates in source may be reported + # Verify stats assert cur.stats is not None - # The exact duplicate count depends on Snowflake's reporting - # but we verify the field is accessible - assert hasattr(cur.stats, "num_dml_duplicates") - assert ( - cur.stats.num_dml_duplicates is not None - or cur.stats.num_dml_duplicates == 0 - ) + assert cur.stats.num_rows_inserted == 3 + assert cur.stats.num_rows_deleted == 0 + assert cur.stats.num_rows_updated == 0 + assert cur.stats.num_dml_duplicates == 0 @pytest.mark.skipolddriver -def test_insert_all_with_duplicates_stats(conn_cnx): - """Test cursor.stats for INSERT ALL with duplicate handling.""" +def test_update_with_dml_duplicates(conn_cnx): + """Test cursor.stats for UPDATE operations that generate numDmlDuplicates. + + When an UPDATE with a FROM clause matches the same row multiple times, + Snowflake reports those as duplicates in numDmlDuplicates. + """ with conn_cnx() as conn: with conn.cursor() as cur: - # Create tables + # Create source table with some data + cur.execute("create temp table test_src (c1 int, c2 int)") cur.execute( - "create temp table test_dup_table1 (id int primary key, name varchar(50))" + "insert into test_src select seq4() % 10, seq4() % 5 from table(generator(rowcount => 50))" ) + + # Create target table with overlapping values + cur.execute("create temp table test_target (c int)") cur.execute( - "create temp table test_dup_table2 (id int primary key, name varchar(50))" + "insert into test_target select seq4() % 5 from table(generator(rowcount => 10))" ) - # Pre-populate with some data - cur.execute("insert into test_dup_table1 values (1, 'Alice')") - cur.execute("insert into test_dup_table2 values (2, 'Bob')") + # Update with FROM clause that causes duplicates + # This will match the same target rows multiple times + cur.execute( + """ + update test_src set c2 = test_target.c + from test_target + where test_src.c1 = test_target.c + """ + ) - # Try INSERT ALL - some inserts will conflict with existing keys - # This tests if duplicates are properly reported - try: - cur.execute( - """ - insert all - into test_dup_table1 values (id, name) - into test_dup_table2 values (id, name) - select * from values - (1, 'Alice_New'), -- Duplicate in table1 - (2, 'Bob_New'), -- Duplicate in table2 - (3, 'Charlie') -- New in both - as t(id, name) - """ - ) - # If this succeeds, check stats - assert cur.stats is not None - # assert hasattr(cur.stats, "num_dml_duplicates") - # TODO: bad - except Exception: - # INSERT ALL with duplicates might fail depending on constraints - # This is expected behavior - just verify we can handle it - pass + # Verify stats show duplicates + assert cur.stats is not None + assert hasattr(cur.stats, "num_dml_duplicates") + # We should have some duplicates due to multiple matches + assert cur.stats.num_dml_duplicates is not None + # The exact count depends on the random data, but with this setup we expect > 0 + assert cur.stats.num_dml_duplicates > 0 + # If we got duplicates, also verify other stats are present + assert cur.stats.num_rows_updated >= 0 + assert cur.stats.num_rows_inserted == 0 + assert cur.stats.num_rows_deleted == 0 @pytest.mark.skipolddriver -def test_stats_duplicates_field_exists(conn_cnx): - """Test that num_dml_duplicates field always exists in stats, even if zero.""" +def test_multi_table_insert_overwrite_stats(conn_cnx): + """Test cursor.stats for multi-table INSERT OVERWRITE operations.""" with conn_cnx() as conn: with conn.cursor() as cur: - # Simple insert with no duplicates - cur.execute("create temp table test_dup_exists (id int, name varchar(50))") - cur.execute("insert into test_dup_exists values (1, 'Alice'), (2, 'Bob')") + # Create source table with data + cur.execute("create temp table test_src_multi (c1 int)") + cur.execute( + "insert into test_src_multi select seq4() % 30 from table(generator(rowcount => 100))" + ) + + # Create target tables with some existing data + cur.execute("create temp table test_tgt1 (c int)") + cur.execute("create temp table test_tgt2 (c int)") + cur.execute( + "insert into test_tgt1 select seq4() from table(generator(rowcount => 20))" + ) + cur.execute( + "insert into test_tgt2 select seq4() from table(generator(rowcount => 15))" + ) + + # Multi-table INSERT OVERWRITE ALL + # This should delete existing rows and insert new ones + cur.execute( + """ + insert overwrite all + when c1 > 20 then + into test_tgt1 values (c1) + when c1 > 10 then + into test_tgt1 values (c1) + into test_tgt2 values (c1) + else + into test_tgt2 values (c1) + select c1 from test_src_multi + """ + ) - # Verify the field exists and is 0 + # Verify stats assert cur.stats is not None - assert hasattr(cur.stats, "num_dml_duplicates") + # OVERWRITE will delete existing rows and insert new ones + assert cur.stats.num_rows_inserted > 0, "Should have inserted rows" assert ( - cur.stats.num_dml_duplicates == 0 - or cur.stats.num_dml_duplicates is None - ) + cur.stats.num_rows_deleted > 0 + ), "Should have deleted rows from OVERWRITE" + # Verify fields exist + assert hasattr(cur.stats, "num_rows_updated") + assert hasattr(cur.stats, "num_dml_duplicates") @pytest.mark.skipolddriver @@ -2338,162 +2415,11 @@ def test_truncate_stats(conn_cnx): # Truncate doesn't provide detailed stats cur.execute("truncate table test_truncate_stats") # Truncate typically doesn't populate DML stats - assert cur.stats is not None - - -@pytest.mark.skipolddriver -def test_empty_result_stats(conn_cnx): - """Test cursor.stats for DML operations that affect zero rows.""" - with conn_cnx() as conn: - with conn.cursor() as cur: - # Setup - cur.execute("create temp table test_empty_stats (id int, name varchar(50))") - cur.execute("insert into test_empty_stats values (1, 'Alice')") - - # Update with no matching rows - cur.execute("update test_empty_stats set name = 'Updated' where id = 999") _assert_stats( cur.stats, QueryResultStats( num_rows_inserted=0, - num_rows_deleted=0, - num_rows_updated=0, - num_dml_duplicates=0, - ), - ) - - # Delete with no matching rows - cur.execute("delete from test_empty_stats where id = 999") - _assert_stats( - cur.stats, - QueryResultStats( - num_rows_inserted=0, - num_rows_deleted=0, - num_rows_updated=0, - num_dml_duplicates=0, - ), - ) - - -@pytest.mark.skipolddriver -@pytest.mark.parametrize( - "operation,setup_sql,test_sql,expected_stats", - [ - pytest.param( - "insert_async", - "create temp table test_async_stats (id int, name varchar(50))", - "insert into test_async_stats values (1, 'Alice'), (2, 'Bob'), (3, 'Charlie')", - QueryResultStats( - num_rows_inserted=3, - num_rows_deleted=0, - num_rows_updated=0, - num_dml_duplicates=0, - ), - id="insert_async", - ), - pytest.param( - "update_async", - """ - create temp table test_async_stats (id int, name varchar(50)); - insert into test_async_stats values (1, 'Alice'), (2, 'Bob'), (3, 'Charlie'); - """, - "update test_async_stats set name = 'Updated' where id in (1, 2)", - QueryResultStats( - num_rows_inserted=0, - num_rows_deleted=0, - num_rows_updated=2, - num_dml_duplicates=0, - ), - id="update_async", - ), - pytest.param( - "delete_async", - """ - create temp table test_async_stats (id int, name varchar(50)); - insert into test_async_stats values (1, 'Alice'), (2, 'Bob'), (3, 'Charlie'); - """, - "delete from test_async_stats where id in (1, 3)", - QueryResultStats( - num_rows_inserted=0, - num_rows_deleted=2, - num_rows_updated=0, - num_dml_duplicates=0, - ), - id="delete_async", - ), - pytest.param( - "ctas_async", - "", - "create temp table test_async_ctas (id int) as select * from values (1), (2), (3), (4), (5) as t(id)", - QueryResultStats( - num_rows_inserted=5, - num_rows_deleted=0, - num_rows_updated=0, - num_dml_duplicates=0, - ), - id="ctas_async", - ), - ], -) -def test_execute_async_stats(conn_cnx, operation, setup_sql, test_sql, expected_stats): - """Test cursor.stats for DML operations executed asynchronously.""" - with conn_cnx() as conn: - with conn.cursor() as cur: - # Setup - if setup_sql: - for sql in setup_sql.strip().split(";"): - sql = sql.strip() - if sql: - cur.execute(sql) - - # Execute async - cur.execute_async(test_sql) - query_id = cur.sfqid - - # Get results - cur.get_results_from_sfqid(query_id) - - # Verify stats are available after getting results - _assert_stats(cur.stats, expected_stats) - - -@pytest.mark.skipolddriver -def test_execute_async_stats_multiple_queries(conn_cnx): - """Test cursor.stats with multiple async queries.""" - with conn_cnx() as conn: - with conn.cursor() as cur: - # Setup - cur.execute("create temp table test_multi_async (id int, name varchar(50))") - - # Execute first async query - cur.execute_async( - "insert into test_multi_async values (1, 'Alice'), (2, 'Bob')" - ) - qid1 = cur.sfqid - - # Execute second async query - cur.execute_async("insert into test_multi_async values (3, 'Charlie')") - qid2 = cur.sfqid - - # Get results for first query - cur.get_results_from_sfqid(qid1) - _assert_stats( - cur.stats, - QueryResultStats( - num_rows_inserted=2, - num_rows_deleted=0, - num_rows_updated=0, - num_dml_duplicates=0, - ), - ) - - # Get results for second query - cur.get_results_from_sfqid(qid2) - _assert_stats( - cur.stats, - QueryResultStats( - num_rows_inserted=1, - num_rows_deleted=0, + num_rows_deleted=3, num_rows_updated=0, num_dml_duplicates=0, ), From 7f51ffd4b2f4cf229731da0cf60402302e712ba2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filip=20Paw=C5=82owski?= Date: Tue, 9 Dec 2025 22:42:35 +0100 Subject: [PATCH 12/14] SNOW-295953: sync tests done --- test/integ/test_connection.py | 278 +++++++++++++++++++++++++--------- 1 file changed, 207 insertions(+), 71 deletions(-) diff --git a/test/integ/test_connection.py b/test/integ/test_connection.py index c533f8fb0f..7e61272a89 100644 --- a/test/integ/test_connection.py +++ b/test/integ/test_connection.py @@ -24,6 +24,7 @@ DEFAULT_CLIENT_PREFETCH_THREADS, SnowflakeConnection, ) +from snowflake.connector.constants import PARAMETER_MULTI_STATEMENT_COUNT from snowflake.connector.cursor import QueryResultStats from snowflake.connector.description import CLIENT_NAME from snowflake.connector.errorcode import ( @@ -2226,25 +2227,28 @@ def test_copy_into_stats_with_stage(conn_cnx, tmp_path): def test_update_with_dml_duplicates(conn_cnx): """Test cursor.stats for UPDATE operations that generate numDmlDuplicates. - When an UPDATE with a FROM clause matches the same row multiple times, - Snowflake reports those as duplicates in numDmlDuplicates. + When a row in the updated table is matched by multiple rows in the FROM clause, + Snowflake reports the extra matches as duplicates in numDmlDuplicates. """ with conn_cnx() as conn: with conn.cursor() as cur: - # Create source table with some data + # test_src has 15 rows: five 0's, five 1's, five 2's cur.execute("create temp table test_src (c1 int, c2 int)") cur.execute( - "insert into test_src select seq4() % 10, seq4() % 5 from table(generator(rowcount => 50))" + "insert into test_src values (0, 100), (1, 100), (2, 100), (0, 100), (1, 100), " + "(2, 100), (0, 100), (1, 100), (2, 100), (0, 100), (1, 100), (2, 100), " + "(0, 100), (1, 100), (2, 100)" ) - # Create target table with overlapping values + # test_target has 4 rows: two 0's, one 1, one 2 cur.execute("create temp table test_target (c int)") - cur.execute( - "insert into test_target select seq4() % 5 from table(generator(rowcount => 10))" - ) + cur.execute("insert into test_target values (0), (1), (2), (0)") - # Update with FROM clause that causes duplicates - # This will match the same target rows multiple times + # UPDATE with FROM clause: + # - Each of 5 rows with c1=0 matches 2 rows in test_target (duplicate count: 5 × 1 = 5) + # - Each of 5 rows with c1=1 matches 1 row in test_target (duplicate count: 0) + # - Each of 5 rows with c1=2 matches 1 row in test_target (duplicate count: 0) + # Total duplicates: 5 cur.execute( """ update test_src set c2 = test_target.c @@ -2253,17 +2257,15 @@ def test_update_with_dml_duplicates(conn_cnx): """ ) - # Verify stats show duplicates - assert cur.stats is not None - assert hasattr(cur.stats, "num_dml_duplicates") - # We should have some duplicates due to multiple matches - assert cur.stats.num_dml_duplicates is not None - # The exact count depends on the random data, but with this setup we expect > 0 - assert cur.stats.num_dml_duplicates > 0 - # If we got duplicates, also verify other stats are present - assert cur.stats.num_rows_updated >= 0 - assert cur.stats.num_rows_inserted == 0 - assert cur.stats.num_rows_deleted == 0 + _assert_stats( + cur.stats, + QueryResultStats( + num_rows_inserted=0, + num_rows_deleted=0, + num_rows_updated=15, + num_dml_duplicates=5, + ), + ) @pytest.mark.skipolddriver @@ -2271,24 +2273,22 @@ def test_multi_table_insert_overwrite_stats(conn_cnx): """Test cursor.stats for multi-table INSERT OVERWRITE operations.""" with conn_cnx() as conn: with conn.cursor() as cur: - # Create source table with data + # Source has 3 values: 5, 15, 25 cur.execute("create temp table test_src_multi (c1 int)") - cur.execute( - "insert into test_src_multi select seq4() % 30 from table(generator(rowcount => 100))" - ) + cur.execute("insert into test_src_multi values (5), (15), (25)") - # Create target tables with some existing data + # Target tables with existing data cur.execute("create temp table test_tgt1 (c int)") cur.execute("create temp table test_tgt2 (c int)") - cur.execute( - "insert into test_tgt1 select seq4() from table(generator(rowcount => 20))" - ) - cur.execute( - "insert into test_tgt2 select seq4() from table(generator(rowcount => 15))" - ) - - # Multi-table INSERT OVERWRITE ALL - # This should delete existing rows and insert new ones + cur.execute("insert into test_tgt1 values (100), (101)") + cur.execute("insert into test_tgt2 values (200), (201), (202)") + + # INSERT OVERWRITE ALL evaluates ALL matching WHEN clauses per row: + # - c1=5: no WHENs match → else clause → 1 insert (5 to tgt2) + # - c1=15: second WHEN matches → 2 inserts (15 to tgt1, 15 to tgt2) + # - c1=25: both WHENs match → 3 inserts (25 to tgt1, then 25 to tgt1 and 25 to tgt2) + # Result: tgt1=[25,15,25], tgt2=[15,25,5] + # Total: 6 inserts, 5 deletes (2+3 existing rows cleared by OVERWRITE) cur.execute( """ insert overwrite all @@ -2303,69 +2303,171 @@ def test_multi_table_insert_overwrite_stats(conn_cnx): """ ) - # Verify stats - assert cur.stats is not None - # OVERWRITE will delete existing rows and insert new ones - assert cur.stats.num_rows_inserted > 0, "Should have inserted rows" - assert ( - cur.stats.num_rows_deleted > 0 - ), "Should have deleted rows from OVERWRITE" - # Verify fields exist - assert hasattr(cur.stats, "num_rows_updated") - assert hasattr(cur.stats, "num_dml_duplicates") - - -@pytest.mark.skipolddriver -def test_multi_statement_stats(conn_cnx): - """Test that stats are updated correctly for multi-statement queries.""" - with conn_cnx() as conn: - with conn.cursor() as cur: - # Setup - cur.execute("create temp table test_multi_stats (id int, name varchar(50))") - - # First insert - cur.execute("insert into test_multi_stats values (1, 'Alice'), (2, 'Bob')") _assert_stats( cur.stats, QueryResultStats( - num_rows_inserted=2, - num_rows_deleted=0, + num_rows_inserted=6, + num_rows_deleted=5, num_rows_updated=0, num_dml_duplicates=0, ), ) - # Second insert in same cursor - cur.execute("insert into test_multi_stats values (3, 'Charlie')") + +@pytest.mark.xfail(reason="Multi-statements does not return stats field") +@pytest.mark.skipolddriver +def test_multi_statement_in_one_execute(conn_cnx): + """Test that stats reflect the last statement when multiple statements are in one execute.""" + with conn_cnx(session_parameters={PARAMETER_MULTI_STATEMENT_COUNT: 0}) as conn: + with conn.cursor() as cur: + # Execute multiple statements separated by semicolons in one execute call + # The stats should reflect ONLY the last statement + cur.execute( + """ + create temp table test_multiexec (id int, name varchar(50)); + insert into test_multiexec values (1, 'Alice'), (2, 'Bob'), (3, 'Charlie'); + update test_multiexec set name = 'Updated' where id = 1; + delete from test_multiexec where id = 2; + """ + ) + + # Stats reflect only the last statement (DELETE of 1 row) _assert_stats( cur.stats, QueryResultStats( - num_rows_inserted=1, - num_rows_deleted=0, + num_rows_inserted=0, + num_rows_deleted=1, num_rows_updated=0, num_dml_duplicates=0, ), ) - # Update - cur.execute("update test_multi_stats set name = 'Updated' where id = 1") + +@pytest.mark.skipolddriver +@pytest.mark.xfail( + reason="execute_async stats are not returned from monitoring endpoint yet" +) +@pytest.mark.parametrize( + "operation,setup_sql,test_sql,expected_stats", + [ + pytest.param( + "insert_async", + "create temp table test_async_stats (id int, name varchar(50))", + "insert into test_async_stats values (1, 'Alice'), (2, 'Bob'), (3, 'Charlie')", + QueryResultStats( + num_rows_inserted=3, + num_rows_deleted=0, + num_rows_updated=0, + num_dml_duplicates=0, + ), + id="insert_async", + ), + pytest.param( + "update_async", + """ + create temp table test_async_stats (id int, name varchar(50)); + insert into test_async_stats values (1, 'Alice'), (2, 'Bob'), (3, 'Charlie'); + """, + "update test_async_stats set name = 'Updated' where id in (1, 2)", + QueryResultStats( + num_rows_inserted=0, + num_rows_deleted=0, + num_rows_updated=2, + num_dml_duplicates=0, + ), + id="update_async", + ), + pytest.param( + "delete_async", + """ + create temp table test_async_stats (id int, name varchar(50)); + insert into test_async_stats values (1, 'Alice'), (2, 'Bob'), (3, 'Charlie'); + """, + "delete from test_async_stats where id in (1, 3)", + QueryResultStats( + num_rows_inserted=0, + num_rows_deleted=2, + num_rows_updated=0, + num_dml_duplicates=0, + ), + id="delete_async", + ), + pytest.param( + "ctas_async", + "", + "create temp table test_async_ctas (id int) as select * from values (1), (2), (3), (4), (5) as t(id)", + QueryResultStats( + num_rows_inserted=5, + num_rows_deleted=0, + num_rows_updated=0, + num_dml_duplicates=0, + ), + id="ctas_async", + ), + ], +) +def test_execute_async_stats(conn_cnx, operation, setup_sql, test_sql, expected_stats): + """Test cursor.stats for DML operations executed asynchronously.""" + with conn_cnx() as conn: + with conn.cursor() as cur: + # Setup + if setup_sql: + for sql in setup_sql.strip().split(";"): + sql = sql.strip() + if sql: + cur.execute(sql) + + # Execute async + cur.execute_async(test_sql) + query_id = cur.sfqid + + # Get results + cur.get_results_from_sfqid(query_id) + + # Verify stats are available after getting results + _assert_stats(cur.stats, expected_stats) + + +@pytest.mark.skipolddriver +@pytest.mark.xfail( + reason="execute_async stats are not returned from monitoring endpoint yet" +) +def test_execute_async_stats_multiple_queries(conn_cnx): + """Test cursor.stats with multiple async queries.""" + with conn_cnx() as conn: + with conn.cursor() as cur: + # Setup + cur.execute("create temp table test_multi_async (id int, name varchar(50))") + + # Execute first async query + cur.execute_async( + "insert into test_multi_async values (1, 'Alice'), (2, 'Bob')" + ) + qid1 = cur.sfqid + + # Execute second async query + cur.execute_async("insert into test_multi_async values (3, 'Charlie')") + qid2 = cur.sfqid + + # Get results for first query + cur.get_results_from_sfqid(qid1) _assert_stats( cur.stats, QueryResultStats( - num_rows_inserted=0, + num_rows_inserted=2, num_rows_deleted=0, - num_rows_updated=1, + num_rows_updated=0, num_dml_duplicates=0, ), ) - # Delete - cur.execute("delete from test_multi_stats where id = 2") + # Get results for second query + cur.get_results_from_sfqid(qid2) _assert_stats( cur.stats, QueryResultStats( - num_rows_inserted=0, - num_rows_deleted=1, + num_rows_inserted=1, + num_rows_deleted=0, num_rows_updated=0, num_dml_duplicates=0, ), @@ -2424,3 +2526,37 @@ def test_truncate_stats(conn_cnx): num_dml_duplicates=0, ), ) + + +@pytest.mark.skipolddriver +def test_empty_result_stats(conn_cnx): + """Test cursor.stats for DML operations that affect zero rows.""" + with conn_cnx() as conn: + with conn.cursor() as cur: + # Setup + cur.execute("create temp table test_empty_stats (id int, name varchar(50))") + cur.execute("insert into test_empty_stats values (1, 'Alice')") + + # Update with no matching rows + cur.execute("update test_empty_stats set name = 'Updated' where id = 999") + _assert_stats( + cur.stats, + QueryResultStats( + num_rows_inserted=None, + num_rows_deleted=None, + num_rows_updated=None, + num_dml_duplicates=None, + ), + ) + + # Delete with no matching rows + cur.execute("delete from test_empty_stats where id = 999") + _assert_stats( + cur.stats, + QueryResultStats( + num_rows_inserted=None, + num_rows_deleted=None, + num_rows_updated=None, + num_dml_duplicates=None, + ), + ) From bf3c4c32a811258da3dd77ddb1fd291d703458c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filip=20Paw=C5=82owski?= Date: Tue, 9 Dec 2025 22:47:45 +0100 Subject: [PATCH 13/14] SNOW-295953: async tests done --- test/integ/aio_it/test_connection_async.py | 301 ++++++++++++--------- 1 file changed, 177 insertions(+), 124 deletions(-) diff --git a/test/integ/aio_it/test_connection_async.py b/test/integ/aio_it/test_connection_async.py index 1a0d0981fc..049ade5478 100644 --- a/test/integ/aio_it/test_connection_async.py +++ b/test/integ/aio_it/test_connection_async.py @@ -28,6 +28,7 @@ from snowflake.connector.aio._description import CLIENT_NAME from snowflake.connector.compat import IS_WINDOWS from snowflake.connector.connection import DEFAULT_CLIENT_PREFETCH_THREADS +from snowflake.connector.constants import PARAMETER_MULTI_STATEMENT_COUNT from snowflake.connector.cursor import QueryResultStats from snowflake.connector.errorcode import ( ER_CONNECTION_IS_CLOSED, @@ -2062,6 +2063,52 @@ def _assert_stats(actual_stats, expected_stats): ), id="merge_insert_update", ), + pytest.param( + "merge_delete", + """ + create temp table test_stats_target (id int, name varchar(50)); + insert into test_stats_target values (1, 'Alice'), (2, 'Bob'), (3, 'Charlie'); + """, + """ + merge into test_stats_target t + using (select * from values (1, 'Delete Me'), (2, 'Delete Me Too') as v(id, name)) s + on t.id = s.id + when matched then delete + """, + QueryResultStats( + num_rows_inserted=0, + num_rows_deleted=2, + num_rows_updated=0, + num_dml_duplicates=0, + ), + id="merge_delete", + ), + pytest.param( + "merge_insert_update_delete", + """ + create temp table test_stats_target (id int, name varchar(50), status varchar(20)); + insert into test_stats_target values (1, 'Alice', 'active'), (2, 'Bob', 'inactive'), (3, 'Charlie', 'active'); + """, + """ + merge into test_stats_target t + using (select * from values + (1, 'Alice Updated', 'active'), + (2, 'Bob', 'inactive'), + (4, 'David', 'active') + as v(id, name, status)) s + on t.id = s.id + when matched and s.status = 'active' then update set t.name = s.name + when matched and s.status = 'inactive' then delete + when not matched then insert (id, name, status) values (s.id, s.name, s.status) + """, + QueryResultStats( + num_rows_inserted=1, + num_rows_deleted=1, + num_rows_updated=1, + num_dml_duplicates=0, + ), + id="merge_insert_update_delete", + ), ], ) async def test_dml_stats_operations( @@ -2084,164 +2131,157 @@ async def test_dml_stats_operations( @pytest.mark.skipolddriver -async def test_merge_with_duplicates_in_source(conn_cnx): - """Test cursor.stats for MERGE operations with duplicates in source data.""" +async def test_copy_into_stats_with_stage(conn_cnx, tmp_path): + """Test cursor.stats for COPY INTO operations with actual stage files.""" + import csv + + # Create a CSV file + csv_file = tmp_path / "test_data.csv" + with open(csv_file, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow([1, "Alice"]) + writer.writerow([2, "Bob"]) + writer.writerow([3, "Charlie"]) + async with conn_cnx() as conn: async with conn.cursor() as cur: - # Create target table - await cur.execute( - "create temp table test_merge_target (id int, name varchar(50))" - ) + # Create table and stage await cur.execute( - "insert into test_merge_target values (1, 'Alice'), (2, 'Bob')" + "create temp table test_copy_stats (id int, name varchar(50))" ) + await cur.execute("create temp stage test_copy_stage") - # Create source with duplicates - # When source has duplicates, Snowflake may report them in num_dml_duplicates + # PUT file to stage + await cur.execute(f"put file://{csv_file} @test_copy_stage") + + # COPY INTO from stage await cur.execute( """ - merge into test_merge_target t - using ( - select * from values - (1, 'Alice_Updated'), - (1, 'Alice_Updated_Again'), -- Duplicate key in source - (3, 'Charlie'), - (3, 'Charlie_Dup') -- Another duplicate in source - as s(id, name) - ) s - on t.id = s.id - when matched then update set t.name = s.name - when not matched then insert (id, name) values (s.id, s.name) + copy into test_copy_stats + from @test_copy_stage/test_data.csv.gz + file_format = (type = csv) """ ) - # Check stats - duplicates in source may be reported + # Verify stats assert cur.stats is not None - # The exact duplicate count depends on Snowflake's reporting - # but we verify the field is accessible - assert hasattr(cur.stats, "num_dml_duplicates") - assert ( - cur.stats.num_dml_duplicates is not None - or cur.stats.num_dml_duplicates == 0 - ) + assert cur.stats.num_rows_inserted == 3 + assert cur.stats.num_rows_deleted == 0 + assert cur.stats.num_rows_updated == 0 + assert cur.stats.num_dml_duplicates == 0 @pytest.mark.skipolddriver -async def test_insert_all_with_duplicates_stats(conn_cnx): - """Test cursor.stats for INSERT ALL with duplicate handling.""" +async def test_update_with_dml_duplicates(conn_cnx): + """Test cursor.stats for UPDATE operations that generate numDmlDuplicates. + + When a row in the updated table is matched by multiple rows in the FROM clause, + Snowflake reports the extra matches as duplicates in numDmlDuplicates. + """ async with conn_cnx() as conn: async with conn.cursor() as cur: - # Create tables - await cur.execute( - "create temp table test_dup_table1 (id int primary key, name varchar(50))" - ) + # test_src has 15 rows: five 0's, five 1's, five 2's + await cur.execute("create temp table test_src (c1 int, c2 int)") await cur.execute( - "create temp table test_dup_table2 (id int primary key, name varchar(50))" + "insert into test_src values (0, 100), (1, 100), (2, 100), (0, 100), (1, 100), " + "(2, 100), (0, 100), (1, 100), (2, 100), (0, 100), (1, 100), (2, 100), " + "(0, 100), (1, 100), (2, 100)" ) - # Pre-populate with some data - await cur.execute("insert into test_dup_table1 values (1, 'Alice')") - await cur.execute("insert into test_dup_table2 values (2, 'Bob')") - - # Try INSERT ALL - some inserts will conflict with existing keys - # This tests if duplicates are properly reported - try: - await cur.execute( - """ - insert all - into test_dup_table1 values (id, name) - into test_dup_table2 values (id, name) - select * from values - (1, 'Alice_New'), -- Duplicate in table1 - (2, 'Bob_New'), -- Duplicate in table2 - (3, 'Charlie') -- New in both - as t(id, name) - """ - ) - # If this succeeds, check stats - assert cur.stats is not None - assert hasattr(cur.stats, "num_dml_duplicates") - # TODO: bad - except Exception: - # INSERT ALL with duplicates might fail depending on constraints - # This is expected behavior - just verify we can handle it - pass + # test_target has 4 rows: two 0's, one 1, one 2 + await cur.execute("create temp table test_target (c int)") + await cur.execute("insert into test_target values (0), (1), (2), (0)") - -@pytest.mark.skipolddriver -async def test_stats_duplicates_field_exists(conn_cnx): - """Test that num_dml_duplicates field always exists in stats, even if zero.""" - async with conn_cnx() as conn: - async with conn.cursor() as cur: - # Simple insert with no duplicates + # UPDATE with FROM clause: + # - Each of 5 rows with c1=0 matches 2 rows in test_target (duplicate count: 5 × 1 = 5) + # - Each of 5 rows with c1=1 matches 1 row in test_target (duplicate count: 0) + # - Each of 5 rows with c1=2 matches 1 row in test_target (duplicate count: 0) + # Total duplicates: 5 await cur.execute( - "create temp table test_dup_exists (id int, name varchar(50))" - ) - await cur.execute( - "insert into test_dup_exists values (1, 'Alice'), (2, 'Bob')" + """ + update test_src set c2 = test_target.c + from test_target + where test_src.c1 = test_target.c + """ ) - # Verify the field exists and is 0 - assert cur.stats is not None - assert hasattr(cur.stats, "num_dml_duplicates") - assert ( - cur.stats.num_dml_duplicates == 0 - or cur.stats.num_dml_duplicates is None + _assert_stats( + cur.stats, + QueryResultStats( + num_rows_inserted=0, + num_rows_deleted=0, + num_rows_updated=15, + num_dml_duplicates=5, + ), ) @pytest.mark.skipolddriver -async def test_multi_statement_stats(conn_cnx): - """Test that stats are updated correctly for multi-statement queries.""" +async def test_multi_table_insert_overwrite_stats(conn_cnx): + """Test cursor.stats for multi-table INSERT OVERWRITE operations.""" async with conn_cnx() as conn: async with conn.cursor() as cur: - # Setup + # Source has 3 values: 5, 15, 25 + await cur.execute("create temp table test_src_multi (c1 int)") + await cur.execute("insert into test_src_multi values (5), (15), (25)") + + # Target tables with existing data + await cur.execute("create temp table test_tgt1 (c int)") + await cur.execute("create temp table test_tgt2 (c int)") + await cur.execute("insert into test_tgt1 values (100), (101)") + await cur.execute("insert into test_tgt2 values (200), (201), (202)") + + # INSERT OVERWRITE ALL evaluates ALL matching WHEN clauses per row: + # - c1=5: no WHENs match → else clause → 1 insert (5 to tgt2) + # - c1=15: second WHEN matches → 2 inserts (15 to tgt1, 15 to tgt2) + # - c1=25: both WHENs match → 3 inserts (25 to tgt1, then 25 to tgt1 and 25 to tgt2) + # Result: tgt1=[25,15,25], tgt2=[15,25,5] + # Total: 6 inserts, 5 deletes (2+3 existing rows cleared by OVERWRITE) await cur.execute( - "create temp table test_multi_stats (id int, name varchar(50))" + """ + insert overwrite all + when c1 > 20 then + into test_tgt1 values (c1) + when c1 > 10 then + into test_tgt1 values (c1) + into test_tgt2 values (c1) + else + into test_tgt2 values (c1) + select c1 from test_src_multi + """ ) - # First insert - await cur.execute( - "insert into test_multi_stats values (1, 'Alice'), (2, 'Bob')" - ) _assert_stats( cur.stats, QueryResultStats( - num_rows_inserted=2, - num_rows_deleted=0, + num_rows_inserted=6, + num_rows_deleted=5, num_rows_updated=0, num_dml_duplicates=0, ), ) - # Second insert in same cursor - await cur.execute("insert into test_multi_stats values (3, 'Charlie')") - _assert_stats( - cur.stats, - QueryResultStats( - num_rows_inserted=1, - num_rows_deleted=0, - num_rows_updated=0, - num_dml_duplicates=0, - ), - ) - # Update +@pytest.mark.xfail(reason="Multi-statements does not return stats field") +@pytest.mark.skipolddriver +async def test_multi_statement_in_one_execute(conn_cnx): + """Test that stats reflect the last statement when multiple statements are in one execute.""" + async with conn_cnx( + session_parameters={PARAMETER_MULTI_STATEMENT_COUNT: 0} + ) as conn: + async with conn.cursor() as cur: + # Execute multiple statements separated by semicolons in one execute call + # Stats should reflect ONLY the last statement await cur.execute( - "update test_multi_stats set name = 'Updated' where id = 1" - ) - _assert_stats( - cur.stats, - QueryResultStats( - num_rows_inserted=0, - num_rows_deleted=0, - num_rows_updated=1, - num_dml_duplicates=0, - ), + """ + create temp table test_multiexec (id int, name varchar(50)); + insert into test_multiexec values (1, 'Alice'), (2, 'Bob'), (3, 'Charlie'); + update test_multiexec set name = 'Updated' where id = 1; + delete from test_multiexec where id = 2; + """ ) - # Delete - await cur.execute("delete from test_multi_stats where id = 2") + # Stats reflect only the last statement (DELETE of 1 row) _assert_stats( cur.stats, QueryResultStats( @@ -2295,8 +2335,15 @@ async def test_truncate_stats(conn_cnx): # Truncate doesn't provide detailed stats await cur.execute("truncate table test_truncate_stats") - # Truncate typically doesn't populate DML stats - assert cur.stats is not None + _assert_stats( + cur.stats, + QueryResultStats( + num_rows_inserted=0, + num_rows_deleted=3, + num_rows_updated=0, + num_dml_duplicates=0, + ), + ) @pytest.mark.skipolddriver @@ -2317,10 +2364,10 @@ async def test_empty_result_stats(conn_cnx): _assert_stats( cur.stats, QueryResultStats( - num_rows_inserted=0, - num_rows_deleted=0, - num_rows_updated=0, - num_dml_duplicates=0, + num_rows_inserted=None, + num_rows_deleted=None, + num_rows_updated=None, + num_dml_duplicates=None, ), ) @@ -2329,15 +2376,18 @@ async def test_empty_result_stats(conn_cnx): _assert_stats( cur.stats, QueryResultStats( - num_rows_inserted=0, - num_rows_deleted=0, - num_rows_updated=0, - num_dml_duplicates=0, + num_rows_inserted=None, + num_rows_deleted=None, + num_rows_updated=None, + num_dml_duplicates=None, ), ) @pytest.mark.skipolddriver +@pytest.mark.xfail( + reason="execute_async stats are not returned from monitoring endpoint yet" +) @pytest.mark.parametrize( "operation,setup_sql,test_sql,expected_stats", [ @@ -2422,6 +2472,9 @@ async def test_execute_async_stats( @pytest.mark.skipolddriver +@pytest.mark.xfail( + reason="execute_async stats are not returned from monitoring endpoint yet" +) async def test_execute_async_stats_multiple_queries(conn_cnx): """Test cursor.stats with multiple async queries.""" async with conn_cnx() as conn: From 4c3d20cce6690a01c0c62a95c2455d4b5336d1f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filip=20Paw=C5=82owski?= Date: Wed, 10 Dec 2025 17:35:58 +0100 Subject: [PATCH 14/14] SNOW-295953: docs updated --- src/snowflake/connector/cursor.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/snowflake/connector/cursor.py b/src/snowflake/connector/cursor.py index 7688d54a59..e408df4a75 100644 --- a/src/snowflake/connector/cursor.py +++ b/src/snowflake/connector/cursor.py @@ -468,7 +468,8 @@ def stats(self) -> QueryResultStats | None: - num_rows_updated: Number of rows updated - num_dml_duplicates: Number of duplicates in DML statement - Returns None on each position if no DML stats are available. + Returns None on each position if no DML stats are available - this includes DML operations where no rows were + affected as well as other type of SQL statements (e.g. DDL, DQL). """ if self._stats_data is None: return QueryResultStats(None, None, None, None)