Skip to content

Commit 1993b31

Browse files
committed
Fixed deadlock on single thread executor
1 parent 798f5a9 commit 1993b31

File tree

2 files changed

+110
-1
lines changed

2 files changed

+110
-1
lines changed

topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,6 @@ public CompletableFuture<Void> shutdown() {
194194
private void logUserThrowableAndStopWorking(Throwable th, String callbackName) {
195195
String errorMessage = "Unhandled throwable in " + callbackName + " user callback: " + th;
196196
logger.error(errorMessage);
197-
shutdownImpl(errorMessage).join();
197+
shutdownImpl(errorMessage);
198198
}
199199
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package tech.ydb.topic.impl;
2+
3+
import java.util.concurrent.ExecutorService;
4+
import java.util.concurrent.Executors;
5+
import java.util.concurrent.Semaphore;
6+
import java.util.concurrent.TimeUnit;
7+
8+
import org.junit.AfterClass;
9+
import org.junit.BeforeClass;
10+
import org.junit.ClassRule;
11+
import org.junit.Test;
12+
import org.slf4j.Logger;
13+
import org.slf4j.LoggerFactory;
14+
15+
import tech.ydb.core.Status;
16+
import tech.ydb.test.junit4.GrpcTransportRule;
17+
import tech.ydb.topic.TopicClient;
18+
import tech.ydb.topic.description.Consumer;
19+
import tech.ydb.topic.read.AsyncReader;
20+
import tech.ydb.topic.settings.CreateTopicSettings;
21+
import tech.ydb.topic.settings.ReadEventHandlersSettings;
22+
import tech.ydb.topic.settings.ReaderSettings;
23+
import tech.ydb.topic.settings.TopicReadSettings;
24+
import tech.ydb.topic.settings.WriterSettings;
25+
import tech.ydb.topic.write.Message;
26+
import tech.ydb.topic.write.SyncWriter;
27+
28+
/**
29+
*
30+
* @author Aleksandr Gorshenin
31+
*/
32+
public class TopicReadersIntegrationTest {
33+
private final static Logger logger = LoggerFactory.getLogger(YdbTopicsIntegrationTest.class);
34+
35+
@ClassRule
36+
public final static GrpcTransportRule ydbTransport = new GrpcTransportRule();
37+
38+
private final static String TEST_TOPIC = "topic_readers_test";
39+
40+
private final static String TEST_CONSUMER1 = "consumer";
41+
42+
private static TopicClient client;
43+
44+
@BeforeClass
45+
public static void initTopic() {
46+
logger.info("Create test table {} ...", TEST_TOPIC);
47+
48+
client = TopicClient.newClient(ydbTransport).build();
49+
client.createTopic(TEST_TOPIC, CreateTopicSettings.newBuilder()
50+
.addConsumer(Consumer.newBuilder().setName(TEST_CONSUMER1).build())
51+
.build()
52+
).join().expectSuccess("can't create a new topic");
53+
}
54+
55+
@AfterClass
56+
public static void dropTopic() {
57+
logger.info("Drop test topic {} ...", TEST_TOPIC);
58+
Status dropStatus = client.dropTopic(TEST_TOPIC).join();
59+
client.close();
60+
dropStatus.expectSuccess("can't drop test topic");
61+
}
62+
63+
private void sendMessages(Message... messages) throws Exception {
64+
WriterSettings settings = WriterSettings.newBuilder()
65+
.setTopicPath(TEST_TOPIC)
66+
.setProducerId("helper")
67+
.build();
68+
SyncWriter writer = client.createSyncWriter(settings);
69+
writer.initAndWait();
70+
for (Message message : messages) {
71+
writer.send(message);
72+
}
73+
74+
writer.flush();
75+
writer.shutdown(10, TimeUnit.SECONDS);
76+
}
77+
78+
@Test
79+
public void singleThreadExecutorTest() throws Exception {
80+
ReaderSettings readerSettings = ReaderSettings.newBuilder()
81+
.addTopic(TopicReadSettings.newBuilder()
82+
.setPath(TEST_TOPIC)
83+
.build())
84+
.setConsumerName(TEST_CONSUMER1)
85+
.build();
86+
87+
Semaphore messageCount = new Semaphore(0);
88+
89+
ExecutorService executor = Executors.newSingleThreadExecutor((r) -> new Thread(r, "test-executor"));
90+
AsyncReader reader = client.createAsyncReader(readerSettings, ReadEventHandlersSettings.newBuilder()
91+
.setExecutor(executor)
92+
.setEventHandler((event) -> {
93+
messageCount.release();
94+
event.commit().join();
95+
}).build()
96+
);
97+
98+
reader.init().join();
99+
100+
sendMessages(Message.of("test".getBytes()));
101+
102+
// wait for message committing
103+
messageCount.acquireUninterruptibly();
104+
105+
reader.shutdown().get(5, TimeUnit.SECONDS);
106+
107+
executor.shutdownNow();
108+
}
109+
}

0 commit comments

Comments
 (0)