Skip to content

Commit ef2a8bc

Browse files
authored
add exponential backoff for statefultailer when the file doesnt exist (#90)
1 parent 6aa5f65 commit ef2a8bc

File tree

4 files changed

+34
-6
lines changed

4 files changed

+34
-6
lines changed

src/main/java/com/arpnetworking/metrics/common/tailer/StatefulTailer.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,9 +110,11 @@ protected boolean isRunning() {
110110

111111
private void fileLoop() {
112112
InitialPosition nextInitialPosition = _initialPosition;
113+
int openFileAttempt = 0;
113114
try {
114115
while (isRunning()) {
115116
// Attempt to open the file
117+
openFileAttempt++;
116118
try (SeekableByteChannel reader = Files.newByteChannel(_file, StandardOpenOption.READ)) {
117119
LOGGER.trace()
118120
.setMessage("Opened file")
@@ -131,9 +133,10 @@ private void fileLoop() {
131133

132134
// Reset per file state
133135
_hash = Optional.empty();
136+
openFileAttempt = 0;
134137
} catch (final NoSuchFileException e) {
135138
_listener.fileNotFound();
136-
_trigger.waitOnTrigger();
139+
_trigger.waitOnFileNotFoundTrigger(openFileAttempt);
137140
}
138141
}
139142
// Clients may elect to kill the stateful tailer on an exception by calling stop, or they
@@ -312,7 +315,7 @@ private void readLoop(final SeekableByteChannel reader) throws IOException, Inte
312315
// rotated.
313316

314317
// Read interval
315-
_trigger.waitOnTrigger();
318+
_trigger.waitOnReadTrigger();
316319
}
317320
}
318321

@@ -347,7 +350,7 @@ private Attributes getAttributes(final Path file, final Optional<Long> lastCheck
347350
private void rotate(final Optional<SeekableByteChannel> reader, final String reason) throws InterruptedException, IOException {
348351
// Allow a full read interval before calling it quits on the old file
349352
if (reader.isPresent()) {
350-
_trigger.waitOnTrigger();
353+
_trigger.waitOnReadTrigger();
351354
readLines(reader.get());
352355
}
353356

src/main/java/com/arpnetworking/utility/TimerTrigger.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,19 @@ public TimerTrigger(final Duration duration) {
3636
}
3737

3838
@Override
39-
public void waitOnTrigger() throws InterruptedException {
39+
public void waitOnReadTrigger() throws InterruptedException {
4040
Thread.sleep(_duration.getMillis());
4141
}
4242

43+
@Override
44+
public void waitOnFileNotFoundTrigger(final int attempt) throws InterruptedException {
45+
// Max time = 1.3^n * base (n capped at 20)
46+
final double maxBackoff = Math.pow(1.3, Math.min(attempt, 20)) * _duration.getMillis();
47+
// Sleep duration is random from 0 to max, capped at 30 seconds
48+
final int sleepDurationMillis = (int) Math.max(Math.random() * maxBackoff, 30000);
49+
Thread.sleep(sleepDurationMillis);
50+
}
51+
4352
/**
4453
* Generate a Steno log compatible representation.
4554
*

src/main/java/com/arpnetworking/utility/Trigger.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,5 +26,13 @@ public interface Trigger {
2626
*
2727
* @throws InterruptedException thrown when the wait is interrupted.
2828
*/
29-
void waitOnTrigger() throws InterruptedException;
29+
void waitOnReadTrigger() throws InterruptedException;
30+
31+
/**
32+
* Blocks the current thread.
33+
*
34+
* @param attempt The attempt number to open or find the file. Used for exponential backoff.
35+
* @throws InterruptedException thrown when the wait is interrupted.
36+
*/
37+
void waitOnFileNotFoundTrigger(int attempt) throws InterruptedException;
3038
}

src/test/java/com/arpnetworking/utility/ManualSingleThreadedTrigger.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,15 @@
3131
*/
3232
public class ManualSingleThreadedTrigger implements Trigger {
3333
@Override
34-
public void waitOnTrigger() throws InterruptedException {
34+
public void waitOnReadTrigger() throws InterruptedException {
35+
if (_enabled) {
36+
_waiter.release();
37+
_semaphore.acquire();
38+
}
39+
}
40+
41+
@Override
42+
public void waitOnFileNotFoundTrigger(final int attempt) throws InterruptedException {
3543
if (_enabled) {
3644
_waiter.release();
3745
_semaphore.acquire();

0 commit comments

Comments
 (0)