@@ -42,6 +42,8 @@ public class LogtailAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {
4242 protected int batchInterval = 3000 ;
4343 protected int connectTimeout = 5000 ;
4444 protected int readTimeout = 10000 ;
45+ protected int maxRetries = 5 ;
46+ protected int retrySleepMilliseconds = 300 ;
4547
4648 protected PatternLayoutEncoder encoder ;
4749
@@ -55,7 +57,9 @@ public class LogtailAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {
5557 protected ScheduledExecutorService scheduledExecutorService ;
5658 protected ScheduledFuture <?> scheduledFuture ;
5759 protected ObjectMapper dataMapper ;
58- protected Logger errorLog ;
60+ protected Logger logger ;
61+ protected int retrySize = 0 ;
62+ protected int retries = 0 ;
5963 protected boolean disabled = false ;
6064
6165 protected ThreadFactory threadFactory = r -> {
@@ -66,7 +70,7 @@ public class LogtailAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {
6670 };
6771
6872 public LogtailAppender () {
69- errorLog = LoggerFactory .getLogger (LogtailAppender .class );
73+ logger = LoggerFactory .getLogger (LogtailAppender .class );
7074
7175 dataMapper = new ObjectMapper ()
7276 .setSerializationInclusion (JsonInclude .Include .NON_NULL )
@@ -85,9 +89,9 @@ protected void append(ILoggingEvent event) {
8589 return ;
8690
8791 if (this .ingestUrl .isEmpty () || this .sourceToken == null || this .sourceToken .isEmpty ()) {
88- // Prevent potential dead-lock, when a blocking logger is configured - avoid using errorLog directly in append
92+ // Prevent potential dead-lock, when a blocking logger is configured - avoid using logger directly in append
8993 startThread ("logtail-warning-logger" , () -> {
90- errorLog .warn ("Missing Source token for Better Stack - disabling LogtailAppender. Find out how to fix this at: https://betterstack.com/docs/logs/java " );
94+ logger .warn ("Missing Source token for Better Stack - disabling LogtailAppender. Find out how to fix this at: https://betterstack.com/docs/logs/java " );
9195 });
9296 this .disabled = true ;
9397 return ;
@@ -99,9 +103,9 @@ protected void append(ILoggingEvent event) {
99103
100104 if (warnAboutMaxQueueSize && batch .size () == maxQueueSize ) {
101105 this .warnAboutMaxQueueSize = false ;
102- // Prevent potential dead-lock, when a blocking logger is configured - avoid using errorLog directly in append
106+ // Prevent potential dead-lock, when a blocking logger is configured - avoid using logger directly in append
103107 startThread ("logtail-error-logger" , () -> {
104- errorLog .error ("Maximum number of messages in queue reached ({}). New messages will be dropped." , maxQueueSize );
108+ logger .error ("Maximum number of messages in queue reached ({}). New messages will be dropped." , maxQueueSize );
105109 });
106110 }
107111
@@ -123,38 +127,84 @@ protected void flush() {
123127 if (batch .isEmpty ())
124128 return ;
125129
130+ // Guaranteed to not be running concurrently
126131 if (isFlushing .getAndSet (true ))
127132 return ;
128133
129134 mustReflush = false ;
130135
136+ int flushedSize = batch .size ();
137+ if (flushedSize > batchSize ) {
138+ flushedSize = batchSize ;
139+ mustReflush = true ;
140+ }
141+ if (retries > 0 && flushedSize > retrySize ) {
142+ flushedSize = retrySize ;
143+ mustReflush = true ;
144+ }
145+
146+ if (!flushLogs (flushedSize )) {
147+ mustReflush = true ;
148+ }
149+
150+ isFlushing .set (false );
151+
152+ if (mustReflush || batch .size () >= batchSize )
153+ flush ();
154+ }
155+
156+ protected boolean flushLogs (int flushedSize ) {
157+ retrySize = flushedSize ;
158+
131159 try {
132- int flushedSize = batch .size ();
133- if (flushedSize > batchSize ) {
134- flushedSize = batchSize ;
135- mustReflush = true ;
160+ if (retries > maxRetries ) {
161+ batch .subList (0 , flushedSize ).clear ();
162+ logger .error ("Dropped batch of {} logs." , flushedSize );
163+ warnAboutMaxQueueSize = true ;
164+ retries = 0 ;
165+
166+ return true ;
167+ }
168+
169+ if (retries > 0 ) {
170+ logger .info ("Retrying to send {} logs to Better Stack ({} / {})" , flushedSize , retries , maxRetries );
171+ try {
172+ TimeUnit .MILLISECONDS .sleep (retrySleepMilliseconds );
173+ } catch (InterruptedException e ) {
174+ // Continue
175+ }
136176 }
137177
138178 LogtailResponse response = callHttpURLConnection (flushedSize );
139179
140- if (response .getStatus () >= 200 && response .getStatus () < 300 ) {
141- batch .subList (0 , flushedSize ).clear ();
142- this .warnAboutMaxQueueSize = true ;
143- } else {
144- errorLog .error ("Error calling Better Stack : {} ({})" , response .getError (), response .getStatus ());
145- mustReflush = true ;
180+ if (response .getStatus () >= 300 || response .getStatus () < 200 ) {
181+ logger .error ("Error calling Better Stack : {} ({})" , response .getError (), response .getStatus ());
182+ retries ++;
183+
184+ return false ;
146185 }
186+
187+ batch .subList (0 , flushedSize ).clear ();
188+ warnAboutMaxQueueSize = true ;
189+ retries = 0 ;
190+
191+ return true ;
192+
193+ } catch (ConcurrentModificationException e ) {
194+ logger .error ("Error clearing {} logs from batch, will retry immediately." , flushedSize , e );
195+ retries = maxRetries ; // No point in retrying to send the data
196+
147197 } catch (JsonProcessingException e ) {
148- errorLog .error ("Error processing JSON data : {}" , e .getMessage (), e );
198+ logger .error ("Error processing JSON data : {}" , e .getMessage (), e );
199+ retries = maxRetries ; // No point in retrying when batch cannot be processed into JSON
149200
150201 } catch (Exception e ) {
151- errorLog .error ("Error trying to call Better Stack : {}" , e .getMessage (), e );
202+ logger .error ("Error trying to call Better Stack : {}" , e .getMessage (), e );
152203 }
153204
154- isFlushing . set ( false ) ;
205+ retries ++ ;
155206
156- if (mustReflush || batch .size () >= batchSize )
157- flush ();
207+ return false ;
158208 }
159209
160210 protected LogtailResponse callHttpURLConnection (int flushedSize ) throws IOException {
@@ -163,7 +213,7 @@ protected LogtailResponse callHttpURLConnection(int flushedSize) throws IOExcept
163213 try {
164214 connection .connect ();
165215 } catch (Exception e ) {
166- errorLog .error ("Error trying to call Better Stack : {}" , e .getMessage (), e );
216+ logger .error ("Error trying to call Better Stack : {}" , e .getMessage (), e );
167217 }
168218
169219 try (OutputStream os = connection .getOutputStream ()) {
@@ -280,7 +330,7 @@ protected Object getMetaValue(String type, String value) {
280330 return Boolean .valueOf (value );
281331 }
282332 } catch (NumberFormatException e ) {
283- errorLog .error ("Error getting meta value - {}" , e .getMessage (), e );
333+ logger .error ("Error getting meta value - {}" , e .getMessage (), e );
284334 }
285335
286336 return value ;
@@ -292,7 +342,10 @@ public void run() {
292342 try {
293343 flush ();
294344 } catch (Exception e ) {
295- errorLog .error ("Error trying to flush : {}" , e .getMessage (), e );
345+ logger .error ("Error trying to flush : {}" , e .getMessage (), e );
346+ if (isFlushing .get ()) {
347+ isFlushing .set (false );
348+ }
296349 }
297350 }
298351 }
@@ -426,6 +479,26 @@ public void setReadTimeout(int readTimeout) {
426479 this .readTimeout = readTimeout ;
427480 }
428481
482+ /**
483+ * Sets the maximum number of retries for sending logs to Better Stack. After that, current batch of logs will be dropped.
484+ *
485+ * @param maxRetries
486+ * max number of retries for sending logs
487+ */
488+ public void setMaxRetries (int maxRetries ) {
489+ this .maxRetries = maxRetries ;
490+ }
491+
492+ /**
493+ * Sets the number of milliseconds to sleep before retrying to send logs to Better Stack.
494+ *
495+ * @param retrySleepMilliseconds
496+ * number of milliseconds to sleep before retry
497+ */
498+ public void setRetrySleepMilliseconds (int retrySleepMilliseconds ) {
499+ this .retrySleepMilliseconds = retrySleepMilliseconds ;
500+ }
501+
429502 public void setEncoder (PatternLayoutEncoder encoder ) {
430503 this .encoder = encoder ;
431504 }
0 commit comments