11package io .scalecube .services .transport .rsocket ;
22
3+ import static io .scalecube .services .transport .rsocket .ReferenceCountUtil .safestRelease ;
34import static java .util .function .Function .identity ;
45import static java .util .stream .Collectors .collectingAndThen ;
56import static java .util .stream .Collectors .toMap ;
@@ -102,7 +103,7 @@ public <T> T encodeAndTransform(
102103 DataCodec dataCodec = getDataCodec (message .dataFormatOrDefault ());
103104 dataCodec .encode (new ByteBufOutputStream (dataBuffer ), message .data ());
104105 } catch (Throwable ex ) {
105- ReferenceCountUtil . safestRelease (dataBuffer );
106+ safestRelease (dataBuffer );
106107 LOGGER .error (
107108 "Failed to encode service message data on: {}, cause: {}" , message , ex .toString ());
108109 throw new MessageCodecException ("Failed to encode service message data" , ex );
@@ -114,8 +115,8 @@ public <T> T encodeAndTransform(
114115 try {
115116 headersCodec .encode (new ByteBufOutputStream (headersBuffer ), message .headers ());
116117 } catch (Throwable ex ) {
117- ReferenceCountUtil . safestRelease (headersBuffer );
118- ReferenceCountUtil . safestRelease (dataBuffer ); // release data buf as well
118+ safestRelease (headersBuffer );
119+ safestRelease (dataBuffer ); // release data buf as well
119120 LOGGER .error (
120121 "Failed to encode service message headers on: {}, cause: {}" , message , ex .toString ());
121122 throw new MessageCodecException ("Failed to encode service message headers" , ex );
@@ -144,7 +145,7 @@ public ServiceMessage decode(ByteBuf dataBuffer, ByteBuf headersBuffer)
144145 try (ByteBufInputStream stream = new ByteBufInputStream (headersBuffer , true )) {
145146 builder .headers (headersCodec .decode (stream ));
146147 } catch (Throwable ex ) {
147- ReferenceCountUtil . safestRelease (dataBuffer ); // release data buf as well
148+ safestRelease (dataBuffer ); // release data buf as well
148149 throw new MessageCodecException ("Failed to decode service message headers" , ex );
149150 }
150151 }
@@ -163,12 +164,15 @@ public ServiceMessage decode(ByteBuf dataBuffer, ByteBuf headersBuffer)
163164 */
164165 public static ServiceMessage decodeData (ServiceMessage message , Type dataType )
165166 throws MessageCodecException {
166- if (dataType == null || !message .hasData (ByteBuf .class )) {
167+ if (dataType == null
168+ || dataType == ByteBuf .class
169+ || message .data () == null
170+ || !(message .data () instanceof ByteBuf )) {
167171 return message ;
168172 }
169173
170174 final ByteBuf dataBuffer = message .data ();
171- if (dataBuffer .readableBytes () == 0 || dataType == ByteBuf . class ) {
175+ if (dataBuffer .readableBytes () == 0 ) {
172176 return message ;
173177 }
174178 if (dataType == byte [].class ) {
@@ -177,17 +181,14 @@ public static ServiceMessage decodeData(ServiceMessage message, Type dataType)
177181 return ServiceMessage .from (message ).data (bytes ).build ();
178182 }
179183
180- Object data ;
181- Type targetType = message .isError () ? ErrorData .class : dataType ;
182-
183184 try (ByteBufInputStream inputStream = new ByteBufInputStream (dataBuffer , true )) {
184- DataCodec dataCodec = DataCodec .getInstance (message .dataFormatOrDefault ());
185- data = dataCodec .decode (inputStream , targetType );
185+ final var targetType = message .isError () ? ErrorData .class : dataType ;
186+ final var dataCodec = DataCodec .getInstance (message .dataFormatOrDefault ());
187+ final var decodedData = dataCodec .decode (inputStream , targetType );
188+ return ServiceMessage .from (message ).data (decodedData ).build ();
186189 } catch (Throwable ex ) {
187190 throw new MessageCodecException ("Failed to decode service message data" , ex );
188191 }
189-
190- return ServiceMessage .from (message ).data (data ).build ();
191192 }
192193
193194 private DataCodec getDataCodec (String contentType ) {
0 commit comments