24
24
from textwrap import dedent
25
25
from typing import Dict , List , Optional , Union
26
26
from copy import copy
27
+ import re
27
28
28
29
import attr
29
30
@@ -1658,6 +1659,7 @@ def run( # type: ignore[override]
1658
1659
job_name : Optional [str ] = None ,
1659
1660
experiment_config : Optional [Dict [str , str ]] = None ,
1660
1661
kms_key : Optional [str ] = None ,
1662
+ codeartifact_repo_arn : Optional [str ] = None ,
1661
1663
):
1662
1664
"""Runs a processing job.
1663
1665
@@ -1758,12 +1760,21 @@ def run( # type: ignore[override]
1758
1760
However, the value of `TrialComponentDisplayName` is honored for display in Studio.
1759
1761
kms_key (str): The ARN of the KMS key that is used to encrypt the
1760
1762
user code file (default: None).
1763
+ codeartifact_repo_arn (str): The ARN of the CodeArtifact repository that should be
1764
+ logged into before installing dependencies (default: None).
1761
1765
Returns:
1762
1766
None or pipeline step arguments in case the Processor instance is built with
1763
1767
:class:`~sagemaker.workflow.pipeline_context.PipelineSession`
1764
1768
"""
1765
1769
s3_runproc_sh , inputs , job_name = self ._pack_and_upload_code (
1766
- code , source_dir , dependencies , git_config , job_name , inputs , kms_key
1770
+ code ,
1771
+ source_dir ,
1772
+ dependencies ,
1773
+ git_config ,
1774
+ job_name ,
1775
+ inputs ,
1776
+ kms_key ,
1777
+ codeartifact_repo_arn ,
1767
1778
)
1768
1779
1769
1780
# Submit a processing job.
@@ -1780,7 +1791,15 @@ def run( # type: ignore[override]
1780
1791
)
1781
1792
1782
1793
def _pack_and_upload_code (
1783
- self , code , source_dir , dependencies , git_config , job_name , inputs , kms_key = None
1794
+ self ,
1795
+ code ,
1796
+ source_dir ,
1797
+ dependencies ,
1798
+ git_config ,
1799
+ job_name ,
1800
+ inputs ,
1801
+ kms_key = None ,
1802
+ codeartifact_repo_arn = None ,
1784
1803
):
1785
1804
"""Pack local code bundle and upload to Amazon S3."""
1786
1805
if code .startswith ("s3://" ):
@@ -1821,12 +1840,65 @@ def _pack_and_upload_code(
1821
1840
script = estimator .uploaded_code .script_name
1822
1841
evaluated_kms_key = kms_key if kms_key else self .output_kms_key
1823
1842
s3_runproc_sh = self ._create_and_upload_runproc (
1824
- script , evaluated_kms_key , entrypoint_s3_uri
1843
+ script , evaluated_kms_key , entrypoint_s3_uri , codeartifact_repo_arn
1825
1844
)
1826
1845
1827
1846
return s3_runproc_sh , inputs , job_name
1828
1847
1829
- def _generate_framework_script (self , user_script : str ) -> str :
1848
+ def _get_codeartifact_index (self , codeartifact_repo_arn : str ):
1849
+ """
1850
+ Build the authenticated codeartifact index url based on the arn provided
1851
+ via codeartifact_repo_arn property following the form
1852
+ # `arn:${Partition}:codeartifact:${Region}:${Account}:repository/${Domain}/${Repository}`
1853
+ https://docs.aws.amazon.com/codeartifact/latest/ug/python-configure-pip.html
1854
+ https://docs.aws.amazon.com/service-authorization/latest/reference/list_awscodeartifact.html#awscodeartifact-resources-for-iam-policies
1855
+ :return: authenticated codeartifact index url
1856
+ """
1857
+
1858
+ arn_regex = (
1859
+ "arn:(?P<partition>[^:]+):codeartifact:(?P<region>[^:]+):(?P<account>[^:]+)"
1860
+ ":repository/(?P<domain>[^/]+)/(?P<repository>.+)"
1861
+ )
1862
+ m = re .match (arn_regex , codeartifact_repo_arn )
1863
+ if not m :
1864
+ raise Exception ("invalid CodeArtifact repository arn {}" .format (codeartifact_repo_arn ))
1865
+ domain = m .group ("domain" )
1866
+ owner = m .group ("account" )
1867
+ repository = m .group ("repository" )
1868
+ region = m .group ("region" )
1869
+
1870
+ logger .info (
1871
+ "configuring pip to use codeartifact "
1872
+ "(domain: %s, domain owner: %s, repository: %s, region: %s)" ,
1873
+ domain ,
1874
+ owner ,
1875
+ repository ,
1876
+ region ,
1877
+ )
1878
+ try :
1879
+ client = self .sagemaker_session .boto_session .client ("codeartifact" , region_name = region )
1880
+ auth_token_response = client .get_authorization_token (domain = domain , domainOwner = owner )
1881
+ token = auth_token_response ["authorizationToken" ]
1882
+ endpoint_response = client .get_repository_endpoint (
1883
+ domain = domain , domainOwner = owner , repository = repository , format = "pypi"
1884
+ )
1885
+ unauthenticated_index = endpoint_response ["repositoryEndpoint" ]
1886
+ return re .sub (
1887
+ "https://" ,
1888
+ "https://aws:{}@" .format (token ),
1889
+ re .sub (
1890
+ "{}/?$" .format (repository ),
1891
+ "{}/simple/" .format (repository ),
1892
+ unauthenticated_index ,
1893
+ ),
1894
+ )
1895
+ except Exception :
1896
+ logger .error ("failed to configure pip to use codeartifact" )
1897
+ raise Exception ("failed to configure pip to use codeartifact" )
1898
+
1899
+ def _generate_framework_script (
1900
+ self , user_script : str , codeartifact_repo_arn : str = None
1901
+ ) -> str :
1830
1902
"""Generate the framework entrypoint file (as text) for a processing job.
1831
1903
1832
1904
This script implements the "framework" functionality for setting up your code:
@@ -1837,7 +1909,15 @@ def _generate_framework_script(self, user_script: str) -> str:
1837
1909
Args:
1838
1910
user_script (str): Relative path to ```code``` in the source bundle
1839
1911
- e.g. 'process.py'.
1912
+ codeartifact_repo_arn (str): The ARN of the CodeArtifact repository that should be
1913
+ logged into before installing dependencies (default: None).
1840
1914
"""
1915
+ if codeartifact_repo_arn :
1916
+ index = self ._get_codeartifact_index (codeartifact_repo_arn )
1917
+ index_option = "-i {}" .format (index )
1918
+ else :
1919
+ index_option = ""
1920
+
1841
1921
return dedent (
1842
1922
"""\
1843
1923
#!/bin/bash
@@ -1852,12 +1932,13 @@ def _generate_framework_script(self, user_script: str) -> str:
1852
1932
# Some py3 containers has typing, which may breaks pip install
1853
1933
pip uninstall --yes typing
1854
1934
1855
- pip install -r requirements.txt
1935
+ pip install -r requirements.txt {index_option}
1856
1936
fi
1857
1937
1858
1938
{entry_point_command} {entry_point} "$@"
1859
1939
"""
1860
1940
).format (
1941
+ index_option = index_option ,
1861
1942
entry_point_command = " " .join (self .command ),
1862
1943
entry_point = user_script ,
1863
1944
)
@@ -1933,7 +2014,9 @@ def _set_entrypoint(self, command, user_script_name):
1933
2014
)
1934
2015
self .entrypoint = self .framework_entrypoint_command + [user_script_location ]
1935
2016
1936
- def _create_and_upload_runproc (self , user_script , kms_key , entrypoint_s3_uri ):
2017
+ def _create_and_upload_runproc (
2018
+ self , user_script , kms_key , entrypoint_s3_uri , codeartifact_repo_arn = None
2019
+ ):
1937
2020
"""Create runproc shell script and upload to S3 bucket.
1938
2021
1939
2022
If leveraging a pipeline session with optimized S3 artifact paths,
@@ -1949,7 +2032,7 @@ def _create_and_upload_runproc(self, user_script, kms_key, entrypoint_s3_uri):
1949
2032
from sagemaker .workflow .utilities import _pipeline_config , hash_object
1950
2033
1951
2034
if _pipeline_config and _pipeline_config .pipeline_name :
1952
- runproc_file_str = self ._generate_framework_script (user_script )
2035
+ runproc_file_str = self ._generate_framework_script (user_script , codeartifact_repo_arn )
1953
2036
runproc_file_hash = hash_object (runproc_file_str )
1954
2037
s3_uri = s3 .s3_path_join (
1955
2038
"s3://" ,
@@ -1968,7 +2051,7 @@ def _create_and_upload_runproc(self, user_script, kms_key, entrypoint_s3_uri):
1968
2051
)
1969
2052
else :
1970
2053
s3_runproc_sh = S3Uploader .upload_string_as_file_body (
1971
- self ._generate_framework_script (user_script ),
2054
+ self ._generate_framework_script (user_script , codeartifact_repo_arn ),
1972
2055
desired_s3_uri = entrypoint_s3_uri ,
1973
2056
kms_key = kms_key ,
1974
2057
sagemaker_session = self .sagemaker_session ,
0 commit comments