@@ -143,33 +143,31 @@ async def aiter(list: List):
143143 elif isinstance (i , dict ):
144144 i = orjson .dumps (i ).decode ("utf-8" )
145145 if isinstance (i , str ):
146- yield "\n " .join (
146+ line = "\n " .join (
147147 [
148- i .rstrip ().replace (
149- r"\n" , r"\\n"
150- ).replace (
151- r"\t" , r"\\t"
152- ).replace (
153- "'" , "''"
154- )
155- ]).encode ("utf-8" )
148+ i .rstrip ()
149+ .replace (r"\n" , r"\\n" )
150+ .replace (r"\t" , r"\\t" )
151+ .replace ("'" , "'" )
152+ ]
153+ ).encode ("utf-8" )
154+ yield line
156155 else :
157156 raise Exception (f"Could not parse { i } " )
158157
159158
160159async def copy (iter , table : tables , conn : asyncpg .Connection ):
161160 logger .debug (f"copying to { table } directly" )
162- if isinstance (iter , List ):
163- logger .debug ("Converting List into Async Iterator" )
164- iter = aiter (iter )
161+ logger .debug (f"iter: { iter } " )
162+ iter = aiter (iter )
165163 async with conn .transaction ():
166164 logger .debug ("Copying data" )
167165 await conn .copy_to_table (
168166 table ,
169167 source = iter ,
170168 columns = ["content" ],
171169 format = "csv" ,
172- quote = "'" ,
170+ quote = chr ( 27 ) ,
173171 delimiter = chr (31 ),
174172 )
175173 logger .debug ("Backfilling partitions" )
@@ -185,9 +183,7 @@ async def copy_ignore_duplicates(
185183 iter , table : tables , conn : asyncpg .Connection
186184):
187185 logger .debug (f"inserting to { table } ignoring duplicates" )
188- if isinstance (iter , List ):
189- logger .debug ("Converting List into Async Iterator" )
190- iter = aiter (iter )
186+ iter = aiter (iter )
191187 async with conn .transaction ():
192188 await conn .execute (
193189 """
@@ -200,7 +196,7 @@ async def copy_ignore_duplicates(
200196 source = iter ,
201197 columns = ["content" ],
202198 format = "csv" ,
203- quote = "'" ,
199+ quote = chr ( 27 ) ,
204200 delimiter = chr (31 ),
205201 )
206202 logger .debug ("Data Copied" )
@@ -225,9 +221,7 @@ async def copy_ignore_duplicates(
225221
226222async def copy_upsert (iter , table : tables , conn : asyncpg .Connection ):
227223 logger .debug (f"upserting to { table } " )
228- if isinstance (iter , List ):
229- logger .debug ("Converting List into Async Iterator" )
230- iter = aiter (iter )
224+ iter = aiter (iter )
231225 async with conn .transaction ():
232226 await conn .execute (
233227 """
@@ -240,7 +234,7 @@ async def copy_upsert(iter, table: tables, conn: asyncpg.Connection):
240234 source = iter ,
241235 columns = ["content" ],
242236 format = "csv" ,
243- quote = "'" ,
237+ quote = chr ( 27 ) ,
244238 delimiter = chr (31 ),
245239 )
246240 logger .debug ("Data Copied" )
0 commit comments