Skip to content

Commit 319934b

Browse files
authored
Integrate checkin worker (#466)
1 parent 80d0b1e commit 319934b

File tree

11 files changed

+348
-293
lines changed

11 files changed

+348
-293
lines changed
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
project: ${COGNITE_PROJECT}
22
base_url: ${COGNITE_BASE_URL}
33
integration:
4-
external_id: "utils-test-keyvault-remote"
4+
external_id: ${COGNITE_INTEGRATION_ID}
55
authentication:
66
type: "client-credentials"
77
client_id: ${COGNITE_CLIENT_ID}
88
client_secret: ${COGNITE_CLIENT_SECRET}
99
token_url: ${COGNITE_TOKEN_URL}
10-
scopes: ${COGNITE_BASE_URL}/.default
10+
scopes: ${COGNITE_SCOPES}

cognite/examples/unstable/extractors/simple_extractor/main.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@
22
An example extractor that logs messages at various levels.
33
"""
44

5-
from cognite.extractorutils.unstable.configuration.models import ExtractorConfig
5+
from cognite.extractorutils.unstable.configuration.models import ExtractorConfig, IntervalConfig, TimeIntervalConfig
66
from cognite.extractorutils.unstable.core.base import Extractor, StartupTask, TaskContext
77
from cognite.extractorutils.unstable.core.runtime import Runtime
8+
from cognite.extractorutils.unstable.core.tasks import ScheduledTask
89

910

1011
class SimpleConfig(ExtractorConfig):
@@ -32,6 +33,13 @@ def __init_tasks__(self) -> None:
3233
Initializes and adds tasks to the extractor.
3334
"""
3435
self.add_task(StartupTask(name="main_task", target=self.run_my_task))
36+
self.add_task(
37+
ScheduledTask(
38+
name="scheduled_task",
39+
target=self.scheduled_task,
40+
schedule=IntervalConfig(type="interval", expression=TimeIntervalConfig("3s")),
41+
)
42+
)
3543

3644
# example task that logs messages at different levels
3745
def run_my_task(self, ctx: TaskContext) -> None:
@@ -46,6 +54,18 @@ def run_my_task(self, ctx: TaskContext) -> None:
4654
ctx.warning("This is a warning message.")
4755
ctx.info("Test finished.")
4856

57+
def scheduled_task(self, ctx: TaskContext) -> None:
58+
"""
59+
An example scheduled task that logs a message.
60+
61+
Args:
62+
ctx: The context for the task execution, used for logging.
63+
"""
64+
ctx.info("This is a scheduled task running.")
65+
ctx.warning("This is a warning from the scheduled task.")
66+
ctx.debug("Debugging the scheduled task execution.")
67+
ctx.error("This is an error message from the scheduled task.")
68+
4969
# add more tasks as needed
5070

5171

cognite/extractorutils/unstable/configuration/models.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,7 @@ class ExtractorConfig(ConfigModel):
433433
"""
434434

435435
log_handlers: list[LogHandlerConfig] = Field(default_factory=_log_handler_default)
436+
retry_startup: bool = True
436437

437438

438439
ConfigType = TypeVar("ConfigType", bound=ExtractorConfig)

cognite/extractorutils/unstable/core/base.py

Lines changed: 12 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -74,15 +74,12 @@ def my_task_function(self, task_context: TaskContext) -> None:
7474
ExtractorInfo,
7575
StartupRequest,
7676
TaskType,
77-
TaskUpdate,
78-
)
79-
from cognite.extractorutils.unstable.core._dto import (
80-
Error as DtoError,
8177
)
8278
from cognite.extractorutils.unstable.core._dto import (
8379
Task as DtoTask,
8480
)
8581
from cognite.extractorutils.unstable.core._messaging import RuntimeMessage
82+
from cognite.extractorutils.unstable.core.checkin_worker import CheckinWorker
8683
from cognite.extractorutils.unstable.core.errors import Error, ErrorLevel
8784
from cognite.extractorutils.unstable.core.logger import CogniteLogger
8885
from cognite.extractorutils.unstable.core.restart_policy import WHEN_CONTINUOUS_TASKS_CRASHES, RestartPolicy
@@ -142,8 +139,11 @@ class Extractor(Generic[ConfigType], CogniteLogger):
142139

143140
RESTART_POLICY: RestartPolicy = WHEN_CONTINUOUS_TASKS_CRASHES
144141

145-
def __init__(self, config: FullConfig[ConfigType]) -> None:
142+
cancellation_token: CancellationToken
143+
144+
def __init__(self, config: FullConfig[ConfigType], checkin_worker: CheckinWorker) -> None:
146145
self._logger = logging.getLogger(f"{self.EXTERNAL_ID}.main")
146+
self._checkin_worker = checkin_worker
147147

148148
self.cancellation_token = CancellationToken()
149149
self.cancellation_token.cancel_on_interrupt()
@@ -161,8 +161,6 @@ def __init__(self, config: FullConfig[ConfigType]) -> None:
161161
self._scheduler = TaskScheduler(self.cancellation_token.create_child_token())
162162

163163
self._tasks: list[Task] = []
164-
self._task_updates: list[TaskUpdate] = []
165-
self._errors: dict[str, Error] = {}
166164
self._start_time: datetime
167165

168166
self.__init_tasks__()
@@ -253,43 +251,6 @@ def _attach_runtime_controls(self, *, cancel_event: MpEvent, message_queue: Queu
253251
self._set_runtime_message_queue(message_queue)
254252
self._setup_cancellation_watcher(cancel_event)
255253

256-
def _checkin(self) -> None:
257-
with self._checkin_lock:
258-
task_updates = [t.model_dump(mode="json") for t in self._task_updates]
259-
self._task_updates.clear()
260-
261-
error_updates = [
262-
DtoError(
263-
external_id=e.external_id,
264-
level=e.level,
265-
description=e.description,
266-
details=e.details,
267-
start_time=e.start_time,
268-
end_time=e.end_time,
269-
task=e._task_name if e._task_name is not None else None,
270-
).model_dump(mode="json")
271-
for e in self._errors.values()
272-
]
273-
self._errors.clear()
274-
275-
res = self.cognite_client.post(
276-
f"/api/v1/projects/{self.cognite_client.config.project}/integrations/checkin",
277-
json={
278-
"externalId": self.connection_config.integration.external_id,
279-
"taskEvents": task_updates,
280-
"errors": error_updates,
281-
},
282-
headers={"cdf-version": "alpha"},
283-
)
284-
new_config_revision = res.json().get("lastConfigRevision")
285-
286-
if (
287-
new_config_revision
288-
and self.current_config_revision != "local"
289-
and new_config_revision > self.current_config_revision
290-
):
291-
self.restart()
292-
293254
def _get_startup_request(self) -> StartupRequest:
294255
return StartupRequest(
295256
external_id=self.connection_config.integration.external_id,
@@ -310,17 +271,10 @@ def _get_startup_request(self) -> StartupRequest:
310271
)
311272

312273
def _run_checkin(self) -> None:
313-
while not self.cancellation_token.is_cancelled:
314-
try:
315-
self._logger.debug("Running checkin")
316-
self._checkin()
317-
except Exception:
318-
self._logger.exception("Error during checkin")
319-
self.cancellation_token.wait(10)
274+
self._checkin_worker.run_periodic_checkin(self.cancellation_token, self._get_startup_request())
320275

321276
def _report_error(self, error: Error) -> None:
322-
with self._checkin_lock:
323-
self._errors[error.external_id] = error
277+
self._checkin_worker.report_error(error)
324278

325279
def _new_error(
326280
self,
@@ -348,8 +302,8 @@ def restart(self) -> None:
348302
self.cancellation_token.cancel()
349303

350304
@classmethod
351-
def _init_from_runtime(cls, config: FullConfig[ConfigType]) -> Self:
352-
return cls(config)
305+
def _init_from_runtime(cls, config: FullConfig[ConfigType], checkin_worker: CheckinWorker) -> Self:
306+
return cls(config, checkin_worker)
353307

354308
def add_task(self, task: Task) -> None:
355309
"""
@@ -368,10 +322,7 @@ def run_task(task_context: TaskContext) -> None:
368322
A wrapped version of the task's target, with tracking and error handling.
369323
"""
370324
# Record a task start
371-
with self._checkin_lock:
372-
self._task_updates.append(
373-
TaskUpdate(type="started", name=task.name, timestamp=now()),
374-
)
325+
self._checkin_worker.report_task_start(name=task.name, timestamp=now())
375326

376327
try:
377328
# Run task
@@ -390,10 +341,7 @@ def run_task(task_context: TaskContext) -> None:
390341

391342
finally:
392343
# Record task end
393-
with self._checkin_lock:
394-
self._task_updates.append(
395-
TaskUpdate(type="ended", name=task.name, timestamp=now()),
396-
)
344+
self._checkin_worker.report_task_end(name=task.name, timestamp=now())
397345

398346
task.target = run_task
399347
self._tasks.append(task)
@@ -411,13 +359,6 @@ def run_task(task_context: TaskContext) -> None:
411359
),
412360
)
413361

414-
def _report_extractor_info(self) -> None:
415-
self.cognite_client.post(
416-
f"/api/v1/projects/{self.cognite_client.config.project}/integrations/extractorinfo",
417-
json=self._get_startup_request().model_dump(mode="json"),
418-
headers={"cdf-version": "alpha"},
419-
)
420-
421362
def start(self) -> None:
422363
"""
423364
Start the extractor.
@@ -427,7 +368,6 @@ def start(self) -> None:
427368
"""
428369
self._setup_logging()
429370
self._start_time = datetime.now(tz=timezone.utc)
430-
self._report_extractor_info()
431371
Thread(target=self._run_checkin, name="ExtractorCheckin", daemon=True).start()
432372

433373
def stop(self) -> None:
@@ -456,10 +396,7 @@ def __exit__(
456396
Stop the extractor when exiting the context manager.
457397
"""
458398
self.stop()
459-
with self._checkin_lock:
460-
self._checkin()
461-
462-
self._logger.info("Shutting down extractor")
399+
self._checkin_worker.flush(self.cancellation_token)
463400
return exc_val is None
464401

465402
def run(self) -> None:

cognite/extractorutils/unstable/core/checkin_worker.py

Lines changed: 20 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ def __init__(
8787
self._retry_startup: bool = retry_startup
8888
self._has_reported_startup: bool = False
8989
self._active_revision: ConfigRevision = active_revision
90-
self._errors: dict[str, DtoError] = {}
90+
self._errors: dict[str, Error] = {}
9191
self._task_updates: list[TaskUpdate] = []
9292

9393
@property
@@ -127,7 +127,7 @@ def run_periodic_checkin(
127127
self._logger.debug("Running periodic check-in with interval %.2f seconds", report_interval)
128128
self.flush(cancellation_token)
129129
self._logger.debug(f"Check-in worker finished check-in, sleeping for {report_interval:.2f} seconds")
130-
sleep(report_interval)
130+
cancellation_token.wait(report_interval)
131131

132132
def _run_startup_report(
133133
self, cancellation_token: CancellationToken, startup_request: StartupRequest, interval: float | None = None
@@ -168,9 +168,7 @@ def _handle_checkin_response(self, response: JSONType) -> None:
168168
"and configured to use remote config for the new config to take effect.",
169169
)
170170
elif self._active_revision < checkin_response.last_config_revision:
171-
self._logger.info(
172-
f"Remote config changed from {self._active_revision} to {checkin_response.last_config_revision}."
173-
)
171+
self._active_revision = checkin_response.last_config_revision
174172
self._on_revision_change(checkin_response.last_config_revision)
175173

176174
def flush(self, cancellation_token: CancellationToken) -> None:
@@ -197,7 +195,7 @@ def report_checkin(self, cancellation_token: CancellationToken) -> None:
197195
"""
198196
with self._lock:
199197
if not self._has_reported_startup:
200-
new_errors = [error for error in self._errors.values() if error.task is None]
198+
new_errors = [error for error in self._errors.values() if error._task_name is None]
201199
if len(new_errors) == 0:
202200
self._logger.info("No startup request has been reported yet, skipping check-in.")
203201
return
@@ -225,11 +223,8 @@ def report_checkin(self, cancellation_token: CancellationToken) -> None:
225223
task_updates_to_write = task_updates
226224
task_updates = []
227225
self.try_write_checkin(
228-
CheckinRequest(
229-
external_id=self._integration,
230-
errors=errors_to_write if len(errors_to_write) > 0 else None,
231-
task_events=task_updates_to_write if len(task_updates_to_write) > 0 else None,
232-
),
226+
errors_to_write,
227+
task_updates_to_write,
233228
)
234229
break
235230

@@ -264,11 +259,8 @@ def report_checkin(self, cancellation_token: CancellationToken) -> None:
264259
if tasks_idx > 0:
265260
task_updates = task_updates[tasks_idx:]
266261
self.try_write_checkin(
267-
CheckinRequest(
268-
external_id=self._integration,
269-
errors=errors_to_write if len(errors_to_write) > 0 else None,
270-
task_events=task_updates_to_write if len(task_updates_to_write) > 0 else None,
271-
)
262+
errors_to_write,
263+
task_updates_to_write,
272264
)
273265
if errs_idx == 0 and tasks_idx == 0:
274266
self._logger.debug("Check-in worker finished writing check-in.")
@@ -278,15 +270,21 @@ def report_checkin(self, cancellation_token: CancellationToken) -> None:
278270
self._logger.debug("Extractor was stopped during check-in, requeuing remaining errors and task updates.")
279271
self._requeue_checkin(new_errors, task_updates)
280272

281-
def try_write_checkin(self, checkin_request: CheckinRequest) -> None:
273+
def try_write_checkin(self, errors: list[Error], task_updates: list[TaskUpdate]) -> None:
282274
"""
283275
We try to write a check-in.
284276
285277
This will try to write a check in to integrations.
286278
287279
Arguments:
288-
checkin_request: The check-in request to write.
280+
errors(list[Error]): The errors to write.
281+
task_updates(list[TaskUpdate]): The task updates to write.
289282
"""
283+
checkin_request = CheckinRequest(
284+
external_id=self._integration,
285+
errors=list(map(DtoError.from_internal, errors)) if len(errors) > 0 else None,
286+
task_events=task_updates if len(task_updates) > 0 else None,
287+
)
290288
should_requeue = self._wrap_checkin_like_request(
291289
lambda: self._cognite_client.post(
292290
f"/api/v1/projects/{self._cognite_client.config.project}/integrations/checkin",
@@ -296,7 +294,7 @@ def try_write_checkin(self, checkin_request: CheckinRequest) -> None:
296294
)
297295

298296
if should_requeue:
299-
self._requeue_checkin(checkin_request.errors, checkin_request.task_events)
297+
self._requeue_checkin(errors, checkin_request.task_events)
300298

301299
def report_error(self, error: Error) -> None:
302300
"""
@@ -307,7 +305,7 @@ def report_error(self, error: Error) -> None:
307305
"""
308306
with self._lock:
309307
if error.external_id not in self._errors:
310-
self._errors[error.external_id] = DtoError.from_internal(error)
308+
self._errors[error.external_id] = error
311309
else:
312310
self._logger.warning(f"Error {error.external_id} already reported, skipping re-reporting.")
313311

@@ -335,7 +333,7 @@ def report_task_end(self, name: str, message: MessageType | None = None, timesta
335333
TaskUpdate(type="ended", name=name, timestamp=timestamp or (int(now() * 1000)), message=message)
336334
)
337335

338-
def _requeue_checkin(self, errors: list[DtoError] | None, task_updates: list[TaskUpdate] | None) -> None:
336+
def _requeue_checkin(self, errors: list[Error] | None, task_updates: list[TaskUpdate] | None) -> None:
339337
with self._lock:
340338
for error in errors or []:
341339
if error.external_id not in self._errors:
@@ -373,6 +371,7 @@ def _wrap_checkin_like_request(self, request: Callable[[], Response]) -> bool:
373371

374372
return True
375373
except Exception as e:
374+
self._logger.critical(f"Extractor could not connect to CDF {e!s}")
376375
self._on_fatal_error(e)
377376
return True
378377

0 commit comments

Comments
 (0)