|
10 | 10 | # See the License for the specific language governing permissions and
|
11 | 11 | # limitations under the License.
|
12 | 12 | import math
|
| 13 | +import time as t |
13 | 14 | import uuid
|
14 | 15 | from datetime import date, datetime, time, timedelta, timezone
|
15 | 16 | from decimal import Decimal
|
|
27 | 28 | import trino
|
28 | 29 | from tests.integration.conftest import trino_version
|
29 | 30 | from trino import constants
|
30 |
| -from trino.dbapi import Cursor, DescribeOutput |
| 31 | +from trino.dbapi import Cursor, DescribeOutput, TimeBoundLRUCache |
31 | 32 | from trino.exceptions import NotSupportedError, TrinoQueryError, TrinoUserError
|
32 | 33 | from trino.transaction import IsolationLevel
|
33 | 34 |
|
@@ -1782,6 +1783,41 @@ def test_rowcount_insert(trino_connection):
|
1782 | 1783 | assert cur.rowcount == 1
|
1783 | 1784 |
|
1784 | 1785 |
|
| 1786 | +@pytest.mark.parametrize( |
| 1787 | + "legacy_prepared_statements", |
| 1788 | + [ |
| 1789 | + True, |
| 1790 | + pytest.param(False, marks=pytest.mark.skipif( |
| 1791 | + trino_version() <= '417', |
| 1792 | + reason="EXECUTE IMMEDIATE was introduced in version 418")), |
| 1793 | + None |
| 1794 | + ] |
| 1795 | +) |
| 1796 | +def test_prepared_statement_capability_autodetection(legacy_prepared_statements, run_trino): |
| 1797 | + # start with an empty cache |
| 1798 | + trino.dbapi.must_use_legacy_prepared_statements = TimeBoundLRUCache(1024, 3600) |
| 1799 | + user_name = f"user_{t.monotonic_ns()}" |
| 1800 | + |
| 1801 | + _, host, port = run_trino |
| 1802 | + connection = trino.dbapi.Connection( |
| 1803 | + host=host, |
| 1804 | + port=port, |
| 1805 | + user=user_name, |
| 1806 | + legacy_prepared_statements=legacy_prepared_statements, |
| 1807 | + ) |
| 1808 | + cur = connection.cursor() |
| 1809 | + cur.execute("SELECT ?", [42]) |
| 1810 | + cur.fetchall() |
| 1811 | + another = connection.cursor() |
| 1812 | + another.execute("SELECT ?", [100]) |
| 1813 | + another.fetchall() |
| 1814 | + |
| 1815 | + verify = connection.cursor() |
| 1816 | + rows = verify.execute("SELECT query FROM system.runtime.queries WHERE user = ?", [user_name]) |
| 1817 | + statements = [stmt for row in rows for stmt in row] |
| 1818 | + assert statements.count("EXECUTE IMMEDIATE 'SELECT 1'") == (1 if legacy_prepared_statements is None else 0) |
| 1819 | + |
| 1820 | + |
1785 | 1821 | def get_cursor(legacy_prepared_statements, run_trino):
|
1786 | 1822 | _, host, port = run_trino
|
1787 | 1823 |
|
|
0 commit comments