Skip to content
Merged
83 changes: 76 additions & 7 deletions providers/openfeature-provider-flagd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ pip install openfeature-provider-flagd

## Configuration and Usage

The flagd provider can operate in two modes: [RPC](#remote-resolver-rpc) (evaluation takes place in flagd, via gRPC calls) or [in-process](#in-process-resolver) (evaluation takes place in-process, with the provider getting a ruleset from a compliant sync-source).

### Remote resolver (RPC)

This is the default mode of operation of the provider.
In this mode, `FlagdProvider` communicates with [flagd](https://github.com/open-feature/flagd) via the gRPC protocol.
Flag evaluations take place remotely at the connected flagd instance.

Instantiate a new FlagdProvider instance and configure the OpenFeature SDK to use it:

```python
Expand All @@ -19,7 +27,9 @@ from openfeature.contrib.provider.flagd import FlagdProvider
api.set_provider(FlagdProvider())
```

To use in-process evaluation in offline mode with a file as source:
### In-process resolver

This mode performs flag evaluations locally (in-process).

```python
from openfeature import api
Expand All @@ -36,12 +46,71 @@ api.set_provider(FlagdProvider(

The default options can be defined in the FlagdProvider constructor.

| Option name | Type & Values | Default |
|----------------|---------------|-----------|
| host | str | localhost |
| port | int | 8013 |
| schema | str | http |
| timeout | int | 2 |
| Option name | Environment variable name | Type & Values | Default | Compatible resolver |
| ------------------------ | ------------------------------ | -------------------------- | ----------------------------- | ------------------- |
| resolver_type | FLAGD_RESOLVER | enum - `rpc`, `in-process` | rpc | |
| host | FLAGD_HOST | str | localhost | rpc & in-process |
| port | FLAGD_PORT | int | 8013 (rpc), 8015 (in-process) | rpc & in-process |
| tls | FLAGD_TLS | bool | false | rpc & in-process |
| deadline | FLAGD_DEADLINE_MS | int | 500 | rpc & in-process |
| stream_deadline_ms | FLAGD_STREAM_DEADLINE_MS | int | 600000 | rpc & in-process |
| keep_alive_time | FLAGD_KEEP_ALIVE_TIME_MS | int | 0 | rpc & in-process |
| selector | FLAGD_SOURCE_SELECTOR | str | null | in-process |
| cache_type | FLAGD_CACHE | enum - `lru`, `disabled` | lru | rpc |
| max_cache_size | FLAGD_MAX_CACHE_SIZE | int | 1000 | rpc |
| retry_backoff_ms | FLAGD_RETRY_BACKOFF_MS | int | 1000 | rpc |
| offline_flag_source_path | FLAGD_OFFLINE_FLAG_SOURCE_PATH | str | null | in-process |

<!-- not implemented
| target_uri | FLAGD_TARGET_URI | alternative to host/port, supporting custom name resolution | string | null | rpc & in-process |
| socket_path | FLAGD_SOCKET_PATH | alternative to host port, unix socket | String | null | rpc & in-process |
| cert_path | FLAGD_SERVER_CERT_PATH | tls cert path | String | null | rpc & in-process |
| max_event_stream_retries | FLAGD_MAX_EVENT_STREAM_RETRIES | int | 5 | rpc |
| context_enricher | - | sync-metadata to evaluation context mapping function | function | identity function | in-process |
| offline_pollIntervalMs | FLAGD_OFFLINE_POLL_MS | poll interval for reading offlineFlagSourcePath | int | 5000 | in-process |
-->

> [!NOTE]
> Some configurations are only applicable for RPC resolver.

<!--
### Unix socket support
Unix socket communication with flagd is facilitated by usaging of the linux-native `epoll` library on `linux-x86_64`
only (ARM support is pending the release of `netty-transport-native-epoll` v5).
Unix sockets are not supported on other platforms or architectures.
-->

### Reconnection

Reconnection is supported by the underlying gRPC connections.
If the connection to flagd is lost, it will reconnect automatically.
A failure to connect will result in an [error event](https://openfeature.dev/docs/reference/concepts/events#provider_error) from the provider, though it will attempt to reconnect indefinitely.

### Deadlines

Deadlines are used to define how long the provider waits to complete initialization or flag evaluations.
They behave differently based on the resolver type.

#### Deadlines with Remote resolver (RPC)

If the remote evaluation call is not completed within this deadline, the gRPC call is terminated with the error `DEADLINE_EXCEEDED`
and the evaluation will default.

### TLS

TLS is available in situations where flagd is running on another host.

<!--
You may optionally supply an X.509 certificate in PEM format. Otherwise, the default certificate store will be used.
```java
FlagdProvider flagdProvider = new FlagdProvider(
FlagdOptions.builder()
.host("myflagdhost")
.tls(true) // use TLS
.certPath("etc/cert/ca.crt") // PEM cert
.build());
```
-->

## License

Expand Down
2 changes: 1 addition & 1 deletion providers/openfeature-provider-flagd/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ classifiers = [
keywords = []
dependencies = [
"openfeature-sdk>=0.6.0",
"grpcio>=1.60.0",
"grpcio>=1.68.0",
"protobuf>=4.25.2",
"mmh3>=4.1.0",
"panzi-json-logic>=1.0.1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,33 @@
import typing
from enum import Enum


class ResolverType(Enum):
RPC = "rpc"
IN_PROCESS = "in-process"


DEFAULT_DEADLINE = 500
DEFAULT_HOST = "localhost"
DEFAULT_KEEP_ALIVE = 0
DEFAULT_OFFLINE_SOURCE_PATH: typing.Optional[str] = None
DEFAULT_PORT_IN_PROCESS = 8015
DEFAULT_PORT_RPC = 8013
DEFAULT_RESOLVER_TYPE = ResolverType.RPC
DEFAULT_RETRY_BACKOFF = 1000
DEFAULT_STREAM_DEADLINE = 600000
DEFAULT_TLS = False

ENV_VAR_DEADLINE_MS = "FLAGD_DEADLINE_MS"
ENV_VAR_HOST = "FLAGD_HOST"
ENV_VAR_KEEP_ALIVE_TIME_MS = "FLAGD_KEEP_ALIVE_TIME_MS"
ENV_VAR_OFFLINE_FLAG_SOURCE_PATH = "FLAGD_OFFLINE_FLAG_SOURCE_PATH"
ENV_VAR_PORT = "FLAGD_PORT"
ENV_VAR_RESOLVER_TYPE = "FLAGD_RESOLVER_TYPE"
ENV_VAR_RETRY_BACKOFF_MS = "FLAGD_RETRY_BACKOFF_MS"
ENV_VAR_STREAM_DEADLINE_MS = "FLAGD_STREAM_DEADLINE_MS"
ENV_VAR_TLS = "FLAGD_TLS"

T = typing.TypeVar("T")


Expand All @@ -18,42 +45,83 @@ def env_or_default(
return val if cast is None else cast(val)


class ResolverType(Enum):
GRPC = "grpc"
IN_PROCESS = "in-process"


class Config:
def __init__( # noqa: PLR0913
self,
host: typing.Optional[str] = None,
port: typing.Optional[int] = None,
tls: typing.Optional[bool] = None,
timeout: typing.Optional[int] = None,
resolver_type: typing.Optional[ResolverType] = None,
offline_flag_source_path: typing.Optional[str] = None,
offline_poll_interval_seconds: typing.Optional[float] = None,
retry_backoff_ms: typing.Optional[int] = None,
deadline: typing.Optional[int] = None,
stream_deadline_ms: typing.Optional[int] = None,
keep_alive_time: typing.Optional[int] = None,
):
self.host = env_or_default("FLAGD_HOST", "localhost") if host is None else host
self.port = (
env_or_default("FLAGD_PORT", 8013, cast=int) if port is None else port
)
self.host = env_or_default(ENV_VAR_HOST, DEFAULT_HOST) if host is None else host

self.tls = (
env_or_default("FLAGD_TLS", False, cast=str_to_bool) if tls is None else tls
env_or_default(ENV_VAR_TLS, DEFAULT_TLS, cast=str_to_bool)
if tls is None
else tls
)
self.timeout = 5 if timeout is None else timeout

self.retry_backoff_ms: int = (
int(
env_or_default(
ENV_VAR_RETRY_BACKOFF_MS, DEFAULT_RETRY_BACKOFF, cast=int
)
)
if retry_backoff_ms is None
else retry_backoff_ms
)

self.resolver_type = (
ResolverType(env_or_default("FLAGD_RESOLVER_TYPE", "grpc"))
ResolverType(env_or_default(ENV_VAR_RESOLVER_TYPE, DEFAULT_RESOLVER_TYPE))
if resolver_type is None
else resolver_type
)

default_port = (
DEFAULT_PORT_RPC
if self.resolver_type is ResolverType.RPC
else DEFAULT_PORT_IN_PROCESS
)

self.port: int = (
int(env_or_default(ENV_VAR_PORT, default_port, cast=int))
if port is None
else port
)

self.offline_flag_source_path = (
env_or_default("FLAGD_OFFLINE_FLAG_SOURCE_PATH", None)
env_or_default(
ENV_VAR_OFFLINE_FLAG_SOURCE_PATH, DEFAULT_OFFLINE_SOURCE_PATH
)
if offline_flag_source_path is None
else offline_flag_source_path
)
self.offline_poll_interval_seconds = (
float(env_or_default("FLAGD_OFFLINE_POLL_INTERVAL_SECONDS", 1.0))
if offline_poll_interval_seconds is None
else offline_poll_interval_seconds

self.deadline: int = (
int(env_or_default(ENV_VAR_DEADLINE_MS, DEFAULT_DEADLINE, cast=int))
if deadline is None
else deadline
)

self.stream_deadline_ms: int = (
int(
env_or_default(
ENV_VAR_STREAM_DEADLINE_MS, DEFAULT_STREAM_DEADLINE, cast=int
)
)
if stream_deadline_ms is None
else stream_deadline_ms
)

self.keep_alive_time: int = (
int(
env_or_default(ENV_VAR_KEEP_ALIVE_TIME_MS, DEFAULT_KEEP_ALIVE, cast=int)
)
if keep_alive_time is None
else keep_alive_time
)
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"""

import typing
import warnings

from openfeature.evaluation_context import EvaluationContext
from openfeature.flag_evaluation import FlagResolutionDetails
Expand All @@ -42,41 +43,68 @@ def __init__( # noqa: PLR0913
host: typing.Optional[str] = None,
port: typing.Optional[int] = None,
tls: typing.Optional[bool] = None,
deadline: typing.Optional[int] = None,
timeout: typing.Optional[int] = None,
retry_backoff_ms: typing.Optional[int] = None,
resolver_type: typing.Optional[ResolverType] = None,
offline_flag_source_path: typing.Optional[str] = None,
offline_poll_interval_seconds: typing.Optional[float] = None,
stream_deadline_ms: typing.Optional[int] = None,
keep_alive_time: typing.Optional[int] = None,
):
"""
Create an instance of the FlagdProvider

:param host: the host to make requests to
:param port: the port the flagd service is available on
:param tls: enable/disable secure TLS connectivity
:param timeout: the maximum to wait before a request times out
:param deadline: the maximum to wait before a request times out
:param timeout: the maximum time to wait before a request times out
:param retry_backoff_ms: the number of milliseconds to backoff
:param offline_flag_source_path: the path to the flag source file
:param stream_deadline_ms: the maximum time to wait before a request times out
:param keep_alive_time: the number of milliseconds to keep alive
:param resolver_type: the type of resolver to use
"""
if deadline is None and timeout is not None:
deadline = timeout * 1000
warnings.warn(
"'timeout' property is deprecated, please use 'deadline' instead, be aware that 'deadline' is in milliseconds",
DeprecationWarning,
stacklevel=2,
)

self.config = Config(
host=host,
port=port,
tls=tls,
timeout=timeout,
deadline=deadline,
retry_backoff_ms=retry_backoff_ms,
resolver_type=resolver_type,
offline_flag_source_path=offline_flag_source_path,
offline_poll_interval_seconds=offline_poll_interval_seconds,
stream_deadline_ms=stream_deadline_ms,
keep_alive_time=keep_alive_time,
)

self.resolver = self.setup_resolver()

def setup_resolver(self) -> AbstractResolver:
if self.config.resolver_type == ResolverType.GRPC:
return GrpcResolver(self.config)
if self.config.resolver_type == ResolverType.RPC:
return GrpcResolver(
self.config,
self.emit_provider_ready,
self.emit_provider_error,
self.emit_provider_configuration_changed,
)
elif self.config.resolver_type == ResolverType.IN_PROCESS:
return InProcessResolver(self.config, self)
else:
raise ValueError(
f"`resolver_type` parameter invalid: {self.config.resolver_type}"
)

def initialize(self, evaluation_context: EvaluationContext) -> None:
self.resolver.initialize(evaluation_context)

def shutdown(self) -> None:
if self.resolver:
self.resolver.shutdown()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@


class AbstractResolver(typing.Protocol):
def initialize(self, evaluation_context: EvaluationContext) -> None: ...

def shutdown(self) -> None: ...

def resolve_boolean_details(
Expand Down
Loading