Skip to content
This repository was archived by the owner on Mar 19, 2026. It is now read-only.

Commit 6a61600

Browse files
authored
Merge pull request #49 from PrefectHQ/fix_fetch
Fix fetch flow
2 parents 70f7faa + 462340c commit 6a61600

File tree

3 files changed

+84
-11
lines changed

3 files changed

+84
-11
lines changed

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1919

2020
### Security
2121

22+
## 0.2.4
23+
24+
Released on February 15th, 2022.
25+
26+
### Fixed
27+
28+
- Initializing `_unique_results` and `_exit_stack` so `fetch_*` is able to run - [#49](https://github.com/PrefectHQ/prefect-sqlalchemy/pull/49)
29+
2230
## 0.2.3
2331

2432
Released on February 10th, 2022.

prefect_sqlalchemy/database.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,12 @@ def block_initialization(self):
301301
except ValueError:
302302
self._driver_is_async = False
303303

304+
if self._unique_results is None:
305+
self._unique_results = {}
306+
307+
if self._exit_stack is None:
308+
self._start_exit_stack()
309+
304310
def _start_exit_stack(self):
305311
"""
306312
Starts an AsyncExitStack or ExitStack depending on whether driver is async.
@@ -378,10 +384,6 @@ def sqlalchemy_credentials_flow():
378384

379385
if self._engine is None:
380386
self._engine = engine
381-
if self._exit_stack is None:
382-
self._start_exit_stack()
383-
if self._unique_results is None:
384-
self._unique_results = {}
385387

386388
return engine
387389

@@ -938,3 +940,9 @@ def __getstate__(self):
938940
def __setstate__(self, data: dict):
939941
"""Upon loading back, restart the engine and results."""
940942
self.__dict__.update(data)
943+
944+
if self._unique_results is None:
945+
self._unique_results = {}
946+
947+
if self._exit_stack is None:
948+
self._start_exit_stack()

tests/test_database.py

Lines changed: 64 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
import cloudpickle
55
import pytest
6-
from prefect import flow
6+
from prefect import flow, task
77
from sqlalchemy.engine import Connection, Engine
88
from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine
99

@@ -261,22 +261,30 @@ async def test_connector_init(self):
261261
credentials_url = SqlAlchemyConnector(connection_info=connection_url)
262262
assert credentials_components._rendered_url == credentials_url._rendered_url
263263

264-
def test_delay_start(self, caplog):
264+
@pytest.mark.parametrize("method", ["fetch_all", "execute"])
265+
def test_delay_start(self, caplog, method):
265266
with SqlAlchemyConnector(
266267
connection_info=ConnectionComponents(
267268
driver=SyncDriver.SQLITE_PYSQLITE,
268269
database=":memory:",
269270
),
270271
) as connector:
272+
assert connector._unique_results == {}
273+
assert isinstance(connector._exit_stack, ExitStack)
271274
connector.reset_connections()
272-
assert caplog.records[0].msg == "There were no connections to reset."
275+
assert (
276+
caplog.records[0].msg == "Reset opened connections and their results."
277+
)
273278
assert connector._engine is None
274-
assert connector._unique_results is None
275-
assert connector._exit_stack is None
276-
connector.execute("SELECT 1")
277-
assert isinstance(connector._engine, Engine)
278279
assert connector._unique_results == {}
279280
assert isinstance(connector._exit_stack, ExitStack)
281+
getattr(connector, method)("SELECT 1")
282+
assert isinstance(connector._engine, Engine)
283+
if method == "execute":
284+
assert connector._unique_results == {}
285+
else:
286+
assert len(connector._unique_results) == 1
287+
assert isinstance(connector._exit_stack, ExitStack)
280288

281289
@pytest.fixture(params=[SyncDriver.SQLITE_PYSQLITE, AsyncDriver.SQLITE_AIOSQLITE])
282290
async def connector_with_data(self, tmp_path, request):
@@ -547,3 +555,52 @@ def test_sync_compatible_reset_connections(self, tmp_path):
547555
assert len(conn._unique_results) == 1
548556
conn.reset_connections()
549557
assert len(conn._unique_results) == 0
558+
559+
def test_flow_without_initialized_engine(self, tmp_path):
560+
@task
561+
def setup_table(block_name: str) -> None:
562+
with SqlAlchemyConnector.load(block_name) as connector:
563+
connector.execute(
564+
"CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);" # noqa
565+
)
566+
connector.execute(
567+
"INSERT INTO customers (name, address) VALUES (:name, :address);",
568+
parameters={"name": "Marvin", "address": "Highway 42"},
569+
)
570+
connector.execute_many(
571+
"INSERT INTO customers (name, address) VALUES (:name, :address);",
572+
seq_of_parameters=[
573+
{"name": "Ford", "address": "Highway 42"},
574+
{"name": "Unknown", "address": "Highway 42"},
575+
],
576+
)
577+
578+
@task
579+
def fetch_data(block_name: str) -> list:
580+
all_rows = []
581+
with SqlAlchemyConnector.load(block_name) as connector:
582+
while True:
583+
# Repeated fetch* calls using the same operation will
584+
# skip re-executing and instead return the next set of results
585+
new_rows = connector.fetch_many("SELECT * FROM customers", size=2)
586+
if len(new_rows) == 0:
587+
break
588+
all_rows.append(new_rows)
589+
return all_rows
590+
591+
@flow
592+
def sqlalchemy_flow(block_name: str) -> list:
593+
SqlAlchemyConnector(
594+
connection_info=ConnectionComponents(
595+
driver=SyncDriver.SQLITE_PYSQLITE,
596+
database=str(tmp_path / "test.db"),
597+
)
598+
).save(block_name)
599+
setup_table(block_name)
600+
all_rows = fetch_data(block_name)
601+
return all_rows
602+
603+
assert sqlalchemy_flow("connector") == [
604+
[("Marvin", "Highway 42"), ("Ford", "Highway 42")],
605+
[("Unknown", "Highway 42")],
606+
]

0 commit comments

Comments
 (0)