diff --git a/providers/openfeature-provider-flagd/README.md b/providers/openfeature-provider-flagd/README.md index aa63c55c..6833da0c 100644 --- a/providers/openfeature-provider-flagd/README.md +++ b/providers/openfeature-provider-flagd/README.md @@ -10,6 +10,14 @@ pip install openfeature-provider-flagd ## Configuration and Usage +The flagd provider can operate in two modes: [RPC](#remote-resolver-rpc) (evaluation takes place in flagd, via gRPC calls) or [in-process](#in-process-resolver) (evaluation takes place in-process, with the provider getting a ruleset from a compliant sync-source). + +### Remote resolver (RPC) + +This is the default mode of operation of the provider. +In this mode, `FlagdProvider` communicates with [flagd](https://github.com/open-feature/flagd) via the gRPC protocol. +Flag evaluations take place remotely at the connected flagd instance. + Instantiate a new FlagdProvider instance and configure the OpenFeature SDK to use it: ```python @@ -19,7 +27,9 @@ from openfeature.contrib.provider.flagd import FlagdProvider api.set_provider(FlagdProvider()) ``` -To use in-process evaluation in offline mode with a file as source: +### In-process resolver + +This mode performs flag evaluations locally (in-process). ```python from openfeature import api @@ -36,12 +46,71 @@ api.set_provider(FlagdProvider( The default options can be defined in the FlagdProvider constructor. -| Option name | Type & Values | Default | -|----------------|---------------|-----------| -| host | str | localhost | -| port | int | 8013 | -| schema | str | http | -| timeout | int | 2 | +| Option name | Environment variable name | Type & Values | Default | Compatible resolver | +| ------------------------ | ------------------------------ | -------------------------- | ----------------------------- | ------------------- | +| resolver_type | FLAGD_RESOLVER | enum - `rpc`, `in-process` | rpc | | +| host | FLAGD_HOST | str | localhost | rpc & in-process | +| port | FLAGD_PORT | int | 8013 (rpc), 8015 (in-process) | rpc & in-process | +| tls | FLAGD_TLS | bool | false | rpc & in-process | +| deadline | FLAGD_DEADLINE_MS | int | 500 | rpc & in-process | +| stream_deadline_ms | FLAGD_STREAM_DEADLINE_MS | int | 600000 | rpc & in-process | +| keep_alive_time | FLAGD_KEEP_ALIVE_TIME_MS | int | 0 | rpc & in-process | +| selector | FLAGD_SOURCE_SELECTOR | str | null | in-process | +| cache_type | FLAGD_CACHE | enum - `lru`, `disabled` | lru | rpc | +| max_cache_size | FLAGD_MAX_CACHE_SIZE | int | 1000 | rpc | +| retry_backoff_ms | FLAGD_RETRY_BACKOFF_MS | int | 1000 | rpc | +| offline_flag_source_path | FLAGD_OFFLINE_FLAG_SOURCE_PATH | str | null | in-process | + + + +> [!NOTE] +> Some configurations are only applicable for RPC resolver. + + + +### Reconnection + +Reconnection is supported by the underlying gRPC connections. +If the connection to flagd is lost, it will reconnect automatically. +A failure to connect will result in an [error event](https://openfeature.dev/docs/reference/concepts/events#provider_error) from the provider, though it will attempt to reconnect indefinitely. + +### Deadlines + +Deadlines are used to define how long the provider waits to complete initialization or flag evaluations. +They behave differently based on the resolver type. + +#### Deadlines with Remote resolver (RPC) + +If the remote evaluation call is not completed within this deadline, the gRPC call is terminated with the error `DEADLINE_EXCEEDED` +and the evaluation will default. + +### TLS + +TLS is available in situations where flagd is running on another host. + + ## License diff --git a/providers/openfeature-provider-flagd/pyproject.toml b/providers/openfeature-provider-flagd/pyproject.toml index 696dfd00..7e74e810 100644 --- a/providers/openfeature-provider-flagd/pyproject.toml +++ b/providers/openfeature-provider-flagd/pyproject.toml @@ -18,7 +18,7 @@ classifiers = [ keywords = [] dependencies = [ "openfeature-sdk>=0.6.0", - "grpcio>=1.60.0", + "grpcio>=1.68.0", "protobuf>=4.25.2", "mmh3>=4.1.0", "panzi-json-logic>=1.0.1", diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py index a95c3153..a393d270 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py @@ -2,6 +2,33 @@ import typing from enum import Enum + +class ResolverType(Enum): + RPC = "rpc" + IN_PROCESS = "in-process" + + +DEFAULT_DEADLINE = 500 +DEFAULT_HOST = "localhost" +DEFAULT_KEEP_ALIVE = 0 +DEFAULT_OFFLINE_SOURCE_PATH: typing.Optional[str] = None +DEFAULT_PORT_IN_PROCESS = 8015 +DEFAULT_PORT_RPC = 8013 +DEFAULT_RESOLVER_TYPE = ResolverType.RPC +DEFAULT_RETRY_BACKOFF = 1000 +DEFAULT_STREAM_DEADLINE = 600000 +DEFAULT_TLS = False + +ENV_VAR_DEADLINE_MS = "FLAGD_DEADLINE_MS" +ENV_VAR_HOST = "FLAGD_HOST" +ENV_VAR_KEEP_ALIVE_TIME_MS = "FLAGD_KEEP_ALIVE_TIME_MS" +ENV_VAR_OFFLINE_FLAG_SOURCE_PATH = "FLAGD_OFFLINE_FLAG_SOURCE_PATH" +ENV_VAR_PORT = "FLAGD_PORT" +ENV_VAR_RESOLVER_TYPE = "FLAGD_RESOLVER_TYPE" +ENV_VAR_RETRY_BACKOFF_MS = "FLAGD_RETRY_BACKOFF_MS" +ENV_VAR_STREAM_DEADLINE_MS = "FLAGD_STREAM_DEADLINE_MS" +ENV_VAR_TLS = "FLAGD_TLS" + T = typing.TypeVar("T") @@ -18,42 +45,83 @@ def env_or_default( return val if cast is None else cast(val) -class ResolverType(Enum): - GRPC = "grpc" - IN_PROCESS = "in-process" - - class Config: def __init__( # noqa: PLR0913 self, host: typing.Optional[str] = None, port: typing.Optional[int] = None, tls: typing.Optional[bool] = None, - timeout: typing.Optional[int] = None, resolver_type: typing.Optional[ResolverType] = None, offline_flag_source_path: typing.Optional[str] = None, - offline_poll_interval_seconds: typing.Optional[float] = None, + retry_backoff_ms: typing.Optional[int] = None, + deadline: typing.Optional[int] = None, + stream_deadline_ms: typing.Optional[int] = None, + keep_alive_time: typing.Optional[int] = None, ): - self.host = env_or_default("FLAGD_HOST", "localhost") if host is None else host - self.port = ( - env_or_default("FLAGD_PORT", 8013, cast=int) if port is None else port - ) + self.host = env_or_default(ENV_VAR_HOST, DEFAULT_HOST) if host is None else host + self.tls = ( - env_or_default("FLAGD_TLS", False, cast=str_to_bool) if tls is None else tls + env_or_default(ENV_VAR_TLS, DEFAULT_TLS, cast=str_to_bool) + if tls is None + else tls ) - self.timeout = 5 if timeout is None else timeout + + self.retry_backoff_ms: int = ( + int( + env_or_default( + ENV_VAR_RETRY_BACKOFF_MS, DEFAULT_RETRY_BACKOFF, cast=int + ) + ) + if retry_backoff_ms is None + else retry_backoff_ms + ) + self.resolver_type = ( - ResolverType(env_or_default("FLAGD_RESOLVER_TYPE", "grpc")) + ResolverType(env_or_default(ENV_VAR_RESOLVER_TYPE, DEFAULT_RESOLVER_TYPE)) if resolver_type is None else resolver_type ) + + default_port = ( + DEFAULT_PORT_RPC + if self.resolver_type is ResolverType.RPC + else DEFAULT_PORT_IN_PROCESS + ) + + self.port: int = ( + int(env_or_default(ENV_VAR_PORT, default_port, cast=int)) + if port is None + else port + ) + self.offline_flag_source_path = ( - env_or_default("FLAGD_OFFLINE_FLAG_SOURCE_PATH", None) + env_or_default( + ENV_VAR_OFFLINE_FLAG_SOURCE_PATH, DEFAULT_OFFLINE_SOURCE_PATH + ) if offline_flag_source_path is None else offline_flag_source_path ) - self.offline_poll_interval_seconds = ( - float(env_or_default("FLAGD_OFFLINE_POLL_INTERVAL_SECONDS", 1.0)) - if offline_poll_interval_seconds is None - else offline_poll_interval_seconds + + self.deadline: int = ( + int(env_or_default(ENV_VAR_DEADLINE_MS, DEFAULT_DEADLINE, cast=int)) + if deadline is None + else deadline + ) + + self.stream_deadline_ms: int = ( + int( + env_or_default( + ENV_VAR_STREAM_DEADLINE_MS, DEFAULT_STREAM_DEADLINE, cast=int + ) + ) + if stream_deadline_ms is None + else stream_deadline_ms + ) + + self.keep_alive_time: int = ( + int( + env_or_default(ENV_VAR_KEEP_ALIVE_TIME_MS, DEFAULT_KEEP_ALIVE, cast=int) + ) + if keep_alive_time is None + else keep_alive_time ) diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py index 76307475..c45b4a86 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py @@ -22,6 +22,7 @@ """ import typing +import warnings from openfeature.evaluation_context import EvaluationContext from openfeature.flag_evaluation import FlagResolutionDetails @@ -42,10 +43,13 @@ def __init__( # noqa: PLR0913 host: typing.Optional[str] = None, port: typing.Optional[int] = None, tls: typing.Optional[bool] = None, + deadline: typing.Optional[int] = None, timeout: typing.Optional[int] = None, + retry_backoff_ms: typing.Optional[int] = None, resolver_type: typing.Optional[ResolverType] = None, offline_flag_source_path: typing.Optional[str] = None, - offline_poll_interval_seconds: typing.Optional[float] = None, + stream_deadline_ms: typing.Optional[int] = None, + keep_alive_time: typing.Optional[int] = None, ): """ Create an instance of the FlagdProvider @@ -53,23 +57,44 @@ def __init__( # noqa: PLR0913 :param host: the host to make requests to :param port: the port the flagd service is available on :param tls: enable/disable secure TLS connectivity - :param timeout: the maximum to wait before a request times out + :param deadline: the maximum to wait before a request times out + :param timeout: the maximum time to wait before a request times out + :param retry_backoff_ms: the number of milliseconds to backoff + :param offline_flag_source_path: the path to the flag source file + :param stream_deadline_ms: the maximum time to wait before a request times out + :param keep_alive_time: the number of milliseconds to keep alive + :param resolver_type: the type of resolver to use """ + if deadline is None and timeout is not None: + deadline = timeout * 1000 + warnings.warn( + "'timeout' property is deprecated, please use 'deadline' instead, be aware that 'deadline' is in milliseconds", + DeprecationWarning, + stacklevel=2, + ) + self.config = Config( host=host, port=port, tls=tls, - timeout=timeout, + deadline=deadline, + retry_backoff_ms=retry_backoff_ms, resolver_type=resolver_type, offline_flag_source_path=offline_flag_source_path, - offline_poll_interval_seconds=offline_poll_interval_seconds, + stream_deadline_ms=stream_deadline_ms, + keep_alive_time=keep_alive_time, ) self.resolver = self.setup_resolver() def setup_resolver(self) -> AbstractResolver: - if self.config.resolver_type == ResolverType.GRPC: - return GrpcResolver(self.config) + if self.config.resolver_type == ResolverType.RPC: + return GrpcResolver( + self.config, + self.emit_provider_ready, + self.emit_provider_error, + self.emit_provider_configuration_changed, + ) elif self.config.resolver_type == ResolverType.IN_PROCESS: return InProcessResolver(self.config, self) else: @@ -77,6 +102,9 @@ def setup_resolver(self) -> AbstractResolver: f"`resolver_type` parameter invalid: {self.config.resolver_type}" ) + def initialize(self, evaluation_context: EvaluationContext) -> None: + self.resolver.initialize(evaluation_context) + def shutdown(self) -> None: if self.resolver: self.resolver.shutdown() diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/__init__.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/__init__.py index d0d46f59..73923abb 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/__init__.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/__init__.py @@ -8,6 +8,8 @@ class AbstractResolver(typing.Protocol): + def initialize(self, evaluation_context: EvaluationContext) -> None: ... + def shutdown(self) -> None: ... def resolve_boolean_details( diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py index f57eef4e..9026e4b2 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py @@ -1,3 +1,6 @@ +import logging +import threading +import time import typing import grpc @@ -5,11 +8,14 @@ from google.protobuf.struct_pb2 import Struct from openfeature.evaluation_context import EvaluationContext +from openfeature.event import ProviderEventDetails from openfeature.exception import ( + ErrorCode, FlagNotFoundError, GeneralError, InvalidContextError, ParseError, + ProviderNotReadyError, TypeMismatchError, ) from openfeature.flag_evaluation import FlagResolutionDetails @@ -26,19 +32,124 @@ T = typing.TypeVar("T") +logger = logging.getLogger("openfeature.contrib") + class GrpcResolver: - def __init__(self, config: Config): + MAX_BACK_OFF = 120 + + def __init__( + self, + config: Config, + emit_provider_ready: typing.Callable[[ProviderEventDetails], None], + emit_provider_error: typing.Callable[[ProviderEventDetails], None], + emit_provider_configuration_changed: typing.Callable[ + [ProviderEventDetails], None + ], + ): self.config = config - channel_factory = ( - grpc.secure_channel if self.config.tls else grpc.insecure_channel + self.emit_provider_ready = emit_provider_ready + self.emit_provider_error = emit_provider_error + self.emit_provider_configuration_changed = emit_provider_configuration_changed + self.stub, self.channel = self._create_stub() + self.retry_backoff_seconds = config.retry_backoff_ms * 0.001 + self.streamline_deadline_seconds = config.stream_deadline_ms * 0.001 + self.deadline = config.deadline * 0.001 + self.connected = False + + def _create_stub( + self, + ) -> typing.Tuple[evaluation_pb2_grpc.ServiceStub, grpc.Channel]: + config = self.config + channel_factory = grpc.secure_channel if config.tls else grpc.insecure_channel + channel = channel_factory( + f"{config.host}:{config.port}", + options=(("grpc.keepalive_time_ms", config.keep_alive_time),), ) - self.channel = channel_factory(f"{self.config.host}:{self.config.port}") - self.stub = evaluation_pb2_grpc.ServiceStub(self.channel) + stub = evaluation_pb2_grpc.ServiceStub(channel) + return stub, channel + + def initialize(self, evaluation_context: EvaluationContext) -> None: + self.connect() def shutdown(self) -> None: + self.active = False self.channel.close() + def connect(self) -> None: + self.active = True + self.thread = threading.Thread( + target=self.listen, daemon=True, name="FlagdGrpcServiceWorkerThread" + ) + self.thread.start() + + ## block until ready or deadline reached + timeout = self.deadline + time.time() + while not self.connected and time.time() < timeout: + time.sleep(0.05) + logger.debug("Finished blocking gRPC state initialization") + + if not self.connected: + raise ProviderNotReadyError( + "Blocking init finished before data synced. Consider increasing startup deadline to avoid inconsistent evaluations." + ) + + def listen(self) -> None: + retry_delay = self.retry_backoff_seconds + + call_args = ( + {"timeout": self.streamline_deadline_seconds} + if self.streamline_deadline_seconds > 0 + else {} + ) + while self.active: + request = evaluation_pb2.EventStreamRequest() + try: + logger.debug("Setting up gRPC sync flags connection") + for message in self.stub.EventStream(request, **call_args): + if message.type == "provider_ready": + if not self.connected: + self.emit_provider_ready( + ProviderEventDetails( + message="gRPC sync connection established" + ) + ) + self.connected = True + # reset retry delay after successsful read + retry_delay = self.retry_backoff_seconds + + elif message.type == "configuration_change": + data = MessageToDict(message)["data"] + self.handle_changed_flags(data) + + if not self.active: + logger.info("Terminating gRPC sync thread") + return + except grpc.RpcError as e: + logger.error(f"SyncFlags stream error, {e.code()=} {e.details()=}") + # re-create the stub if there's a connection issue - otherwise reconnect does not work as expected + self.stub, self.channel = self._create_stub() + except ParseError: + logger.exception( + f"Could not parse flag data using flagd syntax: {message=}" + ) + + self.connected = False + self.emit_provider_error( + ProviderEventDetails( + message=f"gRPC sync disconnected, reconnecting in {retry_delay}s", + error_code=ErrorCode.GENERAL, + ) + ) + logger.info(f"gRPC sync disconnected, reconnecting in {retry_delay}s") + time.sleep(retry_delay) + retry_delay = min(1.1 * retry_delay, self.MAX_BACK_OFF) + + def handle_changed_flags(self, data: typing.Any) -> None: + changed_flags = list(data["flags"].keys()) + + self.emit_provider_configuration_changed(ProviderEventDetails(changed_flags)) + def resolve_boolean_details( self, key: str, @@ -87,7 +198,7 @@ def _resolve( # noqa: PLR0915 evaluation_context: typing.Optional[EvaluationContext], ) -> FlagResolutionDetails[T]: context = self._convert_context(evaluation_context) - call_args = {"timeout": self.config.timeout} + call_args = {"timeout": self.deadline} try: request: Message if flag_type == FlagType.BOOLEAN: diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/in_process.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/in_process.py index 69b4989a..a14dbb8c 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/in_process.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/in_process.py @@ -3,7 +3,7 @@ from openfeature.evaluation_context import EvaluationContext from openfeature.exception import FlagNotFoundError, ParseError from openfeature.flag_evaluation import FlagResolutionDetails, Reason -from openfeature.provider.provider import AbstractProvider +from openfeature.provider import AbstractProvider from ..config import Config from .process.file_watcher import FileWatcherFlagStore @@ -23,9 +23,12 @@ def __init__(self, config: Config, provider: AbstractProvider): self.flag_store = FileWatcherFlagStore( self.config.offline_flag_source_path, self.provider, - self.config.offline_poll_interval_seconds, + self.config.retry_backoff_ms * 0.001, ) + def initialize(self, evaluation_context: EvaluationContext) -> None: + pass + def shutdown(self) -> None: self.flag_store.shutdown() diff --git a/providers/openfeature-provider-flagd/tests/e2e/conftest.py b/providers/openfeature-provider-flagd/tests/e2e/conftest.py index af14e299..142ec7f0 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/conftest.py +++ b/providers/openfeature-provider-flagd/tests/e2e/conftest.py @@ -5,32 +5,25 @@ from tests.e2e.flagd_container import FlagdContainer from tests.e2e.steps import * # noqa: F403 -from openfeature import api -from openfeature.contrib.provider.flagd import FlagdProvider - JsonPrimitive = typing.Union[str, bool, float, int] TEST_HARNESS_PATH = "../../openfeature/test-harness" SPEC_PATH = "../../openfeature/spec" -@pytest.fixture(autouse=True, scope="package") -def setup(request, port, image, resolver_type): +@pytest.fixture(autouse=True, scope="module") +def setup(request, port, image): container: DockerContainer = FlagdContainer( image=image, port=port, ) # Setup code c = container.start() - api.set_provider( - FlagdProvider( - resolver_type=resolver_type, - port=int(container.get_exposed_port(port)), - ) - ) def fin(): c.stop() # Teardown code request.addfinalizer(fin) + + return c.get_exposed_port(port) diff --git a/providers/openfeature-provider-flagd/tests/e2e/steps.py b/providers/openfeature-provider-flagd/tests/e2e/steps.py index fe490c5f..477d8c25 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/steps.py +++ b/providers/openfeature-provider-flagd/tests/e2e/steps.py @@ -1,3 +1,4 @@ +import logging import time import typing @@ -8,8 +9,9 @@ from openfeature import api from openfeature.client import OpenFeatureClient +from openfeature.contrib.provider.flagd import FlagdProvider from openfeature.evaluation_context import EvaluationContext -from openfeature.event import EventDetails, ProviderEvent +from openfeature.event import ProviderEvent from openfeature.flag_evaluation import ErrorCode, FlagEvaluationDetails, Reason from openfeature.provider import ProviderStatus @@ -24,8 +26,12 @@ def evaluation_context() -> EvaluationContext: @given("a flagd provider is set", target_fixture="client") @given("a provider is registered", target_fixture="client") -def setup_provider() -> OpenFeatureClient: - client = api.get_client() +def setup_provider(setup, resolver_type, client_name) -> OpenFeatureClient: + api.set_provider( + FlagdProvider(resolver_type=resolver_type, port=setup, timeout=1), + client_name, + ) + client = api.get_client(client_name) wait_for(lambda: client.get_provider_status() == ProviderStatus.READY) return client @@ -491,27 +497,35 @@ def assert_reason( assert_equal(evaluation_result.reason, reason) -@when(parsers.cfparse("a PROVIDER_READY handler is added")) -def provider_ready_add(client: OpenFeatureClient, context): - def provider_ready_handler(event_details: EventDetails): - context["provider_ready_ran"] = True +@pytest.fixture() +def event_handles() -> list: + return [] - client.add_handler(ProviderEvent.PROVIDER_READY, provider_ready_handler) +@pytest.fixture() +def error_handles() -> list: + return [] -@then(parsers.cfparse("the PROVIDER_READY handler must run")) -def provider_ready_was_executed(client: OpenFeatureClient, context): - assert_true(context["provider_ready_ran"]) +@when( + parsers.cfparse( + "a {event_type:ProviderEvent} handler is added", + extra_types={"ProviderEvent": ProviderEvent}, + ), +) +def add_event_handler( + client: OpenFeatureClient, event_type: ProviderEvent, event_handles: list +): + def handler(event): + logging.debug((event_type, event)) + event_handles.append( + { + "type": event_type, + "event": event, + } + ) -@when(parsers.cfparse("a PROVIDER_CONFIGURATION_CHANGED handler is added")) -def provider_changed_add(client: OpenFeatureClient, context): - def provider_changed_handler(event_details: EventDetails): - context["provider_changed_ran"] = True - - client.add_handler( - ProviderEvent.PROVIDER_CONFIGURATION_CHANGED, provider_changed_handler - ) + client.add_handler(event_type, handler) @pytest.fixture(scope="function") @@ -519,30 +533,100 @@ def context(): return {} -@when(parsers.cfparse('a flag with key "{flag_key}" is modified')) -def assert_reason2( +@when( + parsers.cfparse( + "a {event_type:ProviderEvent} handler and a {event_type2:ProviderEvent} handler are added", + extra_types={"ProviderEvent": ProviderEvent}, + ) +) +def add_event_handlers( client: OpenFeatureClient, - context, - flag_key: str, + event_type: ProviderEvent, + event_type2: ProviderEvent, + event_handles, + error_handles, ): - context["flag_key"] = flag_key + add_event_handler(client, event_type, event_handles) + add_event_handler(client, event_type2, error_handles) +def assert_handlers( + handles, event_type: ProviderEvent, max_wait: int = 2, num_events: int = 1 +): + poll_interval = 1 + while max_wait > 0: + if sum([h["type"] == event_type for h in handles]) < num_events: + max_wait -= poll_interval + time.sleep(poll_interval) + continue + break + + logging.info(f"asserting num({event_type}) >= {num_events}: {handles}") + actual_num_events = sum([h["type"] == event_type for h in handles]) + assert ( + num_events <= actual_num_events + ), f"Expected {num_events} but got {actual_num_events}: {handles}" + + +@then( + parsers.cfparse( + "the {event_type:ProviderEvent} handler must run", + extra_types={"ProviderEvent": ProviderEvent}, + ) +) @then( - parsers.cfparse("the PROVIDER_CONFIGURATION_CHANGED handler must run"), + parsers.cfparse( + "the {event_type:ProviderEvent} handler must run when the provider connects", + extra_types={"ProviderEvent": ProviderEvent}, + ) ) -def provider_changed_was_executed(client: OpenFeatureClient, context): - wait_for(lambda: context.get("provider_changed_ran")) - assert_equal(context["provider_changed_ran"], True) +def assert_handler_run(event_type: ProviderEvent, event_handles): + assert_handlers(event_handles, event_type, max_wait=6) -@then(parsers.cfparse('the event details must indicate "{flag_name}" was altered')) -def flag_was_changed( - flag_name: str, - context, +@then( + parsers.cfparse( + "the {event_type:ProviderEvent} handler must run when the provider's connection is lost", + extra_types={"ProviderEvent": ProviderEvent}, + ) +) +def assert_disconnect_handler(error_handles, event_type: ProviderEvent): + # docker sync upstream restarts every 5s, waiting 2 cycles reduces test noise + assert_handlers(error_handles, event_type, max_wait=30) + + +@when( + parsers.cfparse('a flag with key "{flag_key}" is modified'), + target_fixture="changed_flag", +) +def changed_flag( + flag_key: str, +): + return flag_key + + +@then( + parsers.cfparse( + "when the connection is reestablished the {event_type:ProviderEvent} handler must run again", + extra_types={"ProviderEvent": ProviderEvent}, + ) +) +def assert_disconnect_error( + client: OpenFeatureClient, event_type: ProviderEvent, event_handles: list ): - wait_for(lambda: flag_name in context.get("changed_flags")) - assert_in(flag_name, context.get("changed_flags")) + assert_handlers(event_handles, event_type, max_wait=30, num_events=2) + + +@then(parsers.cfparse('the event details must indicate "{key}" was altered')) +def assert_flag_changed(event_handles, key): + handle = None + for h in event_handles: + if h["type"] == ProviderEvent.PROVIDER_CONFIGURATION_CHANGED: + handle = h + break + + assert handle is not None + assert key in handle["event"].flags_changed def wait_for(pred, poll_sec=2, timeout_sec=10): @@ -551,3 +635,26 @@ def wait_for(pred, poll_sec=2, timeout_sec=10): time.sleep(poll_sec) assert_true(pred()) return ok + + +@given("flagd is unavailable", target_fixture="client") +def flagd_unavailable(resolver_type): + api.set_provider( + FlagdProvider( + resolver_type=resolver_type, + port=99999, + ), + "unavailable", + ) + return api.get_client("unavailable") + + +@when("a flagd provider is set and initialization is awaited") +def flagd_init(client: OpenFeatureClient, event_handles, error_handles): + add_event_handler(client, ProviderEvent.PROVIDER_ERROR, error_handles) + add_event_handler(client, ProviderEvent.PROVIDER_READY, event_handles) + + +@then("an error should be indicated within the configured deadline") +def flagd_error(error_handles): + assert_handlers(error_handles, ProviderEvent.PROVIDER_ERROR) diff --git a/providers/openfeature-provider-flagd/tests/e2e/test_in-process-file.py b/providers/openfeature-provider-flagd/tests/e2e/test_in-process-file.py index 9f1568a4..86be937d 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/test_in-process-file.py +++ b/providers/openfeature-provider-flagd/tests/e2e/test_in-process-file.py @@ -5,12 +5,15 @@ import pytest import yaml -from pytest_bdd import scenario, scenarios +from pytest_bdd import given, scenario, scenarios from tests.e2e.conftest import SPEC_PATH, TEST_HARNESS_PATH +from tests.e2e.steps import wait_for from openfeature import api +from openfeature.client import OpenFeatureClient from openfeature.contrib.provider.flagd import FlagdProvider from openfeature.contrib.provider.flagd.config import ResolverType +from openfeature.provider import ProviderStatus KEY_EVALUATORS = "$evaluators" @@ -19,7 +22,7 @@ MERGED_FILE = "merged_file" -@pytest.fixture(params=["json", "yaml"], scope="package") +@pytest.fixture(params=["json", "yaml"], scope="module") def file_name(request): extension = request.param result = {KEY_FLAGS: {}, KEY_EVALUATORS: {}} @@ -49,17 +52,36 @@ def file_name(request): return outfile -@pytest.fixture(autouse=True, scope="package") -def setup(request, file_name): - """`file_name` tests""" +@pytest.fixture(autouse=True, scope="module") +def client_name() -> str: + return "in-process" + + +@pytest.fixture(autouse=True, scope="module") +def resolver_type() -> ResolverType: + return ResolverType.IN_PROCESS + + +@pytest.fixture(autouse=True, scope="module") +def setup(request, client_name, file_name, resolver_type): + """nothing to boot""" api.set_provider( FlagdProvider( - resolver_type=ResolverType.IN_PROCESS, + resolver_type=resolver_type, offline_flag_source_path=file_name.name, - ) + ), + client_name, ) +@given("a flagd provider is set", target_fixture="client") +@given("a provider is registered", target_fixture="client") +def setup_provider(client_name) -> OpenFeatureClient: + client = api.get_client(client_name) + wait_for(lambda: client.get_provider_status() == ProviderStatus.READY) + return client + + @pytest.mark.skip(reason="Eventing not implemented") @scenario(f"{TEST_HARNESS_PATH}/gherkin/flagd.feature", "Flag change event") def test_flag_change_event(): diff --git a/providers/openfeature-provider-flagd/tests/e2e/test_rpc.py b/providers/openfeature-provider-flagd/tests/e2e/test_rpc.py index 2a5d1c15..0a939d5a 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/test_rpc.py +++ b/providers/openfeature-provider-flagd/tests/e2e/test_rpc.py @@ -1,31 +1,30 @@ import pytest -from pytest_bdd import scenario, scenarios +from pytest_bdd import scenarios from tests.e2e.conftest import SPEC_PATH, TEST_HARNESS_PATH from openfeature.contrib.provider.flagd.config import ResolverType -@pytest.fixture(autouse=True, scope="package") +@pytest.fixture(autouse=True, scope="module") +def client_name() -> str: + return "rpc" + + +@pytest.fixture(autouse=True, scope="module") def resolver_type() -> ResolverType: - return ResolverType.GRPC + return ResolverType.RPC -@pytest.fixture(autouse=True, scope="package") +@pytest.fixture(autouse=True, scope="module") def port(): return 8013 -@pytest.fixture(autouse=True, scope="package") +@pytest.fixture(autouse=True, scope="module") def image(): return "ghcr.io/open-feature/flagd-testbed:v0.5.13" -@pytest.mark.skip(reason="Eventing not implemented") -@scenario(f"{TEST_HARNESS_PATH}/gherkin/flagd.feature", "Flag change event") -def test_flag_change_event(): - """not implemented""" - - scenarios( f"{TEST_HARNESS_PATH}/gherkin/flagd.feature", f"{TEST_HARNESS_PATH}/gherkin/flagd-json-evaluator.feature", diff --git a/providers/openfeature-provider-flagd/tests/e2e/test_rpc_reconnect.py b/providers/openfeature-provider-flagd/tests/e2e/test_rpc_reconnect.py new file mode 100644 index 00000000..b99df2be --- /dev/null +++ b/providers/openfeature-provider-flagd/tests/e2e/test_rpc_reconnect.py @@ -0,0 +1,30 @@ +import pytest +from pytest_bdd import scenarios +from tests.e2e.conftest import TEST_HARNESS_PATH + +from openfeature.contrib.provider.flagd.config import ResolverType + + +@pytest.fixture(autouse=True, scope="module") +def client_name() -> str: + return "rpc-reconnect" + + +@pytest.fixture(autouse=True, scope="module") +def resolver_type() -> ResolverType: + return ResolverType.RPC + + +@pytest.fixture(autouse=True, scope="module") +def port(): + return 8013 + + +@pytest.fixture(autouse=True, scope="module") +def image(): + return "ghcr.io/open-feature/flagd-testbed-unstable:v0.5.13" + + +scenarios( + f"{TEST_HARNESS_PATH}/gherkin/flagd-reconnect.feature", +) diff --git a/providers/openfeature-provider-flagd/tests/test_config.py b/providers/openfeature-provider-flagd/tests/test_config.py index 1fb0c720..54d7ee3a 100644 --- a/providers/openfeature-provider-flagd/tests/test_config.py +++ b/providers/openfeature-provider-flagd/tests/test_config.py @@ -1,29 +1,130 @@ -from openfeature.contrib.provider.flagd.config import Config +import pytest +from openfeature.contrib.provider.flagd.config import ( + DEFAULT_DEADLINE, + DEFAULT_HOST, + DEFAULT_KEEP_ALIVE, + DEFAULT_OFFLINE_SOURCE_PATH, + DEFAULT_PORT_IN_PROCESS, + DEFAULT_PORT_RPC, + DEFAULT_RESOLVER_TYPE, + DEFAULT_RETRY_BACKOFF, + DEFAULT_STREAM_DEADLINE, + DEFAULT_TLS, + ENV_VAR_DEADLINE_MS, + ENV_VAR_HOST, + ENV_VAR_KEEP_ALIVE_TIME_MS, + ENV_VAR_OFFLINE_FLAG_SOURCE_PATH, + ENV_VAR_PORT, + ENV_VAR_RESOLVER_TYPE, + ENV_VAR_RETRY_BACKOFF_MS, + ENV_VAR_STREAM_DEADLINE_MS, + ENV_VAR_TLS, + Config, + ResolverType, +) -def test_return_default_values(): + +def test_return_default_values_rpc(): config = Config() - assert config.host == "localhost" - assert config.port == 8013 - assert config.tls is False - assert config.timeout == 5 + assert config.deadline == DEFAULT_DEADLINE + assert config.host == DEFAULT_HOST + assert config.keep_alive_time == DEFAULT_KEEP_ALIVE + assert config.offline_flag_source_path == DEFAULT_OFFLINE_SOURCE_PATH + assert config.port == DEFAULT_PORT_RPC + assert config.resolver_type == DEFAULT_RESOLVER_TYPE + assert config.retry_backoff_ms == DEFAULT_RETRY_BACKOFF + assert config.stream_deadline_ms == DEFAULT_STREAM_DEADLINE + assert config.tls is DEFAULT_TLS + + +def test_return_default_values_in_process(): + config = Config(resolver_type=ResolverType.IN_PROCESS) + assert config.deadline == DEFAULT_DEADLINE + assert config.host == DEFAULT_HOST + assert config.keep_alive_time == DEFAULT_KEEP_ALIVE + assert config.offline_flag_source_path == DEFAULT_OFFLINE_SOURCE_PATH + assert config.port == DEFAULT_PORT_IN_PROCESS + assert config.resolver_type == ResolverType.IN_PROCESS + assert config.retry_backoff_ms == DEFAULT_RETRY_BACKOFF + assert config.stream_deadline_ms == DEFAULT_STREAM_DEADLINE + assert config.tls is DEFAULT_TLS + +@pytest.fixture(params=ResolverType, scope="module") +def resolver_type(request): + return request.param -def test_overrides_defaults_with_environment(monkeypatch): - monkeypatch.setenv("FLAGD_HOST", "flagd") - monkeypatch.setenv("FLAGD_PORT", "1234") - monkeypatch.setenv("FLAGD_TLS", "true") + +def test_overrides_defaults_with_environment(monkeypatch, resolver_type): + deadline = 1 + host = "flagd" + keep_alive = 2 + offline_path = "path" + port = 1234 + retry_backoff = 3 + stream_deadline = 4 + tls = True + + monkeypatch.setenv(ENV_VAR_DEADLINE_MS, str(deadline)) + monkeypatch.setenv(ENV_VAR_HOST, host) + monkeypatch.setenv(ENV_VAR_KEEP_ALIVE_TIME_MS, str(keep_alive)) + monkeypatch.setenv(ENV_VAR_OFFLINE_FLAG_SOURCE_PATH, offline_path) + monkeypatch.setenv(ENV_VAR_PORT, str(port)) + monkeypatch.setenv(ENV_VAR_RESOLVER_TYPE, str(resolver_type.value)) + monkeypatch.setenv(ENV_VAR_RETRY_BACKOFF_MS, str(retry_backoff)) + monkeypatch.setenv(ENV_VAR_STREAM_DEADLINE_MS, str(stream_deadline)) + monkeypatch.setenv(ENV_VAR_TLS, str(tls)) config = Config() - assert config.host == "flagd" - assert config.port == 1234 - assert config.tls is True + assert config.deadline == deadline + assert config.host == host + assert config.keep_alive_time == keep_alive + assert config.offline_flag_source_path == offline_path + assert config.port == port + assert config.resolver_type == resolver_type + assert config.retry_backoff_ms == retry_backoff + assert config.stream_deadline_ms == stream_deadline + assert config.tls is tls + +def test_uses_arguments_over_environments_and_defaults(monkeypatch, resolver_type): + deadline = 1 + host = "flagd" + keep_alive = 2 + offline_path = "path" + port = 1234 + retry_backoff = 3 + stream_deadline = 4 + tls = True -def test_uses_arguments_over_environments_and_defaults(monkeypatch): - monkeypatch.setenv("FLAGD_HOST", "flagd") + monkeypatch.setenv(ENV_VAR_DEADLINE_MS, str(deadline) + "value") + monkeypatch.setenv(ENV_VAR_HOST, host + "value") + monkeypatch.setenv(ENV_VAR_KEEP_ALIVE_TIME_MS, str(keep_alive) + "value") + monkeypatch.setenv(ENV_VAR_OFFLINE_FLAG_SOURCE_PATH, offline_path + "value") + monkeypatch.setenv(ENV_VAR_PORT, str(port) + "value") + monkeypatch.setenv(ENV_VAR_RESOLVER_TYPE, str(resolver_type) + "value") + monkeypatch.setenv(ENV_VAR_RETRY_BACKOFF_MS, str(retry_backoff) + "value") + monkeypatch.setenv(ENV_VAR_STREAM_DEADLINE_MS, str(stream_deadline) + "value") + monkeypatch.setenv(ENV_VAR_TLS, str(tls) + "value") - config = Config(host="flagd2", port=12345, tls=True) - assert config.host == "flagd2" - assert config.port == 12345 - assert config.tls is True + config = Config( + deadline=deadline, + host=host, + port=port, + resolver_type=resolver_type, + retry_backoff_ms=retry_backoff, + stream_deadline_ms=stream_deadline, + tls=tls, + keep_alive_time=keep_alive, + offline_flag_source_path=offline_path, + ) + assert config.deadline == deadline + assert config.host == host + assert config.keep_alive_time == keep_alive + assert config.offline_flag_source_path == offline_path + assert config.port == port + assert config.resolver_type == resolver_type + assert config.retry_backoff_ms == retry_backoff + assert config.stream_deadline_ms == stream_deadline + assert config.tls is tls