@@ -27,7 +27,8 @@ case class CassandraConnectorConf(
27
27
connectTimeoutMillis : Int = CassandraConnectorConf .ConnectionTimeoutParam .default,
28
28
readTimeoutMillis : Int = CassandraConnectorConf .ReadTimeoutParam .default,
29
29
connectionFactory : CassandraConnectionFactory = DefaultConnectionFactory ,
30
- cassandraSSLConf : CassandraConnectorConf .CassandraSSLConf = CassandraConnectorConf .DefaultCassandraSSLConf
30
+ cassandraSSLConf : CassandraConnectorConf .CassandraSSLConf = CassandraConnectorConf .DefaultCassandraSSLConf ,
31
+ jmxEnabled : Boolean = true
31
32
) {
32
33
33
34
@ transient
@@ -87,6 +88,12 @@ object CassandraConnectorConf extends Logging {
87
88
default = 9042 ,
88
89
description = """ Cassandra native connection port""" )
89
90
91
+ val JmxEnabledParam = ConfigParameter [Boolean ](
92
+ name = " spark.cassandra.connection.jmxEnabled" ,
93
+ section = ReferenceSection ,
94
+ default = true ,
95
+ description = """ Cassandra JMX Reporting""" )
96
+
90
97
val LocalDCParam = ConfigParameter [Option [String ]](
91
98
name = " spark.cassandra.connection.local_dc" ,
92
99
section = ReferenceSection ,
@@ -219,6 +226,7 @@ object CassandraConnectorConf extends Logging {
219
226
ConnectionPortParam ,
220
227
LocalDCParam ,
221
228
ConnectionTimeoutParam ,
229
+ JmxEnabledParam ,
222
230
KeepAliveMillisParam ,
223
231
MinReconnectionDelayParam ,
224
232
MaxReconnectionDelayParam ,
@@ -254,10 +262,11 @@ object CassandraConnectorConf extends Logging {
254
262
hostName <- hostsStr.split(" ," ).toSet[String ]
255
263
hostAddress <- resolveHost(hostName.trim)
256
264
} yield hostAddress
257
-
265
+
258
266
val port = conf.getInt(ConnectionPortParam .name, ConnectionPortParam .default)
259
267
260
268
val authConf = AuthConf .fromSparkConf(conf)
269
+ val jmxEnabled = conf.getOption(JmxEnabledParam .name).map(_.toBoolean).getOrElse(JmxEnabledParam .default)
261
270
val keepAlive = conf.getInt(KeepAliveMillisParam .name, KeepAliveMillisParam .default)
262
271
263
272
val localDC = conf.getOption(LocalDCParam .name)
@@ -312,7 +321,8 @@ object CassandraConnectorConf extends Logging {
312
321
connectTimeoutMillis = connectTimeout,
313
322
readTimeoutMillis = readTimeout,
314
323
connectionFactory = connectionFactory,
315
- cassandraSSLConf = cassandraSSLConf
324
+ cassandraSSLConf = cassandraSSLConf,
325
+ jmxEnabled = jmxEnabled
316
326
)
317
327
}
318
328
}
0 commit comments