@@ -115,10 +115,10 @@ public CompletableFuture<Void> tryToEnqueue(EnqueuedMessage message, boolean ins
115115 logger .info ("[{}] Message queue in-flight limit of {} reached. Putting the message into incoming " +
116116 "waiting queue" , id , settings .getMaxSendBufferMessagesCount ());
117117 }
118- } else if (availableSizeBytes < message .getLength ()) {
118+ } else if (availableSizeBytes < message .getSize ()) {
119119 if (instant ) {
120120 String errorMessage = "[" + id + "] Rejecting a message of " +
121- message .getLength () +
121+ message .getSize () +
122122 " bytes: not enough space in message queue. Buffer currently has " + currentInFlightCount +
123123 " messages with " + availableSizeBytes + " / " + settings .getMaxSendBufferMemorySize () +
124124 " bytes available" ;
@@ -129,7 +129,7 @@ public CompletableFuture<Void> tryToEnqueue(EnqueuedMessage message, boolean ins
129129 } else {
130130 logger .info ("[{}] Can't accept a message of {} bytes into message queue. Buffer currently has " +
131131 "{} messages with {} / {} bytes available. Putting the message into incoming " +
132- "waiting queue." , id , message .getLength (), currentInFlightCount ,
132+ "waiting queue." , id , message .getSize (), currentInFlightCount ,
133133 availableSizeBytes , settings .getMaxSendBufferMemorySize ());
134134 }
135135 } else if (incomingQueue .isEmpty ()) {
@@ -149,10 +149,10 @@ public CompletableFuture<Void> tryToEnqueue(EnqueuedMessage message, boolean ins
149149 private void acceptMessageIntoSendingQueue (EnqueuedMessage message ) {
150150 this .lastAcceptedMessageFuture = message .getFuture ();
151151 this .currentInFlightCount ++;
152- this .availableSizeBytes -= message .getOriginLength ();
152+ this .availableSizeBytes -= message .getOriginalSize ();
153153 if (logger .isDebugEnabled ()) {
154154 logger .debug ("[{}] Accepted 1 message of {} uncompressed bytes. Current In-flight: {}, " +
155- "AvailableSizeBytes: {} ({} / {} acquired)" , id , message .getOriginLength (),
155+ "AvailableSizeBytes: {} ({} / {} acquired)" , id , message .getOriginalSize (),
156156 currentInFlightCount , availableSizeBytes , maxSendBufferMemorySize - availableSizeBytes ,
157157 maxSendBufferMemorySize );
158158 }
@@ -189,15 +189,15 @@ private void moveEncodedMessagesToSendingQueue() {
189189 IOException error = msg .getCompressError ();
190190 if (error != null ) { // just skip
191191 logger .warn ("[{}] Message wasn't sent because of processing error" , id , error );
192- free (1 , msg .getOriginLength ());
192+ free (1 , msg .getOriginalSize ());
193193 continue ;
194194 }
195195
196- if (msg .getOriginLength () != msg .getLength ()) {
197- logger .trace ("[{}] Message compressed from {} to {} bytes" , id , msg .getOriginLength (),
198- msg .getLength ());
196+ if (msg .getOriginalSize () != msg .getSize ()) {
197+ logger .trace ("[{}] Message compressed from {} to {} bytes" , id , msg .getOriginalSize (),
198+ msg .getSize ());
199199 // message was actually encoded. Need to free some bytes
200- long bytesFreed = msg .getOriginLength () - msg .getLength ();
200+ long bytesFreed = msg .getOriginalSize () - msg .getSize ();
201201 // bytesFreed can be less than 0
202202 free (0 , bytesFreed );
203203 }
@@ -288,7 +288,7 @@ private void free(int messageCount, long sizeBytes) {
288288 if (incomingMessage == null ) {
289289 break ;
290290 }
291- if (incomingMessage .message .getOriginLength () > availableSizeBytes
291+ if (incomingMessage .message .getOriginalSize () > availableSizeBytes
292292 || currentInFlightCount >= settings .getMaxSendBufferMessagesCount ()) {
293293 logger .trace ("[{}] There are messages in incomingQueue still, but no space in send buffer" , id );
294294 return ;
@@ -431,7 +431,7 @@ private void onWriteResponse(YdbTopic.StreamWriteMessage.WriteResponse response)
431431 }
432432 if (sentMessage .getSeqNo () == ack .getSeqNo ()) {
433433 inFlightFreed ++;
434- bytesFreed += sentMessage .getLength ();
434+ bytesFreed += sentMessage .getSize ();
435435 sentMessages .remove ();
436436 processWriteAck (sentMessage , ack );
437437 break ;
@@ -443,7 +443,7 @@ private void onWriteResponse(YdbTopic.StreamWriteMessage.WriteResponse response)
443443 sentMessage .getFuture ().completeExceptionally (
444444 new RuntimeException ("Didn't get ack from server for this message" ));
445445 inFlightFreed ++;
446- bytesFreed += sentMessage .getLength ();
446+ bytesFreed += sentMessage .getSize ();
447447 sentMessages .remove ();
448448 // Checking next message waiting for ack
449449 } else {
0 commit comments