33
44package com .azure .messaging .webpubsub .client .implementation .websocket ;
55
6+ import com .azure .core .util .BinaryData ;
67import com .azure .core .util .logging .ClientLogger ;
78import com .azure .messaging .webpubsub .client .implementation .MessageDecoder ;
9+ import com .azure .messaging .webpubsub .client .implementation .models .WebPubSubMessage ;
10+ import io .netty .buffer .ByteBufAllocator ;
11+ import io .netty .buffer .CompositeByteBuf ;
812import io .netty .channel .Channel ;
913import io .netty .channel .ChannelFuture ;
1014import io .netty .channel .ChannelHandlerContext ;
1115import io .netty .channel .ChannelPromise ;
1216import io .netty .channel .SimpleChannelInboundHandler ;
1317import io .netty .handler .codec .http .FullHttpResponse ;
1418import io .netty .handler .codec .http .websocketx .CloseWebSocketFrame ;
19+ import io .netty .handler .codec .http .websocketx .ContinuationWebSocketFrame ;
1520import io .netty .handler .codec .http .websocketx .PingWebSocketFrame ;
1621import io .netty .handler .codec .http .websocketx .PongWebSocketFrame ;
1722import io .netty .handler .codec .http .websocketx .TextWebSocketFrame ;
1823import io .netty .handler .codec .http .websocketx .WebSocketClientHandshaker ;
1924import io .netty .handler .codec .http .websocketx .WebSocketFrame ;
2025import io .netty .handler .codec .http .websocketx .WebSocketHandshakeException ;
21- import io .netty .util .CharsetUtil ;
2226
27+ import java .nio .ByteBuffer ;
28+ import java .util .Arrays ;
2329import java .util .concurrent .CompletableFuture ;
2430import java .util .concurrent .atomic .AtomicReference ;
2531import java .util .function .Consumer ;
2632
2733final class WebSocketClientHandler extends SimpleChannelInboundHandler <Object > {
2834
2935 private final WebSocketClientHandshaker handshaker ;
30- private ChannelPromise handshakeFuture ;
31-
3236 private final AtomicReference <ClientLogger > loggerReference ;
3337 private final MessageDecoder messageDecoder ;
34- private final Consumer <Object > messageHandler ;
38+ private final Consumer <WebPubSubMessage > messageHandler ;
39+
40+ private ChannelPromise handshakeFuture ;
41+ private CompositeByteBuf compositeByteBuf ;
3542
3643 WebSocketClientHandler (WebSocketClientHandshaker handshaker , AtomicReference <ClientLogger > loggerReference ,
37- MessageDecoder messageDecoder , Consumer <Object > messageHandler ) {
44+ MessageDecoder messageDecoder , Consumer <WebPubSubMessage > messageHandler ) {
3845 this .handshaker = handshaker ;
3946 this .loggerReference = loggerReference ;
4047 this .messageDecoder = messageDecoder ;
@@ -46,80 +53,131 @@ ChannelFuture handshakeFuture() {
4653 }
4754
4855 @ Override
49- public void handlerAdded (ChannelHandlerContext ctx ) {
50- handshakeFuture = ctx .newPromise ();
56+ public void handlerAdded (ChannelHandlerContext context ) {
57+ handshakeFuture = context .newPromise ();
58+ compositeByteBuf = context .alloc ().compositeBuffer ();
59+ }
60+
61+ @ Override
62+ public void handlerRemoved (ChannelHandlerContext ctx ) {
63+ publishBuffer ();
5164 }
5265
5366 @ Override
54- public void channelActive (ChannelHandlerContext ctx ) {
55- handshaker .handshake (ctx .channel ());
67+ public void channelActive (ChannelHandlerContext context ) {
68+ handshaker .handshake (context .channel ());
5669 }
5770
5871 @ Override
59- protected void channelRead0 (ChannelHandlerContext ctx , Object msg ) {
60- Channel ch = ctx .channel ();
72+ protected void channelRead0 (ChannelHandlerContext context , Object message ) {
6173 if (handshakeFuture != null && !handshaker .isHandshakeComplete ()) {
74+ Channel channel = context .channel ();
75+
6276 try {
63- handshaker .finishHandshake (ch , (FullHttpResponse ) msg );
77+ handshaker .finishHandshake (channel , (FullHttpResponse ) message );
6478 handshakeFuture .setSuccess ();
6579 } catch (WebSocketHandshakeException e ) {
6680 handshakeFuture .setFailure (e );
6781 }
6882 return ;
6983 }
7084
71- if (msg instanceof FullHttpResponse ) {
72- FullHttpResponse response = (FullHttpResponse ) msg ;
73- throw loggerReference .get ()
74- .logExceptionAsError (new IllegalStateException ("Unexpected FullHttpResponse (getStatus="
75- + response .status () + ", content=" + response .content ().toString (CharsetUtil .UTF_8 ) + ')' ));
85+ if (!(message instanceof WebSocketFrame ) || !processMessage (context , (WebSocketFrame ) message )) {
86+ loggerReference .get ()
87+ .atWarning ()
88+ .addKeyValue ("messageType" , message .getClass ())
89+ .log ("Unknown message type. Skipping." );
90+
91+ context .fireChannelRead (message );
92+ }
93+ }
94+
95+ @ Override
96+ public void exceptionCaught (ChannelHandlerContext ctx , Throwable cause ) {
97+ if (handshakeFuture != null && !handshakeFuture .isDone ()) {
98+ handshakeFuture .setFailure (cause );
7699 }
100+ ctx .close ();
101+ release (compositeByteBuf );
102+ }
77103
78- WebSocketFrame frame = (WebSocketFrame ) msg ;
79- if (frame instanceof TextWebSocketFrame ) {
80- // Text
81- TextWebSocketFrame textFrame = (TextWebSocketFrame ) frame ;
82- loggerReference .get ().atVerbose ().addKeyValue ("text" , textFrame .text ()).log ("Received TextWebSocketFrame" );
83- Object wpsMessage = messageDecoder .decode (textFrame .text ());
84- messageHandler .accept (wpsMessage );
85- } else if (frame instanceof PingWebSocketFrame ) {
104+ /**
105+ * Attempts to process the web socket frame.
106+ *
107+ * @param context Channel for message.
108+ * @param webSocketFrame Frame to process.
109+ * @return true if the frame was processed, false otherwise.
110+ */
111+ private boolean processMessage (ChannelHandlerContext context , WebSocketFrame webSocketFrame ) {
112+ Channel channel = context .channel ();
113+
114+ if (webSocketFrame instanceof PingWebSocketFrame ) {
86115 // Ping, reply Pong
87116 loggerReference .get ().atVerbose ().log ("Received PingWebSocketFrame" );
88117 loggerReference .get ().atVerbose ().log ("Send PongWebSocketFrame" );
89- ch .writeAndFlush (new PongWebSocketFrame ());
90- } else if (frame instanceof PongWebSocketFrame ) {
118+ channel .writeAndFlush (new PongWebSocketFrame ());
119+ return true ;
120+ } else if (webSocketFrame instanceof PongWebSocketFrame ) {
91121 // Pong
92122 loggerReference .get ().atVerbose ().log ("Received PongWebSocketFrame" );
93- } else if (frame instanceof CloseWebSocketFrame ) {
123+ return true ;
124+ } else if (webSocketFrame instanceof CloseWebSocketFrame ) {
94125 // Close
95- CloseWebSocketFrame closeFrame = (CloseWebSocketFrame ) frame ;
126+ final CloseWebSocketFrame closeFrame = (CloseWebSocketFrame ) webSocketFrame ;
127+
96128 loggerReference .get ()
97129 .atVerbose ()
98130 .addKeyValue ("statusCode" , closeFrame .statusCode ())
99131 .addKeyValue ("reasonText" , closeFrame .reasonText ())
100132 .log ("Received CloseWebSocketFrame" );
101-
102- this .serverCloseWebSocketFrame = closeFrame .retain (); // retain for SessionNettyImpl
133+ serverCloseWebSocketFrame = closeFrame .retain (); // retain for SessionNettyImpl
103134
104135 if (closeCallbackFuture == null ) {
105136 // close initiated from server, reply CloseWebSocketFrame, then close connection
106137 loggerReference .get ().atVerbose ().log ("Send CloseWebSocketFrame" );
107138 closeFrame .retain (); // retain before write it back
108- ch .writeAndFlush (closeFrame ).addListener (future -> ch .close ());
139+ channel .writeAndFlush (closeFrame ).addListener (future -> channel .close ());
109140 } else {
110141 // close initiated from client, client already sent CloseWebSocketFrame
111- ch .close ();
142+ channel .close ();
143+ }
144+
145+ return true ;
146+ } else if (webSocketFrame instanceof TextWebSocketFrame
147+ || webSocketFrame instanceof ContinuationWebSocketFrame ) {
148+ if (compositeByteBuf == null ) {
149+ compositeByteBuf = ByteBufAllocator .DEFAULT .compositeBuffer ();
150+ }
151+
152+ compositeByteBuf .addComponent (true , webSocketFrame .content ().retain ());
153+
154+ if (!webSocketFrame .isFinalFragment ()) {
155+ return true ;
112156 }
157+
158+ publishBuffer ();
159+ return true ;
160+ } else {
161+ return false ;
113162 }
114163 }
115164
116- @ Override
117- public void exceptionCaught (ChannelHandlerContext ctx , Throwable cause ) {
118- cause .printStackTrace ();
119- if (handshakeFuture != null && !handshakeFuture .isDone ()) {
120- handshakeFuture .setFailure (cause );
165+ private void publishBuffer () {
166+ final ByteBuffer [] nioBuffers = compositeByteBuf .nioBuffers ();
167+
168+ if (nioBuffers .length == 0 ) {
169+ return ;
170+ }
171+
172+ try {
173+ final BinaryData data = BinaryData .fromListByteBuffer (Arrays .asList (nioBuffers ));
174+ final String collected = data .toString ();
175+ final WebPubSubMessage deserialized = messageDecoder .decode (collected );
176+
177+ messageHandler .accept (deserialized );
178+ } finally {
179+ release (compositeByteBuf );
121180 }
122- ctx .close ();
123181 }
124182
125183 // as side effect, if it is not null, the close (aka CloseWebSocketFrame) is initiated by client
@@ -138,4 +196,11 @@ public CompletableFuture<Void> getClientCloseCallbackFuture() {
138196 CloseWebSocketFrame getServerCloseWebSocketFrame () {
139197 return this .serverCloseWebSocketFrame ;
140198 }
199+
200+ private static void release (CompositeByteBuf buffer ) {
201+ if (buffer .refCnt () > 0 ) {
202+ buffer .release ();
203+ buffer .clear ();
204+ }
205+ }
141206}
0 commit comments