Skip to content

Commit db81de1

Browse files
committed
fix(flagd): adding events for rpc mode
Signed-off-by: Simon Schrottner <[email protected]>
1 parent 3477153 commit db81de1

File tree

10 files changed

+314
-66
lines changed

10 files changed

+314
-66
lines changed

providers/openfeature-provider-flagd/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ classifiers = [
1818
keywords = []
1919
dependencies = [
2020
"openfeature-sdk>=0.4.0",
21-
"grpcio>=1.60.0",
21+
"grpcio>=1.68.0",
2222
"protobuf>=4.25.2",
2323
"mmh3>=4.1.0",
2424
"panzi-json-logic>=1.0.1",

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,14 +69,22 @@ def __init__( # noqa: PLR0913
6969

7070
def setup_resolver(self) -> AbstractResolver:
7171
if self.config.resolver_type == ResolverType.GRPC:
72-
return GrpcResolver(self.config)
72+
return GrpcResolver(
73+
self.config,
74+
self.emit_provider_ready,
75+
self.emit_provider_error,
76+
self.emit_provider_configuration_changed,
77+
)
7378
elif self.config.resolver_type == ResolverType.IN_PROCESS:
7479
return InProcessResolver(self.config, self)
7580
else:
7681
raise ValueError(
7782
f"`resolver_type` parameter invalid: {self.config.resolver_type}"
7883
)
7984

85+
def initialize(self, evaluation_context: EvaluationContext) -> None:
86+
self.resolver.initialize(evaluation_context)
87+
8088
def shutdown(self) -> None:
8189
if self.resolver:
8290
self.resolver.shutdown()

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99

1010
class AbstractResolver(typing.Protocol):
11+
def initialize(self, evaluation_context: EvaluationContext) -> None: ...
12+
1113
def shutdown(self) -> None: ...
1214

1315
def resolve_boolean_details(

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
import logging
2+
import threading
3+
import time
14
import typing
25

36
import grpc
@@ -9,7 +12,9 @@
912
)
1013

1114
from openfeature.evaluation_context import EvaluationContext
15+
from openfeature.event import ProviderEventDetails
1216
from openfeature.exception import (
17+
ErrorCode,
1318
FlagNotFoundError,
1419
GeneralError,
1520
InvalidContextError,
@@ -23,19 +28,95 @@
2328

2429
T = typing.TypeVar("T")
2530

31+
logger = logging.getLogger("openfeature.contrib")
32+
2633

2734
class GrpcResolver:
28-
def __init__(self, config: Config):
35+
MAX_BACK_OFF = 120
36+
37+
def __init__(
38+
self,
39+
config: Config,
40+
emit_provider_ready: typing.Callable[[ProviderEventDetails], None],
41+
emit_provider_error: typing.Callable[[ProviderEventDetails], None],
42+
emit_provider_configuration_changed: typing.Callable[
43+
[ProviderEventDetails], None
44+
],
45+
):
2946
self.config = config
47+
self.emit_provider_ready = emit_provider_ready
48+
self.emit_provider_error = emit_provider_error
49+
self.emit_provider_configuration_changed = emit_provider_configuration_changed
3050
channel_factory = (
3151
grpc.secure_channel if self.config.tls else grpc.insecure_channel
3252
)
3353
self.channel = channel_factory(f"{self.config.host}:{self.config.port}")
3454
self.stub = evaluation_pb2_grpc.ServiceStub(self.channel)
55+
self.retry_backoff_seconds = 0.1
56+
self.connected = False
57+
58+
def initialize(self, evaluation_context: EvaluationContext) -> None:
59+
self.connect()
3560

3661
def shutdown(self) -> None:
62+
self.active = False
3763
self.channel.close()
3864

65+
def connect(self) -> None:
66+
self.active = True
67+
self.thread = threading.Thread(
68+
target=self.listen, daemon=True, name="FlagdGrpcServiceWorkerThread"
69+
)
70+
self.thread.start()
71+
72+
def listen(self) -> None:
73+
retry_delay = self.retry_backoff_seconds
74+
while self.active:
75+
request = evaluation_pb2.EventStreamRequest() # type:ignore[attr-defined]
76+
try:
77+
logger.debug("Setting up gRPC sync flags connection")
78+
for message in self.stub.EventStream(request):
79+
if message.type == "provider_ready":
80+
if not self.connected:
81+
self.emit_provider_ready(
82+
ProviderEventDetails(
83+
message="gRPC sync connection established"
84+
)
85+
)
86+
self.connected = True
87+
# reset retry delay after successsful read
88+
retry_delay = self.retry_backoff_seconds
89+
90+
elif message.type == "configuration_change":
91+
data = MessageToDict(message)["data"]
92+
self.handle_changed_flags(data)
93+
94+
if not self.active:
95+
logger.info("Terminating gRPC sync thread")
96+
return
97+
except grpc.RpcError as e:
98+
logger.error(f"SyncFlags stream error, {e.code()=} {e.details()=}")
99+
except ParseError:
100+
logger.exception(
101+
f"Could not parse flag data using flagd syntax: {message=}"
102+
)
103+
104+
self.connected = False
105+
self.emit_provider_error(
106+
ProviderEventDetails(
107+
message=f"gRPC sync disconnected, reconnecting in {retry_delay}s",
108+
error_code=ErrorCode.GENERAL,
109+
)
110+
)
111+
logger.info(f"gRPC sync disconnected, reconnecting in {retry_delay}s")
112+
time.sleep(retry_delay)
113+
retry_delay = min(2 * retry_delay, self.MAX_BACK_OFF)
114+
115+
def handle_changed_flags(self, data: typing.Any) -> None:
116+
changed_flags = list(data["flags"].keys())
117+
118+
self.emit_provider_configuration_changed(ProviderEventDetails(changed_flags))
119+
39120
def resolve_boolean_details(
40121
self,
41122
key: str,

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/in_process.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from openfeature.evaluation_context import EvaluationContext
44
from openfeature.exception import FlagNotFoundError, ParseError
55
from openfeature.flag_evaluation import FlagResolutionDetails, Reason
6-
from openfeature.provider.provider import AbstractProvider
6+
from openfeature.provider import AbstractProvider
77

88
from ..config import Config
99
from .process.file_watcher import FileWatcherFlagStore
@@ -26,6 +26,9 @@ def __init__(self, config: Config, provider: AbstractProvider):
2626
self.config.offline_poll_interval_seconds,
2727
)
2828

29+
def initialize(self, evaluation_context: EvaluationContext) -> None:
30+
pass
31+
2932
def shutdown(self) -> None:
3033
self.flag_store.shutdown()
3134

providers/openfeature-provider-flagd/tests/e2e/conftest.py

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,29 +5,22 @@
55
from tests.e2e.flagd_container import FlagdContainer
66
from tests.e2e.steps import * # noqa: F403
77

8-
from openfeature import api
9-
from openfeature.contrib.provider.flagd import FlagdProvider
10-
118
JsonPrimitive = typing.Union[str, bool, float, int]
129

1310

14-
@pytest.fixture(autouse=True, scope="package")
15-
def setup(request, port, image, resolver_type):
11+
@pytest.fixture(autouse=True, scope="module")
12+
def setup(request, port, image):
1613
container: DockerContainer = FlagdContainer(
1714
image=image,
1815
port=port,
1916
)
2017
# Setup code
2118
c = container.start()
22-
api.set_provider(
23-
FlagdProvider(
24-
resolver_type=resolver_type,
25-
port=int(container.get_exposed_port(port)),
26-
)
27-
)
2819

2920
def fin():
3021
c.stop()
3122

3223
# Teardown code
3324
request.addfinalizer(fin)
25+
26+
return c.get_exposed_port(port)

0 commit comments

Comments
 (0)