Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test_and_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ jobs:
COGNITE_PROJECT: extractor-tests
COGNITE_BASE_URL: https://greenfield.cognitedata.com
COGNITE_DEV_PROJECT: extractor-aws-dub-dev-testing
COGNITE_DEV_BASE_URL: https://aws-dub-dev.cognitedata.com/
COGNITE_DEV_BASE_URL: https://aws-dub-dev.cognitedata.com
COGNITE_DEV_TOKEN_SCOPES: https://aws-dub-dev.cognitedata.com/.default
COGNITE_INTEGRATION: pythonutils-test
run: |
Expand Down
10 changes: 10 additions & 0 deletions cognite/extractorutils/unstable/core/checkin_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,16 @@ def active_revision(self, value: ConfigRevision) -> None:
with self._lock:
self._active_revision = value

def reset_startup(self) -> None:
"""
Reset startup.

This will reset startup if and when the extractor restarts either
due to changes in the config or the extractor just starting for the first time.

"""
self._has_reported_startup = False

def set_on_revision_change_handler(self, on_revision_change: Callable[[int], None]) -> None:
"""
Set the handler for when the configuration revision changes.
Expand Down
2 changes: 2 additions & 0 deletions cognite/extractorutils/unstable/core/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,8 @@ def _main_runtime(self, args: Namespace) -> None:
message = self._message_queue.get_nowait()
match message:
case RuntimeMessage.RESTART:
self.logger.info("Extractor restart detected. Restarting extractor.")
checkin_worker.reset_startup()
continue

case _:
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -123,5 +123,8 @@ only-include = ["cognite"]
requires = ["hatchling"]
build-backend = "hatchling.build"

[dependency-groups]
dev = []

[project.scripts]
simple-extractor = "cognite.examples.unstable.extractors.simple_extractor.main:main"
4 changes: 2 additions & 2 deletions tests/test_unstable/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def json_callback(request: Any, context: Any) -> dict:

requests_mock.register_uri(
method="POST",
url=f"{connection_config.base_url}api/v1/projects/{connection_config.project}/integrations/startup",
url=f"{connection_config.base_url}/api/v1/projects/{connection_config.project}/integrations/startup",
json=json_callback,
status_code=status_code,
)
Expand All @@ -177,7 +177,7 @@ def json_callback(request: Any, context: Any) -> dict:

requests_mock.register_uri(
method="POST",
url=f"{connection_config.base_url}api/v1/projects/{connection_config.project}/integrations/checkin",
url=f"{connection_config.base_url}/api/v1/projects/{connection_config.project}/integrations/checkin",
json=json_callback,
status_code=status_code,
)
Expand Down
64 changes: 64 additions & 0 deletions tests/test_unstable/test_checkin_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,70 @@ def test_run_report_periodic(
assert "taskEvents" in checkin_bag[1]


def test_run_report_periodic_reset_startup(
connection_config: ConnectionConfig,
application_config: TestConfig,
requests_mock: requests_mock.Mocker,
mock_checkin_request: Callable[[requests_mock.Mocker], None],
mock_startup_request: Callable[[requests_mock.Mocker], None],
faker: faker.Faker,
checkin_bag: list,
) -> None:
requests_mock.real_http = True
mock_startup_request(requests_mock)
mock_checkin_request(requests_mock)
cognite_client = connection_config.get_cognite_client("test_checkin")
cancellation_token = CancellationToken()
worker = CheckinWorker(
cognite_client,
connection_config.integration.external_id,
logging.getLogger(__name__),
)

test_extractor = TestExtractor(
FullConfig(
connection_config=connection_config, application_config=application_config, current_config_revision=1
),
worker,
)
test_extractor._start_time = datetime.fromtimestamp(int(now() / 1000), tz=timezone.utc)
message_queue: Queue = Queue()
mp_cancel_event = Event()
test_extractor._attach_runtime_controls(cancel_event=mp_cancel_event, message_queue=message_queue)

worker.report_task_end("task1", faker.sentence())
worker.report_task_start("task1", faker.sentence())
worker.report_error(
Error(
level=ErrorLevel.error,
description=faker.sentence(),
task_name="task1",
extractor=test_extractor,
details=None,
)
)

process = Thread(
target=worker.run_periodic_checkin,
args=(cancellation_token, test_extractor._get_startup_request(), 2.0),
)
process.start()
worker.reset_startup()
process.join(timeout=3)
cancellation_token.cancel()

process = Thread(
target=worker.run_periodic_checkin,
args=(cancellation_token, test_extractor._get_startup_request(), 2.0),
)
process.start()
worker.reset_startup()
process.join(timeout=3)
cancellation_token.cancel()

assert len(checkin_bag) >= 3


def test_run_report_periodic_ensure_reorder(
connection_config: ConnectionConfig,
application_config: TestConfig,
Expand Down
Loading