Skip to content

Auth Interceptor not working for sync client #34

@maaft

Description

@maaft

according to docs, i can setup an auth interceptor like this:

class AuthInterceptorSync:
    """Synchronous interceptor to add authentication headers to all requests"""

    def __init__(self, token: str):
        self.token = token


    def on_start(self, ctx: RequestContext) -> None:
        """Add authorization header when RPC starts"""
        ctx.request_headers()["authorization"] = self.token

    def on_end(self, token: None, ctx: RequestContext) -> None:
        """Called when RPC ends"""
        pass

But on_start is never called.

When i instead defined intercept_unary_sync, it worked.

class AuthInterceptorSync:
    """Synchronous interceptor to add authentication headers to all requests"""

    def __init__(self, token: str):
        self.token = token

    def intercept_unary_sync(
        self,
        call_next: Callable[[REQ, RequestContext], RES],
        request: REQ,
        ctx: RequestContext,
    ) -> RES:
        """Intercepts a unary RPC.

        Args:
            call_next: A callable to invoke to continue processing, either to another
                interceptor or the actual RPC. Generally will be called with the same
                request the interceptor received but the request can be replaced as
                needed. Can be skipped if returning a response from the interceptor
                directly.
            request: The request message.
            ctx: The request context.

        Returns:
            The response message.
        """
        ctx.request_headers()["authorization"] = self.token
        return call_next(request, ctx)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions