Skip to content

Commit cf165f7

Browse files
wild-endeavorjeevb
andauthored
add accept grpc (#1841)
* add accept grpc Signed-off-by: Yee Hing Tong <[email protected]> Signed-off-by: Jeev B <[email protected]> * unpin setup.py grpc Signed-off-by: Yee Hing Tong <[email protected]> Signed-off-by: Jeev B <[email protected]> * Revert "add accept grpc" This reverts commit 2294592. Signed-off-by: Jeev B <[email protected]> * default headers interceptor Signed-off-by: Jeev B <[email protected]> * setup.py Signed-off-by: Jeev B <[email protected]> * fixes Signed-off-by: Jeev B <[email protected]> * fmt Signed-off-by: Jeev B <[email protected]> * move prometheus-client import Signed-off-by: Jeev B <[email protected]> --------- Signed-off-by: Yee Hing Tong <[email protected]> Signed-off-by: Jeev B <[email protected]> Co-authored-by: Jeev B <[email protected]>
1 parent a1e110e commit cf165f7

File tree

5 files changed

+58
-13
lines changed

5 files changed

+58
-13
lines changed

flytekit/clients/auth_helper.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
PKCEAuthenticator,
1717
)
1818
from flytekit.clients.grpc_utils.auth_interceptor import AuthUnaryInterceptor
19+
from flytekit.clients.grpc_utils.default_metadata_interceptor import DefaultMetadataInterceptor
1920
from flytekit.clients.grpc_utils.wrap_exception_interceptor import RetryExceptionWrapperInterceptor
2021
from flytekit.configuration import AuthType, PlatformConfig
2122

@@ -171,7 +172,7 @@ def get_channel(cfg: PlatformConfig, **kwargs) -> grpc.Channel:
171172
:return: grpc.Channel (secure / insecure)
172173
"""
173174
if cfg.insecure:
174-
return grpc.insecure_channel(cfg.endpoint, **kwargs)
175+
return grpc.intercept_channel(grpc.insecure_channel(cfg.endpoint, **kwargs), DefaultMetadataInterceptor())
175176

176177
credentials = None
177178
if "credentials" not in kwargs:
@@ -189,11 +190,14 @@ def get_channel(cfg: PlatformConfig, **kwargs) -> grpc.Channel:
189190
)
190191
else:
191192
credentials = kwargs["credentials"]
192-
return grpc.secure_channel(
193-
target=cfg.endpoint,
194-
credentials=credentials,
195-
options=kwargs.get("options", None),
196-
compression=kwargs.get("compression", None),
193+
return grpc.intercept_channel(
194+
grpc.secure_channel(
195+
target=cfg.endpoint,
196+
credentials=credentials,
197+
options=kwargs.get("options", None),
198+
compression=kwargs.get("compression", None),
199+
),
200+
DefaultMetadataInterceptor(),
197201
)
198202

199203

flytekit/clients/grpc_utils/auth_interceptor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ def _call_details_with_auth_metadata(self, client_call_details: grpc.ClientCallD
3232
"""
3333
Returns new ClientCallDetails with metadata added.
3434
"""
35-
metadata = None
35+
metadata = client_call_details.metadata
3636
auth_metadata = self._authenticator.fetch_grpc_call_auth_metadata()
3737
if auth_metadata:
3838
metadata = []
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import typing
2+
3+
import grpc
4+
5+
from flytekit.clients.grpc_utils.auth_interceptor import _ClientCallDetails
6+
7+
8+
class DefaultMetadataInterceptor(grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor):
9+
def _inject_default_metadata(self, call_details: grpc.ClientCallDetails):
10+
metadata = [("accept", "application/grpc")]
11+
if call_details.metadata:
12+
metadata.extend(list(call_details.metadata))
13+
new_details = _ClientCallDetails(
14+
call_details.method,
15+
call_details.timeout,
16+
metadata,
17+
call_details.credentials,
18+
)
19+
return new_details
20+
21+
def intercept_unary_unary(
22+
self,
23+
continuation: typing.Callable,
24+
client_call_details: grpc.ClientCallDetails,
25+
request: typing.Any,
26+
):
27+
"""
28+
Intercepts unary calls and inject default metadata
29+
"""
30+
updated_call_details = self._inject_default_metadata(client_call_details)
31+
return continuation(updated_call_details, request)
32+
33+
def intercept_unary_stream(
34+
self,
35+
continuation: typing.Callable,
36+
client_call_details: grpc.ClientCallDetails,
37+
request: typing.Any,
38+
):
39+
"""
40+
Handles a stream call and inject default metadata
41+
"""
42+
updated_call_details = self._inject_default_metadata(client_call_details)
43+
return continuation(updated_call_details, request)

flytekit/clis/sdk_in_container/serve.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@
44
from flyteidl.service.agent_pb2_grpc import add_AsyncAgentServiceServicer_to_server
55
from grpc import aio
66

7-
from flytekit.extend.backend.agent_service import AsyncAgentService
8-
97
_serve_help = """Start a grpc server for the agent service."""
108

119

@@ -47,6 +45,8 @@ async def _start_grpc_server(port: int, worker: int, timeout: int):
4745
try:
4846
from prometheus_client import start_http_server
4947

48+
from flytekit.extend.backend.agent_service import AsyncAgentService
49+
5050
start_http_server(9090)
5151
except ImportError as e:
5252
click.secho(f"Failed to start the prometheus server with error {e}", fg="red")

setup.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,8 @@
3838
"deprecated>=1.0,<2.0",
3939
"docker>=4.0.0,<7.0.0",
4040
"python-dateutil>=2.1",
41-
# Restrict grpcio and grpcio-status. Version 1.50.0 pulls in a version of protobuf that is not compatible
42-
# with the old protobuf library (as described in https://developers.google.com/protocol-buffers/docs/news/2022-05-06)
43-
"grpcio>=1.50.0,!=1.55.0,<1.53.1,<2.0",
44-
"grpcio-status>=1.50.0,!=1.55.0,<1.53.1,<2.0",
41+
"grpcio",
42+
"grpcio-status",
4543
"importlib-metadata",
4644
"fsspec>=2023.3.0",
4745
"adlfs",

0 commit comments

Comments
 (0)