diff --git a/src/momento/auth/credential_provider.py b/src/momento/auth/credential_provider.py index 7238690a..eb16000f 100644 --- a/src/momento/auth/credential_provider.py +++ b/src/momento/auth/credential_provider.py @@ -16,6 +16,7 @@ class CredentialProvider: control_endpoint: str cache_endpoint: str token_endpoint: str + port: int @staticmethod def from_environment_variable( @@ -72,7 +73,23 @@ def from_string( cache_endpoint = cache_endpoint or token_and_endpoints.cache_endpoint token_endpoint = token_endpoint or token_and_endpoints.token_endpoint auth_token = token_and_endpoints.auth_token - return CredentialProvider(auth_token, control_endpoint, cache_endpoint, token_endpoint) + return CredentialProvider(auth_token, control_endpoint, cache_endpoint, token_endpoint, 443) + + @staticmethod + def for_momento_local( + host: str = "127.0.0.1", + port: int = 8080, + ) -> CredentialProvider: + """Creates a credential provider for use with Momento Local. + + Args: + host (str): the Momento Local host. + port (int): the Momento Local port. + + Returns: + CredentialProvider + """ + return CredentialProvider("", host, host, host, port) def __repr__(self) -> str: attributes: Dict[str, str] = copy.copy(vars(self)) # type: ignore[misc] diff --git a/src/momento/internal/aio/_scs_grpc_manager.py b/src/momento/internal/aio/_scs_grpc_manager.py index 77cf1eb3..8a73ec61 100644 --- a/src/momento/internal/aio/_scs_grpc_manager.py +++ b/src/momento/internal/aio/_scs_grpc_manager.py @@ -38,22 +38,33 @@ class _ControlGrpcManager: """Internal gRPC control manager.""" def __init__(self, configuration: Configuration, credential_provider: CredentialProvider): - self._secure_channel = grpc.aio.secure_channel( - target=credential_provider.control_endpoint, - credentials=channel_credentials_from_root_certs_or_default(configuration), - interceptors=_interceptors( - credential_provider.auth_token, ClientType.CACHE, configuration.get_retry_strategy() - ), - options=grpc_control_channel_options_from_grpc_config( - grpc_config=configuration.get_transport_strategy().get_grpc_configuration(), - ), - ) + if credential_provider.port == 443: + self._channel = grpc.aio.secure_channel( + target=credential_provider.control_endpoint, + credentials=channel_credentials_from_root_certs_or_default(configuration), + interceptors=_interceptors( + credential_provider.auth_token, ClientType.CACHE, configuration.get_retry_strategy() + ), + options=grpc_control_channel_options_from_grpc_config( + grpc_config=configuration.get_transport_strategy().get_grpc_configuration(), + ), + ) + else: + self._channel = grpc.aio.insecure_channel( + target=f"{credential_provider.control_endpoint}:{credential_provider.port}", + interceptors=_interceptors( + credential_provider.auth_token, ClientType.CACHE, configuration.get_retry_strategy() + ), + options=grpc_control_channel_options_from_grpc_config( + grpc_config=configuration.get_transport_strategy().get_grpc_configuration(), + ), + ) async def close(self) -> None: - await self._secure_channel.close() + await self._channel.close() def async_stub(self) -> control_client.ScsControlStub: - return control_client.ScsControlStub(self._secure_channel) # type: ignore[no-untyped-call] + return control_client.ScsControlStub(self._channel) # type: ignore[no-untyped-call] class _DataGrpcManager: @@ -61,30 +72,41 @@ class _DataGrpcManager: def __init__(self, configuration: Configuration, credential_provider: CredentialProvider): self._logger = logs.logger - self._secure_channel = grpc.aio.secure_channel( - target=credential_provider.cache_endpoint, - credentials=channel_credentials_from_root_certs_or_default(configuration), - interceptors=_interceptors( - credential_provider.auth_token, ClientType.CACHE, configuration.get_retry_strategy() - ), - # Here is where you would pass override configuration to the underlying C gRPC layer. - # However, I have tried several different tuning options here and did not see any - # performance improvements, so sticking with the defaults for now. - # For more info on the performance investigations: - # https://github.com/momentohq/client-sdk-python/issues/120 - # For more info on available gRPC config options: - # https://grpc.github.io/grpc/python/grpc.html - # https://grpc.github.io/grpc/python/glossary.html#term-channel_arguments - # https://github.com/grpc/grpc/blob/v1.46.x/include/grpc/impl/codegen/grpc_types.h#L140 - # options=[ - # ('grpc.max_concurrent_streams', 1000), - # ('grpc.use_local_subchannel_pool', 1), - # (experimental.ChannelOptions.SingleThreadedUnaryStream, 1) - # ], - options=grpc_data_channel_options_from_grpc_config( - configuration.get_transport_strategy().get_grpc_configuration() - ), - ) + if credential_provider.port == 443: + self._channel = grpc.aio.secure_channel( + target=credential_provider.cache_endpoint, + credentials=channel_credentials_from_root_certs_or_default(configuration), + interceptors=_interceptors( + credential_provider.auth_token, ClientType.CACHE, configuration.get_retry_strategy() + ), + # Here is where you would pass override configuration to the underlying C gRPC layer. + # However, I have tried several different tuning options here and did not see any + # performance improvements, so sticking with the defaults for now. + # For more info on the performance investigations: + # https://github.com/momentohq/client-sdk-python/issues/120 + # For more info on available gRPC config options: + # https://grpc.github.io/grpc/python/grpc.html + # https://grpc.github.io/grpc/python/glossary.html#term-channel_arguments + # https://github.com/grpc/grpc/blob/v1.46.x/include/grpc/impl/codegen/grpc_types.h#L140 + # options=[ + # ('grpc.max_concurrent_streams', 1000), + # ('grpc.use_local_subchannel_pool', 1), + # (experimental.ChannelOptions.SingleThreadedUnaryStream, 1) + # ], + options=grpc_data_channel_options_from_grpc_config( + configuration.get_transport_strategy().get_grpc_configuration() + ), + ) + else: + self._channel = grpc.aio.insecure_channel( + target=f"{credential_provider.cache_endpoint}:{credential_provider.port}", + interceptors=_interceptors( + credential_provider.auth_token, ClientType.CACHE, configuration.get_retry_strategy() + ), + options=grpc_data_channel_options_from_grpc_config( + configuration.get_transport_strategy().get_grpc_configuration() + ), + ) async def eagerly_connect(self, timeout_seconds: float) -> None: self._logger.debug( @@ -93,7 +115,7 @@ async def eagerly_connect(self, timeout_seconds: float) -> None: try: await asyncio.wait_for(self.wait_for_ready(), timeout_seconds) except Exception as error: - self._secure_channel.close() + self._channel.close() self._logger.debug(f"Failed to connect to the server within the given timeout. {error}") raise ConnectionException( message=f"Failed to connect to Momento's server within given eager connection timeout: {error}", @@ -101,7 +123,7 @@ async def eagerly_connect(self, timeout_seconds: float) -> None: ) from error async def wait_for_ready(self) -> None: - latest_state = self._secure_channel.get_state(True) # try_to_connect + latest_state = self._channel.get_state(True) # try_to_connect ready: grpc.ChannelConnectivity = grpc.ChannelConnectivity.READY connecting: grpc.ChannelConnectivity = grpc.ChannelConnectivity.CONNECTING idle: grpc.ChannelConnectivity = grpc.ChannelConnectivity.IDLE @@ -117,80 +139,109 @@ async def wait_for_ready(self) -> None: # This is a gRPC callback helper that prevents us from repeatedly polling on the state # which is highly inefficient. - await self._secure_channel.wait_for_state_change(latest_state) - latest_state = self._secure_channel.get_state(False) # no need to reconnect + await self._channel.wait_for_state_change(latest_state) + latest_state = self._channel.get_state(False) # no need to reconnect if latest_state == ready: self._logger.debug("Connected to Momento's server! Happy Caching!") async def close(self) -> None: self._logger.debug("Closing and tearing down gRPC channel") - await self._secure_channel.close() + await self._channel.close() def async_stub(self) -> cache_client.ScsStub: - return cache_client.ScsStub(self._secure_channel) # type: ignore[no-untyped-call] + return cache_client.ScsStub(self._channel) # type: ignore[no-untyped-call] class _PubsubGrpcManager: """Internal gRPC pubsub manager.""" def __init__(self, configuration: TopicConfiguration, credential_provider: CredentialProvider): - self._secure_channel = grpc.aio.secure_channel( - target=credential_provider.cache_endpoint, - credentials=grpc.ssl_channel_credentials(), - interceptors=_interceptors(credential_provider.auth_token, ClientType.TOPIC, None), - options=grpc_topic_channel_options_from_grpc_config( - configuration.get_transport_strategy().get_grpc_configuration() - ), - ) + if credential_provider.port == 443: + self._channel = grpc.aio.secure_channel( + target=credential_provider.cache_endpoint, + credentials=grpc.ssl_channel_credentials(), + interceptors=_interceptors(credential_provider.auth_token, ClientType.TOPIC, None), + options=grpc_topic_channel_options_from_grpc_config( + configuration.get_transport_strategy().get_grpc_configuration() + ), + ) + else: + self._channel = grpc.aio.insecure_channel( + target=f"{credential_provider.cache_endpoint}:{credential_provider.port}", + interceptors=_interceptors(credential_provider.auth_token, ClientType.TOPIC, None), + options=grpc_topic_channel_options_from_grpc_config( + configuration.get_transport_strategy().get_grpc_configuration() + ), + ) async def close(self) -> None: - await self._secure_channel.close() + await self._channel.close() def async_stub(self) -> pubsub_client.PubsubStub: - return pubsub_client.PubsubStub(self._secure_channel) # type: ignore[no-untyped-call] + return pubsub_client.PubsubStub(self._channel) # type: ignore[no-untyped-call] class _PubsubGrpcStreamManager: """Internal gRPC pubsub stream manager.""" def __init__(self, configuration: TopicConfiguration, credential_provider: CredentialProvider): - self._secure_channel = grpc.aio.secure_channel( - target=credential_provider.cache_endpoint, - credentials=grpc.ssl_channel_credentials(), - interceptors=_stream_interceptors(credential_provider.auth_token, ClientType.TOPIC), - options=grpc_topic_channel_options_from_grpc_config( - configuration.get_transport_strategy().get_grpc_configuration() - ), - ) + if credential_provider.port == 443: + self._channel = grpc.aio.secure_channel( + target=credential_provider.cache_endpoint, + credentials=grpc.ssl_channel_credentials(), + interceptors=_stream_interceptors(credential_provider.auth_token, ClientType.TOPIC), + options=grpc_topic_channel_options_from_grpc_config( + configuration.get_transport_strategy().get_grpc_configuration() + ), + ) + else: + self._channel = grpc.aio.insecure_channel( + target=f"{credential_provider.cache_endpoint}:{credential_provider.port}", + interceptors=_stream_interceptors(credential_provider.auth_token, ClientType.TOPIC), + options=grpc_topic_channel_options_from_grpc_config( + configuration.get_transport_strategy().get_grpc_configuration() + ), + ) async def close(self) -> None: - await self._secure_channel.close() + await self._channel.close() def async_stub(self) -> pubsub_client.PubsubStub: - return pubsub_client.PubsubStub(self._secure_channel) # type: ignore[no-untyped-call] + return pubsub_client.PubsubStub(self._channel) # type: ignore[no-untyped-call] class _TokenGrpcManager: """Internal gRPC token manager.""" def __init__(self, configuration: AuthConfiguration, credential_provider: CredentialProvider): - self._secure_channel = grpc.aio.secure_channel( - target=credential_provider.token_endpoint, - credentials=grpc.ssl_channel_credentials(), - interceptors=_interceptors( - credential_provider.auth_token, ClientType.TOKEN, configuration.get_retry_strategy() - ), - options=grpc_control_channel_options_from_grpc_config( - grpc_config=configuration.get_transport_strategy().get_grpc_configuration(), - ), - ) + if credential_provider.port == 443: + self._channel = grpc.aio.secure_channel( + target=credential_provider.token_endpoint, + credentials=grpc.ssl_channel_credentials(), + interceptors=_interceptors( + credential_provider.auth_token, ClientType.TOKEN, configuration.get_retry_strategy() + ), + options=grpc_control_channel_options_from_grpc_config( + grpc_config=configuration.get_transport_strategy().get_grpc_configuration(), + ), + ) + else: + self._channel = grpc.aio.insecure_channel( + target=f"{credential_provider.token_endpoint}:{credential_provider.port}", + interceptors=_interceptors( + credential_provider.auth_token, ClientType.TOKEN, configuration.get_retry_strategy() + ), + options=grpc_control_channel_options_from_grpc_config( + grpc_config=configuration.get_transport_strategy().get_grpc_configuration(), + ), + ) async def close(self) -> None: - await self._secure_channel.close() + await self._channel.close() def async_stub(self) -> token_client.TokenStub: - return token_client.TokenStub(self._secure_channel) # type: ignore[no-untyped-call] + return token_client.TokenStub(self._channel) # type: ignore[no-untyped-call] def _interceptors( diff --git a/src/momento/internal/synchronous/_scs_grpc_manager.py b/src/momento/internal/synchronous/_scs_grpc_manager.py index 7240d30f..2686c5c0 100644 --- a/src/momento/internal/synchronous/_scs_grpc_manager.py +++ b/src/momento/internal/synchronous/_scs_grpc_manager.py @@ -34,44 +34,60 @@ class _ControlGrpcManager: - """Internal gRPC control mananger.""" + """Internal gRPC control manager.""" def __init__(self, configuration: Configuration, credential_provider: CredentialProvider): - self._secure_channel = grpc.secure_channel( - target=credential_provider.control_endpoint, - credentials=channel_credentials_from_root_certs_or_default(configuration), - options=grpc_control_channel_options_from_grpc_config( - grpc_config=configuration.get_transport_strategy().get_grpc_configuration(), - ), - ) + if credential_provider.port == 443: + self._channel = grpc.secure_channel( + target=credential_provider.control_endpoint, + credentials=channel_credentials_from_root_certs_or_default(configuration), + options=grpc_control_channel_options_from_grpc_config( + grpc_config=configuration.get_transport_strategy().get_grpc_configuration(), + ), + ) + else: + self._channel = grpc.insecure_channel( + target=f"{credential_provider.control_endpoint}:{credential_provider.port}", + options=grpc_control_channel_options_from_grpc_config( + grpc_config=configuration.get_transport_strategy().get_grpc_configuration(), + ), + ) intercept_channel = grpc.intercept_channel( - self._secure_channel, + self._channel, *_interceptors(credential_provider.auth_token, ClientType.CACHE, configuration.get_retry_strategy()), ) self._stub = control_client.ScsControlStub(intercept_channel) # type: ignore[no-untyped-call] def close(self) -> None: - self._secure_channel.close() + self._channel.close() def stub(self) -> control_client.ScsControlStub: return self._stub class _DataGrpcManager: - """Internal gRPC data mananger.""" + """Internal gRPC data manager.""" def __init__(self, configuration: Configuration, credential_provider: CredentialProvider): self._logger = logs.logger - self._secure_channel = grpc.secure_channel( - target=credential_provider.cache_endpoint, - credentials=channel_credentials_from_root_certs_or_default(configuration), - options=grpc_data_channel_options_from_grpc_config( - configuration.get_transport_strategy().get_grpc_configuration() - ), - ) + if credential_provider.port == 443: + self._channel = grpc.secure_channel( + target=credential_provider.cache_endpoint, + credentials=channel_credentials_from_root_certs_or_default(configuration), + options=grpc_data_channel_options_from_grpc_config( + configuration.get_transport_strategy().get_grpc_configuration() + ), + ) + else: + self._channel = grpc.insecure_channel( + target=f"{credential_provider.cache_endpoint}:{credential_provider.port}", + options=grpc_data_channel_options_from_grpc_config( + configuration.get_transport_strategy().get_grpc_configuration() + ), + ) intercept_channel = grpc.intercept_channel( - self._secure_channel, + self._channel, *_interceptors(credential_provider.auth_token, ClientType.CACHE, configuration.get_retry_strategy()), ) self._stub = cache_client.ScsStub(intercept_channel) # type: ignore[no-untyped-call] @@ -97,8 +113,8 @@ def on_timeout() -> None: timeout_seconds, ) # the subscription is no longer needed; it was only meant to watch if we could connect eagerly - self._secure_channel.unsubscribe(on_state_change) - self._secure_channel.close() + self._channel.unsubscribe(on_state_change) + self._channel.close() raise ConnectionException( message="Failed to connect to Momento's server within given eager connection timeout", service=Service.CACHE, @@ -118,7 +134,7 @@ def on_state_change(state: grpc.ChannelConnectivity) -> None: if state == ready: self._logger.debug("Connected to Momento's server! Happy Caching!") # we successfully connected within the timeout and we no longer need this subscription - self._secure_channel.unsubscribe(on_state_change) + self._channel.unsubscribe(on_state_change) # this indicates to the connection event that we were successful in establishing an eager connection connection_event.set() @@ -129,12 +145,12 @@ def on_state_change(state: grpc.ChannelConnectivity) -> None: else: self._logger.warn(f"Unexpected connection state while trying to eagerly connect: {state}") # we could not connect within the timeout and we no longer need this subscription - self._secure_channel.unsubscribe(on_state_change) + self._channel.unsubscribe(on_state_change) connection_event.set() # we subscribe to the channel that notifies us of state transitions, and the connection event above will take # care of unsubscribing from the channel incase the timeout has elapsed. - self._secure_channel.subscribe(on_state_change, try_to_connect=True) + self._channel.subscribe(on_state_change, try_to_connect=True) connection_established = connection_event.wait(timeout_seconds) if not connection_established: @@ -142,7 +158,7 @@ def on_state_change(state: grpc.ChannelConnectivity) -> None: def close(self) -> None: self._logger.debug("Closing and tearing down gRPC channel") - self._secure_channel.close() + self._channel.close() def stub(self) -> cache_client.ScsStub: return self._stub @@ -152,20 +168,28 @@ class _PubsubGrpcManager: """Internal gRPC pubsub manager.""" def __init__(self, configuration: TopicConfiguration, credential_provider: CredentialProvider): - self._secure_channel = grpc.secure_channel( - target=credential_provider.cache_endpoint, - credentials=grpc.ssl_channel_credentials(), - options=grpc_topic_channel_options_from_grpc_config( - configuration.get_transport_strategy().get_grpc_configuration() - ), - ) + if credential_provider.port == 443: + self._channel = grpc.secure_channel( + target=credential_provider.cache_endpoint, + credentials=grpc.ssl_channel_credentials(), + options=grpc_topic_channel_options_from_grpc_config( + configuration.get_transport_strategy().get_grpc_configuration() + ), + ) + else: + self._channel = grpc.insecure_channel( + target=f"{credential_provider.cache_endpoint}:{credential_provider.port}", + options=grpc_topic_channel_options_from_grpc_config( + configuration.get_transport_strategy().get_grpc_configuration() + ), + ) intercept_channel = grpc.intercept_channel( - self._secure_channel, *_interceptors(credential_provider.auth_token, ClientType.TOPIC, None) + self._channel, *_interceptors(credential_provider.auth_token, ClientType.TOPIC, None) ) self._stub = pubsub_client.PubsubStub(intercept_channel) # type: ignore[no-untyped-call] def close(self) -> None: - self._secure_channel.close() + self._channel.close() def stub(self) -> pubsub_client.PubsubStub: return self._stub @@ -175,13 +199,21 @@ class _PubsubGrpcStreamManager: """Internal gRPC pubsub stream manager.""" def __init__(self, configuration: TopicConfiguration, credential_provider: CredentialProvider): - self._secure_channel = grpc.secure_channel( - target=credential_provider.cache_endpoint, - credentials=grpc.ssl_channel_credentials(), - options=grpc_topic_channel_options_from_grpc_config( - configuration.get_transport_strategy().get_grpc_configuration() - ), - ) + if credential_provider.port == 443: + self._secure_channel = grpc.secure_channel( + target=credential_provider.cache_endpoint, + credentials=grpc.ssl_channel_credentials(), + options=grpc_topic_channel_options_from_grpc_config( + configuration.get_transport_strategy().get_grpc_configuration() + ), + ) + else: + self._secure_channel = grpc.insecure_channel( + target=f"{credential_provider.cache_endpoint}:{credential_provider.port}", + options=grpc_topic_channel_options_from_grpc_config( + configuration.get_transport_strategy().get_grpc_configuration() + ), + ) intercept_channel = grpc.intercept_channel( self._secure_channel, *_stream_interceptors(credential_provider.auth_token, ClientType.TOPIC) ) @@ -198,21 +230,29 @@ class _TokenGrpcManager: """Internal gRPC token manager.""" def __init__(self, configuration: AuthConfiguration, credential_provider: CredentialProvider): - self._secure_channel = grpc.secure_channel( - target=credential_provider.token_endpoint, - credentials=grpc.ssl_channel_credentials(), - options=grpc_control_channel_options_from_grpc_config( - grpc_config=configuration.get_transport_strategy().get_grpc_configuration(), - ), - ) + if credential_provider.port == 443: + self._channel = grpc.secure_channel( + target=credential_provider.token_endpoint, + credentials=grpc.ssl_channel_credentials(), + options=grpc_control_channel_options_from_grpc_config( + grpc_config=configuration.get_transport_strategy().get_grpc_configuration(), + ), + ) + else: + self._channel = grpc.insecure_channel( + target=f"{credential_provider.token_endpoint}:{credential_provider.port}", + options=grpc_control_channel_options_from_grpc_config( + grpc_config=configuration.get_transport_strategy().get_grpc_configuration(), + ), + ) intercept_channel = grpc.intercept_channel( - self._secure_channel, + self._channel, *_interceptors(credential_provider.auth_token, ClientType.TOKEN, configuration.get_retry_strategy()), ) self._stub = token_client.TokenStub(intercept_channel) # type: ignore[no-untyped-call] def close(self) -> None: - self._secure_channel.close() + self._channel.close() def stub(self) -> token_client.TokenStub: return self._stub