diff --git a/sdk/cosmos/azure-cosmos-spark_3/pom.xml b/sdk/cosmos/azure-cosmos-spark_3/pom.xml index e620df1b0373..f01429ebb7a1 100644 --- a/sdk/cosmos/azure-cosmos-spark_3/pom.xml +++ b/sdk/cosmos/azure-cosmos-spark_3/pom.xml @@ -55,12 +55,6 @@ 2.12.19 provided - - commons-io - commons-io - 2.4 - provided - com.azure azure-cosmos @@ -288,7 +282,6 @@ org.apache.spark:spark-sql_2.12:[3.3.0] org.apache.spark:spark-sql_2.12:[3.4.0] org.apache.spark:spark-sql_2.12:[3.5.0] - commons-io:commons-io:[2.4] org.scala-lang:scala-library:[2.12.19] org.scala-lang.modules:scala-java8-compat_2.12:[0.8.0] io.projectreactor:reactor-scala-extensions_2.12:[0.8.0] diff --git a/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/ChangeFeedInitialOffsetWriter.scala b/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/ChangeFeedInitialOffsetWriter.scala index 141c8b431f77..8f22181a4fab 100644 --- a/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/ChangeFeedInitialOffsetWriter.scala +++ b/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/ChangeFeedInitialOffsetWriter.scala @@ -2,7 +2,6 @@ // Licensed under the MIT License. package com.azure.cosmos.spark -import org.apache.commons.io.IOUtils import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, MetadataVersionUtil} @@ -25,7 +24,7 @@ private class ChangeFeedInitialOffsetWriter } override def deserialize(in: InputStream): String = { - val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8)) + val content = readerToString(new InputStreamReader(in, StandardCharsets.UTF_8)) // HDFSMetadataLog would never create a partial file. require(content.nonEmpty) val indexOfNewLine = content.indexOf("\n") @@ -37,4 +36,25 @@ private class ChangeFeedInitialOffsetWriter MetadataVersionUtil.validateVersion(content.substring(0, indexOfNewLine), VERSION) content.substring(indexOfNewLine + 1) } + + private def readerToString(reader: java.io.Reader): String = { + val writer = new StringBuilderWriter + val buffer = new Array[Char](4096) + Stream.continually(reader.read(buffer)).takeWhile(_ != -1).foreach(writer.write(buffer, 0, _)) + writer.toString + } + + private class StringBuilderWriter extends java.io.Writer { + private val stringBuilder = new StringBuilder + + override def write(cbuf: Array[Char], off: Int, len: Int): Unit = { + stringBuilder.appendAll(cbuf, off, len) + } + + override def flush(): Unit = {} + + override def close(): Unit = {} + + override def toString: String = stringBuilder.toString() + } }