Skip to content

Commit 04a0ff8

Browse files
committed
fix(flagd): adding events for rpc mode
Signed-off-by: Simon Schrottner <[email protected]>
1 parent 038a343 commit 04a0ff8

File tree

7 files changed

+107
-18
lines changed

7 files changed

+107
-18
lines changed

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/proto/flagd/evaluation/v1/evaluation_pb2_grpc.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"""Client and server classes corresponding to protobuf-defined services."""
33
import grpc
44

5-
from flagd.evaluation.v1 import evaluation_pb2 as flagd_dot_evaluation_dot_v1_dot_evaluation__pb2
5+
from . import evaluation_pb2 as flagd_dot_evaluation_dot_v1_dot_evaluation__pb2
66

77

88
class ServiceStub(object):

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,14 +69,22 @@ def __init__( # noqa: PLR0913
6969

7070
def setup_resolver(self) -> AbstractResolver:
7171
if self.config.resolver_type == ResolverType.GRPC:
72-
return GrpcResolver(self.config)
72+
return GrpcResolver(
73+
self.config,
74+
self.emit_provider_ready,
75+
self.emit_provider_error,
76+
self.emit_provider_configuration_changed,
77+
)
7378
elif self.config.resolver_type == ResolverType.IN_PROCESS:
7479
return InProcessResolver(self.config, self)
7580
else:
7681
raise ValueError(
7782
f"`resolver_type` parameter invalid: {self.config.resolver_type}"
7883
)
7984

85+
def initialize(self, evaluation_context: EvaluationContext) -> None:
86+
self.resolver.initialize(evaluation_context)
87+
8088
def shutdown(self) -> None:
8189
if self.resolver:
8290
self.resolver.shutdown()

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88

99

1010
class AbstractResolver(typing.Protocol):
11+
def initialize(self, evaluation_context: EvaluationContext) -> None:
12+
return
13+
1114
def shutdown(self) -> None: ...
1215

1316
def resolve_boolean_details(

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py

Lines changed: 88 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
1+
import logging
2+
import threading
3+
import time
14
import typing
25

36
import grpc
47
from google.protobuf.json_format import MessageToDict
58
from google.protobuf.struct_pb2 import Struct
69

710
from openfeature.evaluation_context import EvaluationContext
11+
from openfeature.event import ProviderEventDetails
812
from openfeature.exception import (
13+
ErrorCode,
914
FlagNotFoundError,
1015
GeneralError,
1116
InvalidContextError,
@@ -16,23 +21,98 @@
1621

1722
from ..config import Config
1823
from ..flag_type import FlagType
19-
from ..proto.schema.v1 import schema_pb2, schema_pb2_grpc
24+
from ..proto.flagd.evaluation.v1 import evaluation_pb2, evaluation_pb2_grpc
2025

2126
T = typing.TypeVar("T")
2227

28+
logger = logging.getLogger("openfeature.contrib")
29+
2330

2431
class GrpcResolver:
25-
def __init__(self, config: Config):
32+
MAX_BACK_OFF = 120
33+
34+
def __init__(
35+
self,
36+
config: Config,
37+
emit_provider_ready: typing.Callable[[ProviderEventDetails], None],
38+
emit_provider_error: typing.Callable[[ProviderEventDetails], None],
39+
emit_provider_configuration_changed: typing.Callable[
40+
[ProviderEventDetails], None
41+
],
42+
):
2643
self.config = config
44+
self.emit_provider_ready = emit_provider_ready
45+
self.emit_provider_error = emit_provider_error
46+
self.emit_provider_configuration_changed = emit_provider_configuration_changed
2747
channel_factory = (
2848
grpc.secure_channel if self.config.tls else grpc.insecure_channel
2949
)
3050
self.channel = channel_factory(f"{self.config.host}:{self.config.port}")
31-
self.stub = schema_pb2_grpc.ServiceStub(self.channel)
51+
self.stub = evaluation_pb2_grpc.ServiceStub(self.channel)
52+
self.retry_backoff_seconds = 0.1
53+
self.connected = False
54+
55+
def initialize(self, evaluation_context: EvaluationContext) -> None:
56+
self.connect()
3257

3358
def shutdown(self) -> None:
3459
self.channel.close()
3560

61+
def connect(self) -> None:
62+
self.active = True
63+
self.thread = threading.Thread(
64+
target=self.listen, daemon=True, name="FlagdGrpcServiceWorkerThread"
65+
)
66+
self.thread.start()
67+
68+
def listen(self) -> None:
69+
retry_delay = self.retry_backoff_seconds
70+
while self.active:
71+
request = evaluation_pb2.EventStreamRequest() # type:ignore[attr-defined]
72+
try:
73+
logger.debug("Setting up gRPC sync flags connection")
74+
for message in self.stub.EventStream(request):
75+
if message.type == "provider_ready":
76+
if not self.connected:
77+
self.emit_provider_ready(
78+
ProviderEventDetails(
79+
message="gRPC sync connection established"
80+
)
81+
)
82+
self.connected = True
83+
# reset retry delay after successsful read
84+
retry_delay = self.retry_backoff_seconds
85+
86+
elif message.type == "configuration_change":
87+
data = MessageToDict(message)["data"]
88+
self.handle_changed_flags(data)
89+
90+
if not self.active:
91+
logger.info("Terminating gRPC sync thread")
92+
return
93+
except grpc.RpcError as e:
94+
logger.error(f"SyncFlags stream error, {e.code()=} {e.details()=}")
95+
except ParseError:
96+
logger.exception(
97+
f"Could not parse flag data using flagd syntax: {message=}"
98+
)
99+
100+
self.connected = False
101+
self.emit_provider_error(
102+
ProviderEventDetails(
103+
message=f"gRPC sync disconnected, reconnecting in {retry_delay}s",
104+
error_code=ErrorCode.GENERAL,
105+
)
106+
)
107+
logger.info(f"gRPC sync disconnected, reconnecting in {retry_delay}s")
108+
time.sleep(retry_delay)
109+
retry_delay = min(2, self.MAX_BACK_OFF)
110+
111+
def handle_changed_flags(self, data: typing.Any) -> None:
112+
changed_flags = list(data["flags"].keys())
113+
114+
self.emit_provider_configuration_changed(ProviderEventDetails(changed_flags))
115+
36116
def resolve_boolean_details(
37117
self,
38118
key: str,
@@ -84,33 +164,33 @@ def _resolve( # noqa: PLR0915
84164
call_args = {"timeout": self.config.timeout}
85165
try:
86166
if flag_type == FlagType.BOOLEAN:
87-
request = schema_pb2.ResolveBooleanRequest( # type:ignore[attr-defined]
167+
request = evaluation_pb2.ResolveBooleanRequest( # type:ignore[attr-defined]
88168
flag_key=flag_key, context=context
89169
)
90170
response = self.stub.ResolveBoolean(request, **call_args)
91171
value = response.value
92172
elif flag_type == FlagType.STRING:
93-
request = schema_pb2.ResolveStringRequest( # type:ignore[attr-defined]
173+
request = evaluation_pb2.ResolveStringRequest( # type:ignore[attr-defined]
94174
flag_key=flag_key, context=context
95175
)
96176
response = self.stub.ResolveString(request, **call_args)
97177
value = response.value
98178
elif flag_type == FlagType.OBJECT:
99-
request = schema_pb2.ResolveObjectRequest( # type:ignore[attr-defined]
179+
request = evaluation_pb2.ResolveObjectRequest( # type:ignore[attr-defined]
100180
flag_key=flag_key, context=context
101181
)
102182
response = self.stub.ResolveObject(request, **call_args)
103183
value = MessageToDict(response, preserving_proto_field_name=True)[
104184
"value"
105185
]
106186
elif flag_type == FlagType.FLOAT:
107-
request = schema_pb2.ResolveFloatRequest( # type:ignore[attr-defined]
187+
request = evaluation_pb2.ResolveFloatRequest( # type:ignore[attr-defined]
108188
flag_key=flag_key, context=context
109189
)
110190
response = self.stub.ResolveFloat(request, **call_args)
111191
value = response.value
112192
elif flag_type == FlagType.INTEGER:
113-
request = schema_pb2.ResolveIntRequest( # type:ignore[attr-defined]
193+
request = evaluation_pb2.ResolveIntRequest( # type:ignore[attr-defined]
114194
flag_key=flag_key, context=context
115195
)
116196
response = self.stub.ResolveInt(request, **call_args)

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/in_process.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from openfeature.evaluation_context import EvaluationContext
44
from openfeature.exception import FlagNotFoundError, ParseError
55
from openfeature.flag_evaluation import FlagResolutionDetails, Reason
6-
from openfeature.provider.provider import AbstractProvider
6+
from openfeature.provider import AbstractProvider
77

88
from ..config import Config
99
from .process.file_watcher import FileWatcherFlagStore
@@ -26,6 +26,9 @@ def __init__(self, config: Config, provider: AbstractProvider):
2626
self.config.offline_poll_interval_seconds,
2727
)
2828

29+
def initialize(self, evaluation_context: EvaluationContext) -> None:
30+
pass
31+
2932
def shutdown(self) -> None:
3033
self.flag_store.shutdown()
3134

providers/openfeature-provider-flagd/tests/e2e/steps.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,7 @@ def provider_ready_was_executed(client: OpenFeatureClient, context):
508508
def provider_changed_add(client: OpenFeatureClient, context):
509509
def provider_changed_handler(event_details: EventDetails):
510510
context["provider_changed_ran"] = True
511+
context["changed_flags"] = event_details.flags_changed
511512

512513
client.add_handler(
513514
ProviderEvent.PROVIDER_CONFIGURATION_CHANGED, provider_changed_handler

providers/openfeature-provider-flagd/tests/e2e/test_rpc.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import pytest
2-
from pytest_bdd import scenario, scenarios
2+
from pytest_bdd import scenarios
33

44
from openfeature.contrib.provider.flagd.config import ResolverType
55

@@ -19,12 +19,6 @@ def image():
1919
return "ghcr.io/open-feature/flagd-testbed:v0.5.13"
2020

2121

22-
@pytest.mark.skip(reason="Eventing not implemented")
23-
@scenario("../../test-harness/gherkin/flagd.feature", "Flag change event")
24-
def test_flag_change_event():
25-
"""not implemented"""
26-
27-
2822
scenarios(
2923
"../../test-harness/gherkin/flagd.feature",
3024
"../../test-harness/gherkin/flagd-json-evaluator.feature",

0 commit comments

Comments
 (0)