@@ -37,7 +37,6 @@ class NatsConnectionWriter implements Runnable {
3737
3838 private final NatsConnection connection ;
3939
40- private final ReentrantLock writerLock ;
4140 private Future <Boolean > stopped ;
4241 private Future <DataPort > dataPortFuture ;
4342 private DataPort dataPort ;
@@ -51,10 +50,10 @@ class NatsConnectionWriter implements Runnable {
5150 private final MessageQueue outgoing ;
5251 private final MessageQueue reconnectOutgoing ;
5352 private final long reconnectBufferSize ;
53+ private final AtomicBoolean flushBuffer ;
5454
5555 NatsConnectionWriter (NatsConnection connection , NatsConnectionWriter sourceWriter ) {
5656 this .connection = connection ;
57- writerLock = new ReentrantLock ();
5857
5958 this .running = new AtomicBoolean (false );
6059 this .reconnectMode = new AtomicBoolean (sourceWriter != null );
@@ -77,6 +76,8 @@ class NatsConnectionWriter implements Runnable {
7776 reconnectOutgoing = new MessageQueue (true , options .getRequestCleanupInterval (),
7877 sourceWriter == null ? null : sourceWriter .reconnectOutgoing );
7978 reconnectBufferSize = options .getReconnectBufferSize ();
79+
80+ flushBuffer = new AtomicBoolean (false );
8081 }
8182
8283 // Should only be called if the current thread has exited.
@@ -122,60 +123,58 @@ boolean isRunning() {
122123 }
123124
124125 void sendMessageBatch (NatsMessage msg , DataPort dataPort , StatisticsCollector stats ) throws IOException {
125- writerLock .lock ();
126- try {
127- int sendPosition = 0 ;
128- int sbl = sendBufferLength .get ();
129-
130- while (msg != null ) {
131- long size = msg .getSizeInBytes ();
132-
133- if (sendPosition + size > sbl ) {
134- if (sendPosition > 0 ) {
135- dataPort .write (sendBuffer , sendPosition );
136- connection .getNatsStatistics ().registerWrite (sendPosition );
137- sendPosition = 0 ;
138- }
139- if (size > sbl ) { // have to resize b/c can't fit 1 message
140- sbl = bufferAllocSize ((int ) size , BUFFER_BLOCK_SIZE );
141- sendBufferLength .set (sbl );
142- sendBuffer = new byte [sbl ];
143- }
144- }
126+ int sendPosition = 0 ;
127+ int sbl = sendBufferLength .get ();
145128
146- ByteArrayBuilder bab = msg .getProtocolBab ();
147- int babLen = bab .length ();
148- System .arraycopy (bab .internalArray (), 0 , sendBuffer , sendPosition , babLen );
149- sendPosition += babLen ;
129+ while (msg != null ) {
130+ long size = msg .getSizeInBytes ();
150131
151- sendBuffer [sendPosition ++] = CR ;
152- sendBuffer [sendPosition ++] = LF ;
132+ if (sendPosition + size > sbl ) {
133+ if (sendPosition > 0 ) {
134+ dataPort .write (sendBuffer , sendPosition );
135+ connection .getNatsStatistics ().registerWrite (sendPosition );
136+ sendPosition = 0 ;
137+ }
138+ if (size > sbl ) { // have to resize b/c can't fit 1 message
139+ sbl = bufferAllocSize ((int ) size , BUFFER_BLOCK_SIZE );
140+ sendBufferLength .set (sbl );
141+ sendBuffer = new byte [sbl ];
142+ }
143+ }
153144
154- if (!msg .isProtocol ()) {
155- sendPosition += msg .copyNotEmptyHeaders (sendPosition , sendBuffer );
145+ ByteArrayBuilder bab = msg .getProtocolBab ();
146+ int babLen = bab .length ();
147+ System .arraycopy (bab .internalArray (), 0 , sendBuffer , sendPosition , babLen );
148+ sendPosition += babLen ;
156149
157- byte [] bytes = msg .getData (); // guaranteed to not be null
158- if (bytes .length > 0 ) {
159- System .arraycopy (bytes , 0 , sendBuffer , sendPosition , bytes .length );
160- sendPosition += bytes .length ;
161- }
150+ sendBuffer [sendPosition ++] = CR ;
151+ sendBuffer [sendPosition ++] = LF ;
162152
163- sendBuffer [sendPosition ++] = CR ;
164- sendBuffer [sendPosition ++] = LF ;
165- }
153+ if (!msg .isProtocol ()) {
154+ sendPosition += msg .copyNotEmptyHeaders (sendPosition , sendBuffer );
166155
167- stats .incrementOutMsgs ();
168- stats .incrementOutBytes (size );
156+ byte [] bytes = msg .getData (); // guaranteed to not be null
157+ if (bytes .length > 0 ) {
158+ System .arraycopy (bytes , 0 , sendBuffer , sendPosition , bytes .length );
159+ sendPosition += bytes .length ;
160+ }
169161
170- msg = msg .next ;
162+ sendBuffer [sendPosition ++] = CR ;
163+ sendBuffer [sendPosition ++] = LF ;
171164 }
172165
173- dataPort .write (sendBuffer , sendPosition );
174- connection .getNatsStatistics ().registerWrite (sendPosition );
166+ stats .incrementOutMsgs ();
167+ stats .incrementOutBytes (size );
168+
169+ msg = msg .next ;
175170 }
176- finally {
177- writerLock .unlock ();
171+
172+ // no need to write if there are no bytes
173+ if (sendPosition > 0 ) {
174+ dataPort .write (sendBuffer , sendPosition );
178175 }
176+
177+ connection .getNatsStatistics ().registerWrite (sendPosition );
179178 }
180179
181180 @ Override
@@ -197,11 +196,14 @@ public void run() {
197196 msg = this .outgoing .accumulate (sendBufferLength .get (), maxAccumulate , waitForMessage );
198197 }
199198
200- if (msg == null ) { // Make sure we are still running
201- continue ;
199+ if (msg != null ) {
200+ sendMessageBatch ( msg , dataPort , stats ) ;
202201 }
203202
204- sendMessageBatch (msg , dataPort , stats );
203+ if (flushBuffer .get ()) {
204+ flushBuffer .set (false );
205+ dataPort .flush ();
206+ }
205207 }
206208 } catch (IOException | BufferOverflowException io ) {
207209 // if already not running, an IOE is not unreasonable in a transition state
@@ -237,18 +239,8 @@ void queueInternalMessage(NatsMessage msg) {
237239 }
238240
239241 void flushBuffer () {
240- // Since there is no connection level locking, we rely on synchronization
241- // of the APIs here.
242- writerLock .lock ();
243- try {
244- if (this .running .get ()) {
245- dataPort .flush ();
246- }
247- } catch (Exception e ) {
248- // NOOP;
249- }
250- finally {
251- writerLock .unlock ();
242+ if (running .get ()) {
243+ flushBuffer .set (true );
252244 }
253245 }
254246}
0 commit comments