Skip to content

Commit 78d2d99

Browse files
committed
Fixed deadlock on single thread executor
1 parent 798f5a9 commit 78d2d99

File tree

2 files changed

+115
-1
lines changed

2 files changed

+115
-1
lines changed

topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,6 @@ public CompletableFuture<Void> shutdown() {
194194
private void logUserThrowableAndStopWorking(Throwable th, String callbackName) {
195195
String errorMessage = "Unhandled throwable in " + callbackName + " user callback: " + th;
196196
logger.error(errorMessage);
197-
shutdownImpl(errorMessage).join();
197+
shutdownImpl(errorMessage);
198198
}
199199
}
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package tech.ydb.topic.impl;
2+
3+
import java.util.concurrent.CompletableFuture;
4+
import java.util.concurrent.ExecutorService;
5+
import java.util.concurrent.Executors;
6+
import java.util.concurrent.Semaphore;
7+
import java.util.concurrent.TimeUnit;
8+
9+
import org.junit.AfterClass;
10+
import org.junit.BeforeClass;
11+
import org.junit.ClassRule;
12+
import org.junit.Test;
13+
import org.slf4j.Logger;
14+
import org.slf4j.LoggerFactory;
15+
16+
import tech.ydb.core.Status;
17+
import tech.ydb.test.junit4.GrpcTransportRule;
18+
import tech.ydb.topic.TopicClient;
19+
import tech.ydb.topic.description.Consumer;
20+
import tech.ydb.topic.read.AsyncReader;
21+
import tech.ydb.topic.settings.CreateTopicSettings;
22+
import tech.ydb.topic.settings.ReadEventHandlersSettings;
23+
import tech.ydb.topic.settings.ReaderSettings;
24+
import tech.ydb.topic.settings.TopicReadSettings;
25+
import tech.ydb.topic.settings.WriterSettings;
26+
import tech.ydb.topic.write.Message;
27+
import tech.ydb.topic.write.SyncWriter;
28+
29+
/**
30+
*
31+
* @author Aleksandr Gorshenin
32+
*/
33+
public class TopicReadersIntegrationTest {
34+
private final static Logger logger = LoggerFactory.getLogger(YdbTopicsIntegrationTest.class);
35+
36+
@ClassRule
37+
public final static GrpcTransportRule ydbTransport = new GrpcTransportRule();
38+
39+
private final static String TEST_TOPIC = "topic_readers_test";
40+
41+
private final static String TEST_CONSUMER1 = "consumer";
42+
43+
private static TopicClient client;
44+
45+
@BeforeClass
46+
public static void initTopic() {
47+
logger.info("Create test table {} ...", TEST_TOPIC);
48+
49+
client = TopicClient.newClient(ydbTransport).build();
50+
client.createTopic(TEST_TOPIC, CreateTopicSettings.newBuilder()
51+
.addConsumer(Consumer.newBuilder().setName(TEST_CONSUMER1).build())
52+
.build()
53+
).join().expectSuccess("can't create a new topic");
54+
}
55+
56+
@AfterClass
57+
public static void dropTopic() {
58+
logger.info("Drop test topic {} ...", TEST_TOPIC);
59+
Status dropStatus = client.dropTopic(TEST_TOPIC).join();
60+
client.close();
61+
dropStatus.expectSuccess("can't drop test topic");
62+
}
63+
64+
private void sendMessages(Message... messages) throws Exception {
65+
WriterSettings settings = WriterSettings.newBuilder()
66+
.setTopicPath(TEST_TOPIC)
67+
.setProducerId("helper")
68+
.build();
69+
SyncWriter writer = client.createSyncWriter(settings);
70+
writer.initAndWait();
71+
for (Message message : messages) {
72+
writer.send(message);
73+
}
74+
75+
writer.flush();
76+
writer.shutdown(10, TimeUnit.SECONDS);
77+
}
78+
79+
@Test
80+
public void singleThreadExecutorTest() throws Exception {
81+
ReaderSettings readerSettings = ReaderSettings.newBuilder()
82+
.addTopic(TopicReadSettings.newBuilder()
83+
.setPath(TEST_TOPIC)
84+
.build())
85+
.setConsumerName(TEST_CONSUMER1)
86+
.build();
87+
88+
Semaphore messageCount = new Semaphore(0);
89+
CompletableFuture<Boolean> processing = new CompletableFuture<>();
90+
91+
ExecutorService executor = Executors.newSingleThreadExecutor((r) -> new Thread(r, "test-executor"));
92+
AsyncReader reader = client.createAsyncReader(readerSettings, ReadEventHandlersSettings.newBuilder()
93+
.setExecutor(executor)
94+
.setEventHandler((event) -> {
95+
messageCount.release();
96+
processing.join();
97+
}).build()
98+
);
99+
100+
reader.init().join();
101+
102+
sendMessages(Message.of("test".getBytes()));
103+
104+
// wait for message committing
105+
messageCount.acquireUninterruptibly();
106+
107+
// stop reader
108+
CompletableFuture<Void> f = reader.shutdown();
109+
processing.completeExceptionally(new RuntimeException("shutdown"));
110+
f.get(5, TimeUnit.SECONDS);
111+
112+
executor.shutdownNow();
113+
}
114+
}

0 commit comments

Comments
 (0)