|
25 | 25 |
|
26 | 26 | class GrpcWatcher(FlagStateConnector): |
27 | 27 | def __init__( |
28 | | - self, |
29 | | - config: Config, |
30 | | - flag_store: FlagStore, |
31 | | - emit_provider_ready: typing.Callable[[ProviderEventDetails, dict], None], |
32 | | - emit_provider_error: typing.Callable[[ProviderEventDetails], None], |
33 | | - emit_provider_stale: typing.Callable[[ProviderEventDetails], None], |
| 28 | + self, |
| 29 | + config: Config, |
| 30 | + flag_store: FlagStore, |
| 31 | + emit_provider_ready: typing.Callable[[ProviderEventDetails, dict], None], |
| 32 | + emit_provider_error: typing.Callable[[ProviderEventDetails], None], |
| 33 | + emit_provider_stale: typing.Callable[[ProviderEventDetails], None], |
34 | 34 | ): |
35 | 35 | self.flag_store = flag_store |
36 | 36 | self.config = config |
@@ -62,6 +62,43 @@ def _generate_channel(self, config: Config) -> grpc.Channel: |
62 | 62 | ("grpc.initial_reconnect_backoff_ms", config.retry_backoff_ms), |
63 | 63 | ("grpc.max_reconnect_backoff_ms", config.retry_backoff_max_ms), |
64 | 64 | ("grpc.min_reconnect_backoff_ms", config.stream_deadline_ms), |
| 65 | + ("grpc.server_config", { |
| 66 | + "methodConfig": [ |
| 67 | + { |
| 68 | + "name": [ |
| 69 | + { |
| 70 | + "service": "flagd.sync.v1.FlagSyncService" |
| 71 | + }, |
| 72 | + { |
| 73 | + "service": "flagd.evaluation.v1.Service" |
| 74 | + } |
| 75 | + ], |
| 76 | + "retryPolicy": { |
| 77 | + "maxAttempts": 3.0, |
| 78 | + "initialBackoff": "1s", |
| 79 | + "maxBackoff": "5s", |
| 80 | + "backoffMultiplier": 2.0, |
| 81 | + "retryableStatusCodes": [ |
| 82 | + "CANCELLED", |
| 83 | + "UNKNOWN", |
| 84 | + "INVALID_ARGUMENT", |
| 85 | + "NOT_FOUND", |
| 86 | + "ALREADY_EXISTS", |
| 87 | + "PERMISSION_DENIED", |
| 88 | + "RESOURCE_EXHAUSTED", |
| 89 | + "FAILED_PRECONDITION", |
| 90 | + "ABORTED", |
| 91 | + "OUT_OF_RANGE", |
| 92 | + "UNIMPLEMENTED", |
| 93 | + "INTERNAL", |
| 94 | + "UNAVAILABLE", |
| 95 | + "DATA_LOSS", |
| 96 | + "UNAUTHENTICATED" |
| 97 | + ] |
| 98 | + } |
| 99 | + } |
| 100 | + ] |
| 101 | + }) |
65 | 102 | ] |
66 | 103 | if config.default_authority is not None: |
67 | 104 | options.append(("grpc.default_authority", config.default_authority)) |
@@ -120,8 +157,8 @@ def monitor(self) -> None: |
120 | 157 | def _state_change_callback(self, new_state: grpc.ChannelConnectivity) -> None: |
121 | 158 | logger.debug(f"gRPC state change: {new_state}") |
122 | 159 | if ( |
123 | | - new_state == grpc.ChannelConnectivity.READY |
124 | | - or new_state == grpc.ChannelConnectivity.IDLE |
| 160 | + new_state == grpc.ChannelConnectivity.READY |
| 161 | + or new_state == grpc.ChannelConnectivity.IDLE |
125 | 162 | ): |
126 | 163 | if not self.thread or not self.thread.is_alive(): |
127 | 164 | self.thread = threading.Thread( |
@@ -199,11 +236,11 @@ def listen(self) -> None: |
199 | 236 |
|
200 | 237 | logger.debug("Setting up gRPC sync flags connection") |
201 | 238 | for flag_rsp in self.stub.SyncFlags( |
202 | | - request, wait_for_ready=True, **call_args |
| 239 | + request, wait_for_ready=True, **call_args |
203 | 240 | ): |
204 | 241 | flag_str = flag_rsp.flag_configuration |
205 | 242 | logger.debug( |
206 | | - f"Received flag configuration - {abs(hash(flag_str)) % (10**8)}" |
| 243 | + f"Received flag configuration - {abs(hash(flag_str)) % (10 ** 8)}" |
207 | 244 | ) |
208 | 245 | self.flag_store.update(json.loads(flag_str)) |
209 | 246 |
|
|
0 commit comments