-
Notifications
You must be signed in to change notification settings - Fork 33
Open
Labels
bugSomething isn't workingSomething isn't working
Description
Questions
When I using GrpcServiceBridgeImpl, some of the behavior of vertx-grpc is not as expected when the network is abnormal.
Expect Behavior
vertx-grpc/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServiceBridgeImpl.java
Lines 148 to 157 in fca746c
| void init(ServerCall.Listener<Req> listener) { | |
| this.listener = listener; | |
| req.errorHandler(error -> { | |
| if (error == GrpcError.CANCELLED && !closed) { | |
| listener.onCancel(); | |
| } | |
| }); | |
| readAdapter.init(req, new BridgeMessageDecoder<>(methodDef.getMethodDescriptor().getRequestMarshaller(), decompressor)); | |
| writeAdapter.init(req.response(), new BridgeMessageEncoder<>(methodDef.getMethodDescriptor().getResponseMarshaller(), compressor)); | |
| } |
if the network between client and server was interrupted, listener.onCancel() should be invoked.
Actual Behavior
nothing happened.
Version
4.5.7
The errorHandler is an method from GrpcReadStream, it was invoked only when the httpStream has a StreamResetException, but when client was killed, an IOException occurred.
vertx-grpc/vertx-grpc-common/src/main/java/io/vertx/grpc/common/impl/GrpcReadStreamBase.java
Lines 71 to 81 in fca746c
| public void init() { | |
| stream.handler(this); | |
| stream.endHandler(v -> queue.write(END_SENTINEL)); | |
| stream.exceptionHandler(err -> { | |
| if (err instanceof StreamResetException) { | |
| handleReset(((StreamResetException)err).getCode()); | |
| } else { | |
| handleException(err); | |
| } | |
| }); | |
| queue.drainHandler(v -> stream.resume()); |
java.io.IOException: Broken pipe
at java.base/sun.nio.ch.SocketDispatcher.writev0(Native Method)
at java.base/sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:66)
at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:227)
at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:158)
at java.base/sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:574)
at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:430)
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:931)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:359)
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:895)
at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:935)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:921)
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:907)
at io.netty.handler.codec.http2.Http2ConnectionHandler.flush(Http2ConnectionHandler.java:197)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:941)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:921)
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:907)
at io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:967)
at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:254)
at io.vertx.core.http.impl.VertxHttp2ConnectionHandler.checkFlush(VertxHttp2ConnectionHandler.java:247)
at io.vertx.core.http.impl.VertxHttp2ConnectionHandler.writeData(VertxHttp2ConnectionHandler.java:242)
at io.vertx.core.http.impl.VertxHttp2Stream.doWriteData(VertxHttp2Stream.java:244)
at io.vertx.core.http.impl.VertxHttp2Stream.writeData(VertxHttp2Stream.java:216)
at io.vertx.core.http.impl.Http2ServerResponse.write(Http2ServerResponse.java:469)
at io.vertx.core.http.impl.Http2ServerResponse.write(Http2ServerResponse.java:347)
at io.vertx.core.http.impl.Http2ServerResponse.write(Http2ServerResponse.java:48)
at io.vertx.grpc.server.impl.GrpcServerResponseImpl.writeMessage(GrpcServerResponseImpl.java:242)
at io.vertx.grpc.server.impl.GrpcServerResponseImpl.writeMessage(GrpcServerResponseImpl.java:112)
at io.vertx.grpc.server.impl.GrpcServerResponseImpl.write(GrpcServerResponseImpl.java:102)
at io.vertx.grpc.server.impl.GrpcServerResponseImpl.write(GrpcServerResponseImpl.java:248)
at io.vertx.core.streams.impl.PipeImpl.lambda$to$1(PipeImpl.java:81)
at io.vertx.grpc.server.impl.GrpcServerRequestImpl.lambda$handler$0(GrpcServerRequestImpl.java:79)
at io.vertx.grpc.common.impl.GrpcReadStreamBase.handleMessage(GrpcReadStreamBase.java:207)
at io.vertx.grpc.common.impl.GrpcReadStreamBase.lambda$init$3(GrpcReadStreamBase.java:89)
at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:255)
at io.vertx.core.streams.impl.InboundBuffer.write(InboundBuffer.java:134)
at io.vertx.grpc.common.impl.GrpcReadStreamBase.handle(GrpcReadStreamBase.java:125)
at io.vertx.grpc.common.impl.GrpcReadStreamBase.handle(GrpcReadStreamBase.java:39)
at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:279)
at io.vertx.core.http.impl.HttpEventHandler.handleChunk(HttpEventHandler.java:51)
at io.vertx.core.http.impl.Http2ServerRequest.handleData(Http2ServerRequest.java:148)
at io.vertx.core.http.impl.Http2ServerStream.handleData(Http2ServerStream.java:206)
at io.vertx.core.http.impl.VertxHttp2Stream.lambda$new$1(VertxHttp2Stream.java:75)
at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:255)
at io.vertx.core.streams.impl.InboundBuffer.drain(InboundBuffer.java:242)
at io.vertx.core.streams.impl.InboundBuffer.lambda$fetch$0(InboundBuffer.java:295)
at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:279)
at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:261)
at io.vertx.core.impl.ContextInternal.lambda$runOnContext$0(ContextInternal.java:59)
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:566)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:1583)
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't working