Skip to content

Commit cfec027

Browse files
author
Attila Tóth
committed
[pulsar-spark] added option for configuring Pulsar client
1 parent 9801c43 commit cfec027

File tree

4 files changed

+83
-21
lines changed

4 files changed

+83
-21
lines changed

pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import java.util.concurrent.ConcurrentHashMap;
5858
import java.util.concurrent.TimeUnit;
5959
import java.util.concurrent.atomic.AtomicInteger;
60+
import java.util.function.Function;
6061

6162
import static org.mockito.Mockito.any;
6263
import static org.mockito.Mockito.mock;

pulsar-spark/src/main/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiver.java

Lines changed: 39 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,12 @@
2222
import static com.google.common.base.Preconditions.checkNotNull;
2323

2424
import java.io.Serializable;
25+
import java.util.HashMap;
26+
import java.util.Map;
2527

26-
import org.apache.pulsar.client.api.Authentication;
27-
import org.apache.pulsar.client.api.Consumer;
28-
import org.apache.pulsar.client.api.MessageListener;
29-
import org.apache.pulsar.client.api.PulsarClient;
30-
import org.apache.pulsar.client.api.PulsarClientException;
28+
import org.apache.pulsar.client.api.*;
3129
import org.apache.pulsar.client.impl.PulsarClientImpl;
30+
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
3231
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
3332
import org.apache.spark.storage.StorageLevel;
3433
import org.apache.spark.streaming.receiver.Receiver;
@@ -43,34 +42,52 @@ public class SparkStreamingPulsarReceiver extends Receiver<byte[]> {
4342
private static final Logger LOG = LoggerFactory.getLogger(SparkStreamingPulsarReceiver.class);
4443

4544
private String serviceUrl;
46-
private ConsumerConfigurationData<byte[]> conf;
45+
private Map<String,Object> clientConfig;
46+
private ConsumerConfigurationData<byte[]> consumerConfig;
4747
private Authentication authentication;
4848
private PulsarClient pulsarClient;
4949
private Consumer<byte[]> consumer;
5050

5151
public SparkStreamingPulsarReceiver(
5252
String serviceUrl,
53-
ConsumerConfigurationData<byte[]> conf,
53+
ConsumerConfigurationData<byte[]> consumerConfig,
5454
Authentication authentication) {
55-
this(StorageLevel.MEMORY_AND_DISK_2(), serviceUrl, conf, authentication);
55+
this(StorageLevel.MEMORY_AND_DISK_2(), serviceUrl, new HashMap<>(), consumerConfig, authentication);
56+
}
57+
58+
public SparkStreamingPulsarReceiver(
59+
String serviceUrl,
60+
Map<String,Object> clientConfig,
61+
ConsumerConfigurationData<byte[]> consumerConfig,
62+
Authentication authentication) {
63+
this(StorageLevel.MEMORY_AND_DISK_2(), serviceUrl, clientConfig, consumerConfig, authentication);
64+
}
65+
66+
public SparkStreamingPulsarReceiver(StorageLevel storageLevel,
67+
String serviceUrl,
68+
ConsumerConfigurationData<byte[]> consumerConf,
69+
Authentication authentication) {
70+
this(StorageLevel.MEMORY_AND_DISK_2(), serviceUrl, new HashMap<>(), consumerConf, authentication);
5671
}
5772

5873
public SparkStreamingPulsarReceiver(StorageLevel storageLevel,
5974
String serviceUrl,
60-
ConsumerConfigurationData<byte[]> conf,
75+
Map<String,Object> clientConfig,
76+
ConsumerConfigurationData<byte[]> consumerConfig,
6177
Authentication authentication) {
6278
super(storageLevel);
6379

6480
checkNotNull(serviceUrl, "serviceUrl must not be null");
65-
checkNotNull(conf, "ConsumerConfigurationData must not be null");
66-
checkArgument(conf.getTopicNames().size() > 0, "TopicNames must be set a value.");
67-
checkNotNull(conf.getSubscriptionName(), "SubscriptionName must not be null");
81+
checkNotNull(consumerConfig, "ConsumerConfigurationData must not be null");
82+
checkNotNull(clientConfig, "Client configuration map must not be null");
83+
checkArgument(consumerConfig.getTopicNames().size() > 0, "TopicNames must be set a value.");
84+
checkNotNull(consumerConfig.getSubscriptionName(), "SubscriptionName must not be null");
6885

6986
this.serviceUrl = serviceUrl;
7087
this.authentication = authentication;
7188

72-
if (conf.getMessageListener() == null) {
73-
conf.setMessageListener((MessageListener<byte[]> & Serializable) (consumer, msg) -> {
89+
if (consumerConfig.getMessageListener() == null) {
90+
consumerConfig.setMessageListener((MessageListener<byte[]> & Serializable) (consumer, msg) -> {
7491
try {
7592
store(msg.getData());
7693
consumer.acknowledgeAsync(msg);
@@ -80,13 +97,18 @@ public SparkStreamingPulsarReceiver(StorageLevel storageLevel,
8097
}
8198
});
8299
}
83-
this.conf = conf;
100+
this.clientConfig = clientConfig;
101+
this.consumerConfig = consumerConfig;
84102
}
85103

86104
public void onStart() {
87105
try {
88-
pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).authentication(authentication).build();
89-
consumer = ((PulsarClientImpl) pulsarClient).subscribeAsync(conf).join();
106+
ClientBuilder builder = PulsarClient.builder().serviceUrl(serviceUrl).authentication(authentication);
107+
if (!clientConfig.isEmpty()) {
108+
builder.loadConf(clientConfig);
109+
}
110+
pulsarClient = builder.build();
111+
consumer = ((PulsarClientImpl) pulsarClient).subscribeAsync(consumerConfig).join();
90112
} catch (Exception e) {
91113
LOG.error("Failed to start subscription : {}", e.getMessage());
92114
restart("Restart a consumer");

tests/pulsar-kafka-compat-client-test/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/PulsarKafkaProducerThreadSafeTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
* under the License.
1818
*/
1919
package org.apache.pulsar.tests.integration.compat.kafka;
20-
2120
import org.apache.kafka.clients.producer.KafkaProducer;
2221
import org.apache.kafka.clients.producer.Producer;
2322
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -37,8 +36,8 @@ private String getPlainTextServiceUrl() {
3736
return container.getPlainTextServiceUrl();
3837
}
3938

40-
@Override
41-
public void setUpCluster() throws Exception {
39+
@BeforeTest
40+
private void setupThreadSafeTest() {
4241
super.setUpCluster();
4342
Properties producerProperties = new Properties();
4443
producerProperties.put("bootstrap.servers", getPlainTextServiceUrl());

tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,11 +147,51 @@ public void testSharedSubscription(Supplier<String> serviceUrl) throws Exception
147147
assertEquals(receveidCounts.size(), 2);
148148
}
149149

150+
@Test(expectedExceptions = NullPointerException.class,
151+
expectedExceptionsMessageRegExp = "ConsumerConfigurationData must not be null",
152+
dataProvider = "ServiceUrls")
153+
public void testReceiverWhenConsumerConfigurationIsNull(Supplier<String> serviceUrl) {
154+
new SparkStreamingPulsarReceiver(
155+
serviceUrl.get(),
156+
null,
157+
new AuthenticationDisabled());
158+
}
159+
160+
@Test(dataProvider = "ServiceUrls")
161+
public void testOverrideServiceUrlWithClientConfiguration(Supplier<String> serviceUrl) {
162+
Map<String,Object> testClientConfig = new HashMap<>();
163+
testClientConfig.put("serviceUrl",serviceUrl.get());
164+
165+
ConsumerConfigurationData<byte[]> testConsumerConfig = new ConsumerConfigurationData<>();
166+
Set<String> set = new HashSet<>();
167+
set.add(TOPIC);
168+
testConsumerConfig.setTopicNames(set);
169+
testConsumerConfig.setSubscriptionName(SUBS);
170+
testConsumerConfig.setSubscriptionType(SubscriptionType.Shared);
171+
testConsumerConfig.setReceiverQueueSize(1);
172+
173+
String deliberatelyWrongServiceUrl = "http://invalid.service.url:1234";
174+
175+
SparkStreamingPulsarReceiver testReceiver = new SparkStreamingPulsarReceiver(
176+
deliberatelyWrongServiceUrl,
177+
testClientConfig,
178+
testConsumerConfig,
179+
new AuthenticationDisabled());
180+
181+
testReceiver.onStart();
182+
waitForTransmission();
183+
testReceiver.onStop();
184+
}
185+
150186
@Test(expectedExceptions = NullPointerException.class,
151187
expectedExceptionsMessageRegExp = "ConsumerConfigurationData must not be null",
152188
dataProvider = "ServiceUrls")
153189
public void testReceiverWhenClientConfigurationIsNull(Supplier<String> serviceUrl) {
154-
new SparkStreamingPulsarReceiver(serviceUrl.get(), null, new AuthenticationDisabled());
190+
new SparkStreamingPulsarReceiver(
191+
serviceUrl.get(),
192+
null,
193+
null,
194+
new AuthenticationDisabled());
155195
}
156196

157197
private static void waitForTransmission() {

0 commit comments

Comments
 (0)