-
Notifications
You must be signed in to change notification settings - Fork 24
Implement a configurable credentials resolver chain #452
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from 3 commits
2cd5625
368fd21
87f73ba
c33518b
e5b55cc
4351de1
b325e70
d14e85e
086d9ce
ef5e169
b7412fb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,12 @@ | ||
| # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| from .environment import EnvironmentCredentialsResolver | ||
| from .static import StaticCredentialsResolver | ||
| from .credentials_resolver_chain import CredentialsResolverChain | ||
|
|
||
| __all__ = ("EnvironmentCredentialsResolver", "StaticCredentialsResolver") | ||
| __all__ = ( | ||
| "EnvironmentCredentialsResolver", | ||
| "StaticCredentialsResolver", | ||
| "CredentialsResolverChain", | ||
| ) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,54 @@ | ||
| from typing import Callable, List | ||
|
|
||
| from smithy_aws_core.credentials_resolvers import EnvironmentCredentialsResolver | ||
| from smithy_aws_core.identity import AWSCredentialsIdentity, AWSCredentialsResolver | ||
| from smithy_core.aio.interfaces.identity import IdentityResolver | ||
| from smithy_core.exceptions import SmithyIdentityException | ||
| from smithy_core.interfaces.identity import IdentityProperties | ||
|
|
||
| import os | ||
|
|
||
|
|
||
| def _env_creds_available() -> bool: | ||
| return bool(os.getenv("AWS_ACCESS_KEY_ID")) and bool( | ||
| os.getenv("AWS_SECRET_ACCESS_KEY") | ||
| ) | ||
alextwoods marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
|
|
||
| def _build_env_creds() -> AWSCredentialsResolver: | ||
| return EnvironmentCredentialsResolver() | ||
|
|
||
|
|
||
| type CredentialSource = tuple[Callable[[], bool], Callable[[], AWSCredentialsResolver]] | ||
| _DEFAULT_SOURCES: list[CredentialSource] = [(_env_creds_available, _build_env_creds)] | ||
|
|
||
|
|
||
| class CredentialsResolverChain( | ||
| IdentityResolver[AWSCredentialsIdentity, IdentityProperties] | ||
| ): | ||
| """Resolves AWS Credentials from system environment variables.""" | ||
|
|
||
| def __init__(self, *, sources: List[CredentialSource] | None = None): | ||
alextwoods marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| if sources is None: | ||
| sources = _DEFAULT_SOURCES | ||
| self._sources: List[CredentialSource] = sources | ||
| self._credentials_resolver: AWSCredentialsResolver | None = None | ||
|
|
||
| async def get_identity( | ||
| self, *, identity_properties: IdentityProperties | ||
| ) -> AWSCredentialsIdentity: | ||
| if self._credentials_resolver is not None: | ||
| return await self._credentials_resolver.get_identity( | ||
| identity_properties=identity_properties | ||
| ) | ||
|
|
||
| for source in self._sources: | ||
| if source[0](): | ||
| self._credentials_resolver = source[1]() | ||
|
||
| return await self._credentials_resolver.get_identity( | ||
| identity_properties=identity_properties | ||
| ) | ||
|
|
||
| raise SmithyIdentityException( | ||
| "None of the configured credentials sources were able to resolve credentials." | ||
| ) | ||
jonathan343 marked this conversation as resolved.
Show resolved
Hide resolved
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,62 @@ | ||
| import pytest | ||
|
|
||
| from smithy_aws_core.credentials_resolvers import ( | ||
| CredentialsResolverChain, | ||
| StaticCredentialsResolver, | ||
| ) | ||
| from smithy_aws_core.identity import AWSCredentialsIdentity | ||
| from smithy_core.exceptions import SmithyIdentityException | ||
| from smithy_core.interfaces.identity import IdentityProperties | ||
|
|
||
|
|
||
| async def test_no_sources_resolve(): | ||
| resolver_chain = CredentialsResolverChain(sources=[]) | ||
| with pytest.raises(SmithyIdentityException): | ||
| await resolver_chain.get_identity(identity_properties=IdentityProperties()) | ||
|
|
||
|
|
||
| async def test_env_credentials_resolver_not_set(monkeypatch: pytest.MonkeyPatch): | ||
| monkeypatch.delenv("AWS_ACCESS_KEY_ID", raising=False) | ||
| monkeypatch.delenv("AWS_SECRET_ACCESS_KEY", raising=False) | ||
| resolver_chain = CredentialsResolverChain() | ||
|
|
||
| with pytest.raises(SmithyIdentityException): | ||
| await resolver_chain.get_identity(identity_properties=IdentityProperties()) | ||
|
|
||
|
|
||
| async def test_env_credentials_resolver_partial(monkeypatch: pytest.MonkeyPatch): | ||
| monkeypatch.setenv("AWS_ACCESS_KEY_ID", "akid") | ||
| monkeypatch.delenv("AWS_SECRET_ACCESS_KEY", raising=False) | ||
| resolver_chain = CredentialsResolverChain() | ||
|
|
||
| with pytest.raises(SmithyIdentityException): | ||
| await resolver_chain.get_identity(identity_properties=IdentityProperties()) | ||
|
|
||
|
|
||
| async def test_env_credentials_resolver_success(monkeypatch: pytest.MonkeyPatch): | ||
| monkeypatch.setenv("AWS_ACCESS_KEY_ID", "akid") | ||
| monkeypatch.setenv("AWS_SECRET_ACCESS_KEY", "secret") | ||
| resolver_chain = CredentialsResolverChain() | ||
|
|
||
| credentials = await resolver_chain.get_identity( | ||
| identity_properties=IdentityProperties() | ||
| ) | ||
| assert credentials.access_key_id == "akid" | ||
| assert credentials.secret_access_key == "secret" | ||
|
|
||
|
|
||
| async def test_custom_sources_with_static_credentials(): | ||
| static_credentials = AWSCredentialsIdentity( | ||
| access_key_id="static_akid", | ||
| secret_access_key="static_secret", | ||
| ) | ||
| static_resolver = StaticCredentialsResolver(credentials=static_credentials) | ||
| resolver_chain = CredentialsResolverChain( | ||
| sources=[(lambda: False, lambda: None), (lambda: True, lambda: static_resolver)] # type: ignore | ||
| ) | ||
|
|
||
| credentials = await resolver_chain.get_identity( | ||
| identity_properties=IdentityProperties() | ||
| ) | ||
| assert credentials.access_key_id == "static_akid" | ||
| assert credentials.secret_access_key == "static_secret" |
Uh oh!
There was an error while loading. Please reload this page.