@@ -19,7 +19,6 @@ package org.apache.spark.sql.avro
19
19
20
20
import java .io ._
21
21
import java .net .URI
22
- import java .util .zip .Deflater
23
22
24
23
import scala .util .control .NonFatal
25
24
@@ -31,18 +30,18 @@ import org.apache.avro.mapreduce.AvroJob
31
30
import org .apache .hadoop .conf .Configuration
32
31
import org .apache .hadoop .fs .{FileStatus , Path }
33
32
import org .apache .hadoop .mapreduce .Job
34
- import org .slf4j .LoggerFactory
35
33
36
34
import org .apache .spark .TaskContext
35
+ import org .apache .spark .internal .Logging
37
36
import org .apache .spark .sql .SparkSession
38
37
import org .apache .spark .sql .catalyst .InternalRow
39
38
import org .apache .spark .sql .execution .datasources .{FileFormat , OutputWriterFactory , PartitionedFile }
40
39
import org .apache .spark .sql .sources .{DataSourceRegister , Filter }
41
40
import org .apache .spark .sql .types .StructType
42
41
import org .apache .spark .util .SerializableConfiguration
43
42
44
- private [avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
45
- private val log = LoggerFactory .getLogger(getClass)
43
+ private [avro] class AvroFileFormat extends FileFormat
44
+ with DataSourceRegister with Logging with Serializable {
46
45
47
46
override def equals (other : Any ): Boolean = other match {
48
47
case _ : AvroFileFormat => true
@@ -121,23 +120,23 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
121
120
122
121
parsedOptions.compression match {
123
122
case " uncompressed" =>
124
- log.info (" writing uncompressed Avro records" )
123
+ logInfo (" writing uncompressed Avro records" )
125
124
job.getConfiguration.setBoolean(COMPRESS_KEY , false )
126
125
127
126
case " snappy" =>
128
- log.info (" compressing Avro output using Snappy" )
127
+ logInfo (" compressing Avro output using Snappy" )
129
128
job.getConfiguration.setBoolean(COMPRESS_KEY , true )
130
129
job.getConfiguration.set(AvroJob .CONF_OUTPUT_CODEC , DataFileConstants .SNAPPY_CODEC )
131
130
132
131
case " deflate" =>
133
132
val deflateLevel = spark.sessionState.conf.avroDeflateLevel
134
- log.info (s " compressing Avro output using deflate (level= $deflateLevel) " )
133
+ logInfo (s " compressing Avro output using deflate (level= $deflateLevel) " )
135
134
job.getConfiguration.setBoolean(COMPRESS_KEY , true )
136
135
job.getConfiguration.set(AvroJob .CONF_OUTPUT_CODEC , DataFileConstants .DEFLATE_CODEC )
137
136
job.getConfiguration.setInt(AvroOutputFormat .DEFLATE_LEVEL_KEY , deflateLevel)
138
137
139
138
case unknown : String =>
140
- log.error (s " unsupported compression codec $unknown" )
139
+ logError (s " unsupported compression codec $unknown" )
141
140
}
142
141
143
142
new AvroOutputWriterFactory (dataSchema, outputAvroSchema.toString)
@@ -157,7 +156,6 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
157
156
val parsedOptions = new AvroOptions (options, hadoopConf)
158
157
159
158
(file : PartitionedFile ) => {
160
- val log = LoggerFactory .getLogger(classOf [AvroFileFormat ])
161
159
val conf = broadcastedConf.value.value
162
160
val userProvidedSchema = parsedOptions.schema.map(new Schema .Parser ().parse)
163
161
@@ -176,7 +174,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
176
174
DataFileReader .openReader(in, datumReader)
177
175
} catch {
178
176
case NonFatal (e) =>
179
- log.error (" Exception while opening DataFileReader" , e)
177
+ logError (" Exception while opening DataFileReader" , e)
180
178
in.close()
181
179
throw e
182
180
}
0 commit comments