Skip to content

Commit bb20a96

Browse files
authored
Merge branch 'master' into connection-config-schemas
2 parents af91747 + ae83c7c commit bb20a96

File tree

6 files changed

+85
-3
lines changed

6 files changed

+85
-3
lines changed

.github/workflows/test_and_build.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ jobs:
4646
COGNITE_PROJECT: extractor-tests
4747
COGNITE_BASE_URL: https://greenfield.cognitedata.com
4848
COGNITE_DEV_PROJECT: extractor-aws-dub-dev-testing
49-
COGNITE_DEV_BASE_URL: https://aws-dub-dev.cognitedata.com/
49+
COGNITE_DEV_BASE_URL: https://aws-dub-dev.cognitedata.com
5050
COGNITE_DEV_TOKEN_SCOPES: https://aws-dub-dev.cognitedata.com/.default
5151
COGNITE_INTEGRATION: pythonutils-test
5252
run: |

cognite/extractorutils/unstable/core/checkin_worker.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,18 @@ def active_revision(self, value: ConfigRevision) -> None:
9696
with self._lock:
9797
self._active_revision = value
9898

99+
def reset_startup(self) -> None:
100+
"""
101+
Reset startup.
102+
103+
This will reset startup if and when the extractor restarts either
104+
due to changes in the config or the extractor just starting for the first time.
105+
106+
"""
107+
with self._lock:
108+
self._is_running = False
109+
self._has_reported_startup = False
110+
99111
def set_on_revision_change_handler(self, on_revision_change: Callable[[int], None]) -> None:
100112
"""
101113
Set the handler for when the configuration revision changes.

cognite/extractorutils/unstable/core/runtime.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -525,6 +525,8 @@ def _main_runtime(self, args: Namespace) -> None:
525525
message = self._message_queue.get_nowait()
526526
match message:
527527
case RuntimeMessage.RESTART:
528+
self.logger.info("Extractor restart detected. Restarting extractor.")
529+
checkin_worker.reset_startup()
528530
continue
529531

530532
case _:

pyproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,5 +123,8 @@ only-include = ["cognite"]
123123
requires = ["hatchling"]
124124
build-backend = "hatchling.build"
125125

126+
[dependency-groups]
127+
dev = []
128+
126129
[project.scripts]
127130
simple-extractor = "cognite.examples.unstable.extractors.simple_extractor.main:main"

tests/test_unstable/conftest.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ def json_callback(request: Any, context: Any) -> dict:
151151

152152
requests_mock.register_uri(
153153
method="POST",
154-
url=f"{connection_config.base_url}api/v1/projects/{connection_config.project}/integrations/startup",
154+
url=f"{connection_config.base_url}/api/v1/projects/{connection_config.project}/integrations/startup",
155155
json=json_callback,
156156
status_code=status_code,
157157
)
@@ -177,7 +177,7 @@ def json_callback(request: Any, context: Any) -> dict:
177177

178178
requests_mock.register_uri(
179179
method="POST",
180-
url=f"{connection_config.base_url}api/v1/projects/{connection_config.project}/integrations/checkin",
180+
url=f"{connection_config.base_url}/api/v1/projects/{connection_config.project}/integrations/checkin",
181181
json=json_callback,
182182
status_code=status_code,
183183
)

tests/test_unstable/test_checkin_worker.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,71 @@ def test_run_report_periodic(
157157
assert "taskEvents" in checkin_bag[1]
158158

159159

160+
def test_run_report_periodic_reset_startup(
161+
connection_config: ConnectionConfig,
162+
application_config: TestConfig,
163+
requests_mock: requests_mock.Mocker,
164+
mock_checkin_request: Callable[[requests_mock.Mocker], None],
165+
mock_startup_request: Callable[[requests_mock.Mocker], None],
166+
faker: faker.Faker,
167+
checkin_bag: list,
168+
) -> None:
169+
requests_mock.real_http = True
170+
mock_startup_request(requests_mock)
171+
mock_checkin_request(requests_mock)
172+
cognite_client = connection_config.get_cognite_client("test_checkin")
173+
cancellation_token = CancellationToken()
174+
worker = CheckinWorker(
175+
cognite_client,
176+
connection_config.integration.external_id,
177+
logging.getLogger(__name__),
178+
)
179+
180+
test_extractor = TestExtractor(
181+
FullConfig(
182+
connection_config=connection_config, application_config=application_config, current_config_revision=1
183+
),
184+
worker,
185+
)
186+
test_extractor._start_time = datetime.fromtimestamp(int(now() / 1000), tz=timezone.utc)
187+
message_queue: Queue = Queue()
188+
mp_cancel_event = Event()
189+
test_extractor._attach_runtime_controls(cancel_event=mp_cancel_event, message_queue=message_queue)
190+
191+
worker.report_task_end("task1", faker.sentence())
192+
worker.report_task_start("task1", faker.sentence())
193+
worker.report_error(
194+
Error(
195+
level=ErrorLevel.error,
196+
description=faker.sentence(),
197+
task_name="task1",
198+
extractor=test_extractor,
199+
details=None,
200+
)
201+
)
202+
203+
process = Thread(
204+
target=worker.run_periodic_checkin,
205+
args=(cancellation_token, test_extractor._get_startup_request(), 2.0),
206+
)
207+
process.start()
208+
worker.reset_startup()
209+
process.join(timeout=3)
210+
cancellation_token.cancel()
211+
212+
cancellation_token = CancellationToken()
213+
process = Thread(
214+
target=worker.run_periodic_checkin,
215+
args=(cancellation_token, test_extractor._get_startup_request(), 2.0),
216+
)
217+
process.start()
218+
worker.reset_startup()
219+
process.join(timeout=3)
220+
cancellation_token.cancel()
221+
222+
assert len(checkin_bag) >= 3
223+
224+
160225
def test_run_report_periodic_ensure_reorder(
161226
connection_config: ConnectionConfig,
162227
application_config: TestConfig,

0 commit comments

Comments
 (0)