Skip to content

Commit 9749b6e

Browse files
committed
Harvest written byte count in WebSocket transport
1 parent a1abbad commit 9749b6e

File tree

2 files changed

+6
-3
lines changed

2 files changed

+6
-3
lines changed

src/main/qpid/org/apache/qpid/protonj2/client/transport/netty4/TcpTransport.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public class TcpTransport implements Transport {
7272
protected final SslOptions sslOptions;
7373
protected final Bootstrap bootstrap;
7474
private final IntConsumer readBytesConsumer;
75-
private final IntConsumer writtenBytesConsumer;
75+
protected final IntConsumer writtenBytesConsumer;
7676

7777
protected Channel channel;
7878
protected volatile IOException failureCause;

src/main/qpid/org/apache/qpid/protonj2/client/transport/netty4/WebSocketTransport.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.nio.charset.StandardCharsets;
2323
import java.util.concurrent.TimeUnit;
2424

25+
import io.netty.buffer.ByteBuf;
2526
import org.apache.qpid.protonj2.buffer.ProtonBuffer;
2627
import org.apache.qpid.protonj2.client.SslOptions;
2728
import org.apache.qpid.protonj2.client.TransportOptions;
@@ -87,10 +88,12 @@ public WebSocketTransport write(ProtonBuffer output, Runnable onComplete) throws
8788

8889
LOG.trace("Attempted write of: {} bytes", length);
8990

91+
ByteBuf nettyBuf = toOutputBuffer(output);
92+
this.writtenBytesConsumer.accept(nettyBuf.writableBytes());
9093
if (onComplete == null) {
91-
channel.write(new BinaryWebSocketFrame(toOutputBuffer(output)), channel.voidPromise());
94+
channel.write(new BinaryWebSocketFrame(nettyBuf), channel.voidPromise());
9295
} else {
93-
channel.write(new BinaryWebSocketFrame(toOutputBuffer(output)), channel.newPromise().addListener(new GenericFutureListener<Future<? super Void>>() {
96+
channel.write(new BinaryWebSocketFrame(nettyBuf), channel.newPromise().addListener(new GenericFutureListener<Future<? super Void>>() {
9497

9598
@Override
9699
public void operationComplete(Future<? super Void> future) throws Exception {

0 commit comments

Comments
 (0)