2222import static com .google .common .base .Preconditions .checkNotNull ;
2323
2424import java .io .Serializable ;
25+ import java .util .HashMap ;
26+ import java .util .Map ;
2527
2628import org .apache .pulsar .client .api .Authentication ;
2729import org .apache .pulsar .client .api .Consumer ;
2830import org .apache .pulsar .client .api .MessageListener ;
2931import org .apache .pulsar .client .api .PulsarClient ;
3032import org .apache .pulsar .client .api .PulsarClientException ;
33+ import org .apache .pulsar .client .api .ClientBuilder ;
3134import org .apache .pulsar .client .impl .PulsarClientImpl ;
3235import org .apache .pulsar .client .impl .conf .ConsumerConfigurationData ;
3336import org .apache .spark .storage .StorageLevel ;
@@ -43,34 +46,52 @@ public class SparkStreamingPulsarReceiver extends Receiver<byte[]> {
4346 private static final Logger LOG = LoggerFactory .getLogger (SparkStreamingPulsarReceiver .class );
4447
4548 private String serviceUrl ;
46- private ConsumerConfigurationData <byte []> conf ;
49+ private Map <String ,Object > clientConfig ;
50+ private ConsumerConfigurationData <byte []> consumerConfig ;
4751 private Authentication authentication ;
4852 private PulsarClient pulsarClient ;
4953 private Consumer <byte []> consumer ;
5054
5155 public SparkStreamingPulsarReceiver (
5256 String serviceUrl ,
53- ConsumerConfigurationData <byte []> conf ,
57+ ConsumerConfigurationData <byte []> consumerConfig ,
5458 Authentication authentication ) {
55- this (StorageLevel .MEMORY_AND_DISK_2 (), serviceUrl , conf , authentication );
59+ this (StorageLevel .MEMORY_AND_DISK_2 (), serviceUrl , new HashMap <>(), consumerConfig , authentication );
60+ }
61+
62+ public SparkStreamingPulsarReceiver (
63+ String serviceUrl ,
64+ Map <String ,Object > clientConfig ,
65+ ConsumerConfigurationData <byte []> consumerConfig ,
66+ Authentication authentication ) {
67+ this (StorageLevel .MEMORY_AND_DISK_2 (), serviceUrl , clientConfig , consumerConfig , authentication );
68+ }
69+
70+ public SparkStreamingPulsarReceiver (StorageLevel storageLevel ,
71+ String serviceUrl ,
72+ ConsumerConfigurationData <byte []> consumerConf ,
73+ Authentication authentication ) {
74+ this (StorageLevel .MEMORY_AND_DISK_2 (), serviceUrl , new HashMap <>(), consumerConf , authentication );
5675 }
5776
5877 public SparkStreamingPulsarReceiver (StorageLevel storageLevel ,
5978 String serviceUrl ,
60- ConsumerConfigurationData <byte []> conf ,
79+ Map <String ,Object > clientConfig ,
80+ ConsumerConfigurationData <byte []> consumerConfig ,
6181 Authentication authentication ) {
6282 super (storageLevel );
6383
6484 checkNotNull (serviceUrl , "serviceUrl must not be null" );
65- checkNotNull (conf , "ConsumerConfigurationData must not be null" );
66- checkArgument (conf .getTopicNames ().size () > 0 , "TopicNames must be set a value." );
67- checkNotNull (conf .getSubscriptionName (), "SubscriptionName must not be null" );
85+ checkNotNull (consumerConfig , "ConsumerConfigurationData must not be null" );
86+ checkNotNull (clientConfig , "Client configuration map must not be null" );
87+ checkArgument (consumerConfig .getTopicNames ().size () > 0 , "TopicNames must be set a value." );
88+ checkNotNull (consumerConfig .getSubscriptionName (), "SubscriptionName must not be null" );
6889
6990 this .serviceUrl = serviceUrl ;
7091 this .authentication = authentication ;
7192
72- if (conf .getMessageListener () == null ) {
73- conf .setMessageListener ((MessageListener <byte []> & Serializable ) (consumer , msg ) -> {
93+ if (consumerConfig .getMessageListener () == null ) {
94+ consumerConfig .setMessageListener ((MessageListener <byte []> & Serializable ) (consumer , msg ) -> {
7495 try {
7596 store (msg .getData ());
7697 consumer .acknowledgeAsync (msg );
@@ -80,13 +101,18 @@ public SparkStreamingPulsarReceiver(StorageLevel storageLevel,
80101 }
81102 });
82103 }
83- this .conf = conf ;
104+ this .clientConfig = clientConfig ;
105+ this .consumerConfig = consumerConfig ;
84106 }
85107
86108 public void onStart () {
87109 try {
88- pulsarClient = PulsarClient .builder ().serviceUrl (serviceUrl ).authentication (authentication ).build ();
89- consumer = ((PulsarClientImpl ) pulsarClient ).subscribeAsync (conf ).join ();
110+ ClientBuilder builder = PulsarClient .builder ().serviceUrl (serviceUrl ).authentication (authentication );
111+ if (!clientConfig .isEmpty ()) {
112+ builder .loadConf (clientConfig );
113+ }
114+ pulsarClient = builder .build ();
115+ consumer = ((PulsarClientImpl ) pulsarClient ).subscribeAsync (consumerConfig ).join ();
90116 } catch (Exception e ) {
91117 LOG .error ("Failed to start subscription : {}" , e .getMessage ());
92118 restart ("Restart a consumer" );
0 commit comments