1+ """
2+ Module containing the base class for extractors.
3+ """
4+
15# Copyright 2021 Cognite AS
26#
37# Licensed under the Apache License, Version 2.0 (the "License");
2933from cognite .client import CogniteClient
3034from cognite .client .data_classes import ExtractionPipeline , ExtractionPipelineRun
3135from cognite .client .exceptions import CogniteAPIError
32- from cognite .extractorutils .configtools import BaseConfig , ConfigResolver , StateStoreConfig
36+ from cognite .extractorutils .configtools import (
37+ BaseConfig ,
38+ ConfigResolver ,
39+ StateStoreConfig ,
40+ )
3341from cognite .extractorutils .exceptions import InvalidConfigError
3442from cognite .extractorutils .metrics import BaseMetrics
35- from cognite .extractorutils .statestore import AbstractStateStore , LocalStateStore , NoStateStore
43+ from cognite .extractorutils .statestore import (
44+ AbstractStateStore ,
45+ LocalStateStore ,
46+ NoStateStore ,
47+ )
3648from cognite .extractorutils .threading import CancellationToken
3749
3850ReportStatus = Literal ["success" , "failure" , "seen" ]
3951
4052
4153class ReloadConfigAction (Enum ):
54+ """
55+ Enum for actions to take when a config file is reloaded.
56+ """
57+
4258 DO_NOTHING = 1
4359 REPLACE_ATTRIBUTE = 2
4460 SHUTDOWN = 3
@@ -130,8 +146,10 @@ def __init__(
130146
131147 def _initial_load_config (self , override_path : str | None = None ) -> None :
132148 """
133- Load a configuration file, either from the specified path, or by a path specified by the user in a command line
134- arg. Will quit further execution of no path is given.
149+ Load a configuration file.
150+
151+ Either from the specified path, or from a path specified by the user in a command line arg. Will quit further
152+ execution if no path is specified.
135153
136154 Args:
137155 override_path: Optional override for file path, ie don't parse command line arguments
@@ -154,6 +172,11 @@ def config_refresher() -> None:
154172 Thread (target = config_refresher , name = "ConfigReloader" , daemon = True ).start ()
155173
156174 def reload_config_callback (self ) -> None :
175+ """
176+ If the reload_config_action was set to CALLBACK, this method will be called when the config file is reloaded.
177+
178+ This method should be overridden in subclasses to provide custom behavior when the config file is reloaded.
179+ """
157180 self .logger .error ("Method for reloading configs has not been overridden in subclass" )
158181
159182 def _reload_config (self ) -> None :
@@ -177,9 +200,11 @@ def _reload_config(self) -> None:
177200
178201 def _load_state_store (self ) -> None :
179202 """
180- Searches through the config object for a StateStoreConfig. If found, it will use that configuration to generate
181- a state store, if no such config is found it will either create a LocalStateStore or a NoStateStore depending
182- on whether the ``use_default_state_store`` argument to the constructor was true or false.
203+ Searches through the config object for a StateStoreConfig.
204+
205+ If found, it will use that configuration to generate a state store, if no such config is found it will either
206+ create a LocalStateStore or a NoStateStore depending on whether the ``use_default_state_store`` argument to the
207+ constructor was true or false.
183208
184209 Either way, the state_store attribute is guaranteed to be set after calling this method.
185210 """
@@ -218,11 +243,11 @@ def recursive_find_state_store(d: dict[str, Any]) -> StateStoreConfig | None:
218243 def _report_run (self , status : ReportStatus , message : str ) -> None :
219244 """
220245 Report the status of the extractor run to the extraction pipeline.
246+
221247 Args:
222248 status: Status of the run, either success or failure or seen
223249 message: Message to report to the extraction pipeline
224250 """
225-
226251 MAX_MESSAGE_LENGTH_FOR_EXTRACTION_PIPELINE_RUN = 1000
227252 if self .extraction_pipeline :
228253 try :
@@ -246,7 +271,7 @@ def _report_run(self, status: ReportStatus, message: str) -> None:
246271
247272 def _report_success (self , message : str | None = None ) -> None :
248273 """
249- Called on a successful exit of the extractor
274+ Called on a successful exit of the extractor.
250275
251276 Args:
252277 message: Message to report to the extraction pipeline. If not provided, Extractor.success_message is taken.
@@ -256,7 +281,7 @@ def _report_success(self, message: str | None = None) -> None:
256281
257282 def _report_failure (self , message : str ) -> None :
258283 """
259- Called on an unsuccessful exit of the extractor
284+ Called on an unsuccessful exit of the extractor.
260285
261286 Args:
262287 message: Message to report to the extraction pipeline
@@ -265,7 +290,7 @@ def _report_failure(self, message: str) -> None:
265290
266291 def _report_error (self , exception : BaseException ) -> None :
267292 """
268- Called on an unsuccessful exit of the extractor
293+ Called on an unsuccessful exit of the extractor.
269294
270295 Args:
271296 exception: Exception object that caused the extractor to fail
@@ -280,7 +305,6 @@ def __enter__(self) -> "Extractor":
280305 Returns:
281306 self
282307 """
283-
284308 if str (os .getenv ("COGNITE_FUNCTION_RUNTIME" , False )).lower () != "true" :
285309 # Environment Variables
286310 env_file_found = load_dotenv (dotenv_path = "./.env" , override = True )
@@ -294,7 +318,9 @@ def __enter__(self) -> "Extractor":
294318 try :
295319 self ._initial_load_config (override_path = self .config_file_path )
296320 except InvalidConfigError as e :
297- print ("Critical error: Could not read config file" , file = sys .stderr ) # noqa: T201
321+ print ( # noqa: T201
322+ "Critical error: Could not read config file" , file = sys .stderr
323+ )
298324 print (str (e ), file = sys .stderr ) # noqa: T201
299325 sys .exit (1 )
300326
@@ -353,7 +379,10 @@ def heartbeat_loop() -> None:
353379 return self
354380
355381 def __exit__ (
356- self , exc_type : type [BaseException ] | None , exc_val : BaseException | None , exc_tb : TracebackType | None
382+ self ,
383+ exc_type : type [BaseException ] | None ,
384+ exc_val : BaseException | None ,
385+ exc_tb : TracebackType | None ,
357386 ) -> bool :
358387 """
359388 Shuts down the extractor. Makes sure states are preserved, that all uploads of data and metrics are done, etc.
@@ -387,26 +416,50 @@ def __exit__(
387416
388417 def run (self ) -> None :
389418 """
390- Run the extractor. Ensures that the Extractor is set up correctly (``run`` called within a ``with``) and calls
391- the ``run_handle``.
419+ Run the extractor.
420+
421+ Ensures that the Extractor is set up correctly (``run`` called within a ``with``) and calls the ``run_handle``.
392422
393- Can be overrided in subclasses.
423+ Can be overriden in subclasses.
394424 """
395425 if not self .started :
396426 raise ValueError ("You must run the extractor in a context manager" )
397427 if self .run_handle :
398- self .run_handle (self .cognite_client , self .state_store , self .config , self .cancellation_token )
428+ self .run_handle (
429+ self .cognite_client ,
430+ self .state_store ,
431+ self .config ,
432+ self .cancellation_token ,
433+ )
399434 else :
400435 raise ValueError ("No run_handle defined" )
401436
402437 @classmethod
403438 def get_current_config (cls ) -> CustomConfigClass :
439+ """
440+ Get the current configuration singleton.
441+
442+ Returns:
443+ The current configuration singleton
444+
445+ Raises:
446+ ValueError: If no configuration singleton has been created, meaning no config file has been loaded.
447+ """
404448 if Extractor ._config_singleton is None : # type: ignore
405449 raise ValueError ("No config singleton created. Have a config file been loaded?" )
406450 return Extractor ._config_singleton # type: ignore
407451
408452 @classmethod
409453 def get_current_statestore (cls ) -> AbstractStateStore :
454+ """
455+ Get the current state store singleton.
456+
457+ Returns:
458+ The current state store singleton
459+
460+ Raises:
461+ ValueError: If no state store singleton has been created, meaning no state store has been loaded.
462+ """
410463 if Extractor ._statestore_singleton is None :
411464 raise ValueError ("No state store singleton created. Have a state store been loaded?" )
412465 return Extractor ._statestore_singleton
0 commit comments