|
100 | 100 | import org.apache.pulsar.broker.service.persistent.PersistentTopic; |
101 | 101 | import org.apache.pulsar.broker.service.schema.SchemaRegistryService; |
102 | 102 | import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; |
| 103 | +import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException; |
103 | 104 | import org.apache.pulsar.broker.topiclistlimit.TopicListMemoryLimiter; |
104 | 105 | import org.apache.pulsar.broker.topiclistlimit.TopicListSizeResultCache; |
105 | 106 | import org.apache.pulsar.broker.web.RestException; |
@@ -1730,11 +1731,17 @@ protected void handleProducer(final CommandProducer cmdProducer) { |
1730 | 1731 | BrokerServiceException.getClientErrorCode(exception), |
1731 | 1732 | message); |
1732 | 1733 | } |
| 1734 | + |
1733 | 1735 | var cause = FutureUtil.unwrapCompletionException(exception); |
1734 | | - if (!(cause instanceof IncompatibleSchemaException)) { |
| 1736 | + if (cause instanceof IncompatibleSchemaException) { |
| 1737 | + // ignore it |
| 1738 | + } else if (cause instanceof InvalidSchemaDataException) { |
| 1739 | + log.warn("Try add schema failed due to invalid schema data, " |
| 1740 | + + "remote address {}, topic {}, producerId {}", |
| 1741 | + remoteAddress, topicName, producerId); |
| 1742 | + } else { |
1735 | 1743 | log.error("Try add schema failed, remote address {}, topic {}, producerId {}", |
1736 | | - remoteAddress, |
1737 | | - topicName, producerId, exception); |
| 1744 | + remoteAddress, topicName, producerId, exception); |
1738 | 1745 | } |
1739 | 1746 | producers.remove(producerId, producerFuture); |
1740 | 1747 | return null; |
|
0 commit comments