Skip to content

Commit 934e867

Browse files
committed
fix(flagd): adding events for rpc mode
Signed-off-by: Simon Schrottner <[email protected]>
1 parent 038a343 commit 934e867

File tree

7 files changed

+110
-18
lines changed

7 files changed

+110
-18
lines changed

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/proto/flagd/evaluation/v1/evaluation_pb2_grpc.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"""Client and server classes corresponding to protobuf-defined services."""
33
import grpc
44

5-
from flagd.evaluation.v1 import evaluation_pb2 as flagd_dot_evaluation_dot_v1_dot_evaluation__pb2
5+
from . import evaluation_pb2 as flagd_dot_evaluation_dot_v1_dot_evaluation__pb2
66

77

88
class ServiceStub(object):

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,14 +69,22 @@ def __init__( # noqa: PLR0913
6969

7070
def setup_resolver(self) -> AbstractResolver:
7171
if self.config.resolver_type == ResolverType.GRPC:
72-
return GrpcResolver(self.config)
72+
return GrpcResolver(
73+
self.config,
74+
self.emit_provider_ready,
75+
self.emit_provider_error,
76+
self.emit_provider_configuration_changed,
77+
)
7378
elif self.config.resolver_type == ResolverType.IN_PROCESS:
7479
return InProcessResolver(self.config, self)
7580
else:
7681
raise ValueError(
7782
f"`resolver_type` parameter invalid: {self.config.resolver_type}"
7883
)
7984

85+
def initialize(self, evaluation_context: EvaluationContext) -> None:
86+
self.resolver.initialize(evaluation_context)
87+
8088
def shutdown(self) -> None:
8189
if self.resolver:
8290
self.resolver.shutdown()

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88

99

1010
class AbstractResolver(typing.Protocol):
11+
def initialize(self, evaluation_context: EvaluationContext) -> None:
12+
return
13+
1114
def shutdown(self) -> None: ...
1215

1316
def resolve_boolean_details(

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py

Lines changed: 91 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
1+
import logging
2+
import threading
3+
import time
14
import typing
25

36
import grpc
47
from google.protobuf.json_format import MessageToDict
58
from google.protobuf.struct_pb2 import Struct
69

710
from openfeature.evaluation_context import EvaluationContext
11+
from openfeature.event import ProviderEventDetails
812
from openfeature.exception import (
13+
ErrorCode,
914
FlagNotFoundError,
1015
GeneralError,
1116
InvalidContextError,
@@ -16,23 +21,101 @@
1621

1722
from ..config import Config
1823
from ..flag_type import FlagType
19-
from ..proto.schema.v1 import schema_pb2, schema_pb2_grpc
24+
from ..proto.flagd.evaluation.v1 import evaluation_pb2, evaluation_pb2_grpc
2025

2126
T = typing.TypeVar("T")
2227

28+
logger = logging.getLogger("openfeature.contrib")
29+
2330

2431
class GrpcResolver:
25-
def __init__(self, config: Config):
32+
MAX_BACK_OFF = 120
33+
34+
def __init__(
35+
self,
36+
config: Config,
37+
emit_provider_ready: typing.Callable[[ProviderEventDetails], None],
38+
emit_provider_error: typing.Callable[[ProviderEventDetails], None],
39+
emit_provider_configuration_changed: typing.Callable[
40+
[ProviderEventDetails], None
41+
],
42+
):
2643
self.config = config
44+
self.emit_provider_ready = emit_provider_ready
45+
self.emit_provider_error = emit_provider_error
46+
self.emit_provider_configuration_changed = emit_provider_configuration_changed
2747
channel_factory = (
2848
grpc.secure_channel if self.config.tls else grpc.insecure_channel
2949
)
3050
self.channel = channel_factory(f"{self.config.host}:{self.config.port}")
31-
self.stub = schema_pb2_grpc.ServiceStub(self.channel)
51+
self.stub = evaluation_pb2_grpc.ServiceStub(self.channel)
52+
self.retry_backoff_seconds = 0.1
53+
self.connected = False
54+
55+
def initialize(self, evaluation_context: EvaluationContext) -> None:
56+
self.connect()
3257

3358
def shutdown(self) -> None:
3459
self.channel.close()
3560

61+
def connect(self) -> None:
62+
self.active = True
63+
self.thread = threading.Thread(
64+
target=self.listen, daemon=True, name="FlagdGrpcServiceWorkerThread"
65+
)
66+
self.thread.start()
67+
68+
def listen(self) -> None:
69+
retry_delay = self.retry_backoff_seconds
70+
while self.active:
71+
request = evaluation_pb2.EventStreamRequest() # type:ignore[attr-defined]
72+
try:
73+
logger.debug("Setting up gRPC sync flags connection")
74+
for message in self.stub.EventStream(request):
75+
if message.type == "provider_ready":
76+
if not self.connected:
77+
self.emit_provider_ready(
78+
ProviderEventDetails(
79+
message="gRPC sync connection established"
80+
)
81+
)
82+
self.connected = True
83+
# reset retry delay after successsful read
84+
retry_delay = self.retry_backoff_seconds
85+
else:
86+
self.emit_provider_error(ProviderEventDetails())
87+
self.connected = False
88+
89+
elif message.type == "configuration_change":
90+
data = MessageToDict(message)["data"]
91+
self.handle_changed_flags(data)
92+
93+
if not self.active:
94+
logger.info("Terminating gRPC sync thread")
95+
return
96+
except grpc.RpcError as e:
97+
logger.error(f"SyncFlags stream error, {e.code()=} {e.details()=}")
98+
except ParseError:
99+
logger.exception(
100+
f"Could not parse flag data using flagd syntax: {message=}"
101+
)
102+
103+
self.connected = False
104+
self.emit_provider_error(
105+
ProviderEventDetails(
106+
message=f"gRPC sync disconnected, reconnecting in {retry_delay}s",
107+
error_code=ErrorCode.GENERAL,
108+
)
109+
)
110+
logger.info(f"gRPC sync disconnected, reconnecting in {retry_delay}s")
111+
time.sleep(retry_delay)
112+
retry_delay = min(2, self.MAX_BACK_OFF)
113+
114+
def handle_changed_flags(self, data: typing.Any) -> None:
115+
changed_flags = list(data["flags"].keys())
116+
117+
self.emit_provider_configuration_changed(ProviderEventDetails(changed_flags))
118+
36119
def resolve_boolean_details(
37120
self,
38121
key: str,
@@ -84,33 +167,33 @@ def _resolve( # noqa: PLR0915
84167
call_args = {"timeout": self.config.timeout}
85168
try:
86169
if flag_type == FlagType.BOOLEAN:
87-
request = schema_pb2.ResolveBooleanRequest( # type:ignore[attr-defined]
170+
request = evaluation_pb2.ResolveBooleanRequest( # type:ignore[attr-defined]
88171
flag_key=flag_key, context=context
89172
)
90173
response = self.stub.ResolveBoolean(request, **call_args)
91174
value = response.value
92175
elif flag_type == FlagType.STRING:
93-
request = schema_pb2.ResolveStringRequest( # type:ignore[attr-defined]
176+
request = evaluation_pb2.ResolveStringRequest( # type:ignore[attr-defined]
94177
flag_key=flag_key, context=context
95178
)
96179
response = self.stub.ResolveString(request, **call_args)
97180
value = response.value
98181
elif flag_type == FlagType.OBJECT:
99-
request = schema_pb2.ResolveObjectRequest( # type:ignore[attr-defined]
182+
request = evaluation_pb2.ResolveObjectRequest( # type:ignore[attr-defined]
100183
flag_key=flag_key, context=context
101184
)
102185
response = self.stub.ResolveObject(request, **call_args)
103186
value = MessageToDict(response, preserving_proto_field_name=True)[
104187
"value"
105188
]
106189
elif flag_type == FlagType.FLOAT:
107-
request = schema_pb2.ResolveFloatRequest( # type:ignore[attr-defined]
190+
request = evaluation_pb2.ResolveFloatRequest( # type:ignore[attr-defined]
108191
flag_key=flag_key, context=context
109192
)
110193
response = self.stub.ResolveFloat(request, **call_args)
111194
value = response.value
112195
elif flag_type == FlagType.INTEGER:
113-
request = schema_pb2.ResolveIntRequest( # type:ignore[attr-defined]
196+
request = evaluation_pb2.ResolveIntRequest( # type:ignore[attr-defined]
114197
flag_key=flag_key, context=context
115198
)
116199
response = self.stub.ResolveInt(request, **call_args)

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/in_process.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from openfeature.evaluation_context import EvaluationContext
44
from openfeature.exception import FlagNotFoundError, ParseError
55
from openfeature.flag_evaluation import FlagResolutionDetails, Reason
6-
from openfeature.provider.provider import AbstractProvider
6+
from openfeature.provider import AbstractProvider
77

88
from ..config import Config
99
from .process.file_watcher import FileWatcherFlagStore
@@ -26,6 +26,9 @@ def __init__(self, config: Config, provider: AbstractProvider):
2626
self.config.offline_poll_interval_seconds,
2727
)
2828

29+
def initialize(self, evaluation_context: EvaluationContext) -> None:
30+
pass
31+
2932
def shutdown(self) -> None:
3033
self.flag_store.shutdown()
3134

providers/openfeature-provider-flagd/tests/e2e/steps.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,7 @@ def provider_ready_was_executed(client: OpenFeatureClient, context):
508508
def provider_changed_add(client: OpenFeatureClient, context):
509509
def provider_changed_handler(event_details: EventDetails):
510510
context["provider_changed_ran"] = True
511+
context["changed_flags"] = event_details.flags_changed
511512

512513
client.add_handler(
513514
ProviderEvent.PROVIDER_CONFIGURATION_CHANGED, provider_changed_handler

providers/openfeature-provider-flagd/tests/e2e/test_rpc.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import pytest
2-
from pytest_bdd import scenario, scenarios
2+
from pytest_bdd import scenarios
33

44
from openfeature.contrib.provider.flagd.config import ResolverType
55

@@ -19,12 +19,6 @@ def image():
1919
return "ghcr.io/open-feature/flagd-testbed:v0.5.13"
2020

2121

22-
@pytest.mark.skip(reason="Eventing not implemented")
23-
@scenario("../../test-harness/gherkin/flagd.feature", "Flag change event")
24-
def test_flag_change_event():
25-
"""not implemented"""
26-
27-
2822
scenarios(
2923
"../../test-harness/gherkin/flagd.feature",
3024
"../../test-harness/gherkin/flagd-json-evaluator.feature",

0 commit comments

Comments
 (0)