Skip to content

Commit 0a0f68b

Browse files
MaxGekkHyukjinKwon
authored andcommitted
[SPARK-24881][SQL] New Avro option - compression
## What changes were proposed in this pull request? In the PR, I added new option for Avro datasource - `compression`. The option allows to specify compression codec for saved Avro files. This option is similar to `compression` option in another datasources like `JSON` and `CSV`. Also I added the SQL configs `spark.sql.avro.compression.codec` and `spark.sql.avro.deflate.level`. I put the configs into `SQLConf`. If the `compression` option is not specified by an user, the first SQL config is taken into account. ## How was this patch tested? I added new test which read meta info from written avro files and checks `avro.codec` property. Author: Maxim Gekk <[email protected]> Closes apache#21837 from MaxGekk/avro-compression.
1 parent c9bec1d commit 0a0f68b

File tree

4 files changed

+67
-14
lines changed

4 files changed

+67
-14
lines changed

external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -117,11 +117,9 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
117117
dataSchema, nullable = false, parsedOptions.recordName, parsedOptions.recordNamespace)
118118

119119
AvroJob.setOutputKeySchema(job, outputAvroSchema)
120-
val AVRO_COMPRESSION_CODEC = "spark.sql.avro.compression.codec"
121-
val AVRO_DEFLATE_LEVEL = "spark.sql.avro.deflate.level"
122120
val COMPRESS_KEY = "mapred.output.compress"
123121

124-
spark.conf.get(AVRO_COMPRESSION_CODEC, "snappy") match {
122+
parsedOptions.compression match {
125123
case "uncompressed" =>
126124
log.info("writing uncompressed Avro records")
127125
job.getConfiguration.setBoolean(COMPRESS_KEY, false)
@@ -132,8 +130,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
132130
job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.SNAPPY_CODEC)
133131

134132
case "deflate" =>
135-
val deflateLevel = spark.conf.get(
136-
AVRO_DEFLATE_LEVEL, Deflater.DEFAULT_COMPRESSION.toString).toInt
133+
val deflateLevel = spark.sessionState.conf.avroDeflateLevel
137134
log.info(s"compressing Avro output using deflate (level=$deflateLevel)")
138135
job.getConfiguration.setBoolean(COMPRESS_KEY, true)
139136
job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.DEFLATE_CODEC)

external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration
2121

2222
import org.apache.spark.internal.Logging
2323
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
24+
import org.apache.spark.sql.internal.SQLConf
2425

2526
/**
2627
* Options for Avro Reader and Writer stored in case insensitive manner.
@@ -68,4 +69,14 @@ class AvroOptions(
6869
.map(_.toBoolean)
6970
.getOrElse(!ignoreFilesWithoutExtension)
7071
}
72+
73+
/**
74+
* The `compression` option allows to specify a compression codec used in write.
75+
* Currently supported codecs are `uncompressed`, `snappy` and `deflate`.
76+
* If the option is not set, the `spark.sql.avro.compression.codec` config is taken into
77+
* account. If the former one is not set too, the `snappy` codec is used by default.
78+
*/
79+
val compression: String = {
80+
parameters.get("compression").getOrElse(SQLConf.get.avroCompressionCodec)
81+
}
7182
}

external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,14 @@ import scala.collection.JavaConverters._
2727

2828
import org.apache.avro.Schema
2929
import org.apache.avro.Schema.{Field, Type}
30-
import org.apache.avro.file.DataFileWriter
31-
import org.apache.avro.generic.{GenericData, GenericDatumWriter, GenericRecord}
30+
import org.apache.avro.file.{DataFileReader, DataFileWriter}
31+
import org.apache.avro.generic.{GenericData, GenericDatumReader, GenericDatumWriter, GenericRecord}
3232
import org.apache.avro.generic.GenericData.{EnumSymbol, Fixed}
3333
import org.apache.commons.io.FileUtils
3434

3535
import org.apache.spark.sql._
3636
import org.apache.spark.sql.execution.datasources.DataSource
37+
import org.apache.spark.sql.internal.SQLConf
3738
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
3839
import org.apache.spark.sql.types._
3940

@@ -364,21 +365,19 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
364365
}
365366
}
366367

367-
test("write with compression") {
368+
test("write with compression - sql configs") {
368369
withTempPath { dir =>
369-
val AVRO_COMPRESSION_CODEC = "spark.sql.avro.compression.codec"
370-
val AVRO_DEFLATE_LEVEL = "spark.sql.avro.deflate.level"
371370
val uncompressDir = s"$dir/uncompress"
372371
val deflateDir = s"$dir/deflate"
373372
val snappyDir = s"$dir/snappy"
374373

375374
val df = spark.read.format("avro").load(testAvro)
376-
spark.conf.set(AVRO_COMPRESSION_CODEC, "uncompressed")
375+
spark.conf.set(SQLConf.AVRO_COMPRESSION_CODEC.key, "uncompressed")
377376
df.write.format("avro").save(uncompressDir)
378-
spark.conf.set(AVRO_COMPRESSION_CODEC, "deflate")
379-
spark.conf.set(AVRO_DEFLATE_LEVEL, "9")
377+
spark.conf.set(SQLConf.AVRO_COMPRESSION_CODEC.key, "deflate")
378+
spark.conf.set(SQLConf.AVRO_DEFLATE_LEVEL.key, "9")
380379
df.write.format("avro").save(deflateDir)
381-
spark.conf.set(AVRO_COMPRESSION_CODEC, "snappy")
380+
spark.conf.set(SQLConf.AVRO_COMPRESSION_CODEC.key, "snappy")
382381
df.write.format("avro").save(snappyDir)
383382

384383
val uncompressSize = FileUtils.sizeOfDirectory(new File(uncompressDir))
@@ -890,4 +889,31 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
890889
}
891890
}
892891
}
892+
893+
test("SPARK-24881: write with compression - avro options") {
894+
def getCodec(dir: String): Option[String] = {
895+
val files = new File(dir)
896+
.listFiles()
897+
.filter(_.isFile)
898+
.filter(_.getName.endsWith("avro"))
899+
files.map { file =>
900+
val reader = new DataFileReader(file, new GenericDatumReader[Any]())
901+
val r = reader.getMetaString("avro.codec")
902+
r
903+
}.map(v => if (v == "null") "uncompressed" else v).headOption
904+
}
905+
def checkCodec(df: DataFrame, dir: String, codec: String): Unit = {
906+
val subdir = s"$dir/$codec"
907+
df.write.option("compression", codec).format("avro").save(subdir)
908+
assert(getCodec(subdir) == Some(codec))
909+
}
910+
withTempPath { dir =>
911+
val path = dir.toString
912+
val df = spark.read.format("avro").load(testAvro)
913+
914+
checkCodec(df, path, "uncompressed")
915+
checkCodec(df, path, "deflate")
916+
checkCodec(df, path, "snappy")
917+
}
918+
}
893919
}

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.sql.internal
2020
import java.util.{Locale, NoSuchElementException, Properties, TimeZone}
2121
import java.util.concurrent.TimeUnit
2222
import java.util.concurrent.atomic.AtomicReference
23+
import java.util.zip.Deflater
2324

2425
import scala.collection.JavaConverters._
2526
import scala.collection.immutable
@@ -1434,6 +1435,20 @@ object SQLConf {
14341435
"This only takes effect when spark.sql.repl.eagerEval.enabled is set to true.")
14351436
.intConf
14361437
.createWithDefault(20)
1438+
1439+
val AVRO_COMPRESSION_CODEC = buildConf("spark.sql.avro.compression.codec")
1440+
.doc("Compression codec used in writing of AVRO files. Default codec is snappy.")
1441+
.stringConf
1442+
.checkValues(Set("uncompressed", "deflate", "snappy"))
1443+
.createWithDefault("snappy")
1444+
1445+
val AVRO_DEFLATE_LEVEL = buildConf("spark.sql.avro.deflate.level")
1446+
.doc("Compression level for the deflate codec used in writing of AVRO files. " +
1447+
"Valid value must be in the range of from 1 to 9 inclusive or -1. " +
1448+
"The default value is -1 which corresponds to 6 level in the current implementation.")
1449+
.intConf
1450+
.checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION)
1451+
.createWithDefault(Deflater.DEFAULT_COMPRESSION)
14371452
}
14381453

14391454
/**
@@ -1820,6 +1835,10 @@ class SQLConf extends Serializable with Logging {
18201835

18211836
def replEagerEvalTruncate: Int = getConf(SQLConf.REPL_EAGER_EVAL_TRUNCATE)
18221837

1838+
def avroCompressionCodec: String = getConf(SQLConf.AVRO_COMPRESSION_CODEC)
1839+
1840+
def avroDeflateLevel: Int = getConf(SQLConf.AVRO_DEFLATE_LEVEL)
1841+
18231842
/** ********************** SQLConf functionality methods ************ */
18241843

18251844
/** Set Spark SQL configuration properties. */

0 commit comments

Comments
 (0)