4141)
4242
4343
44- class BatchExtractionResult : # noqa: WPS338, WPS214
44+ class BatchExtractionResult :
4545 """Track results of batch extraction.
4646
4747 Calling any ``add_*`` method will add DTO item to the result, including nested DTOs,
@@ -74,7 +74,7 @@ def __init__(self):
7474
7575 def __repr__ (self ):
7676 return (
77- "ExtractionResult(" # noqa: WPS237
77+ "ExtractionResult("
7878 f"locations={ len (self ._locations )} , "
7979 f"datasets={ len (self ._datasets )} , "
8080 f"dataset_symlinks={ len (self ._dataset_symlinks )} , "
@@ -90,7 +90,7 @@ def __repr__(self):
9090 )
9191
9292 @staticmethod
93- def _add (context : dict [tuple , T ], new_item : T ) -> dict [tuple , T ]: # noqa: WPS602
93+ def _add (context : dict [tuple , T ], new_item : T ) -> dict [tuple , T ]:
9494 key = new_item .unique_key
9595 if key in context :
9696 old_item = context [key ]
@@ -130,12 +130,12 @@ def add_operation(self, operation: OperationDTO):
130130 self ._add (self ._operations , operation )
131131 self .add_run (operation .run )
132132
133- def add_input (self , input : InputDTO ):
134- self ._add (self ._inputs , input )
135- self .add_operation (input .operation )
136- self .add_dataset (input .dataset )
137- if input .schema :
138- self .add_schema (input .schema )
133+ def add_input (self , input_ : InputDTO ):
134+ self ._add (self ._inputs , input_ )
135+ self .add_operation (input_ .operation )
136+ self .add_dataset (input_ .dataset )
137+ if input_ .schema :
138+ self .add_schema (input_ .schema )
139139
140140 def add_output (self , output : OutputDTO ):
141141 self ._add (self ._outputs , output )
@@ -196,12 +196,12 @@ def _get_operation(self, operation_key: tuple) -> OperationDTO:
196196 return operation
197197
198198 def _get_input (self , input_key : tuple ) -> InputDTO :
199- input = self ._inputs [input_key ]
200- input .operation = self ._get_operation (input .operation .unique_key )
201- input .dataset = self ._get_dataset (input .dataset .unique_key )
202- if input .schema :
203- input .schema = self ._get_schema (input .schema .unique_key )
204- return input
199+ input_ = self ._inputs [input_key ]
200+ input_ .operation = self ._get_operation (input_ .operation .unique_key )
201+ input_ .dataset = self ._get_dataset (input_ .dataset .unique_key )
202+ if input_ .schema :
203+ input_ .schema = self ._get_schema (input_ .schema .unique_key )
204+ return input_
205205
206206 def _get_output (self , output_key : tuple ) -> OutputDTO :
207207 output = self ._outputs [output_key ]
@@ -252,23 +252,23 @@ def users(self) -> list[UserDTO]:
252252 return list (map (self ._get_user , self ._users ))
253253
254254
255- def extract_batch (events : list [OpenLineageRunEvent ]) -> BatchExtractionResult : # noqa: WPS231
255+ def extract_batch (events : list [OpenLineageRunEvent ]) -> BatchExtractionResult :
256256 result = BatchExtractionResult ()
257257
258258 for event in events :
259259 if event .job .facets .jobType and event .job .facets .jobType .jobType == OpenLineageJobType .JOB :
260260 operation = extract_operation (event )
261261 result .add_operation (operation )
262262 for input_dataset in event .inputs :
263- input , symlinks = extract_input (operation , input_dataset )
264- result .add_input (input )
263+ input_ , symlinks = extract_input (operation , input_dataset )
264+ result .add_input (input_ )
265265 for symlink in symlinks :
266266 result .add_dataset_symlink (symlink )
267267
268268 for output_dataset in event .outputs :
269269 output , symlinks = extract_output (operation , output_dataset )
270270 result .add_output (output )
271- for symlink in symlinks : # noqa: WPS440
271+ for symlink in symlinks :
272272 result .add_dataset_symlink (symlink )
273273
274274 for dataset in event .inputs + event .outputs :
0 commit comments