Skip to content

Commit c223ee1

Browse files
authored
Improve WatchService error handling & logging (#153)
* Improve WatchService error handling & logging * Add license header * Revert cleanup log level to debug
1 parent cfa4a5f commit c223ee1

File tree

5 files changed

+123
-12
lines changed

5 files changed

+123
-12
lines changed

agent/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ dependencies {
1212
implementation("org.apache.avro:avro:${avroVersion}")
1313
compileOnly("org.slf4j:slf4j-api:${slf4jVersion}")
1414
testImplementation("org.junit-pioneer:junit-pioneer:1.4.2")
15+
testImplementation "com.datastax.oss:dsbulk-tests:${dsbulkVersion}"
1516

1617
implementation("${pulsarGroup}:pulsar-client:${pulsarVersion}")
1718
}

agent/src/main/java/com/datastax/oss/cdc/agent/AbstractDirectoryWatcher.java

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,13 @@ public abstract class AbstractDirectoryWatcher {
3636
private final WatchService watchService;
3737
private final Duration pollInterval;
3838
private final Path directory;
39-
private final Set<WatchEvent.Kind<Path>> kinds;
39+
private final Set<WatchEvent.Kind<?>> kinds;
4040

41-
public AbstractDirectoryWatcher(Path directory, Duration pollInterval, Set<WatchEvent.Kind<Path>> kinds) throws IOException {
41+
public AbstractDirectoryWatcher(Path directory, Duration pollInterval, Set<WatchEvent.Kind<?>> kinds) throws IOException {
4242
this(FileSystems.getDefault().newWatchService(), directory, pollInterval, kinds);
4343
}
4444

45-
AbstractDirectoryWatcher(WatchService watchService, Path directory, Duration pollInterval, Set<WatchEvent.Kind<Path>> kinds) throws IOException {
45+
AbstractDirectoryWatcher(WatchService watchService, Path directory, Duration pollInterval, Set<WatchEvent.Kind<?>> kinds) throws IOException {
4646
this.watchService = watchService;
4747
this.pollInterval = pollInterval;
4848
this.directory = directory;
@@ -55,16 +55,28 @@ public void poll() throws InterruptedException, IOException {
5555
WatchKey key = watchService.poll(pollInterval.toMillis(), TimeUnit.MILLISECONDS);
5656

5757
if (key != null) {
58-
for (WatchEvent<?> event : key.pollEvents()) {
59-
Path relativePath = (Path) event.context();
60-
Path absolutePath = directory.resolve(relativePath);
58+
try {
59+
for (WatchEvent<?> event : key.pollEvents()) {
60+
if (event.kind() == java.nio.file.StandardWatchEventKinds.OVERFLOW) {
61+
log.warn("Overflow event detected: kind={}, context={}, count={}",
62+
event.kind(), event.context(), event.count());
63+
continue;
64+
}
65+
Path relativePath = (Path) event.context();
66+
Path absolutePath = directory.resolve(relativePath);
6167

62-
if (kinds.contains(event.kind())) {
63-
log.debug("Detected new commitlog file={}", absolutePath);
64-
handleEvent(event, absolutePath);
68+
if (kinds.contains(event.kind())) {
69+
log.debug("Detected new commitlog file={}", absolutePath);
70+
handleEvent(event, absolutePath);
71+
}
72+
}
73+
} catch (Exception ex) {
74+
log.error("Error while handling WatchKey", ex);
75+
} finally {
76+
if (!key.reset()) {
77+
log.warn("WatchKey is no longer valid");
6578
}
6679
}
67-
key.reset();
6880
}
6981
}
7082

agent/src/main/java/com/datastax/oss/cdc/agent/CommitLogProcessor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
3030
import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
31+
import static java.nio.file.StandardWatchEventKinds.OVERFLOW;
3132

3233
/**
3334
* Detect and read commitlogs files in the cdc_raw directory.
@@ -36,7 +37,7 @@
3637
public class CommitLogProcessor extends AbstractProcessor implements AutoCloseable {
3738
private static final String NAME = "Commit Log Processor";
3839

39-
private static final Set<WatchEvent.Kind<Path>> watchedEvents = Stream.of(ENTRY_CREATE, ENTRY_MODIFY).collect(Collectors.toSet());
40+
private static final Set<WatchEvent.Kind<?>> watchedEvents = Stream.of(ENTRY_CREATE, ENTRY_MODIFY, OVERFLOW).collect(Collectors.toSet());
4041

4142
private final AbstractDirectoryWatcher newCommitLogWatcher;
4243
private final CommitLogTransfer commitLogTransfer;

agent/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ public void finish(TaskStatus taskStatus, int lastSentPosition) {
300300
}
301301

302302
public void cleanup(TaskStatus status) {
303-
log.debug("Cleanup task={}", this, status);
303+
log.debug("Cleanup task={}, status={}", this, status);
304304
File file = getFile();
305305
switch (status) {
306306
case SUCCESS:
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/**
2+
* Copyright DataStax, Inc 2021.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datastax.oss.cdc.agent;
17+
18+
import com.datastax.oss.dsbulk.tests.utils.FileUtils;
19+
import org.junit.jupiter.api.AfterEach;
20+
import org.junit.jupiter.api.BeforeEach;
21+
import org.junit.jupiter.api.Test;
22+
23+
import java.io.File;
24+
import java.io.IOException;
25+
import java.nio.file.Files;
26+
import java.nio.file.Path;
27+
import java.nio.file.WatchEvent;
28+
import java.time.Duration;
29+
import java.util.Set;
30+
import java.util.concurrent.BlockingQueue;
31+
import java.util.concurrent.LinkedBlockingQueue;
32+
import java.util.stream.Collectors;
33+
import java.util.stream.Stream;
34+
35+
import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
36+
import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
37+
import static java.nio.file.StandardWatchEventKinds.OVERFLOW;
38+
import static org.junit.jupiter.api.Assertions.assertEquals;
39+
import static org.junit.jupiter.api.Assertions.assertNotNull;
40+
41+
public class AbstractDirectoryWatcherTest {
42+
43+
private Path cdcDir;
44+
45+
@BeforeEach
46+
public void init() throws IOException {
47+
cdcDir = Files.createTempDirectory("cdc");
48+
}
49+
50+
@AfterEach
51+
void deleteTempDirs() {
52+
if (cdcDir != null && Files.exists(cdcDir)) {
53+
FileUtils.deleteDirectory(cdcDir);
54+
}
55+
}
56+
57+
@Test
58+
public void testPoll() throws InterruptedException, IOException {
59+
// given
60+
Set<WatchEvent.Kind<?>> watchedEvents = Stream.of(ENTRY_CREATE, ENTRY_MODIFY, OVERFLOW).collect(Collectors.toSet());
61+
final BlockingQueue<File> commitLogQueue = new LinkedBlockingQueue<>();
62+
63+
AbstractDirectoryWatcher watcher = new AbstractDirectoryWatcher(cdcDir,
64+
Duration.ofSeconds(20), // avoid flakiness, usually completes in ~10 seconds
65+
watchedEvents) {
66+
@Override
67+
void handleEvent(WatchEvent<?> event, Path path) {
68+
commitLogQueue.add(path.toFile());
69+
}
70+
};
71+
File commitLog = new File(cdcDir.toFile(), "CommitLog-6-1.log");
72+
commitLog.createNewFile();
73+
74+
// when
75+
watcher.poll(); // can block up to 20 seconds
76+
77+
// then
78+
assertEventHandled(commitLogQueue, commitLog);
79+
80+
// write another file
81+
File commitLog2 = new File(cdcDir.toFile(), "CommitLog-6-2.log");
82+
commitLog2.createNewFile();
83+
84+
// when
85+
watcher.poll(); // can block up to 20 seconds
86+
87+
// then
88+
assertEventHandled(commitLogQueue, commitLog2);
89+
}
90+
91+
private void assertEventHandled(BlockingQueue<File> commitLogQueue, File expectedFile) {
92+
File actualFile = commitLogQueue.poll();
93+
assertNotNull(actualFile);
94+
assertEquals(commitLogQueue.size(), 0);
95+
assertEquals(expectedFile, actualFile);
96+
}
97+
}

0 commit comments

Comments
 (0)