From 28e63cb1b52377d1cbbde8dbe0b8dd0eef851ff2 Mon Sep 17 00:00:00 2001 From: wentingwu Date: Mon, 15 May 2023 11:16:12 -0400 Subject: [PATCH] max frame length --- .../connector/cql/CassandraConnectionFactory.scala | 1 + .../connector/cql/CassandraConnectorConf.scala | 14 ++++++++++++-- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/connector/src/main/scala/com/datastax/spark/connector/cql/CassandraConnectionFactory.scala b/connector/src/main/scala/com/datastax/spark/connector/cql/CassandraConnectionFactory.scala index 2f684037a..7d8766a11 100644 --- a/connector/src/main/scala/com/datastax/spark/connector/cql/CassandraConnectionFactory.scala +++ b/connector/src/main/scala/com/datastax/spark/connector/cql/CassandraConnectionFactory.scala @@ -68,6 +68,7 @@ object DefaultConnectionFactory extends CassandraConnectionFactory { .withInt(MultipleRetryPolicy.MaxRetryCount, conf.queryRetryCount) .withDuration(DseDriverOption.CONTINUOUS_PAGING_TIMEOUT_FIRST_PAGE, Duration.ofMillis(conf.readTimeoutMillis)) .withDuration(DseDriverOption.CONTINUOUS_PAGING_TIMEOUT_OTHER_PAGES, Duration.ofMillis(conf.readTimeoutMillis)) + .withInt(PROTOCOL_MAX_FRAME_LENGTH, conf.maxFrameLengthInMB * 1024 * 1024) } // compression option cannot be set to NONE (default) diff --git a/connector/src/main/scala/com/datastax/spark/connector/cql/CassandraConnectorConf.scala b/connector/src/main/scala/com/datastax/spark/connector/cql/CassandraConnectorConf.scala index 78589987b..acbca042e 100644 --- a/connector/src/main/scala/com/datastax/spark/connector/cql/CassandraConnectorConf.scala +++ b/connector/src/main/scala/com/datastax/spark/connector/cql/CassandraConnectorConf.scala @@ -62,7 +62,8 @@ case class CassandraConnectorConf( connectionFactory: CassandraConnectionFactory = DefaultConnectionFactory, quietPeriodBeforeCloseMillis: Int = CassandraConnectorConf.QuietPeriodBeforeCloseParam.default, timeoutBeforeCloseMillis: Int = CassandraConnectorConf.TimeoutBeforeCloseParam.default, - resolveContactPoints: Boolean = CassandraConnectorConf.ResolveContactPoints.default + resolveContactPoints: Boolean = CassandraConnectorConf.ResolveContactPoints.default, + maxFrameLengthInMB: Int = CassandraConnectorConf.MaxFrameLengthInMB.default, ) { override def hashCode: Int = HashCodeBuilder.reflectionHashCode(this, false) @@ -332,6 +333,12 @@ object CassandraConnectorConf extends Logging { default = DefaultCassandraSSLConf.keyStoreType, description = """Key store type""") + val MaxFrameLengthInMB = ConfigParameter[Int]( + name = "spark.cassandra.protocol.max-frame-length-mb", + section = ReferenceSection, + default = 256, + description = """The maximum length, in MB, of the frames supported by the driver. """) + private def maybeResolveHostAndPort(hostAndPort: String, defaultPort: Int, resolveContactPoints: Boolean): Option[InetSocketAddress] = { val (hostName, port) = if (hostAndPort.contains(":")) { @@ -429,6 +436,8 @@ object CassandraConnectorConf extends Logging { val connectionFactory = CassandraConnectionFactory.fromSparkConf(conf) + val maxFrameLengthInMB = conf.getInt(MaxFrameLengthInMB.name, MaxFrameLengthInMB.default) + CassandraConnectorConf( contactInfo = getContactInfoFromSparkConf(conf), localDC = localDC, @@ -444,7 +453,8 @@ object CassandraConnectorConf extends Logging { connectionFactory = connectionFactory, quietPeriodBeforeCloseMillis = quietPeriodBeforeClose, timeoutBeforeCloseMillis = timeoutBeforeClose, - resolveContactPoints = resolveContactPoints + resolveContactPoints = resolveContactPoints, + maxFrameLengthInMB = maxFrameLengthInMB ) }