@@ -37,6 +37,7 @@ class NatsConnectionWriter implements Runnable {
3737
3838 private final NatsConnection connection ;
3939
40+ private final ReentrantLock writerLock ;
4041 private Future <Boolean > stopped ;
4142 private Future <DataPort > dataPortFuture ;
4243 private DataPort dataPort ;
@@ -50,10 +51,10 @@ class NatsConnectionWriter implements Runnable {
5051 private final MessageQueue outgoing ;
5152 private final MessageQueue reconnectOutgoing ;
5253 private final long reconnectBufferSize ;
53- private final AtomicBoolean flushBuffer ;
5454
5555 NatsConnectionWriter (NatsConnection connection , NatsConnectionWriter sourceWriter ) {
5656 this .connection = connection ;
57+ writerLock = new ReentrantLock ();
5758
5859 this .running = new AtomicBoolean (false );
5960 this .reconnectMode = new AtomicBoolean (sourceWriter != null );
@@ -76,8 +77,6 @@ class NatsConnectionWriter implements Runnable {
7677 reconnectOutgoing = new MessageQueue (true , options .getRequestCleanupInterval (),
7778 sourceWriter == null ? null : sourceWriter .reconnectOutgoing );
7879 reconnectBufferSize = options .getReconnectBufferSize ();
79-
80- flushBuffer = new AtomicBoolean (false );
8180 }
8281
8382 // Should only be called if the current thread has exited.
@@ -123,65 +122,60 @@ boolean isRunning() {
123122 }
124123
125124 void sendMessageBatch (NatsMessage msg , DataPort dataPort , StatisticsCollector stats ) throws IOException {
126- int sendPosition = 0 ;
127- int sbl = sendBufferLength .get ();
128-
129- while (msg != null ) {
130- long size = msg .getSizeInBytes ();
131-
132- if (sendPosition + size > sbl ) {
133- if (sendPosition > 0 ) {
134- dataPort .write (sendBuffer , sendPosition );
135- connection .getNatsStatistics ().registerWrite (sendPosition );
136- sendPosition = 0 ;
137- }
138- if (size > sbl ) { // have to resize b/c can't fit 1 message
139- sbl = bufferAllocSize ((int ) size , BUFFER_BLOCK_SIZE );
140- sendBufferLength .set (sbl );
141- sendBuffer = new byte [sbl ];
125+ writerLock .lock ();
126+ try {
127+ int sendPosition = 0 ;
128+ int sbl = sendBufferLength .get ();
129+
130+ while (msg != null ) {
131+ long size = msg .getSizeInBytes ();
132+
133+ if (sendPosition + size > sbl ) {
134+ if (sendPosition > 0 ) {
135+ dataPort .write (sendBuffer , sendPosition );
136+ connection .getNatsStatistics ().registerWrite (sendPosition );
137+ sendPosition = 0 ;
138+ }
139+ if (size > sbl ) { // have to resize b/c can't fit 1 message
140+ sbl = bufferAllocSize ((int ) size , BUFFER_BLOCK_SIZE );
141+ sendBufferLength .set (sbl );
142+ sendBuffer = new byte [sbl ];
143+ }
142144 }
143- }
144-
145- ByteArrayBuilder bab = msg .getProtocolBab ();
146- int babLen = bab .length ();
147- System .arraycopy (bab .internalArray (), 0 , sendBuffer , sendPosition , babLen );
148- sendPosition += babLen ;
149145
150- sendBuffer [sendPosition ++] = CR ;
151- sendBuffer [sendPosition ++] = LF ;
152-
153- if (!msg .isProtocol ()) {
154- sendPosition += msg .copyNotEmptyHeaders (sendPosition , sendBuffer );
155-
156- byte [] bytes = msg .getData (); // guaranteed to not be null
157- if (bytes .length > 0 ) {
158- System .arraycopy (bytes , 0 , sendBuffer , sendPosition , bytes .length );
159- sendPosition += bytes .length ;
160- }
146+ ByteArrayBuilder bab = msg .getProtocolBab ();
147+ int babLen = bab .length ();
148+ System .arraycopy (bab .internalArray (), 0 , sendBuffer , sendPosition , babLen );
149+ sendPosition += babLen ;
161150
162151 sendBuffer [sendPosition ++] = CR ;
163152 sendBuffer [sendPosition ++] = LF ;
164- }
165153
166- stats . incrementOutMsgs ();
167- stats . incrementOutBytes ( size );
154+ if (! msg . isProtocol ()) {
155+ sendPosition += msg . copyNotEmptyHeaders ( sendPosition , sendBuffer );
168156
169- msg = msg .next ;
170- }
157+ byte [] bytes = msg .getData (); // guaranteed to not be null
158+ if (bytes .length > 0 ) {
159+ System .arraycopy (bytes , 0 , sendBuffer , sendPosition , bytes .length );
160+ sendPosition += bytes .length ;
161+ }
171162
172- // no need to write if there are no bytes
173- if (sendPosition > 0 ) {
174- dataPort .write (sendBuffer , sendPosition );
175- }
163+ sendBuffer [sendPosition ++] = CR ;
164+ sendBuffer [sendPosition ++] = LF ;
165+ }
176166
177- try {
178- if ( flushBuffer . get ()) {
179- dataPort . flush ();
180- flushBuffer . set ( false ) ;
167+ stats . incrementOutMsgs ();
168+ stats . incrementOutBytes ( size );
169+
170+ msg = msg . next ;
181171 }
182- } catch (Exception ignore ) {}
183172
184- connection .getNatsStatistics ().registerWrite (sendPosition );
173+ dataPort .write (sendBuffer , sendPosition );
174+ connection .getNatsStatistics ().registerWrite (sendPosition );
175+ }
176+ finally {
177+ writerLock .unlock ();
178+ }
185179 }
186180
187181 @ Override
@@ -243,8 +237,18 @@ void queueInternalMessage(NatsMessage msg) {
243237 }
244238
245239 void flushBuffer () {
246- if (running .get ()) {
247- flushBuffer .set (true );
240+ // Since there is no connection level locking, we rely on synchronization
241+ // of the APIs here.
242+ writerLock .lock ();
243+ try {
244+ if (this .running .get ()) {
245+ dataPort .flush ();
246+ }
247+ } catch (Exception e ) {
248+ // NOOP;
249+ }
250+ finally {
251+ writerLock .unlock ();
248252 }
249253 }
250254}
0 commit comments