Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 61 additions & 46 deletions cognite/extractorutils/unstable/configuration/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,7 @@ class RawStateStoreConfig(RawDestinationConfig):
Configuration of a state store based on CDF RAW.
"""

type: Literal["raw"]
upload_interval: TimeIntervalConfig = Field(default_factory=lambda: TimeIntervalConfig("30s"))


Expand All @@ -826,17 +827,70 @@ class LocalStateStoreConfig(ConfigModel):
Configuration of a state store using a local JSON file.
"""

type: Literal["local"]
path: Path
save_interval: TimeIntervalConfig = Field(default_factory=lambda: TimeIntervalConfig("30s"))


class StateStoreConfig(ConfigModel):
StateStoreConfig = Annotated[RawStateStoreConfig | LocalStateStoreConfig, Field(discriminator="type")]


def create_state_store(
config: StateStoreConfig | None,
cdf_client: CogniteClient | None = None,
default_to_local: bool = True,
cancellation_token: CancellationToken | None = None,
) -> AbstractStateStore:
"""
Configuration of the State Store, containing ``LocalStateStoreConfig`` or ``RawStateStoreConfig``.
Create a state store object based on the config.

Args:
config: StateStoreConfig object defining the state store to create. If None, the behavior depends on the
default_to_local parameter.
cdf_client: CogniteClient object to use in case of a RAW state store (ignored otherwise)
default_to_local: If true, return a LocalStateStore if no state store is configured. Otherwise return a
NoStateStore
cancellation_token: Cancellation token to pass to created state stores

Returns:
An (uninitialized) state store
"""
if isinstance(config, RawStateStoreConfig):
if cdf_client is None:
raise TypeError("A cognite client object must be provided when state store is RAW")

return RawStateStore(
cdf_client=cdf_client,
database=config.database,
table=config.table,
save_interval=config.upload_interval.seconds,
cancellation_token=cancellation_token,
)

if isinstance(config, LocalStateStoreConfig):
if config.path.is_dir():
raise IsADirectoryError(config.path)

return LocalStateStore(
file_path=str(config.path),
save_interval=config.save_interval.seconds,
cancellation_token=cancellation_token,
)

if default_to_local:
return LocalStateStore(file_path="states.json", cancellation_token=cancellation_token)

return NoStateStore()


class ExtractorConfig(ConfigModel):
"""
Base class for application configuration for extractors.
"""

raw: RawStateStoreConfig | None = None
local: LocalStateStoreConfig | None = None
state_store: StateStoreConfig | None = None
metrics: MetricsConfig | None = None
log_handlers: list[LogHandlerConfig] = Field(default_factory=_log_handler_default)

def create_state_store(
self,
Expand All @@ -845,56 +899,17 @@ def create_state_store(
cancellation_token: CancellationToken | None = None,
) -> AbstractStateStore:
"""
Create a state store object based on the config.
Create a state store based on the configuration.

Args:
cdf_client: CogniteClient object to use in case of a RAW state store (ignored otherwise)
default_to_local: If true, return a LocalStateStore if no state store is configured. Otherwise return a
NoStateStore
cancellation_token: Cancellation token to pass to created state stores

Returns:
An (uninitialized) state store
An (uninitialized) state store based on the configuration.
"""
if self.raw and self.local:
raise ValueError("Only one state store can be used simultaneously")

if self.raw:
if cdf_client is None:
raise TypeError("A cognite client object must be provided when state store is RAW")

return RawStateStore(
cdf_client=cdf_client,
database=self.raw.database,
table=self.raw.table,
save_interval=self.raw.upload_interval.seconds,
cancellation_token=cancellation_token,
)

if self.local:
if self.local.path.is_dir():
raise IsADirectoryError(self.local.path)

return LocalStateStore(
file_path=str(self.local.path),
save_interval=self.local.save_interval.seconds,
cancellation_token=cancellation_token,
)

if default_to_local:
return LocalStateStore(file_path="states.json", cancellation_token=cancellation_token)

return NoStateStore()


class ExtractorConfig(ConfigModel):
"""
Base class for application configuration for extractors.
"""

state_store: StateStoreConfig | None = None
metrics: MetricsConfig | None = None
log_handlers: list[LogHandlerConfig] = Field(default_factory=_log_handler_default)
return create_state_store(self.state_store, cdf_client, default_to_local, cancellation_token)


ConfigType = TypeVar("ConfigType", bound=ExtractorConfig)
Expand Down
18 changes: 7 additions & 11 deletions cognite/extractorutils/unstable/core/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ def my_task_function(self, task_context: TaskContext) -> None:
from cognite.extractorutils.metrics import BaseMetrics, MetricsType, safe_get
from cognite.extractorutils.statestore import (
AbstractStateStore,
LocalStateStore,
NoStateStore,
)
from cognite.extractorutils.threading import CancellationToken
Expand All @@ -83,6 +82,7 @@ def my_task_function(self, task_context: TaskContext) -> None:
ExtractorConfig,
LogConsoleHandlerConfig,
LogFileHandlerConfig,
create_state_store,
)
from cognite.extractorutils.unstable.core._dto import (
CogniteModel,
Expand Down Expand Up @@ -295,16 +295,12 @@ def _load_state_store(self) -> None:
"""
state_store_config = self.application_config.state_store

if state_store_config:
self.state_store = state_store_config.create_state_store(
cdf_client=self.cognite_client,
default_to_local=self.USE_DEFAULT_STATE_STORE,
cancellation_token=self.cancellation_token,
)
elif self.USE_DEFAULT_STATE_STORE:
self.state_store = LocalStateStore("states.json", cancellation_token=self.cancellation_token)
else:
self.state_store = NoStateStore()
self.state_store = create_state_store(
config=state_store_config,
cdf_client=self.cognite_client,
default_to_local=self.USE_DEFAULT_STATE_STORE,
cancellation_token=self.cancellation_token,
)

try:
self.state_store.initialize()
Expand Down
17 changes: 17 additions & 0 deletions schema/unstable/either_id.schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"$id": "either_id.schema.json",
"$schema": "https://json-schema.org/draft/2020-12/schema",
"type": "object",
"unevaluatedProperties": false,
"oneOf": [{ "required": ["id"] }, { "required": ["external-id"] }],
"properties": {
"id": {
"type": "integer",
"description": "Resource internal ID"
},
"external-id": {
"type": "string",
"description": "Resource external ID"
}
}
}
22 changes: 22 additions & 0 deletions schema/unstable/extractor_config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"$id": "extractor_config.schema.json",
"$schema": "https://json-schema.org/draft/2020-12/schema",
"type": "object",
"description": "Base extractor configuration object",
"unevaluatedProperties": false,
"properties": {
"state-store": {
"$ref": "state_store_config.schema.json"
},
"metrics": {
"$ref": "metrics_config.schema.json"
},
"log-handlers": {
"type": "array",
"description": "List of log handlers to use for logging from the extractor",
"items": {
"$ref": "log_handler_config.schema.json"
}
}
}
}
76 changes: 76 additions & 0 deletions schema/unstable/log_handler_config.schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
{
"$id": "log_handler_config.schema.json",
"$schema": "https://json-schema.org/draft/2020-12/schema",
"type": "object",
"unevaluatedProperties": false,
"description": "Configuration of a log handler for an extractor.",
"discriminatorProp": "type",
"oneOf": [
{
"type": "object",
"description": "Configuration for a log handler that writes logs to a local file",
"unevaluatedProperties": false,
"title": "File",
"properties": {
"type": {
"type": "string",
"const": "file",
"description": "Type of log handler, must be 'file' for a log handler that writes logs to a local file"
},
"path": {
"type": "string",
"description": "File path to the log file to write logs to"
},
"retention": {
"type": "integer",
"description": "Number of log files to retain when rotating logs.",
"default": 7
},
"level": {
"type": "string",
"description": "Select the verbosity level for console logging. To reduce the verbosity levels, use `DEBUG`, `INFO`, `WARNING`, `ERROR`, or `CRITICAL`.",
"enum": [
"DEBUG",
"INFO",
"WARNING",
"ERROR",
"CRITICAL"
]
}
},
"required": [
"type",
"path",
"level"
]
},
{
"type": "object",
"description": "Configuration for a log handler that writes logs to the console",
"unevaluatedProperties": false,
"title": "Console",
"properties": {
"type": {
"type": "string",
"const": "console",
"description": "Type of log handler, must be 'console' for a log handler that writes logs to the console"
},
"level": {
"type": "string",
"description": "Select the verbosity level for console logging. To reduce the verbosity levels, use `DEBUG`, `INFO`, `WARNING`, `ERROR`, or `CRITICAL`.",
"enum": [
"DEBUG",
"INFO",
"WARNING",
"ERROR",
"CRITICAL"
]
}
},
"required": [
"type",
"level"
]
}
]
}
96 changes: 96 additions & 0 deletions schema/unstable/metrics_config.schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
{
"$id": "metrics_config.schema.json",
"$schema": "https://json-schema.org/draft/2020-12/schema",
"type": "object",
"unevaluatedProperties": false,
"description": "Configuration of metrics collection for an extractor",
"properties": {
"push-gateways": {
"type": "array",
"description": "List of Prometheus Push Gateways to push metrics to",
"items": {
"type": "object",
"unevaluatedProperties": false,
"properties": {
"host": {
"type": "string",
"description": "Host of the Prometheus Push Gateway, e.g. 'pushgateway.example.com'"
},
"job-name": {
"type": "string",
"description": "Job name to use when pushing metrics to the Prometheus Push Gateway"
},
"username": {
"type": "string",
"description": "Username for authenticating with the Prometheus Push Gateway, if required"
},
"password": {
"type": "string",
"description": "Password for authenticating with the Prometheus Push Gateway, if required"
},
"clear-after": {
"type": "string",
"description": "Interval for clearing metrics from the Prometheus Push Gateway, on the form 12s, 15m, 1h, etc. If not specified, metrics will not be cleared from the Push Gateway."
},
"push-interval": {
"type": "string",
"description": "Interval for pushing metrics to the Prometheus Push Gateway, on the form 12s, 15m, 1h, etc.",
"default": "30s"
}
},
"required": [
"host",
"job-name"
]
}
},
"cognite": {
"type": "object",
"unevaluatedProperties": false,
"description": "Configuration for pushing metrics to CDF Classic time-series",
"properties": {
"external-id-prefix": {
"type": "string",
"description": "Prefix on external ID used when creating CDF time series to store metrics."
},
"asset-name": {
"type": "string",
"description": "Enter the name for a CDF asset that will have all the metrics time series attached to it."
},
"asset-external-id": {
"type": "string",
"description": "Enter the external ID for a CDF asset with all the metrics time series attached to it."
},
"push-interval": {
"type": "string",
"description": "Enter the interval between each push to CDF, on the form 12s, 15m, 1h, etc.",
"default": "30s"
},
"data-set": {
"description": "The data set where the metrics will be created.",
"$ref": "either_id.schema.json"
}
},
"required": [
"external-id-prefix"
]
},
"server": {
"type": "object",
"unevaluatedProperties": false,
"description": "Configuration for exposing an HTTP server with Prometheus metrics for scraping.",
"properties": {
"host": {
"type": "string",
"description": "Host to run the Prometheus server on.",
"default": "0.0.0.0"
},
"port": {
"type": "integer",
"description": "Port to run the Prometheus server on.",
"default": 8080
}
}
}
}
}
Loading
Loading