Skip to content

Do Interceptors support intercepting server-streamed/bidirectional events? #55

@pirate

Description

@pirate

Could we do something like:

from connectrpc.interceptor import Interceptor

class LoggingInterceptor(Interceptor):
    async def intercept(self, method, request, context, next_handler):
        response = await next_handler(request, context)
        if response.stream:
            async for event in response.stream:
                if event.event_type == 'LogEvent':
                    logging.info(event.message)
                    # (server sends LogEvents when client should print something to console)
        return response


client = HelloServiceClient(
    base_url="https://api.example.com",
    session=session,
    interceptors=[LoggingInterceptor()]
)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions