@@ -37,7 +37,8 @@ async def connect(self) -> None:
37
37
return
38
38
39
39
self ._pool = psycopg_pool .AsyncConnectionPool (
40
- self ._database_url .url , open = False , ** self ._options )
40
+ self ._database_url ._url , open = False , ** self ._options
41
+ )
41
42
42
43
# TODO: Add configurable timeouts
43
44
await self ._pool .open ()
@@ -86,13 +87,33 @@ async def fetch_all(self, query: ClauseElement) -> typing.List[RecordInterface]:
86
87
raise RuntimeError ("Connection is not acquired" )
87
88
88
89
query_str , args , result_columns = compile_query (query , self ._dialect )
89
- rows = await self ._connection .fetch (query_str , * args )
90
+
91
+ async with self ._connection .cursor () as cursor :
92
+ await cursor .execute (query_str , args )
93
+ rows = await cursor .fetchall ()
94
+
90
95
column_maps = create_column_maps (result_columns )
91
96
return [Record (row , result_columns , self ._dialect , column_maps ) for row in rows ]
92
- raise NotImplementedError () # pragma: no cover
93
97
94
98
async def fetch_one (self , query : ClauseElement ) -> typing .Optional [RecordInterface ]:
95
- raise NotImplementedError () # pragma: no cover
99
+ if self ._connection is None :
100
+ raise RuntimeError ("Connection is not acquired" )
101
+
102
+ query_str , args , result_columns = compile_query (query , self ._dialect )
103
+
104
+ async with self ._connection .cursor () as cursor :
105
+ await cursor .execute (query_str , args )
106
+ row = await cursor .fetchone ()
107
+
108
+ if row is None :
109
+ return None
110
+
111
+ return Record (
112
+ row ,
113
+ result_columns ,
114
+ self ._dialect ,
115
+ create_column_maps (result_columns ),
116
+ )
96
117
97
118
async def fetch_val (
98
119
self , query : ClauseElement , column : typing .Any = 0
@@ -101,25 +122,47 @@ async def fetch_val(
101
122
return None if row is None else row [column ]
102
123
103
124
async def execute (self , query : ClauseElement ) -> typing .Any :
104
- raise NotImplementedError () # pragma: no cover
125
+ if self ._connection is None :
126
+ raise RuntimeError ("Connection is not acquired" )
127
+
128
+ query_str , args , _ = compile_query (query , self ._dialect )
129
+
130
+ async with self ._connection .cursor () as cursor :
131
+ await cursor .execute (query_str , args )
105
132
106
133
async def execute_many (self , queries : typing .List [ClauseElement ]) -> None :
107
- raise NotImplementedError () # pragma: no cover
134
+ # TODO: Find a way to use psycopg's executemany
135
+ for query in queries :
136
+ await self .execute (query )
108
137
109
138
async def iterate (
110
139
self , query : ClauseElement
111
140
) -> typing .AsyncGenerator [typing .Mapping , None ]:
112
- raise NotImplementedError () # pragma: no cover
113
- # mypy needs async iterators to contain a `yield`
114
- # https://github.com/python/mypy/issues/5385#issuecomment-407281656
115
- yield True # pragma: no cover
141
+ if self ._connection is None :
142
+ raise RuntimeError ("Connection is not acquired" )
143
+
144
+ query_str , args , result_columns = compile_query (query , self ._dialect )
145
+ column_maps = create_column_maps (result_columns )
146
+
147
+ async with self ._connection .cursor () as cursor :
148
+ await cursor .execute (query_str , args )
149
+
150
+ while True :
151
+ row = await cursor .fetchone ()
152
+
153
+ if row is None :
154
+ break
155
+
156
+ yield Record (row , result_columns , self ._dialect , column_maps )
116
157
117
158
def transaction (self ) -> "TransactionBackend" :
118
159
raise NotImplementedError () # pragma: no cover
119
160
120
161
@property
121
162
def raw_connection (self ) -> typing .Any :
122
- raise NotImplementedError () # pragma: no cover
163
+ if self ._connection is None :
164
+ raise RuntimeError ("Connection is not acquired" )
165
+ return self ._connection
123
166
124
167
125
168
class PsycopgTransaction (TransactionBackend ):
0 commit comments