|
| 1 | +import logging |
| 2 | +import threading |
| 3 | +import time |
1 | 4 | import typing |
2 | 5 |
|
3 | 6 | import grpc |
4 | 7 | from google.protobuf.json_format import MessageToDict |
5 | 8 | from google.protobuf.struct_pb2 import Struct |
6 | 9 |
|
7 | 10 | from openfeature.evaluation_context import EvaluationContext |
| 11 | +from openfeature.event import ProviderEventDetails |
8 | 12 | from openfeature.exception import ( |
| 13 | + ErrorCode, |
9 | 14 | FlagNotFoundError, |
10 | 15 | GeneralError, |
11 | 16 | InvalidContextError, |
|
16 | 21 |
|
17 | 22 | from ..config import Config |
18 | 23 | from ..flag_type import FlagType |
19 | | -from ..proto.schema.v1 import schema_pb2, schema_pb2_grpc |
| 24 | +from ..proto.flagd.evaluation.v1 import evaluation_pb2, evaluation_pb2_grpc |
20 | 25 |
|
21 | 26 | T = typing.TypeVar("T") |
22 | 27 |
|
| 28 | +logger = logging.getLogger("openfeature.contrib") |
| 29 | + |
23 | 30 |
|
24 | 31 | class GrpcResolver: |
25 | | - def __init__(self, config: Config): |
| 32 | + MAX_BACK_OFF = 120 |
| 33 | + |
| 34 | + def __init__( |
| 35 | + self, |
| 36 | + config: Config, |
| 37 | + emit_provider_ready: typing.Callable[[ProviderEventDetails], None], |
| 38 | + emit_provider_error: typing.Callable[[ProviderEventDetails], None], |
| 39 | + emit_provider_configuration_changed: typing.Callable[ |
| 40 | + [ProviderEventDetails], None |
| 41 | + ], |
| 42 | + ): |
26 | 43 | self.config = config |
| 44 | + self.emit_provider_ready = emit_provider_ready |
| 45 | + self.emit_provider_error = emit_provider_error |
| 46 | + self.emit_provider_configuration_changed = emit_provider_configuration_changed |
27 | 47 | channel_factory = ( |
28 | 48 | grpc.secure_channel if self.config.tls else grpc.insecure_channel |
29 | 49 | ) |
30 | 50 | self.channel = channel_factory(f"{self.config.host}:{self.config.port}") |
31 | | - self.stub = schema_pb2_grpc.ServiceStub(self.channel) |
| 51 | + self.stub = evaluation_pb2_grpc.ServiceStub(self.channel) |
| 52 | + self.retry_backoff_seconds = 0.1 |
| 53 | + self.connected = False |
| 54 | + |
| 55 | + def initialize(self, evaluation_context: EvaluationContext) -> None: |
| 56 | + self.connect() |
32 | 57 |
|
33 | 58 | def shutdown(self) -> None: |
34 | 59 | self.channel.close() |
35 | 60 |
|
| 61 | + def connect(self) -> None: |
| 62 | + self.active = True |
| 63 | + self.thread = threading.Thread( |
| 64 | + target=self.listen, daemon=True, name="FlagdGrpcServiceWorkerThread" |
| 65 | + ) |
| 66 | + self.thread.start() |
| 67 | + |
| 68 | + def listen(self) -> None: |
| 69 | + retry_delay = self.retry_backoff_seconds |
| 70 | + while self.active: |
| 71 | + request = evaluation_pb2.EventStreamRequest() # type:ignore[attr-defined] |
| 72 | + try: |
| 73 | + logger.debug("Setting up gRPC sync flags connection") |
| 74 | + for message in self.stub.EventStream(request): |
| 75 | + if message.type == "provider_ready": |
| 76 | + if not self.connected: |
| 77 | + self.emit_provider_ready( |
| 78 | + ProviderEventDetails( |
| 79 | + message="gRPC sync connection established" |
| 80 | + ) |
| 81 | + ) |
| 82 | + self.connected = True |
| 83 | + # reset retry delay after successsful read |
| 84 | + retry_delay = self.retry_backoff_seconds |
| 85 | + |
| 86 | + elif message.type == "configuration_change": |
| 87 | + data = MessageToDict(message)["data"] |
| 88 | + self.handle_changed_flags(data) |
| 89 | + |
| 90 | + if not self.active: |
| 91 | + logger.info("Terminating gRPC sync thread") |
| 92 | + return |
| 93 | + except grpc.RpcError as e: |
| 94 | + logger.error(f"SyncFlags stream error, {e.code()=} {e.details()=}") |
| 95 | + except ParseError: |
| 96 | + logger.exception( |
| 97 | + f"Could not parse flag data using flagd syntax: {message=}" |
| 98 | + ) |
| 99 | + |
| 100 | + self.connected = False |
| 101 | + self.emit_provider_error( |
| 102 | + ProviderEventDetails( |
| 103 | + message=f"gRPC sync disconnected, reconnecting in {retry_delay}s", |
| 104 | + error_code=ErrorCode.GENERAL, |
| 105 | + ) |
| 106 | + ) |
| 107 | + logger.info(f"gRPC sync disconnected, reconnecting in {retry_delay}s") |
| 108 | + time.sleep(retry_delay) |
| 109 | + retry_delay = min(2, self.MAX_BACK_OFF) |
| 110 | + |
| 111 | + def handle_changed_flags(self, data: typing.Any) -> None: |
| 112 | + changed_flags = list(data["flags"].keys()) |
| 113 | + |
| 114 | + self.emit_provider_configuration_changed(ProviderEventDetails(changed_flags)) |
| 115 | + |
36 | 116 | def resolve_boolean_details( |
37 | 117 | self, |
38 | 118 | key: str, |
@@ -84,33 +164,33 @@ def _resolve( # noqa: PLR0915 |
84 | 164 | call_args = {"timeout": self.config.timeout} |
85 | 165 | try: |
86 | 166 | if flag_type == FlagType.BOOLEAN: |
87 | | - request = schema_pb2.ResolveBooleanRequest( # type:ignore[attr-defined] |
| 167 | + request = evaluation_pb2.ResolveBooleanRequest( # type:ignore[attr-defined] |
88 | 168 | flag_key=flag_key, context=context |
89 | 169 | ) |
90 | 170 | response = self.stub.ResolveBoolean(request, **call_args) |
91 | 171 | value = response.value |
92 | 172 | elif flag_type == FlagType.STRING: |
93 | | - request = schema_pb2.ResolveStringRequest( # type:ignore[attr-defined] |
| 173 | + request = evaluation_pb2.ResolveStringRequest( # type:ignore[attr-defined] |
94 | 174 | flag_key=flag_key, context=context |
95 | 175 | ) |
96 | 176 | response = self.stub.ResolveString(request, **call_args) |
97 | 177 | value = response.value |
98 | 178 | elif flag_type == FlagType.OBJECT: |
99 | | - request = schema_pb2.ResolveObjectRequest( # type:ignore[attr-defined] |
| 179 | + request = evaluation_pb2.ResolveObjectRequest( # type:ignore[attr-defined] |
100 | 180 | flag_key=flag_key, context=context |
101 | 181 | ) |
102 | 182 | response = self.stub.ResolveObject(request, **call_args) |
103 | 183 | value = MessageToDict(response, preserving_proto_field_name=True)[ |
104 | 184 | "value" |
105 | 185 | ] |
106 | 186 | elif flag_type == FlagType.FLOAT: |
107 | | - request = schema_pb2.ResolveFloatRequest( # type:ignore[attr-defined] |
| 187 | + request = evaluation_pb2.ResolveFloatRequest( # type:ignore[attr-defined] |
108 | 188 | flag_key=flag_key, context=context |
109 | 189 | ) |
110 | 190 | response = self.stub.ResolveFloat(request, **call_args) |
111 | 191 | value = response.value |
112 | 192 | elif flag_type == FlagType.INTEGER: |
113 | | - request = schema_pb2.ResolveIntRequest( # type:ignore[attr-defined] |
| 193 | + request = evaluation_pb2.ResolveIntRequest( # type:ignore[attr-defined] |
114 | 194 | flag_key=flag_key, context=context |
115 | 195 | ) |
116 | 196 | response = self.stub.ResolveInt(request, **call_args) |
|
0 commit comments