Skip to content

Commit ce02140

Browse files
committed
Add files ingest method
1 parent e6fdb36 commit ce02140

File tree

4 files changed

+111
-60
lines changed

4 files changed

+111
-60
lines changed

src/citrine/__version__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = '2.9.0'
1+
__version__ = '2.10.0'

src/citrine/_rest/collection.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@ def _put_resource_ref(self, subpath: str, uid: Union[UUID, str]):
3636
ref = ResourceRef(uid)
3737
return self.session.put_resource(url, ref.dump(), version=self._api_version)
3838

39-
def _get_path(self, uid: Optional[Union[UUID, str]] = None,
39+
def _get_path(self,
40+
uid: Optional[Union[UUID, str]] = None,
41+
*,
4042
ignore_dataset: Optional[bool] = False) -> str:
4143
"""Construct a url from __base_path__ and, optionally, id."""
4244
subpath = format_escaped_url('/{}', uid) if uid else ''

src/citrine/resources/file_link.py

Lines changed: 61 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from pathlib import Path
66
from enum import Enum
77
from logging import getLogger
8-
from typing import Optional, Tuple, Union, List, Dict
8+
from typing import Optional, Tuple, Union, List, Dict, Iterable
99
from urllib.parse import urlparse, quote
1010
from uuid import UUID
1111

@@ -182,8 +182,8 @@ def __init__(self, project_id: UUID, dataset_id: UUID, session: Session):
182182

183183
def _get_path(self,
184184
uid: Optional[Union[UUID, str]] = None,
185-
ignore_dataset: Optional[bool] = False,
186185
*,
186+
ignore_dataset: Optional[bool] = False,
187187
version: Union[str, UUID] = None,
188188
action: str = None) -> str:
189189
"""Build the path for taking an action with a particular file version."""
@@ -248,7 +248,7 @@ def get(self,
248248
*,
249249
version: Optional[Union[UUID, str, int]] = None) -> FileLink:
250250
"""
251-
Get an element of the collection by its id.
251+
Retrieve an on-platform FileLink from its filename or file uuid.
252252
253253
Parameters
254254
----------
@@ -359,7 +359,7 @@ def _make_upload_request(self, file_path: Path, dest_name: str):
359359
aws_session_token, bucket, object_key, & upload_id.
360360
361361
"""
362-
path = self._get_path() + "/uploads"
362+
path = self._get_path(action="uploads")
363363
mime_type = self._mime_type(file_path)
364364
file_size = file_path.stat().st_size
365365
assert isinstance(file_size, int)
@@ -423,7 +423,7 @@ def _search_by_file_name(self,
423423
All the data needed for a file.
424424
425425
"""
426-
path = self._get_path() + "/search"
426+
path = self._get_path(action="search")
427427

428428
search_json = {
429429
'fileSearchFilter':
@@ -456,7 +456,7 @@ def _search_by_file_version_id(self,
456456
All the data needed for a file.
457457
458458
"""
459-
path = self._get_path() + "/search"
459+
path = self._get_path(action="search")
460460

461461
search_json = {
462462
'fileSearchFilter': {
@@ -495,7 +495,7 @@ def _search_by_dataset_file_id(self,
495495
All the data needed for a file.
496496
497497
"""
498-
path = self._get_path() + "/search"
498+
path = self._get_path(action="search")
499499

500500
search_json = {
501501
'fileSearchFilter': {
@@ -644,14 +644,12 @@ def read(self, *, file_link: Union[str, UUID, FileLink]):
644644

645645
if self._is_external_url(file_link.url): # Pull it from where ever it lives
646646
final_url = file_link.url
647-
elif self._validate_local_url(file_link.url):
647+
else:
648648
# The "/content-link" route returns a pre-signed url to download the file.
649649
content_link = self._get_path_from_file_link(file_link, action='content-link')
650650
content_link_response = self.session.get_resource(content_link)
651651
pre_signed_url = content_link_response['pre_signed_read_link']
652652
final_url = rewrite_s3_links_locally(pre_signed_url, self.session.s3_endpoint_url)
653-
else: # Unrecognized
654-
raise ValueError(f"URL was malformed for a local file resource ({file_link.url}).")
655653

656654
download_response = requests.get(final_url)
657655
return download_response.content
@@ -690,10 +688,10 @@ def process(self, *, file_link: Union[FileLink, str, UUID],
690688
A JobSubmissionResponse which can be used to poll for the result.
691689
692690
"""
693-
file_link = self._resolve_file_link(file_link)
694-
if not self._validate_local_url(file_link.url):
691+
if self._is_external_url(file_link.url):
695692
raise ValueError(f"Only on-platform resources can be processed. "
696693
f"Passed URL {file_link.url}.")
694+
file_link = self._resolve_file_link(file_link)
697695

698696
params = {"processing_type": processing_type.value}
699697
response = self.session.put_resource(
@@ -797,6 +795,38 @@ def file_processing_result(self, *,
797795

798796
return results
799797

798+
def ingest(self, files: Iterable[FileLink]):
799+
"""
800+
[ALPHA] Ingest a set of CSVs and/or Excel Workbooks formatted per the gemd-ingest protocol.
801+
802+
Parameters
803+
----------
804+
files: List[FileLink]
805+
A list of files, already on platform, from which GEMD objects should be built
806+
807+
"""
808+
targets = [self._resolve_file_link(f) for f in files]
809+
if any(self._is_external_url(f.url) for f in targets):
810+
externals = [f.url for f in targets if self._is_external_url(f.url)]
811+
raise ValueError(f"All files must be on-platform to load them. "
812+
f"The following are not: {externals}")
813+
814+
file_infos = [
815+
{"dataset_file_id": str(f.uid),
816+
"file_version_uuid": str(f.version)
817+
}
818+
for f in targets]
819+
req = {
820+
"project_id": str(self.project_id),
821+
"dataset_id": str(self.dataset_id),
822+
"files": file_infos
823+
}
824+
base_url = format_escaped_url("/projects/{}/ingestions", self.project_id)
825+
create_ingestion_resp = self.session.post_resource(path=base_url, json=req)
826+
ingestion_id = create_ingestion_resp["ingestion_id"]
827+
job_url = base_url + format_escaped_url("/{}/gemd-objects", ingestion_id)
828+
return self.session.post_resource(path=job_url, json={})
829+
800830
def delete(self, file_link: FileLink):
801831
"""
802832
Delete the file associated with a given FileLink from the database.
@@ -817,8 +847,25 @@ def delete(self, file_link: FileLink):
817847

818848
def _resolve_file_link(self, identifier: Union[str, UUID, FileLink]) -> FileLink:
819849
"""Generate the FileLink object referenced by the passed argument."""
820-
if isinstance(identifier, FileLink): # Passthrough for convenience
821-
return identifier
850+
if isinstance(identifier, GEMDFileLink):
851+
if isinstance(identifier, FileLink) and identifier.uid is not None:
852+
# Passthrough since it's as full as it can get
853+
return identifier
854+
if self._is_external_url(identifier.url):
855+
# Up-convert type with existing info
856+
return FileLink(filename=identifier.filename, url=identifier.url)
857+
# Resolve on-platform uid and possibly up-convert
858+
file_id, version_id = self._get_ids_from_url(identifier.url)
859+
if file_id is None:
860+
raise ValueError(f"URL was malformed for local resources; "
861+
f"passed URL {identifier.url}")
862+
platform_link = self.get(uid=file_id, version=version_id)
863+
if platform_link.filename != identifier.filename:
864+
raise ValueError(
865+
f"Name mismatch between link ({identifier.filename}) "
866+
f"and platform ({platform_link.filename})"
867+
)
868+
return platform_link
822869
elif isinstance(identifier, str) and self._is_external_url(identifier):
823870
# Assume it's an absolute URL
824871
filename = urlparse(identifier).path.split('/')[-1]
@@ -848,10 +895,3 @@ def _is_external_url(self, url: str):
848895
return False
849896

850897
return urlparse(self._get_path()).netloc != parsed.netloc
851-
852-
def _validate_local_url(self, url):
853-
"""Verify link is well formed."""
854-
if self._is_external_url(url):
855-
return False
856-
857-
return self._get_ids_from_url(url)[1] is not None # Implies file_id is None, too

tests/resources/test_file_link.py

Lines changed: 46 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -352,8 +352,10 @@ def test_file_download(collection: FileCollection, session, tmpdir):
352352
"""
353353
# Given
354354
filename = 'diagram.pdf'
355-
url = f"projects/{collection.project_id}/datasets/{collection.dataset_id}/files/{uuid4()}/versions/{uuid4()}"
356-
file = FileLink.build(FileLinkDataFactory(url=url, filename=filename))
355+
file_uid = str(uuid4())
356+
version_uid = str(uuid4())
357+
url = f"projects/{collection.project_id}/datasets/{collection.dataset_id}/files/{file_uid}/versions/{version_uid}"
358+
file = FileLink.build(FileLinkDataFactory(url=url, filename=filename, id=file_uid, version=version_uid))
357359
pre_signed_url = "http://files.citrine.io/secret-codes/jiifema987pjfsda" # arbitrary
358360
session.set_response({
359361
'pre_signed_read_link': pre_signed_url,
@@ -403,8 +405,10 @@ def test_read(collection: FileCollection, session):
403405
"""
404406
# Given
405407
filename = 'diagram.pdf'
406-
url = f"projects/{collection.project_id}/datasets/{collection.dataset_id}/files/{uuid4()}/versions/{uuid4()}"
407-
file = FileLink.build(FileLinkDataFactory(url=url, filename=filename))
408+
file_uid = str(uuid4())
409+
version_uid = str(uuid4())
410+
url = f"projects/{collection.project_id}/datasets/{collection.dataset_id}/files/{file_uid}/versions/{version_uid}"
411+
file = FileLink.build(FileLinkDataFactory(url=url, filename=filename, id=file_uid, version=version_uid))
408412
pre_signed_url = "http://files.citrine.io/secret-codes/jiifema987pjfsda" # arbitrary
409413
session.set_response({
410414
'pre_signed_read_link': pre_signed_url,
@@ -499,8 +503,8 @@ def test_process_file(collection: FileCollection, session):
499503
"""Test processing an existing file."""
500504

501505
file_id, version_id = str(uuid4()), str(uuid4())
502-
full_url = 'www.citrine.io/develop/files/{}/versions/{}'.format(file_id, version_id)
503-
file_link = collection.build(FileLinkDataFactory(url=full_url))
506+
full_url = collection._get_path(uid=file_id, version=version_id)
507+
file_link = collection.build(FileLinkDataFactory(url=full_url, id=file_id, version=version_id))
504508

505509
job_id_resp = {
506510
'job_id': str(uuid4())
@@ -548,8 +552,8 @@ def test_process_file_no_waiting(collection: FileCollection, session):
548552
"""Test processing an existing file without waiting on the result."""
549553

550554
file_id, version_id = str(uuid4()), str(uuid4())
551-
full_url = 'www.citrine.io/develop/files/{}/versions/{}'.format(file_id, version_id)
552-
file_link = collection.build(FileLinkDataFactory(url=full_url))
555+
full_url = collection._get_path(uid=file_id, version=version_id)
556+
file_link = collection.build(FileLinkDataFactory(url=full_url, id=file_id, version=version_id))
553557

554558
job_id_resp = {
555559
'job_id': str(uuid4())
@@ -566,11 +570,9 @@ def test_process_file_no_waiting(collection: FileCollection, session):
566570

567571
def test_process_file_exceptions(collection: FileCollection, session):
568572
"""Test processing an existing file without waiting on the result."""
569-
570-
file_id, version_id = str(uuid4()), str(uuid4())
571-
full_url = 'https://www.citrine.io/develop/files/{}/versions/{}'.format(file_id, version_id)
573+
full_url = f'http://www.files.com/file.path'
572574
file_link = collection.build(FileLinkDataFactory(url=full_url))
573-
575+
collection._get_path()
574576
# First does a PUT on the /processed endpoint
575577
# then does a GET on the job executions endpoint
576578
with pytest.raises(ValueError, match="on-platform resources"):
@@ -580,6 +582,22 @@ def test_process_file_exceptions(collection: FileCollection, session):
580582
wait_for_response=False)
581583

582584

585+
def test_ingest(collection: FileCollection, session):
586+
"""Test the on-platform ingest route."""
587+
good_file1 = collection.build({"filename": "good.csv", "id": str(uuid4()), "version": str(uuid4())})
588+
good_file2 = collection.build({"filename": "also.csv", "id": str(uuid4()), "version": str(uuid4())})
589+
bad_file = FileLink(filename="bad.csv", url="http://files.com/input.csv")
590+
591+
job_id_resp = {
592+
'ingestion_id': str(uuid4())
593+
}
594+
session.set_responses(job_id_resp, job_id_resp)
595+
collection.ingest([good_file1, good_file2])
596+
597+
with pytest.raises(ValueError, match=bad_file.url):
598+
collection.ingest([good_file1, bad_file])
599+
600+
583601
def test_resolve_file_link(collection: FileCollection, session):
584602
# The actual response contains more fields, but these are the only ones we use.
585603
raw_files = [
@@ -626,17 +644,27 @@ def test_resolve_file_link(collection: FileCollection, session):
626644
session.set_response({
627645
'files': [raw_files[1]]
628646
})
629-
assert collection._resolve_file_link(UUID(raw_files[1]['id'])) == file1, "UUID didn't resolve"
647+
648+
unresolved = FileLink(filename=file1.filename, url=file1.url)
649+
assert collection._resolve_file_link(unresolved) == file1, "FileLink didn't resolve"
630650
assert session.num_calls == 1
631651

652+
unresolved.filename = "Wrong.file"
653+
with pytest.raises(ValueError):
654+
collection._resolve_file_link(unresolved)
655+
assert session.num_calls == 2
656+
657+
assert collection._resolve_file_link(UUID(raw_files[1]['id'])) == file1, "UUID didn't resolve"
658+
assert session.num_calls == 3
659+
632660
session.set_response({
633661
'files': [raw_files[1]]
634662
})
635663
assert collection._resolve_file_link(raw_files[1]['id']) == file1, "String UUID didn't resolve"
636-
assert session.num_calls == 2
664+
assert session.num_calls == 4
637665

638666
assert collection._resolve_file_link(raw_files[1]['version']) == file1, "Version UUID didn't resolve"
639-
assert session.num_calls == 3
667+
assert session.num_calls == 5
640668

641669
abs_link = "https://wwww.website.web/web.pdf"
642670
assert collection._resolve_file_link(abs_link).filename == "web.pdf"
@@ -646,36 +674,17 @@ def test_resolve_file_link(collection: FileCollection, session):
646674
'files': [raw_files[1]]
647675
})
648676
assert collection._resolve_file_link(file1.url) == file1, "Relative path didn't resolve"
649-
assert session.num_calls == 4
677+
assert session.num_calls == 6
650678

651679
session.set_response({
652680
'files': [raw_files[1]]
653681
})
654682
assert collection._resolve_file_link(file1.filename) == file1, "Filename didn't resolve"
655-
assert session.num_calls == 5
683+
assert session.num_calls == 7
656684

657685
with pytest.raises(TypeError):
658686
collection._resolve_file_link(12345)
659-
assert session.num_calls == 5
660-
661-
662-
def test_validate_filelink_url(collection: FileCollection):
663-
good = [
664-
f"projects/{uuid4()}/datasets/{uuid4()}/files/{uuid4()}/versions/{uuid4()}",
665-
f"/files/{uuid4()}/versions/{uuid4()}"
666-
]
667-
bad = [
668-
f"/projects/{uuid4()}/datasets/{uuid4()}/files/{uuid4()}/versions/{uuid4()}/action",
669-
f"/projects/{uuid4()}/datasets/{uuid4()}/{uuid4()}/versions/{uuid4()}",
670-
f"projects/{uuid4()}/datasets/{uuid4()}/files/{uuid4()}/versions/{uuid4()}?query=param",
671-
f"projects/{uuid4()}/datasets/{uuid4()}/files/{uuid4()}/versions/{uuid4()}?#fragment",
672-
"http://customer.com/data-lake/files/123/versions/456",
673-
"/files/uuid4/versions/uuid4",
674-
]
675-
for x in good:
676-
assert collection._validate_local_url(x)
677-
for x in bad:
678-
assert not collection._validate_local_url(x)
687+
assert session.num_calls == 7
679688

680689

681690
def test_get_ids_from_url(collection: FileCollection):

0 commit comments

Comments
 (0)