Skip to content
This repository was archived by the owner on Aug 19, 2025. It is now read-only.

Commit 0bf7cb7

Browse files
committed
Rough implementation of the raw driver call
1 parent 1e0f679 commit 0bf7cb7

File tree

5 files changed

+208
-3
lines changed

5 files changed

+208
-3
lines changed

databases/backends/mysql.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from sqlalchemy.sql import ClauseElement
1111
from sqlalchemy.types import TypeEngine
1212

13-
from databases.core import DatabaseURL
13+
from databases.core import DatabaseURL, NoBackendMethod
1414
from databases.interfaces import ConnectionBackend, DatabaseBackend, TransactionBackend
1515

1616
logger = logging.getLogger("databases")
@@ -135,6 +135,29 @@ async def execute_many(self, query: ClauseElement, values: list) -> None:
135135
finally:
136136
await cursor.close()
137137

138+
async def raw_api_call(self, method: str, *args: typing.Any, **kwargs: typing.Any) -> typing.Any:
139+
"""
140+
NOTE: highly experimental, seems to be a dead-end for generalized solution
141+
"""
142+
assert self._connection is not None, "Connection is not acquired"
143+
cursor = await self._connection.cursor()
144+
try:
145+
if 'execute' not in method:
146+
await cursor.execute(*args, **kwargs)
147+
api_method = getattr(cursor, method)
148+
return await api_method()
149+
else:
150+
api_method = getattr(cursor, method)
151+
return await api_method(*args, **kwargs)
152+
except AttributeError:
153+
raise NoBackendMethod(f'{self.database._backend._dialect.driver} has no "{method}" implemented.')
154+
finally:
155+
await cursor.close()
156+
157+
async def expose_backend_connection(self) -> aiomysql.connection.Connection:
158+
assert self._connection is not None, "Connection is not acquired"
159+
return self._connection
160+
138161
async def iterate(
139162
self, query: ClauseElement
140163
) -> typing.AsyncGenerator[typing.Any, None]:

databases/backends/postgres.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from sqlalchemy.engine.interfaces import Dialect
88
from sqlalchemy.sql import ClauseElement
99

10-
from databases.core import DatabaseURL
10+
from databases.core import DatabaseURL, NoBackendMethod
1111
from databases.interfaces import ConnectionBackend, DatabaseBackend, TransactionBackend
1212

1313
logger = logging.getLogger("databases")
@@ -93,6 +93,8 @@ def __iter__(self) -> typing.Iterator:
9393
def __len__(self) -> int:
9494
return len(self._column_map)
9595

96+
# TODO (?) Nice __repr__
97+
9698

9799
class PostgresConnection(ConnectionBackend):
98100
def __init__(self, database: PostgresBackend, dialect: Dialect):
@@ -142,6 +144,21 @@ async def execute_many(self, query: ClauseElement, values: list) -> None:
142144
single_query, args, result_columns = self._compile(single_query)
143145
await self._connection.execute(single_query, *args)
144146

147+
async def raw_api_call(self, method: str, *args: typing.Any, **kwargs: typing.Any) -> typing.Any:
148+
"""
149+
NOTE: highly experimental, seems to be a dead-end for generalized solution
150+
"""
151+
assert self._connection is not None, "Connection is not acquired"
152+
try:
153+
api_method = getattr(self._connection, method)
154+
return await api_method(*args, **kwargs)
155+
except AttributeError:
156+
raise NoBackendMethod(f'{self.database._backend._dialect.driver} has no "{method}" implemented.')
157+
158+
async def expose_backend_connection(self) -> asyncpg.connection.Connection:
159+
assert self._connection is not None, "Connection is not acquired"
160+
return self._connection
161+
145162
async def iterate(
146163
self, query: ClauseElement
147164
) -> typing.AsyncGenerator[typing.Any, None]:

databases/backends/sqlite.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from sqlalchemy.sql import ClauseElement
1010
from sqlalchemy.types import TypeEngine
1111

12-
from databases.core import DatabaseURL
12+
from databases.core import DatabaseURL, NoBackendMethod
1313
from databases.interfaces import ConnectionBackend, DatabaseBackend, TransactionBackend
1414

1515
logger = logging.getLogger("databases")
@@ -116,6 +116,21 @@ async def execute_many(self, query: ClauseElement, values: list) -> None:
116116
for value in values:
117117
await self.execute(query, value)
118118

119+
async def raw_api_call(self, method: str, *args: typing.Any, **kwargs: typing.Any) -> typing.Any:
120+
"""
121+
NOTE: highly experimental, seems to be a dead-end for generalized solution
122+
"""
123+
assert self._connection is not None, "Connection is not acquired"
124+
try:
125+
api_method = getattr(self._connection, method)
126+
return await api_method(*args, **kwargs)
127+
except AttributeError:
128+
raise NoBackendMethod(f'{self.database._backend._dialect.driver} has no "{method}" implemented.')
129+
130+
async def expose_backend_connection(self) -> aiosqlite.core.Connection:
131+
assert self._connection is not None, "Connection is not acquired"
132+
return self._connection
133+
119134
async def iterate(
120135
self, query: ClauseElement
121136
) -> typing.AsyncGenerator[typing.Any, None]:

databases/core.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,14 @@ async def execute_many(self, query: ClauseElement, values: list) -> None:
104104
async with self.connection() as connection:
105105
return await connection.execute_many(query=query, values=values)
106106

107+
async def raw_api_call(self, method: str, *args: typing.Any, **kwargs: typing.Any) -> typing.Any:
108+
async with self.connection() as connection:
109+
return await connection.raw_api_call(method, *args, **kwargs)
110+
111+
async def expose_backend_connection(self) -> typing.Any:
112+
async with self.connection() as connection:
113+
return await connection.expose_backend_connection()
114+
107115
async def iterate(
108116
self, query: ClauseElement
109117
) -> typing.AsyncGenerator[RowProxy, None]:
@@ -168,6 +176,12 @@ async def execute(self, query: ClauseElement, values: dict = None) -> typing.Any
168176
async def execute_many(self, query: ClauseElement, values: list) -> None:
169177
await self._connection.execute_many(query, values)
170178

179+
async def raw_api_call(self, method: str, *args: typing.Any, **kwargs: typing.Any) -> typing.Any:
180+
return await self._connection.raw_api_call(method, *args, **kwargs)
181+
182+
async def expose_backend_connection(self) -> typing.Any:
183+
return await self._connection.expose_backend_connection()
184+
171185
async def iterate(
172186
self, query: ClauseElement
173187
) -> typing.AsyncGenerator[typing.Any, None]:
@@ -348,3 +362,7 @@ def __repr__(self) -> str:
348362
if self.password:
349363
url = str(self.replace(password="********"))
350364
return f"{self.__class__.__name__}({repr(url)})"
365+
366+
367+
class NoBackendMethod(Exception):
368+
pass

tests/test_databases.py

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -514,3 +514,135 @@ async def get_connection_2():
514514
test_complete.set()
515515
await task_1
516516
await task_2
517+
518+
519+
@pytest.mark.parametrize("database_url", DATABASE_URLS)
520+
@async_adapter
521+
async def test_queries_with_raw_api_call(database_url):
522+
"""
523+
Test that the basic `execute()`, `execute_many()`, `fetch_all()``, and
524+
`fetch_one()` interfaces are working as expected being called as raw driver calls.
525+
"""
526+
async with Database(database_url) as database:
527+
async with database.transaction(force_rollback=True):
528+
# Insert query
529+
if str(database_url).startswith('mysql'):
530+
insert_query = "INSERT INTO notes (text, completed) VALUES (%s, %s)"
531+
else:
532+
insert_query = "INSERT INTO notes (text, completed) VALUES ($1, $2)"
533+
534+
# execute()
535+
values = ("example1", True)
536+
537+
if str(database_url).startswith('postgresql'):
538+
await database.raw_api_call('execute', insert_query, *values)
539+
else:
540+
await database.raw_api_call('execute', insert_query, values)
541+
542+
# execute_many()
543+
values = [("example2", False), ("example3", True)]
544+
await database.raw_api_call('executemany', insert_query, values)
545+
546+
# Select query
547+
select_query = "SELECT notes.id, notes.text, notes.completed FROM notes"
548+
549+
# fetch_all()
550+
if str(database_url).startswith('postgresql'):
551+
results = await database.raw_api_call('fetch', select_query)
552+
elif str(database_url).startswith('mysql'):
553+
results = await database.raw_api_call('fetchall', select_query)
554+
elif str(database_url).startswith('sqlite'):
555+
results = await database.raw_api_call('execute_fetchall', select_query)
556+
557+
assert len(results) == 3
558+
# Raw output for the raw request
559+
assert results[0][1] == "example1"
560+
assert results[0][2] == True
561+
assert results[1][1] == "example2"
562+
assert results[1][2] == False
563+
assert results[2][1] == "example3"
564+
assert results[2][2] == True
565+
566+
# # fetch_one()
567+
# import pdb; pdb.set_trace()
568+
# if str(database_url).startswith('postgresql'):
569+
# result = await database.raw_api_call('fetchrow', select_query)
570+
# else:
571+
# result = await database.raw_api_call('fetchone', select_query)
572+
# assert result[1] == "example1"
573+
# assert result[2] == True
574+
575+
576+
@pytest.mark.parametrize("database_url", DATABASE_URLS)
577+
@async_adapter
578+
async def test_queries_with_expose_backend_connection(database_url):
579+
"""
580+
Replication of `execute()`, `execute_many()`, `fetch_all()``, and
581+
`fetch_one()` using the raw driver interface.
582+
"""
583+
async with Database(database_url) as database:
584+
async with database.transaction(force_rollback=True):
585+
# Insert query
586+
if str(database_url).startswith('mysql'):
587+
insert_query = "INSERT INTO notes (text, completed) VALUES (%s, %s)"
588+
else:
589+
insert_query = "INSERT INTO notes (text, completed) VALUES ($1, $2)"
590+
591+
# execute()
592+
values = ("example1", True)
593+
594+
con = await database.expose_backend_connection()
595+
596+
if str(database_url).startswith('postgresql'):
597+
await con.execute(insert_query, *values)
598+
elif str(database_url).startswith('mysql'):
599+
cursor = await con.cursor()
600+
await cursor.execute(insert_query, values)
601+
elif str(database_url).startswith('sqlite'):
602+
await con.execute(insert_query, values)
603+
604+
# execute_many()
605+
values = [("example2", False), ("example3", True)]
606+
607+
if str(database_url).startswith('mysql'):
608+
cursor = await con.cursor()
609+
await cursor.executemany(insert_query, values)
610+
else:
611+
await con.executemany(insert_query, values)
612+
613+
# Select query
614+
select_query = "SELECT notes.id, notes.text, notes.completed FROM notes"
615+
616+
# fetch_all()
617+
if str(database_url).startswith('postgresql'):
618+
results = await con.fetch(select_query)
619+
elif str(database_url).startswith('mysql'):
620+
cursor = await con.cursor()
621+
await cursor.execute(select_query)
622+
results = await cursor.fetchall()
623+
elif str(database_url).startswith('sqlite'):
624+
results = await con.execute_fetchall(select_query)
625+
626+
assert len(results) == 3
627+
# Raw output for the raw request
628+
assert results[0][1] == "example1"
629+
assert results[0][2] == True
630+
assert results[1][1] == "example2"
631+
assert results[1][2] == False
632+
assert results[2][1] == "example3"
633+
assert results[2][2] == True
634+
635+
# fetch_one()
636+
if str(database_url).startswith('postgresql'):
637+
result = await con.fetchrow(select_query)
638+
else:
639+
cursor = await con.cursor()
640+
await cursor.execute(select_query)
641+
result = await cursor.fetchone()
642+
643+
# Raw output for the raw request
644+
assert result[1] == "example1"
645+
assert result[2] == True
646+
647+
# TODO unittests at the connection level
648+
# TODO (?) Double-check connections are released back to the pool

0 commit comments

Comments
 (0)