Skip to content

Commit e3824ea

Browse files
committed
Modify compound ingest operation to wait for table build completion
1 parent 9c59fb6 commit e3824ea

File tree

9 files changed

+226
-71
lines changed

9 files changed

+226
-71
lines changed

src/citrine/__version__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "3.10.0"
1+
__version__ = "3.11.0"

src/citrine/jobs/job.py

Lines changed: 58 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
from gemd.enumeration.base_enumeration import BaseEnumeration
12
from logging import getLogger
23
from time import time, sleep
34
from typing import Union
45
from uuid import UUID
6+
from warnings import warn
57

68
from citrine._rest.resource import Resource
79
from citrine._serialization.properties import Set as PropertySet, String, Object
@@ -23,6 +25,16 @@ class JobSubmissionResponse(Resource['JobSubmissionResponse']):
2325
""":UUID: job id of the job submission request"""
2426

2527

28+
class JobStatus(BaseEnumeration):
29+
"""The valid status codes for a job."""
30+
31+
SUBMITTED = "Submitted"
32+
PENDING = "Pending"
33+
RUNNING = "Running"
34+
SUCCESS = "Success"
35+
FAILURE = "Failure"
36+
37+
2638
class TaskNode(Resource['TaskNode']):
2739
"""Individual task status.
2840
@@ -33,14 +45,29 @@ class TaskNode(Resource['TaskNode']):
3345
""":str: unique identification number for the job task"""
3446
task_type = properties.String("task_type")
3547
""":str: the type of task running"""
36-
status = properties.String("status")
37-
""":str: The last reported status of this particular task.
38-
One of "Submitted", "Pending", "Running", "Success", or "Failure"."""
48+
_status = properties.String("status")
3949
dependencies = PropertySet(String(), "dependencies")
4050
""":Set[str]: all the tasks that this task is dependent on"""
4151
failure_reason = properties.Optional(String(), "failure_reason")
4252
""":str: if a task has failed, the failure reason will be in this parameter"""
4353

54+
@property
55+
def status(self) -> Union[JobStatus, str]:
56+
"""The last reported status of this particular task."""
57+
if resolved := JobStatus.from_str(self._status, exception=False):
58+
return resolved
59+
else:
60+
return self._status
61+
62+
@status.setter
63+
def status(self, value):
64+
if JobStatus.from_str(value, exception=False) is None:
65+
warn(
66+
f"{value} is not a recognized JobStatus; this will become an error as of v4.0.0.",
67+
DeprecationWarning
68+
)
69+
self._status = value
70+
4471

4572
class JobStatusResponse(Resource['JobStatusResponse']):
4673
"""A response to a job status check.
@@ -50,13 +77,37 @@ class JobStatusResponse(Resource['JobStatusResponse']):
5077

5178
job_type = properties.String("job_type")
5279
""":str: the type of job for this status report"""
53-
status = properties.String("status")
80+
_status = properties.String("status")
5481
""":str: The status of the job. One of "Running", "Success", or "Failure"."""
5582
tasks = properties.List(Object(TaskNode), "tasks")
5683
""":List[TaskNode]: all of the constituent task required to complete this job"""
5784
output = properties.Optional(properties.Mapping(String, String), 'output')
5885
""":Optional[dict[str, str]]: job output properties and results"""
5986

87+
@property
88+
def status(self) -> Union[JobStatus, str]:
89+
"""The last reported status of this particular task."""
90+
if resolved := JobStatus.from_str(self._status, exception=False):
91+
return resolved
92+
else:
93+
return self._status
94+
95+
@status.setter
96+
def status(self, value):
97+
if resolved := JobStatus.from_str(value, exception=False):
98+
if resolved not in [JobStatus.RUNNING, JobStatus.SUCCESS, JobStatus.FAILURE]:
99+
warn(
100+
f"{value} is not a valid JobStatus for a JobStatusResponse; "
101+
f"this will become an error as of v4.0.0.",
102+
DeprecationWarning
103+
)
104+
else:
105+
warn(
106+
f"{value} is not a recognized JobStatus; this will become an error as of v4.0.0.",
107+
DeprecationWarning
108+
)
109+
self._status = value
110+
60111

61112
def _poll_for_job_completion(session: Session,
62113
job: Union[JobSubmissionResponse, UUID, str],
@@ -102,7 +153,7 @@ def _poll_for_job_completion(session: Session,
102153
while True:
103154
response = session.get_resource(path=path, params=params)
104155
status: JobStatusResponse = JobStatusResponse.build(response)
105-
if status.status in ['Success', 'Failure']:
156+
if status.status in [JobStatus.SUCCESS, JobStatus.FAILURE]:
106157
break
107158
elif time() - start_time < timeout:
108159
logger.info(
@@ -115,12 +166,12 @@ def _poll_for_job_completion(session: Session,
115166
f'Note job on server is unaffected by this timeout.')
116167
logger.debug('Last status: {}'.format(status.dump()))
117168
raise PollingTimeoutError('Job {} timed out.'.format(job_id))
118-
if status.status == 'Failure':
169+
if status.status == JobStatus.FAILURE:
119170
logger.debug(f'Job terminated with Failure status: {status.dump()}')
120171
if raise_errors:
121172
failure_reasons = []
122173
for task in status.tasks:
123-
if task.status == 'Failure':
174+
if task.status == JobStatus.FAILURE:
124175
logger.error(f'Task {task.id} failed with reason "{task.failure_reason}"')
125176
failure_reasons.append(task.failure_reason)
126177
raise JobFailureError(

src/citrine/resources/ingestion.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,9 @@ class Ingestion(Resource['Ingestion']):
196196
raise_errors = properties.Optional(properties.Boolean(), 'raise_errors', default=True)
197197

198198
@property
199+
@deprecated(deprecated_in='3.11.0', removed_in='4.0.0',
200+
details="The project_id attribute is deprecated since "
201+
"dataset access is now controlled through teams.")
199202
def project_id(self) -> Optional[UUID]:
200203
"""[DEPRECATED] The project ID associated with this ingest."""
201204
return self._project_id
@@ -300,15 +303,15 @@ def build_objects_async(self,
300303
if not build_table:
301304
project_id = None
302305
elif project is None:
303-
if self.project_id is None:
306+
if self._project_id is None:
304307
raise ValueError("Building a table requires a target project.")
305308
else:
306309
warn(
307310
"Building a table with an implicit project is deprecated "
308311
"and will be removed in v4. Please pass a project explicitly.",
309312
DeprecationWarning
310313
)
311-
project_id = self.project_id
314+
project_id = self._project_id
312315
elif isinstance(project, Project):
313316
project_id = project.uid
314317
elif isinstance(project, UUID):
@@ -365,18 +368,26 @@ def poll_for_job_completion(self,
365368
if polling_delay is not None:
366369
kwargs["polling_delay"] = polling_delay
367370

368-
_poll_for_job_completion(
371+
build_job_status = _poll_for_job_completion(
369372
session=self.session,
370373
team_id=self.team_id,
371374
job=job,
372375
raise_errors=False, # JobFailureError doesn't contain the error
373376
**kwargs
374377
)
378+
if build_job_status.output is not None and "table_build_job_id" in build_job_status.output:
379+
_poll_for_job_completion(
380+
session=self.session,
381+
team_id=self.team_id,
382+
job=build_job_status.output["table_build_job_id"],
383+
raise_errors=False, # JobFailureError doesn't contain the error
384+
**kwargs
385+
)
375386
return self.status()
376387

377388
def status(self) -> IngestionStatus:
378389
"""
379-
[ALPHA] Retrieve the status of the ingestion from platform.
390+
[ALPHA] Retrieve the status of the ingestion from platform.
380391
381392
Returns
382393
----------
@@ -438,7 +449,7 @@ def poll_for_job_completion(self,
438449

439450
def status(self) -> IngestionStatus:
440451
"""
441-
[ALPHA] Retrieve the status of the ingestion from platform.
452+
[ALPHA] Retrieve the status of the ingestion from platform.
442453
443454
Returns
444455
----------

tests/jobs/test_deprecations.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
from citrine.jobs.job import JobStatus, JobStatusResponse, TaskNode
2+
import pytest
3+
import warnings
4+
5+
from tests.utils.factories import TaskNodeDataFactory, JobStatusResponseDataFactory
6+
7+
def test_status_response_status():
8+
status_response = JobStatusResponse.build(JobStatusResponseDataFactory(failure=True))
9+
assert status_response.status == JobStatus.FAILURE
10+
11+
with pytest.deprecated_call():
12+
status_response.status = 'Failed'
13+
with warnings.catch_warnings():
14+
warnings.simplefilter("error")
15+
assert not isinstance(status_response.status, JobStatus)
16+
17+
with pytest.deprecated_call():
18+
status_response.status = JobStatus.PENDING
19+
with warnings.catch_warnings():
20+
warnings.simplefilter("error")
21+
assert status_response.status == JobStatus.PENDING
22+
23+
with warnings.catch_warnings():
24+
warnings.simplefilter("error")
25+
status_response.status = JobStatus.SUCCESS
26+
assert status_response.status == JobStatus.SUCCESS
27+
28+
def test_task_node_status():
29+
status_response = TaskNode.build(TaskNodeDataFactory(failure=True))
30+
assert status_response.status == JobStatus.FAILURE
31+
32+
with pytest.deprecated_call():
33+
status_response.status = 'Failed'
34+
assert not isinstance(status_response.status, JobStatus)
35+
36+
with warnings.catch_warnings():
37+
warnings.simplefilter("error")
38+
status_response.status = JobStatus.SUCCESS
39+
assert status_response.status == JobStatus.SUCCESS

tests/jobs/test_waiting.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@
77
import time
88

99
from citrine.informatics.executions.design_execution import DesignExecution
10-
from citrine.informatics.executions.predictor_evaluation_execution import (
11-
PredictorEvaluationExecution)
1210
from citrine.jobs.waiting import (
1311
wait_for_asynchronous_object,
1412
wait_while_executing,
@@ -53,7 +51,7 @@ def test_wait_while_validating_timeout(sleep_mock, time_mock):
5351
module.in_progress.return_value = True
5452
collection.get.return_value = module
5553

56-
with pytest.raises(ConditionTimeoutError) as exceptio:
54+
with pytest.raises(ConditionTimeoutError):
5755
wait_while_validating(collection=collection, module=module, timeout=1.0)
5856

5957
@mock.patch('time.sleep', return_value=None)

tests/resources/test_file_link.py

Lines changed: 14 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@
1313
from citrine.resources.ingestion import Ingestion, IngestionCollection
1414
from citrine.exceptions import NotFound
1515

16-
from tests.utils.factories import FileLinkDataFactory, _UploaderFactory
16+
from tests.utils.factories import (
17+
FileLinkDataFactory, _UploaderFactory, JobStatusResponseDataFactory,
18+
IngestionStatusResponseDataFactory, IngestFilesResponseDataFactory, JobSubmissionResponseDataFactory
19+
)
1720
from tests.utils.session import FakeSession, FakeS3Client, FakeCall, FakeRequestResponseApiError
1821

1922

@@ -536,31 +539,15 @@ def test_ingest(collection: FileCollection, session):
536539
good_file2 = collection.build({"filename": "also.csv", "id": str(uuid4()), "version": str(uuid4())})
537540
bad_file = FileLink(filename="bad.csv", url="http://files.com/input.csv")
538541

539-
ingest_create_resp = {
540-
"team_id": str(uuid4()),
541-
"dataset_id": str(uuid4()),
542-
"ingestion_id": str(uuid4())
543-
}
544-
job_id_resp = {
545-
'job_id': str(uuid4())
546-
}
547-
job_status_resp = {
548-
'job_id': job_id_resp['job_id'],
549-
'job_type': 'create-gemd-objects',
550-
'status': 'Success',
551-
'tasks': [{'id': f'create-gemd-objects-{uuid4()}',
552-
'task_type': 'create-gemd-objects-task',
553-
'status': 'Success',
554-
'dependencies': [],
555-
'failure_reason': None}],
556-
'output': {}
557-
}
558-
ingest_status_resp = {
559-
"ingestion_id": ingest_create_resp["ingestion_id"],
560-
"status": "ingestion_created",
561-
"errors": [],
562-
}
563-
session.set_responses(ingest_create_resp, job_id_resp, job_status_resp, ingest_status_resp)
542+
ingest_files_resp = IngestFilesResponseDataFactory()
543+
job_id_resp = JobSubmissionResponseDataFactory()
544+
job_status_resp = JobStatusResponseDataFactory(
545+
job_id=job_id_resp['job_id'],
546+
job_type='create-gemd-objects',
547+
)
548+
ingest_status_resp = IngestionStatusResponseDataFactory()
549+
550+
session.set_responses(ingest_files_resp, job_id_resp, job_status_resp, ingest_status_resp)
564551
collection.ingest([good_file1, good_file2])
565552

566553
with pytest.raises(ValueError, match=bad_file.url):
@@ -572,7 +559,7 @@ def test_ingest(collection: FileCollection, session):
572559
with pytest.raises(ValueError):
573560
collection.ingest([good_file1], build_table=True)
574561

575-
session.set_responses(ingest_create_resp, job_id_resp, job_status_resp, ingest_status_resp)
562+
session.set_responses(ingest_files_resp, job_id_resp, job_status_resp, ingest_status_resp)
576563
coll_with_project_id = FileCollection(team_id=uuid4(), dataset_id=uuid4(), session=session)
577564
coll_with_project_id.project_id = uuid4()
578565
with pytest.deprecated_call():

tests/resources/test_gemd_resource.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
from citrine._utils.functions import format_escaped_url
4646

4747
from tests.utils.factories import MaterialRunDataFactory, MaterialSpecDataFactory
48-
from tests.utils.factories import JobSubmissionResponseFactory
48+
from tests.utils.factories import JobSubmissionResponseDataFactory
4949
from tests.utils.session import FakeSession, FakeCall
5050

5151

@@ -409,7 +409,7 @@ def test_async_update(gemd_collection, session):
409409
'output': {}
410410
}
411411

412-
session.set_responses(JobSubmissionResponseFactory(), fake_job_status_resp)
412+
session.set_responses(JobSubmissionResponseDataFactory(), fake_job_status_resp)
413413

414414
# This returns None on successful update with wait.
415415
gemd_collection.async_update(obj, wait_for_response=True)
@@ -423,7 +423,7 @@ def test_async_update_and_no_dataset_id(gemd_collection, session):
423423
uids={'id': str(uuid4())}
424424
)
425425

426-
session.set_response(JobSubmissionResponseFactory())
426+
session.set_response(JobSubmissionResponseDataFactory())
427427
gemd_collection.dataset_id = None
428428

429429
with pytest.raises(RuntimeError):
@@ -444,7 +444,7 @@ def test_async_update_timeout(gemd_collection, session):
444444
'output': {}
445445
}
446446

447-
session.set_responses(JobSubmissionResponseFactory(), fake_job_status_resp)
447+
session.set_responses(JobSubmissionResponseDataFactory(), fake_job_status_resp)
448448

449449
with pytest.raises(PollingTimeoutError):
450450
gemd_collection.async_update(obj, wait_for_response=True,
@@ -465,7 +465,7 @@ def test_async_update_and_wait(gemd_collection, session):
465465
'output': {}
466466
}
467467

468-
session.set_responses(JobSubmissionResponseFactory(), fake_job_status_resp)
468+
session.set_responses(JobSubmissionResponseDataFactory(), fake_job_status_resp)
469469

470470
# This returns None on successful update with wait.
471471
gemd_collection.async_update(obj, wait_for_response=True)
@@ -485,7 +485,7 @@ def test_async_update_and_wait_failure(gemd_collection, session):
485485
'output': {}
486486
}
487487

488-
session.set_responses(JobSubmissionResponseFactory(), fake_job_status_resp)
488+
session.set_responses(JobSubmissionResponseDataFactory(), fake_job_status_resp)
489489

490490
with pytest.raises(JobFailureError):
491491
gemd_collection.async_update(obj, wait_for_response=True)
@@ -499,7 +499,7 @@ def test_async_update_with_no_wait(gemd_collection, session):
499499
uids={'id': str(uuid4())}
500500
)
501501

502-
session.set_response(JobSubmissionResponseFactory())
502+
session.set_response(JobSubmissionResponseDataFactory())
503503
job_id = gemd_collection.async_update(obj, wait_for_response=False)
504504
assert job_id is not None
505505

0 commit comments

Comments
 (0)