Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.SchemaData;
Expand Down Expand Up @@ -272,35 +273,42 @@ private void internalPublishMessages(TopicName topicName, ProducerMessages reque
private CompletableFuture<Position> publishSingleMessageToPartition(String topic, Message message) {
CompletableFuture<Position> publishResult = new CompletableFuture<>();
pulsar().getBrokerService().getTopic(topic, false)
.thenAccept(t -> {
// TODO: Check message backlog and fail if backlog too large.
if (!t.isPresent()) {
// Topic not found, and remove from owning partition list.
publishResult.completeExceptionally(new BrokerServiceException.TopicNotFoundException("Topic not "
+ "owned by current broker."));
TopicName topicName = TopicName.get(topic);
pulsar().getBrokerService().getOwningTopics().get(topicName.getPartitionedTopicName())
.remove(topicName.getPartitionIndex());
} else {
try {
ByteBuf headersAndPayload = messageToByteBuf(message);
try {
Topic topicObj = t.get();
topicObj.publishMessage(headersAndPayload,
RestMessagePublishContext.get(publishResult, topicObj, System.nanoTime()));
} finally {
headersAndPayload.release();
.thenCompose(tOpt -> {
if (tOpt.isEmpty()) {
publishResult.completeExceptionally(
new BrokerServiceException.TopicNotFoundException("Topic not "
+ "owned by current broker."));
TopicName tn = TopicName.get(topic);
pulsar().getBrokerService().getOwningTopics().get(tn.getPartitionedTopicName())
.remove(tn.getPartitionIndex());
return CompletableFuture.completedFuture(null);
}
} catch (Exception e) {
Topic topicObj = tOpt.get();
CompletableFuture<Void> backlogQuotaCheckFuture = CompletableFuture.allOf(
topicObj.checkBacklogQuotaExceeded(message.getProducerName(),
BacklogQuota.BacklogQuotaType.destination_storage),
topicObj.checkBacklogQuotaExceeded(message.getProducerName(),
BacklogQuota.BacklogQuotaType.message_age));
return backlogQuotaCheckFuture.thenRun(() -> {
ByteBuf headersAndPayload = messageToByteBuf(message);
try {
topicObj.publishMessage(headersAndPayload,
RestMessagePublishContext.get(publishResult, topicObj, System.nanoTime()));
} finally {
headersAndPayload.release();
}
});
})
.exceptionally(ex -> {
Throwable cause = FutureUtil.unwrapCompletionException(ex);
if (log.isDebugEnabled()) {
log.debug("Fail to publish single messages to topic {}: {} ",
topicName, e.getCause());
log.debug("Fail to publish single message to topic {}: {}", topic, cause.getMessage());
}
publishResult.completeExceptionally(e);
}
}
});

if (!publishResult.isDone()) {
publishResult.completeExceptionally(cause);
}
return null;
});
return publishResult;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.admin;

import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString;
Expand Down Expand Up @@ -66,6 +67,7 @@
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
Expand All @@ -82,6 +84,7 @@
import org.apache.pulsar.client.impl.schema.generic.GenericJsonSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.schema.KeyValue;
Expand Down Expand Up @@ -931,4 +934,84 @@ public void testProduceWithAutoConsumeSchema() throws Exception {
}
}

@Test
public void testProduceWithBacklogQuotaSizeExceeded() throws Exception {
String namespaceName = testTenant + "/" + testNamespace;
String topicName = "persistent://" + namespaceName + "/" + testTopicName;
admin.topics().createNonPartitionedTopic(topicName);
BacklogQuota backlogQuota = BacklogQuota.builder()
.limitSize(0)
.retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
.build();
admin.namespaces().setBacklogQuota(namespaceName, backlogQuota,
BacklogQuota.BacklogQuotaType.destination_storage);

AsyncResponse asyncResponse = mock(AsyncResponse.class);
ProducerMessages producerMessages = new ProducerMessages();
String message = "[{\"payload\":\"rest-produce\"}]";
producerMessages.setMessages(createMessages(message));
topics.produceOnPersistentTopic(asyncResponse, testTenant, testNamespace, testTopicName,
false, producerMessages);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(asyncResponse, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.OK.getStatusCode());
Object responseEntity = responseCaptor.getValue().getEntity();
Assert.assertTrue(responseEntity instanceof ProducerAcks);
ProducerAcks response = (ProducerAcks) responseEntity;
Assert.assertEquals(response.getMessagePublishResults().size(), 1);
for (int index = 0; index < response.getMessagePublishResults().size(); index++) {
Assert.assertEquals(response.getMessagePublishResults().get(index).getErrorCode(), 2);
Assert.assertTrue(response.getMessagePublishResults().get(index).getErrorMsg()
.contains("Cannot create producer on topic with backlog quota exceeded"));
}
}

@Test
public void testProduceWithBacklogQuotaTimeExceeded() throws Exception {
pulsar.getConfiguration().setPreciseTimeBasedBacklogQuotaCheck(true);
String namespaceName = testTenant + "/" + testNamespace;
String topicName = "persistent://" + namespaceName + "/" + testTopicName;
admin.topics().createNonPartitionedTopic(topicName);
BacklogQuota backlogQuota = BacklogQuota.builder()
.limitTime(1)
.retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
.build();
admin.namespaces().setBacklogQuota(namespaceName, backlogQuota,
BacklogQuota.BacklogQuotaType.message_age);
admin.topics().createSubscription(topicName, "time-quota-sub", MessageId.earliest);

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topicName)
.create();
producer.send("backlog-message");
Topic topic = pulsar.getBrokerService().getTopic(topicName, false)
.get()
.orElseThrow(() -> new IllegalStateException("Topic not loaded: " + topicName));
PersistentTopic persistentTopic = (PersistentTopic) topic;
await().atMost(5, TimeUnit.SECONDS).untilAsserted(() ->
Assert.assertTrue(persistentTopic.checkTimeBacklogExceeded(true).get()));

AsyncResponse asyncResponse = mock(AsyncResponse.class);
ProducerMessages producerMessages = new ProducerMessages();
String message = "["
+ "{\"key\":\"my-key\",\"payload\":\"RestProducer:1\",\"eventTime\":1603045262772,\"sequenceId\":1},"
+ "{\"key\":\"my-key\",\"payload\":\"RestProducer:2\",\"eventTime\":1603045262772,\"sequenceId\":2}]";
producerMessages.setMessages(createMessages(message));
topics.produceOnPersistentTopic(asyncResponse, testTenant, testNamespace, testTopicName,
false, producerMessages);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(asyncResponse, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.OK.getStatusCode());
Object responseEntity = responseCaptor.getValue().getEntity();
Assert.assertTrue(responseEntity instanceof ProducerAcks);
ProducerAcks response = (ProducerAcks) responseEntity;
Assert.assertEquals(response.getMessagePublishResults().size(), 2);
for (int index = 0; index < response.getMessagePublishResults().size(); index++) {
Assert.assertEquals(response.getMessagePublishResults().get(index).getErrorCode(), 2);
Assert.assertTrue(response.getMessagePublishResults().get(index).getErrorMsg()
.contains("Cannot create producer on topic with backlog quota exceeded"));
}
}

}