Skip to content

Commit 3d3d696

Browse files
authored
Merge pull request #546 from alex268/master
Added validation of message size
2 parents df87343 + 0a2e5c6 commit 3d3d696

File tree

2 files changed

+96
-0
lines changed

2 files changed

+96
-0
lines changed

topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,14 @@ private IncomingMessage(EnqueuedMessage message) {
100100
}
101101

102102
public CompletableFuture<Void> tryToEnqueue(EnqueuedMessage message, boolean instant) {
103+
if (message.getSize() > settings.getMaxSendBufferMemorySize()) {
104+
String errorMessage = "Rejecting a message of " + message.getSize()
105+
+ " bytes: not enough space in message queue. The maximum size of buffer is "
106+
+ settings.getMaxSendBufferMemorySize() + " bytes";
107+
logger.info("[{}] {}", id, errorMessage);
108+
throw new IllegalArgumentException(errorMessage);
109+
}
110+
103111
incomingQueueLock.lock();
104112

105113
try {
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package tech.ydb.topic.impl;
2+
3+
import java.util.Arrays;
4+
import java.util.concurrent.TimeUnit;
5+
6+
import org.junit.AfterClass;
7+
import org.junit.Assert;
8+
import org.junit.BeforeClass;
9+
import org.junit.ClassRule;
10+
import org.junit.Test;
11+
import org.slf4j.Logger;
12+
import org.slf4j.LoggerFactory;
13+
14+
import tech.ydb.core.Status;
15+
import tech.ydb.test.junit4.GrpcTransportRule;
16+
import tech.ydb.topic.TopicClient;
17+
import tech.ydb.topic.settings.CreateTopicSettings;
18+
import tech.ydb.topic.settings.WriterSettings;
19+
import tech.ydb.topic.write.Message;
20+
import tech.ydb.topic.write.SyncWriter;
21+
22+
/**
23+
*
24+
* @author Aleksandr Gorshenin
25+
*/
26+
public class TopicWritersIntegrationTest {
27+
private final static Logger logger = LoggerFactory.getLogger(YdbTopicsIntegrationTest.class);
28+
29+
@ClassRule
30+
public final static GrpcTransportRule ydbTransport = new GrpcTransportRule();
31+
32+
private final static String TEST_TOPIC = "topic_writers_test";
33+
34+
private final static String TEST_PRODUCER1 = "producer";
35+
36+
private static TopicClient client;
37+
38+
@BeforeClass
39+
public static void initTopic() {
40+
logger.info("Create test table {} ...", TEST_TOPIC);
41+
42+
client = TopicClient.newClient(ydbTransport).build();
43+
client.createTopic(TEST_TOPIC, CreateTopicSettings.newBuilder().build())
44+
.join().expectSuccess("can't create a new topic");
45+
}
46+
47+
@AfterClass
48+
public static void dropTopic() {
49+
logger.info("Drop test topic {} ...", TEST_TOPIC);
50+
Status dropStatus = client.dropTopic(TEST_TOPIC).join();
51+
client.close();
52+
dropStatus.expectSuccess("can't drop test topic");
53+
}
54+
55+
@Test
56+
public void messageBufferOverflowTest() throws Exception {
57+
WriterSettings settings = WriterSettings.newBuilder()
58+
.setTopicPath(TEST_TOPIC)
59+
.setProducerId(TEST_PRODUCER1)
60+
.setMaxSendBufferMemorySize(1000)
61+
.build();
62+
63+
SyncWriter writer = client.createSyncWriter(settings);
64+
writer.initAndWait();
65+
66+
byte[] msg1 = new byte[1000];
67+
byte[] msg2 = new byte[1001];
68+
Arrays.fill(msg1, (byte) 0x10);
69+
Arrays.fill(msg2, (byte) 0x11);
70+
71+
writer.send(Message.of(msg1));
72+
writer.send(Message.of(msg1));
73+
writer.send(Message.of(msg1));
74+
writer.flush();
75+
76+
IllegalArgumentException ex = Assert.assertThrows(IllegalArgumentException.class,
77+
() -> writer.send(Message.of(msg2))
78+
);
79+
Assert.assertEquals("Rejecting a message of 1001 bytes: not enough space in message queue. "
80+
+ "The maximum size of buffer is 1000 bytes", ex.getMessage());
81+
82+
writer.send(Message.of(msg1));
83+
writer.send(Message.of(msg1));
84+
writer.send(Message.of(msg1));
85+
writer.flush();
86+
writer.shutdown(10, TimeUnit.SECONDS);
87+
}
88+
}

0 commit comments

Comments
 (0)