Skip to content

Commit 57b626b

Browse files
committed
fix(flagd): adding events for rpc mode
Signed-off-by: Simon Schrottner <[email protected]>
1 parent 038a343 commit 57b626b

File tree

10 files changed

+327
-73
lines changed

10 files changed

+327
-73
lines changed

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/proto/flagd/evaluation/v1/evaluation_pb2_grpc.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"""Client and server classes corresponding to protobuf-defined services."""
33
import grpc
44

5-
from flagd.evaluation.v1 import evaluation_pb2 as flagd_dot_evaluation_dot_v1_dot_evaluation__pb2
5+
from . import evaluation_pb2 as flagd_dot_evaluation_dot_v1_dot_evaluation__pb2
66

77

88
class ServiceStub(object):

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,14 +69,22 @@ def __init__( # noqa: PLR0913
6969

7070
def setup_resolver(self) -> AbstractResolver:
7171
if self.config.resolver_type == ResolverType.GRPC:
72-
return GrpcResolver(self.config)
72+
return GrpcResolver(
73+
self.config,
74+
self.emit_provider_ready,
75+
self.emit_provider_error,
76+
self.emit_provider_configuration_changed,
77+
)
7378
elif self.config.resolver_type == ResolverType.IN_PROCESS:
7479
return InProcessResolver(self.config, self)
7580
else:
7681
raise ValueError(
7782
f"`resolver_type` parameter invalid: {self.config.resolver_type}"
7883
)
7984

85+
def initialize(self, evaluation_context: EvaluationContext) -> None:
86+
self.resolver.initialize(evaluation_context)
87+
8088
def shutdown(self) -> None:
8189
if self.resolver:
8290
self.resolver.shutdown()

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99

1010
class AbstractResolver(typing.Protocol):
11+
def initialize(self, evaluation_context: EvaluationContext) -> None: ...
12+
1113
def shutdown(self) -> None: ...
1214

1315
def resolve_boolean_details(

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py

Lines changed: 89 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
1+
import logging
2+
import threading
3+
import time
14
import typing
25

36
import grpc
47
from google.protobuf.json_format import MessageToDict
58
from google.protobuf.struct_pb2 import Struct
69

710
from openfeature.evaluation_context import EvaluationContext
11+
from openfeature.event import ProviderEventDetails
812
from openfeature.exception import (
13+
ErrorCode,
914
FlagNotFoundError,
1015
GeneralError,
1116
InvalidContextError,
@@ -16,23 +21,99 @@
1621

1722
from ..config import Config
1823
from ..flag_type import FlagType
19-
from ..proto.schema.v1 import schema_pb2, schema_pb2_grpc
24+
from ..proto.flagd.evaluation.v1 import evaluation_pb2, evaluation_pb2_grpc
2025

2126
T = typing.TypeVar("T")
2227

28+
logger = logging.getLogger("openfeature.contrib")
29+
2330

2431
class GrpcResolver:
25-
def __init__(self, config: Config):
32+
MAX_BACK_OFF = 120
33+
34+
def __init__(
35+
self,
36+
config: Config,
37+
emit_provider_ready: typing.Callable[[ProviderEventDetails], None],
38+
emit_provider_error: typing.Callable[[ProviderEventDetails], None],
39+
emit_provider_configuration_changed: typing.Callable[
40+
[ProviderEventDetails], None
41+
],
42+
):
2643
self.config = config
44+
self.emit_provider_ready = emit_provider_ready
45+
self.emit_provider_error = emit_provider_error
46+
self.emit_provider_configuration_changed = emit_provider_configuration_changed
2747
channel_factory = (
2848
grpc.secure_channel if self.config.tls else grpc.insecure_channel
2949
)
3050
self.channel = channel_factory(f"{self.config.host}:{self.config.port}")
31-
self.stub = schema_pb2_grpc.ServiceStub(self.channel)
51+
self.stub = evaluation_pb2_grpc.ServiceStub(self.channel)
52+
self.retry_backoff_seconds = 0.1
53+
self.connected = False
54+
55+
def initialize(self, evaluation_context: EvaluationContext) -> None:
56+
self.connect()
3257

3358
def shutdown(self) -> None:
59+
self.active = False
3460
self.channel.close()
3561

62+
def connect(self) -> None:
63+
self.active = True
64+
self.thread = threading.Thread(
65+
target=self.listen, daemon=True, name="FlagdGrpcServiceWorkerThread"
66+
)
67+
self.thread.start()
68+
69+
def listen(self) -> None:
70+
retry_delay = self.retry_backoff_seconds
71+
while self.active:
72+
request = evaluation_pb2.EventStreamRequest() # type:ignore[attr-defined]
73+
try:
74+
logger.debug("Setting up gRPC sync flags connection")
75+
for message in self.stub.EventStream(request):
76+
if message.type == "provider_ready":
77+
if not self.connected:
78+
self.emit_provider_ready(
79+
ProviderEventDetails(
80+
message="gRPC sync connection established"
81+
)
82+
)
83+
self.connected = True
84+
# reset retry delay after successsful read
85+
retry_delay = self.retry_backoff_seconds
86+
87+
elif message.type == "configuration_change":
88+
data = MessageToDict(message)["data"]
89+
self.handle_changed_flags(data)
90+
91+
if not self.active:
92+
logger.info("Terminating gRPC sync thread")
93+
return
94+
except grpc.RpcError as e:
95+
logger.error(f"SyncFlags stream error, {e.code()=} {e.details()=}")
96+
except ParseError:
97+
logger.exception(
98+
f"Could not parse flag data using flagd syntax: {message=}"
99+
)
100+
101+
self.connected = False
102+
self.emit_provider_error(
103+
ProviderEventDetails(
104+
message=f"gRPC sync disconnected, reconnecting in {retry_delay}s",
105+
error_code=ErrorCode.GENERAL,
106+
)
107+
)
108+
logger.info(f"gRPC sync disconnected, reconnecting in {retry_delay}s")
109+
time.sleep(retry_delay)
110+
retry_delay = min(2 * retry_delay, self.MAX_BACK_OFF)
111+
112+
def handle_changed_flags(self, data: typing.Any) -> None:
113+
changed_flags = list(data["flags"].keys())
114+
115+
self.emit_provider_configuration_changed(ProviderEventDetails(changed_flags))
116+
36117
def resolve_boolean_details(
37118
self,
38119
key: str,
@@ -84,33 +165,33 @@ def _resolve( # noqa: PLR0915
84165
call_args = {"timeout": self.config.timeout}
85166
try:
86167
if flag_type == FlagType.BOOLEAN:
87-
request = schema_pb2.ResolveBooleanRequest( # type:ignore[attr-defined]
168+
request = evaluation_pb2.ResolveBooleanRequest( # type:ignore[attr-defined]
88169
flag_key=flag_key, context=context
89170
)
90171
response = self.stub.ResolveBoolean(request, **call_args)
91172
value = response.value
92173
elif flag_type == FlagType.STRING:
93-
request = schema_pb2.ResolveStringRequest( # type:ignore[attr-defined]
174+
request = evaluation_pb2.ResolveStringRequest( # type:ignore[attr-defined]
94175
flag_key=flag_key, context=context
95176
)
96177
response = self.stub.ResolveString(request, **call_args)
97178
value = response.value
98179
elif flag_type == FlagType.OBJECT:
99-
request = schema_pb2.ResolveObjectRequest( # type:ignore[attr-defined]
180+
request = evaluation_pb2.ResolveObjectRequest( # type:ignore[attr-defined]
100181
flag_key=flag_key, context=context
101182
)
102183
response = self.stub.ResolveObject(request, **call_args)
103184
value = MessageToDict(response, preserving_proto_field_name=True)[
104185
"value"
105186
]
106187
elif flag_type == FlagType.FLOAT:
107-
request = schema_pb2.ResolveFloatRequest( # type:ignore[attr-defined]
188+
request = evaluation_pb2.ResolveFloatRequest( # type:ignore[attr-defined]
108189
flag_key=flag_key, context=context
109190
)
110191
response = self.stub.ResolveFloat(request, **call_args)
111192
value = response.value
112193
elif flag_type == FlagType.INTEGER:
113-
request = schema_pb2.ResolveIntRequest( # type:ignore[attr-defined]
194+
request = evaluation_pb2.ResolveIntRequest( # type:ignore[attr-defined]
114195
flag_key=flag_key, context=context
115196
)
116197
response = self.stub.ResolveInt(request, **call_args)

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/in_process.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from openfeature.evaluation_context import EvaluationContext
44
from openfeature.exception import FlagNotFoundError, ParseError
55
from openfeature.flag_evaluation import FlagResolutionDetails, Reason
6-
from openfeature.provider.provider import AbstractProvider
6+
from openfeature.provider import AbstractProvider
77

88
from ..config import Config
99
from .process.file_watcher import FileWatcherFlagStore
@@ -26,6 +26,9 @@ def __init__(self, config: Config, provider: AbstractProvider):
2626
self.config.offline_poll_interval_seconds,
2727
)
2828

29+
def initialize(self, evaluation_context: EvaluationContext) -> None:
30+
pass
31+
2932
def shutdown(self) -> None:
3033
self.flag_store.shutdown()
3134

providers/openfeature-provider-flagd/tests/e2e/conftest.py

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,29 +5,22 @@
55
from tests.e2e.flagd_container import FlagdContainer
66
from tests.e2e.steps import * # noqa: F403
77

8-
from openfeature import api
9-
from openfeature.contrib.provider.flagd import FlagdProvider
10-
118
JsonPrimitive = typing.Union[str, bool, float, int]
129

1310

14-
@pytest.fixture(autouse=True, scope="package")
15-
def setup(request, port, image, resolver_type):
11+
@pytest.fixture(autouse=True, scope="module")
12+
def setup(request, port, image):
1613
container: DockerContainer = FlagdContainer(
1714
image=image,
1815
port=port,
1916
)
2017
# Setup code
2118
c = container.start()
22-
api.set_provider(
23-
FlagdProvider(
24-
resolver_type=resolver_type,
25-
port=int(container.get_exposed_port(port)),
26-
)
27-
)
2819

2920
def fin():
3021
c.stop()
3122

3223
# Teardown code
3324
request.addfinalizer(fin)
25+
26+
return c.get_exposed_port(port)

0 commit comments

Comments
 (0)