@@ -115,10 +115,10 @@ public CompletableFuture<Void> tryToEnqueue(EnqueuedMessage message, boolean ins
115115 logger .info ("[{}] Message queue in-flight limit of {} reached. Putting the message into incoming " +
116116 "waiting queue" , id , settings .getMaxSendBufferMessagesCount ());
117117 }
118- } else if (availableSizeBytes < message .getLength ()) {
118+ } else if (availableSizeBytes < message .getSize ()) {
119119 if (instant ) {
120120 String errorMessage = "[" + id + "] Rejecting a message of " +
121- message .getLength () +
121+ message .getSize () +
122122 " bytes: not enough space in message queue. Buffer currently has " + currentInFlightCount +
123123 " messages with " + availableSizeBytes + " / " + settings .getMaxSendBufferMemorySize () +
124124 " bytes available" ;
@@ -129,7 +129,7 @@ public CompletableFuture<Void> tryToEnqueue(EnqueuedMessage message, boolean ins
129129 } else {
130130 logger .info ("[{}] Can't accept a message of {} bytes into message queue. Buffer currently has " +
131131 "{} messages with {} / {} bytes available. Putting the message into incoming " +
132- "waiting queue." , id , message .getLength (), currentInFlightCount ,
132+ "waiting queue." , id , message .getSize (), currentInFlightCount ,
133133 availableSizeBytes , settings .getMaxSendBufferMemorySize ());
134134 }
135135 } else if (incomingQueue .isEmpty ()) {
@@ -149,10 +149,10 @@ public CompletableFuture<Void> tryToEnqueue(EnqueuedMessage message, boolean ins
149149 private void acceptMessageIntoSendingQueue (EnqueuedMessage message ) {
150150 this .lastAcceptedMessageFuture = message .getFuture ();
151151 this .currentInFlightCount ++;
152- this .availableSizeBytes -= message .getOriginLength ();
152+ this .availableSizeBytes -= message .gitOriginSize ();
153153 if (logger .isDebugEnabled ()) {
154154 logger .debug ("[{}] Accepted 1 message of {} uncompressed bytes. Current In-flight: {}, " +
155- "AvailableSizeBytes: {} ({} / {} acquired)" , id , message .getOriginLength (),
155+ "AvailableSizeBytes: {} ({} / {} acquired)" , id , message .gitOriginSize (),
156156 currentInFlightCount , availableSizeBytes , maxSendBufferMemorySize - availableSizeBytes ,
157157 maxSendBufferMemorySize );
158158 }
@@ -189,15 +189,14 @@ private void moveEncodedMessagesToSendingQueue() {
189189 IOException error = msg .getCompressError ();
190190 if (error != null ) { // just skip
191191 logger .warn ("[{}] Message wasn't sent because of processing error" , id , error );
192- free (1 , msg .getOriginLength ());
192+ free (1 , msg .gitOriginSize ());
193193 continue ;
194194 }
195195
196- if (msg .getOriginLength () != msg .getLength ()) {
197- logger .trace ("[{}] Message compressed from {} to {} bytes" , id , msg .getOriginLength (),
198- msg .getLength ());
196+ if (msg .gitOriginSize () != msg .getSize ()) {
197+ logger .trace ("[{}] Message compressed from {} to {} bytes" , id , msg .gitOriginSize (), msg .getSize ());
199198 // message was actually encoded. Need to free some bytes
200- long bytesFreed = msg .getOriginLength () - msg .getLength ();
199+ long bytesFreed = msg .gitOriginSize () - msg .getSize ();
201200 // bytesFreed can be less than 0
202201 free (0 , bytesFreed );
203202 }
@@ -288,7 +287,7 @@ private void free(int messageCount, long sizeBytes) {
288287 if (incomingMessage == null ) {
289288 break ;
290289 }
291- if (incomingMessage .message .getOriginLength () > availableSizeBytes
290+ if (incomingMessage .message .gitOriginSize () > availableSizeBytes
292291 || currentInFlightCount >= settings .getMaxSendBufferMessagesCount ()) {
293292 logger .trace ("[{}] There are messages in incomingQueue still, but no space in send buffer" , id );
294293 return ;
@@ -431,7 +430,7 @@ private void onWriteResponse(YdbTopic.StreamWriteMessage.WriteResponse response)
431430 }
432431 if (sentMessage .getSeqNo () == ack .getSeqNo ()) {
433432 inFlightFreed ++;
434- bytesFreed += sentMessage .getLength ();
433+ bytesFreed += sentMessage .getSize ();
435434 sentMessages .remove ();
436435 processWriteAck (sentMessage , ack );
437436 break ;
@@ -443,7 +442,7 @@ private void onWriteResponse(YdbTopic.StreamWriteMessage.WriteResponse response)
443442 sentMessage .getFuture ().completeExceptionally (
444443 new RuntimeException ("Didn't get ack from server for this message" ));
445444 inFlightFreed ++;
446- bytesFreed += sentMessage .getLength ();
445+ bytesFreed += sentMessage .getSize ();
447446 sentMessages .remove ();
448447 // Checking next message waiting for ack
449448 } else {
0 commit comments