Skip to content

Commit 368fd21

Browse files
committed
Add test cases
1 parent 2cd5625 commit 368fd21

File tree

1 file changed

+56
-0
lines changed

1 file changed

+56
-0
lines changed
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import pytest
2+
import os
3+
4+
from smithy_aws_core.credentials_resolvers import CredentialsResolverChain, StaticCredentialsResolver
5+
from smithy_aws_core.identity import AWSCredentialsIdentity
6+
from smithy_core.exceptions import SmithyIdentityException
7+
from smithy_core.interfaces.identity import IdentityProperties
8+
9+
10+
async def test_no_sources_resolve():
11+
resolver_chain = CredentialsResolverChain(sources=[])
12+
with pytest.raises(SmithyIdentityException):
13+
await resolver_chain.get_identity(identity_properties=IdentityProperties())
14+
15+
16+
async def test_env_credentials_resolver_not_set(monkeypatch: pytest.MonkeyPatch):
17+
monkeypatch.delenv("AWS_ACCESS_KEY_ID", raising=False)
18+
monkeypatch.delenv("AWS_SECRET_ACCESS_KEY", raising=False)
19+
resolver_chain = CredentialsResolverChain()
20+
21+
with pytest.raises(SmithyIdentityException):
22+
await resolver_chain.get_identity(identity_properties=IdentityProperties())
23+
24+
25+
async def test_env_credentials_resolver_partial(monkeypatch: pytest.MonkeyPatch):
26+
monkeypatch.setenv("AWS_ACCESS_KEY_ID", "akid")
27+
monkeypatch.delenv("AWS_SECRET_ACCESS_KEY", raising=False)
28+
resolver_chain = CredentialsResolverChain()
29+
30+
with pytest.raises(SmithyIdentityException):
31+
await resolver_chain.get_identity(identity_properties=IdentityProperties())
32+
33+
34+
async def test_env_credentials_resolver_success(monkeypatch: pytest.MonkeyPatch):
35+
monkeypatch.setenv("AWS_ACCESS_KEY_ID", "akid")
36+
monkeypatch.setenv("AWS_SECRET_ACCESS_KEY", "secret")
37+
resolver_chain = CredentialsResolverChain()
38+
39+
credentials = await resolver_chain.get_identity(identity_properties=IdentityProperties())
40+
assert credentials.access_key_id == "akid"
41+
assert credentials.secret_access_key == "secret"
42+
43+
44+
async def test_custom_sources_with_static_credentials():
45+
static_credentials = AWSCredentialsIdentity(
46+
access_key_id="static_akid",
47+
secret_access_key="static_secret",
48+
)
49+
static_resolver = StaticCredentialsResolver(credentials=static_credentials)
50+
resolver_chain = CredentialsResolverChain(
51+
sources=[(lambda: False, lambda: None), (lambda: True, lambda: static_resolver)])
52+
53+
credentials = await resolver_chain.get_identity(identity_properties=IdentityProperties())
54+
assert credentials.access_key_id == "static_akid"
55+
assert credentials.secret_access_key == "static_secret"
56+

0 commit comments

Comments
 (0)