Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion src/momento/auth/credential_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class CredentialProvider:
control_endpoint: str
cache_endpoint: str
token_endpoint: str
port: int

@staticmethod
def from_environment_variable(
Expand Down Expand Up @@ -72,7 +73,23 @@ def from_string(
cache_endpoint = cache_endpoint or token_and_endpoints.cache_endpoint
token_endpoint = token_endpoint or token_and_endpoints.token_endpoint
auth_token = token_and_endpoints.auth_token
return CredentialProvider(auth_token, control_endpoint, cache_endpoint, token_endpoint)
return CredentialProvider(auth_token, control_endpoint, cache_endpoint, token_endpoint, 443)

@staticmethod
def for_momento_local(
host: str = "127.0.0.1",
port: int = 8080,
) -> CredentialProvider:
"""Creates a credential provider for use with Momento Local.

Args:
host (str): the Momento Local host.
port (int): the Momento Local port.

Returns:
CredentialProvider
"""
return CredentialProvider("", host, host, host, port)

def __repr__(self) -> str:
attributes: Dict[str, str] = copy.copy(vars(self)) # type: ignore[misc]
Expand Down
199 changes: 125 additions & 74 deletions src/momento/internal/aio/_scs_grpc_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,53 +38,75 @@ class _ControlGrpcManager:
"""Internal gRPC control manager."""

def __init__(self, configuration: Configuration, credential_provider: CredentialProvider):
self._secure_channel = grpc.aio.secure_channel(
target=credential_provider.control_endpoint,
credentials=channel_credentials_from_root_certs_or_default(configuration),
interceptors=_interceptors(
credential_provider.auth_token, ClientType.CACHE, configuration.get_retry_strategy()
),
options=grpc_control_channel_options_from_grpc_config(
grpc_config=configuration.get_transport_strategy().get_grpc_configuration(),
),
)
if credential_provider.port == 443:
self._channel = grpc.aio.secure_channel(
target=credential_provider.control_endpoint,
credentials=channel_credentials_from_root_certs_or_default(configuration),
interceptors=_interceptors(
credential_provider.auth_token, ClientType.CACHE, configuration.get_retry_strategy()
),
options=grpc_control_channel_options_from_grpc_config(
grpc_config=configuration.get_transport_strategy().get_grpc_configuration(),
),
)
else:
self._channel = grpc.aio.insecure_channel(
target=f"{credential_provider.control_endpoint}:{credential_provider.port}",
interceptors=_interceptors(
credential_provider.auth_token, ClientType.CACHE, configuration.get_retry_strategy()
),
options=grpc_control_channel_options_from_grpc_config(
grpc_config=configuration.get_transport_strategy().get_grpc_configuration(),
),
)

async def close(self) -> None:
await self._secure_channel.close()
await self._channel.close()

def async_stub(self) -> control_client.ScsControlStub:
return control_client.ScsControlStub(self._secure_channel) # type: ignore[no-untyped-call]
return control_client.ScsControlStub(self._channel) # type: ignore[no-untyped-call]


class _DataGrpcManager:
"""Internal gRPC data manager."""

def __init__(self, configuration: Configuration, credential_provider: CredentialProvider):
self._logger = logs.logger
self._secure_channel = grpc.aio.secure_channel(
target=credential_provider.cache_endpoint,
credentials=channel_credentials_from_root_certs_or_default(configuration),
interceptors=_interceptors(
credential_provider.auth_token, ClientType.CACHE, configuration.get_retry_strategy()
),
# Here is where you would pass override configuration to the underlying C gRPC layer.
# However, I have tried several different tuning options here and did not see any
# performance improvements, so sticking with the defaults for now.
# For more info on the performance investigations:
# https://github.com/momentohq/client-sdk-python/issues/120
# For more info on available gRPC config options:
# https://grpc.github.io/grpc/python/grpc.html
# https://grpc.github.io/grpc/python/glossary.html#term-channel_arguments
# https://github.com/grpc/grpc/blob/v1.46.x/include/grpc/impl/codegen/grpc_types.h#L140
# options=[
# ('grpc.max_concurrent_streams', 1000),
# ('grpc.use_local_subchannel_pool', 1),
# (experimental.ChannelOptions.SingleThreadedUnaryStream, 1)
# ],
options=grpc_data_channel_options_from_grpc_config(
configuration.get_transport_strategy().get_grpc_configuration()
),
)
if credential_provider.port == 443:
self._channel = grpc.aio.secure_channel(
target=credential_provider.cache_endpoint,
credentials=channel_credentials_from_root_certs_or_default(configuration),
interceptors=_interceptors(
credential_provider.auth_token, ClientType.CACHE, configuration.get_retry_strategy()
),
# Here is where you would pass override configuration to the underlying C gRPC layer.
# However, I have tried several different tuning options here and did not see any
# performance improvements, so sticking with the defaults for now.
# For more info on the performance investigations:
# https://github.com/momentohq/client-sdk-python/issues/120
# For more info on available gRPC config options:
# https://grpc.github.io/grpc/python/grpc.html
# https://grpc.github.io/grpc/python/glossary.html#term-channel_arguments
# https://github.com/grpc/grpc/blob/v1.46.x/include/grpc/impl/codegen/grpc_types.h#L140
# options=[
# ('grpc.max_concurrent_streams', 1000),
# ('grpc.use_local_subchannel_pool', 1),
# (experimental.ChannelOptions.SingleThreadedUnaryStream, 1)
# ],
options=grpc_data_channel_options_from_grpc_config(
configuration.get_transport_strategy().get_grpc_configuration()
),
)
else:
self._channel = grpc.aio.insecure_channel(
target=f"{credential_provider.cache_endpoint}:{credential_provider.port}",
interceptors=_interceptors(
credential_provider.auth_token, ClientType.CACHE, configuration.get_retry_strategy()
),
options=grpc_data_channel_options_from_grpc_config(
configuration.get_transport_strategy().get_grpc_configuration()
),
)

async def eagerly_connect(self, timeout_seconds: float) -> None:
self._logger.debug(
Expand All @@ -93,15 +115,15 @@ async def eagerly_connect(self, timeout_seconds: float) -> None:
try:
await asyncio.wait_for(self.wait_for_ready(), timeout_seconds)
except Exception as error:
self._secure_channel.close()
self._channel.close()
self._logger.debug(f"Failed to connect to the server within the given timeout. {error}")
raise ConnectionException(
message=f"Failed to connect to Momento's server within given eager connection timeout: {error}",
service=Service.CACHE,
) from error

async def wait_for_ready(self) -> None:
latest_state = self._secure_channel.get_state(True) # try_to_connect
latest_state = self._channel.get_state(True) # try_to_connect
ready: grpc.ChannelConnectivity = grpc.ChannelConnectivity.READY
connecting: grpc.ChannelConnectivity = grpc.ChannelConnectivity.CONNECTING
idle: grpc.ChannelConnectivity = grpc.ChannelConnectivity.IDLE
Expand All @@ -117,80 +139,109 @@ async def wait_for_ready(self) -> None:

# This is a gRPC callback helper that prevents us from repeatedly polling on the state
# which is highly inefficient.
await self._secure_channel.wait_for_state_change(latest_state)
latest_state = self._secure_channel.get_state(False) # no need to reconnect
await self._channel.wait_for_state_change(latest_state)
latest_state = self._channel.get_state(False) # no need to reconnect

if latest_state == ready:
self._logger.debug("Connected to Momento's server! Happy Caching!")

async def close(self) -> None:
self._logger.debug("Closing and tearing down gRPC channel")
await self._secure_channel.close()
await self._channel.close()

def async_stub(self) -> cache_client.ScsStub:
return cache_client.ScsStub(self._secure_channel) # type: ignore[no-untyped-call]
return cache_client.ScsStub(self._channel) # type: ignore[no-untyped-call]


class _PubsubGrpcManager:
"""Internal gRPC pubsub manager."""

def __init__(self, configuration: TopicConfiguration, credential_provider: CredentialProvider):
self._secure_channel = grpc.aio.secure_channel(
target=credential_provider.cache_endpoint,
credentials=grpc.ssl_channel_credentials(),
interceptors=_interceptors(credential_provider.auth_token, ClientType.TOPIC, None),
options=grpc_topic_channel_options_from_grpc_config(
configuration.get_transport_strategy().get_grpc_configuration()
),
)
if credential_provider.port == 443:
self._channel = grpc.aio.secure_channel(
target=credential_provider.cache_endpoint,
credentials=grpc.ssl_channel_credentials(),
interceptors=_interceptors(credential_provider.auth_token, ClientType.TOPIC, None),
options=grpc_topic_channel_options_from_grpc_config(
configuration.get_transport_strategy().get_grpc_configuration()
),
)
else:
self._channel = grpc.aio.insecure_channel(
target=f"{credential_provider.cache_endpoint}:{credential_provider.port}",
interceptors=_interceptors(credential_provider.auth_token, ClientType.TOPIC, None),
options=grpc_topic_channel_options_from_grpc_config(
configuration.get_transport_strategy().get_grpc_configuration()
),
)

async def close(self) -> None:
await self._secure_channel.close()
await self._channel.close()

def async_stub(self) -> pubsub_client.PubsubStub:
return pubsub_client.PubsubStub(self._secure_channel) # type: ignore[no-untyped-call]
return pubsub_client.PubsubStub(self._channel) # type: ignore[no-untyped-call]


class _PubsubGrpcStreamManager:
"""Internal gRPC pubsub stream manager."""

def __init__(self, configuration: TopicConfiguration, credential_provider: CredentialProvider):
self._secure_channel = grpc.aio.secure_channel(
target=credential_provider.cache_endpoint,
credentials=grpc.ssl_channel_credentials(),
interceptors=_stream_interceptors(credential_provider.auth_token, ClientType.TOPIC),
options=grpc_topic_channel_options_from_grpc_config(
configuration.get_transport_strategy().get_grpc_configuration()
),
)
if credential_provider.port == 443:
self._channel = grpc.aio.secure_channel(
target=credential_provider.cache_endpoint,
credentials=grpc.ssl_channel_credentials(),
interceptors=_stream_interceptors(credential_provider.auth_token, ClientType.TOPIC),
options=grpc_topic_channel_options_from_grpc_config(
configuration.get_transport_strategy().get_grpc_configuration()
),
)
else:
self._channel = grpc.aio.insecure_channel(
target=f"{credential_provider.cache_endpoint}:{credential_provider.port}",
interceptors=_stream_interceptors(credential_provider.auth_token, ClientType.TOPIC),
options=grpc_topic_channel_options_from_grpc_config(
configuration.get_transport_strategy().get_grpc_configuration()
),
)

async def close(self) -> None:
await self._secure_channel.close()
await self._channel.close()

def async_stub(self) -> pubsub_client.PubsubStub:
return pubsub_client.PubsubStub(self._secure_channel) # type: ignore[no-untyped-call]
return pubsub_client.PubsubStub(self._channel) # type: ignore[no-untyped-call]


class _TokenGrpcManager:
"""Internal gRPC token manager."""

def __init__(self, configuration: AuthConfiguration, credential_provider: CredentialProvider):
self._secure_channel = grpc.aio.secure_channel(
target=credential_provider.token_endpoint,
credentials=grpc.ssl_channel_credentials(),
interceptors=_interceptors(
credential_provider.auth_token, ClientType.TOKEN, configuration.get_retry_strategy()
),
options=grpc_control_channel_options_from_grpc_config(
grpc_config=configuration.get_transport_strategy().get_grpc_configuration(),
),
)
if credential_provider.port == 443:
self._channel = grpc.aio.secure_channel(
target=credential_provider.token_endpoint,
credentials=grpc.ssl_channel_credentials(),
interceptors=_interceptors(
credential_provider.auth_token, ClientType.TOKEN, configuration.get_retry_strategy()
),
options=grpc_control_channel_options_from_grpc_config(
grpc_config=configuration.get_transport_strategy().get_grpc_configuration(),
),
)
else:
self._channel = grpc.aio.insecure_channel(
target=f"{credential_provider.token_endpoint}:{credential_provider.port}",
interceptors=_interceptors(
credential_provider.auth_token, ClientType.TOKEN, configuration.get_retry_strategy()
),
options=grpc_control_channel_options_from_grpc_config(
grpc_config=configuration.get_transport_strategy().get_grpc_configuration(),
),
)

async def close(self) -> None:
await self._secure_channel.close()
await self._channel.close()

def async_stub(self) -> token_client.TokenStub:
return token_client.TokenStub(self._secure_channel) # type: ignore[no-untyped-call]
return token_client.TokenStub(self._channel) # type: ignore[no-untyped-call]


def _interceptors(
Expand Down
Loading
Loading