Skip to content

Commit ae05982

Browse files
authored
Update to Pulsar 3.3.2 (#875)
This commit updates the version of Pulsar to 3.3.2. Also, in Pulsar 3.3.2 the schema validation of an outgoing message value happens later than it did previously. This requires the PulsarTemplate to widen the try/catch net so that when this happens the producer is closed properly. The backing change in Pulsar 3.3.2 can be seen here apache/pulsar@f3c177e#diff-095bc2359e03726e031d8c2f210560c6a3218f48a969b0f75a2482e25cd54744R69
1 parent 473642e commit ae05982

File tree

9 files changed

+23
-27
lines changed

9 files changed

+23
-27
lines changed

gradle/libs.versions.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ micrometer = "1.14.0-M3"
99
micrometer-docs-gen = "1.0.4"
1010
micrometer-tracing = "1.4.0-M3"
1111
protobuf = "3.25.5"
12-
pulsar = "3.3.1"
12+
pulsar = "3.3.2"
1313
pulsar-reactive = "0.5.7"
1414
reactor = "2024.0.0-M6"
1515
spring = "6.2.0-RC1"

spring-pulsar-sample-apps/sample-failover-custom-router/compose.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
services:
22
pulsar:
3-
image: 'apachepulsar/pulsar:3.3.1'
3+
image: 'apachepulsar/pulsar:3.3.2'
44
ports:
55
- '6650'
66
- '8080'

spring-pulsar-sample-apps/sample-imperative-produce-consume/compose.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
services:
22
pulsar:
3-
image: 'apachepulsar/pulsar:3.3.1'
3+
image: 'apachepulsar/pulsar:3.3.2'
44
ports:
55
- '6650'
66
- '8080'

spring-pulsar-sample-apps/sample-pulsar-binder/compose.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
services:
22
pulsar:
3-
image: 'apachepulsar/pulsar:3.3.1'
3+
image: 'apachepulsar/pulsar:3.3.2'
44
ports:
55
- '6650'
66
- '8080'

spring-pulsar-sample-apps/sample-pulsar-functions/download-connectors.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@
22

33
mkdir connectors
44
cd connectors
5-
wget https://archive.apache.org/dist/pulsar/pulsar-3.3.1/connectors/pulsar-io-cassandra-3.3.1.nar
6-
wget https://archive.apache.org/dist/pulsar/pulsar-3.3.1/connectors/pulsar-io-rabbitmq-3.3.1.nar
5+
wget https://archive.apache.org/dist/pulsar/pulsar-3.3.2/connectors/pulsar-io-cassandra-3.3.2.nar
6+
wget https://archive.apache.org/dist/pulsar/pulsar-3.3.2/connectors/pulsar-io-rabbitmq-3.3.2.nar
77
cd ..

spring-pulsar-sample-apps/sample-pulsar-reader/compose.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
services:
22
pulsar:
3-
image: 'apachepulsar/pulsar:3.3.1'
3+
image: 'apachepulsar/pulsar:3.3.2'
44
ports:
55
- '6650'
66
- '8080'

spring-pulsar-sample-apps/sample-reactive/compose.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
services:
22
pulsar:
3-
image: 'apachepulsar/pulsar:3.3.1'
3+
image: 'apachepulsar/pulsar:3.3.2'
44
ports:
55
- '6650'
66
- '8080'

spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarTemplate.java

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import org.apache.pulsar.client.api.Producer;
3131
import org.apache.pulsar.client.api.PulsarClientException;
3232
import org.apache.pulsar.client.api.Schema;
33-
import org.apache.pulsar.client.api.TypedMessageBuilder;
3433
import org.apache.pulsar.client.api.interceptor.ProducerInterceptor;
3534
import org.apache.pulsar.client.api.transaction.Transaction;
3635

@@ -285,25 +284,19 @@ private CompletableFuture<MessageId> doSendAsync(@Nullable String topic, @Nullab
285284

286285
PulsarMessageSenderContext senderContext = PulsarMessageSenderContext.newContext(topicName, this.beanName);
287286
Observation observation = newObservation(senderContext);
287+
Producer<T> producer = null;
288288
try {
289289
observation.start();
290-
Producer<T> producer = prepareProducerForSend(topicName, message, schema, encryptionKeys,
291-
producerCustomizer);
292-
TypedMessageBuilder<T> messageBuilder;
293-
try {
294-
var txn = getTransaction();
295-
messageBuilder = (txn != null) ? producer.newMessage(txn) : producer.newMessage();
296-
messageBuilder = messageBuilder.value(message);
297-
if (typedMessageBuilderCustomizer != null) {
298-
typedMessageBuilderCustomizer.customize(messageBuilder);
299-
}
300-
// propagate props to message
301-
senderContext.properties().forEach(messageBuilder::property);
302-
}
303-
catch (RuntimeException ex) {
304-
ProducerUtils.closeProducerAsync(producer, this.logger);
305-
throw ex;
290+
producer = prepareProducerForSend(topicName, message, schema, encryptionKeys, producerCustomizer);
291+
var txn = getTransaction();
292+
var messageBuilder = (txn != null) ? producer.newMessage(txn) : producer.newMessage();
293+
messageBuilder = messageBuilder.value(message);
294+
if (typedMessageBuilderCustomizer != null) {
295+
typedMessageBuilderCustomizer.customize(messageBuilder);
306296
}
297+
// propagate props to message
298+
senderContext.properties().forEach(messageBuilder::property);
299+
var finalProducer = producer;
307300
return messageBuilder.sendAsync().whenComplete((msgId, ex) -> {
308301
if (ex == null) {
309302
this.logger.trace(() -> "Sent msg to '%s' topic".formatted(topicName));
@@ -314,10 +307,13 @@ private CompletableFuture<MessageId> doSendAsync(@Nullable String topic, @Nullab
314307
observation.error(ex);
315308
observation.stop();
316309
}
317-
ProducerUtils.closeProducerAsync(producer, this.logger);
310+
ProducerUtils.closeProducerAsync(finalProducer, this.logger);
318311
});
319312
}
320313
catch (RuntimeException ex) {
314+
if (producer != null) {
315+
ProducerUtils.closeProducerAsync(producer, this.logger);
316+
}
321317
observation.error(ex);
322318
observation.stop();
323319
throw ex;

tools/pulsar/docker/standalone/pulsar-start.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,5 @@
33
docker run -it -p 6650:6650 -p 8080:8080 \
44
--mount source=pulsardata,target=/pulsar/data \
55
--mount source=pulsarconf,target=/pulsar/conf \
6-
apachepulsar/pulsar:3.3.1 \
6+
apachepulsar/pulsar:3.3.2 \
77
bin/pulsar standalone

0 commit comments

Comments
 (0)