|
101 | 101 | import org.apache.pulsar.broker.service.persistent.PersistentTopic; |
102 | 102 | import org.apache.pulsar.broker.service.schema.SchemaRegistryService; |
103 | 103 | import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; |
| 104 | +import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException; |
104 | 105 | import org.apache.pulsar.broker.topiclistlimit.TopicListMemoryLimiter; |
105 | 106 | import org.apache.pulsar.broker.topiclistlimit.TopicListSizeResultCache; |
106 | 107 | import org.apache.pulsar.broker.web.RestException; |
@@ -1728,11 +1729,17 @@ protected void handleProducer(final CommandProducer cmdProducer) { |
1728 | 1729 | BrokerServiceException.getClientErrorCode(exception), |
1729 | 1730 | message); |
1730 | 1731 | } |
| 1732 | + |
1731 | 1733 | var cause = FutureUtil.unwrapCompletionException(exception); |
1732 | | - if (!(cause instanceof IncompatibleSchemaException)) { |
| 1734 | + if (cause instanceof IncompatibleSchemaException) { |
| 1735 | + // ignore it |
| 1736 | + } else if (cause instanceof InvalidSchemaDataException) { |
| 1737 | + log.warn("Try add schema failed due to invalid schema data, " |
| 1738 | + + "remote address {}, topic {}, producerId {}", |
| 1739 | + remoteAddress, topicName, producerId); |
| 1740 | + } else { |
1733 | 1741 | log.error("Try add schema failed, remote address {}, topic {}, producerId {}", |
1734 | | - remoteAddress, |
1735 | | - topicName, producerId, exception); |
| 1742 | + remoteAddress, topicName, producerId, exception); |
1736 | 1743 | } |
1737 | 1744 | producers.remove(producerId, producerFuture); |
1738 | 1745 | return null; |
|
0 commit comments