Skip to content

Commit ce0160f

Browse files
committed
Some additional order guarantee
1 parent ac82f40 commit ce0160f

File tree

1 file changed

+19
-0
lines changed

1 file changed

+19
-0
lines changed

topic/src/test/java/tech/ydb/topic/impl/YdbTopicsIntegrationTest.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import java.util.List;
44
import java.util.concurrent.CompletableFuture;
5+
import java.util.concurrent.CountDownLatch;
56
import java.util.concurrent.ExecutionException;
67
import java.util.concurrent.TimeUnit;
78
import java.util.concurrent.TimeoutException;
@@ -51,6 +52,12 @@ public class YdbTopicsIntegrationTest {
5152
private final static String TEST_CONSUMER2 = "other_consumer";
5253

5354
private static TopicClient client;
55+
final static CountDownLatch latch1 = new CountDownLatch(1);
56+
final static CountDownLatch latch2 = new CountDownLatch(1);
57+
final static CountDownLatch latch3WithCommit = new CountDownLatch(1);
58+
final static CountDownLatch latch3WithoutCommit = new CountDownLatch(1);
59+
final static CountDownLatch latch4 = new CountDownLatch(1);
60+
final static CountDownLatch latch5 = new CountDownLatch(1);
5461

5562
private final static byte[][] TEST_MESSAGES = new byte[][] {
5663
"Test message".getBytes(),
@@ -98,10 +105,12 @@ public void step01_writeWithoutDeduplication() throws InterruptedException, Exec
98105

99106
writer.flush();
100107
writer.shutdown(1, TimeUnit.MINUTES);
108+
latch1.countDown();
101109
}
102110

103111
@Test
104112
public void step02_readHalfWithoutCommit() throws InterruptedException {
113+
latch1.await(5, TimeUnit.SECONDS);
105114
ReaderSettings settings = ReaderSettings.newBuilder()
106115
.addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC).build())
107116
.setConsumerName(TEST_CONSUMER1)
@@ -116,10 +125,12 @@ public void step02_readHalfWithoutCommit() throws InterruptedException {
116125
}
117126

118127
reader.shutdown();
128+
latch2.countDown();
119129
}
120130

121131
@Test
122132
public void step03_readHalfWithCommit() throws InterruptedException {
133+
latch2.await(5, TimeUnit.SECONDS);
123134
ReaderSettings settings = ReaderSettings.newBuilder()
124135
.addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC).build())
125136
.setConsumerName(TEST_CONSUMER1)
@@ -135,10 +146,12 @@ public void step03_readHalfWithCommit() throws InterruptedException {
135146
}
136147

137148
reader.shutdown();
149+
latch3WithCommit.countDown();
138150
}
139151

140152
@Test
141153
public void step03_readNextHalfWithoutCommit() throws InterruptedException {
154+
latch3WithCommit.await(5, TimeUnit.SECONDS);
142155
ReaderSettings settings = ReaderSettings.newBuilder()
143156
.addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC).build())
144157
.setConsumerName(TEST_CONSUMER1)
@@ -153,10 +166,12 @@ public void step03_readNextHalfWithoutCommit() throws InterruptedException {
153166
}
154167

155168
reader.shutdown();
169+
latch3WithoutCommit.countDown();
156170
}
157171

158172
@Test
159173
public void step04_readNextHalfWithCommit() throws InterruptedException {
174+
latch3WithoutCommit.await(5, TimeUnit.SECONDS);
160175
ReaderSettings settings = ReaderSettings.newBuilder()
161176
.addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC).build())
162177
.setConsumerName(TEST_CONSUMER1)
@@ -174,10 +189,12 @@ public void step04_readNextHalfWithCommit() throws InterruptedException {
174189

175190
committer.commit();
176191
reader.shutdown();
192+
latch4.countDown();
177193
}
178194

179195
@Test
180196
public void step05_describeTopic() throws InterruptedException {
197+
latch4.await(5, TimeUnit.SECONDS);
181198
TopicDescription description = client.describeTopic(TEST_TOPIC).join().getValue();
182199

183200
Assert.assertNull(description.getTopicStats());
@@ -186,10 +203,12 @@ public void step05_describeTopic() throws InterruptedException {
186203

187204
Assert.assertEquals(TEST_CONSUMER1, consumers.get(0).getName());
188205
Assert.assertEquals(TEST_CONSUMER2, consumers.get(1).getName());
206+
latch5.countDown();
189207
}
190208

191209
@Test
192210
public void step06_readAllByAsyncReader() throws InterruptedException {
211+
latch5.await(5, TimeUnit.SECONDS);
193212
ReaderSettings settings = ReaderSettings.newBuilder()
194213
.addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC).build())
195214
.setConsumerName(TEST_CONSUMER2)

0 commit comments

Comments
 (0)