Skip to content

Commit 850cb84

Browse files
authored
Release all semaphore permits when the current task instance is to be retried (#158)
1 parent 6f02e87 commit 850cb84

File tree

2 files changed

+137
-1
lines changed

2 files changed

+137
-1
lines changed

agent/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderService.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,16 @@ public void finish(TaskStatus taskStatus, int lastSentPosition) {
280280
} catch (Throwable e) {
281281
// eventually resubmit self after 10s
282282
log.error("Task segment={} completed={} syncPosition={} failed, retrying:", segment, completed, syncPosition, e);
283-
pendingTasks.putIfAbsent(segment, this);
283+
pendingTasks.computeIfAbsent(segment, k -> {
284+
// release all permits to avoid blocking retrying that task. Please note that the following code
285+
// path will not be exercised there are other pending tasks for the same segment, in which case,
286+
// that other pending would have its own, unused inflightMessagesSemaphore instance and there
287+
// is no need to reset the semaphore on the current task.
288+
log.debug("Task segment={} resubmitted, all inflightMessagesSemaphore permits will be released", segment);
289+
inflightMessagesSemaphore.release(config.maxInflightMessagesPerTask -
290+
inflightMessagesSemaphore.availablePermits());
291+
return this;
292+
});
284293
}
285294
}
286295

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/**
2+
* Copyright DataStax, Inc 2021.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datastax.oss.cdc.agent;
17+
18+
import org.junit.jupiter.api.Test;
19+
import org.mockito.Mockito;
20+
21+
import java.io.File;
22+
import java.util.concurrent.BlockingQueue;
23+
import java.util.concurrent.ConcurrentMap;
24+
import java.util.concurrent.CountDownLatch;
25+
import java.util.concurrent.ExecutorService;
26+
import java.util.concurrent.Executors;
27+
import java.util.concurrent.LinkedBlockingDeque;
28+
import java.util.concurrent.TimeUnit;
29+
30+
import static org.junit.jupiter.api.Assertions.assertEquals;
31+
import static org.junit.jupiter.api.Assertions.assertTrue;
32+
33+
34+
public class CommitLogReaderServiceTest {
35+
36+
@Test
37+
public void finishTaskRetryTest() throws InterruptedException {
38+
// given
39+
AgentConfig config = new AgentConfig();
40+
config.maxInflightMessagesPerTask = 10;
41+
config.cdcConcurrentProcessors = 1;
42+
MutationSender<?> mutationSender = Mockito.mock(MutationSender.class);
43+
SegmentOffsetWriter segmentOffsetWriter = Mockito.mock(SegmentOffsetWriter.class);
44+
CommitLogTransfer commitLogTransfer = Mockito.mock(CommitLogTransfer.class);
45+
ExecutorService executorService = Executors.newFixedThreadPool(1);
46+
MyCommitLogService commitLogReaderService = new MyCommitLogService(
47+
config, mutationSender, segmentOffsetWriter, commitLogTransfer, executorService);
48+
CommitLogReaderService.Task task = commitLogReaderService.createTask("filename", 1, 1, true);
49+
50+
// when
51+
commitLogReaderService.addPendingTask(task);
52+
53+
// then
54+
boolean allRetriesCompleted = commitLogReaderService.getRetrylatch().await(10, TimeUnit.SECONDS); // wait for the task to be executed and retied
55+
assertTrue(allRetriesCompleted, "all retries should've been completed");
56+
executorService.shutdown();
57+
boolean terminated = executorService.awaitTermination(10, TimeUnit.SECONDS); // if semaphore permits where not released, awaitTermination will timeout
58+
assertTrue(terminated, "executor service should've been terminated");
59+
assertTrue(commitLogReaderService.getPendingTasks().isEmpty(), "pending tasks should be empty");
60+
assertEquals(3, commitLogReaderService.getAvailablePermitsRecorder().size());
61+
int recordPermitsCount = 0;
62+
while (!commitLogReaderService.getAvailablePermitsRecorder().isEmpty()) {
63+
recordPermitsCount++;
64+
int permits = commitLogReaderService.getAvailablePermitsRecorder().poll();
65+
if (recordPermitsCount == 3) { // this is the last recorded permit, should be 0
66+
assertEquals(0, permits);
67+
} else {
68+
assertEquals(config.maxInflightMessagesPerTask, permits);
69+
}
70+
}
71+
}
72+
73+
class MyCommitLogService extends CommitLogReaderService {
74+
public MyCommitLogService(AgentConfig config, MutationSender<?> mutationSender, SegmentOffsetWriter segmentOffsetWriter, CommitLogTransfer commitLogTransfer, ExecutorService executorService) {
75+
super(config, mutationSender, segmentOffsetWriter, commitLogTransfer);
76+
this.tasksExecutor = executorService;
77+
78+
}
79+
80+
File mockFile = Mockito.mock(File.class);
81+
CountDownLatch retryLatch = new CountDownLatch(3); // fail twice, succeed on third retry
82+
BlockingQueue<Integer> availablePermitsRecorder = new LinkedBlockingDeque<>();
83+
84+
@Override
85+
public Task createTask(String commitlogName, long seg, int pos, boolean completed) {
86+
return new CommitLogReaderService.Task("filename", 1, 1, true) {
87+
public void run() {
88+
// this is where the code will block if the permits are not released properly. The idea
89+
// is to limit the number of inflight messages per task, but it is up to the implementation to
90+
// adhere to that.
91+
inflightMessagesSemaphore.acquireUninterruptibly();
92+
try {
93+
if (retryLatch.getCount() > 1) {
94+
lastException = new RuntimeException("failed to send to pulsar");
95+
} else {
96+
lastException = null;
97+
}
98+
Thread.sleep(50L); // noop, could be any rpc call in reality
99+
} catch (InterruptedException e) {
100+
throw new RuntimeException(e);
101+
}
102+
inflightMessagesSemaphore.release();
103+
super.finish(TaskStatus.SUCCESS, -1);
104+
availablePermitsRecorder.add(inflightMessagesSemaphore.availablePermits());
105+
retryLatch.countDown();
106+
}
107+
108+
@Override
109+
public File getFile() {
110+
return mockFile;
111+
}
112+
};
113+
}
114+
115+
public CountDownLatch getRetrylatch() {
116+
return retryLatch;
117+
}
118+
119+
public ConcurrentMap<Long, Task> getPendingTasks() {
120+
return pendingTasks;
121+
}
122+
123+
public BlockingQueue<Integer> getAvailablePermitsRecorder() {
124+
return availablePermitsRecorder;
125+
}
126+
}
127+
}

0 commit comments

Comments
 (0)