@@ -342,7 +342,9 @@ def get_files(self) -> Iterable[RemoteFile]:
342342 )
343343
344344 def infer_schema (self , files : List [RemoteFile ]) -> Mapping [str , Any ]:
345- self .logger .info (f"Starting schema inference for stream { self .name } with { len (files )} files" )
345+ self .logger .info (
346+ f"Starting schema inference for stream { self .name } with { len (files )} files"
347+ )
346348 loop = asyncio .get_event_loop ()
347349 schema = loop .run_until_complete (self ._infer_schema (files ))
348350 # as infer schema returns a Mapping that is assumed to be immutable, we need to create a deepcopy to avoid modifying the reference
@@ -377,7 +379,9 @@ async def _infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
377379 Each file type has a corresponding `infer_schema` handler.
378380 Dispatch on file type.
379381 """
380- self .logger .info (f"Starting concurrent schema inference for { len (files )} files with { self ._discovery_policy .n_concurrent_requests } concurrent requests" )
382+ self .logger .info (
383+ f"Starting concurrent schema inference for { len (files )} files with { self ._discovery_policy .n_concurrent_requests } concurrent requests"
384+ )
381385 base_schema : SchemaType = {}
382386 pending_tasks : Set [asyncio .tasks .Task [SchemaType ]] = set ()
383387
@@ -397,7 +401,9 @@ async def _infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
397401 )
398402 for task in done :
399403 try :
400- self .logger .debug (f"Completed schema inference for a file, { len (pending_tasks )} files remaining" )
404+ self .logger .debug (
405+ f"Completed schema inference for a file, { len (pending_tasks )} files remaining"
406+ )
401407 base_schema = merge_schemas (base_schema , task .result ())
402408 except AirbyteTracedException as ate :
403409 raise ate
0 commit comments