@@ -143,33 +143,30 @@ async def aiter(list: List):
143143 elif isinstance (i , dict ):
144144 i = orjson .dumps (i ).decode ("utf-8" )
145145 if isinstance (i , str ):
146- yield "\n " .join (
146+ line = "\n " .join (
147147 [
148- i .rstrip ().replace (
149- r"\n" , r"\\n"
150- ).replace (
151- r"\t" , r"\\t"
152- ).replace (
153- "'" , "''"
154- )
155- ]).encode ("utf-8" )
148+ i .rstrip ()
149+ .replace (r"\n" , r"\\n" )
150+ .replace (r"\t" , r"\\t" )
151+ ]
152+ ).encode ("utf-8" )
153+ yield line
156154 else :
157155 raise Exception (f"Could not parse { i } " )
158156
159157
160158async def copy (iter , table : tables , conn : asyncpg .Connection ):
161159 logger .debug (f"copying to { table } directly" )
162- if isinstance (iter , List ):
163- logger .debug ("Converting List into Async Iterator" )
164- iter = aiter (iter )
160+ logger .debug (f"iter: { iter } " )
161+ iter = aiter (iter )
165162 async with conn .transaction ():
166163 logger .debug ("Copying data" )
167164 await conn .copy_to_table (
168165 table ,
169166 source = iter ,
170167 columns = ["content" ],
171168 format = "csv" ,
172- quote = "'" ,
169+ quote = chr ( 27 ) ,
173170 delimiter = chr (31 ),
174171 )
175172 logger .debug ("Backfilling partitions" )
@@ -185,9 +182,7 @@ async def copy_ignore_duplicates(
185182 iter , table : tables , conn : asyncpg .Connection
186183):
187184 logger .debug (f"inserting to { table } ignoring duplicates" )
188- if isinstance (iter , List ):
189- logger .debug ("Converting List into Async Iterator" )
190- iter = aiter (iter )
185+ iter = aiter (iter )
191186 async with conn .transaction ():
192187 await conn .execute (
193188 """
@@ -200,7 +195,7 @@ async def copy_ignore_duplicates(
200195 source = iter ,
201196 columns = ["content" ],
202197 format = "csv" ,
203- quote = "'" ,
198+ quote = chr ( 27 ) ,
204199 delimiter = chr (31 ),
205200 )
206201 logger .debug ("Data Copied" )
@@ -225,9 +220,7 @@ async def copy_ignore_duplicates(
225220
226221async def copy_upsert (iter , table : tables , conn : asyncpg .Connection ):
227222 logger .debug (f"upserting to { table } " )
228- if isinstance (iter , List ):
229- logger .debug ("Converting List into Async Iterator" )
230- iter = aiter (iter )
223+ iter = aiter (iter )
231224 async with conn .transaction ():
232225 await conn .execute (
233226 """
@@ -240,7 +233,7 @@ async def copy_upsert(iter, table: tables, conn: asyncpg.Connection):
240233 source = iter ,
241234 columns = ["content" ],
242235 format = "csv" ,
243- quote = "'" ,
236+ quote = chr ( 27 ) ,
244237 delimiter = chr (31 ),
245238 )
246239 logger .debug ("Data Copied" )
0 commit comments