Skip to content

Commit e9d23c9

Browse files
authored
Merge pull request #1272 from data-integrations/cherry_fix_emptybqsink
[6.9.2][Cherry-pick] PLUGIN-1445 : BQ sink in case empty task: Use a supplier and initialize only when there are records to write
2 parents 275a35e + 58619df commit e9d23c9

File tree

3 files changed

+130
-13
lines changed

3 files changed

+130
-13
lines changed

src/main/java/io/cdap/plugin/gcp/bigquery/sink/AvroOutputFormat.java

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,19 @@
2828
import org.apache.hadoop.io.NullWritable;
2929
import org.apache.hadoop.mapreduce.RecordWriter;
3030
import org.apache.hadoop.mapreduce.TaskAttemptContext;
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
3133

3234
import java.io.IOException;
3335
import java.io.OutputStream;
36+
import java.io.UncheckedIOException;
37+
import java.util.function.Supplier;
3438

3539
/**
3640
* avro output format.
3741
*/
3842
public class AvroOutputFormat extends AvroKeyOutputFormat<GenericRecord> {
43+
3944
public AvroOutputFormat() {
4045
super();
4146
}
@@ -65,21 +70,34 @@ public RecordWriter<AvroKey<GenericRecord>, NullWritable> getRecordWriter(TaskAt
6570
}
6671

6772
GenericData dataModel = AvroSerialization.createDataModel(conf);
68-
return create(writerSchema, dataModel, getCompressionCodec(context),
69-
getAvroFileOutputStream(context), getSyncInterval(context));
73+
74+
return create(writerSchema, dataModel, getCompressionCodec(context), getOutputStreamSupplier(context),
75+
getSyncInterval(context));
76+
}
77+
78+
//Creating a supplier for outputstream , which will get triggered only if there is a record to written.
79+
//This is to avoid creation of output stream for empty tasks ( no writes )
80+
private Supplier<OutputStream> getOutputStreamSupplier(TaskAttemptContext context) {
81+
return () -> {
82+
try {
83+
return getAvroFileOutputStream(context);
84+
} catch (IOException e) {
85+
throw new UncheckedIOException(e);
86+
}
87+
};
7088
}
7189

7290
/**
7391
* Creates a new record writer instance.
7492
*
7593
* @param writerSchema The writer schema for the records to write.
7694
* @param compressionCodec The compression type for the writer file.
77-
* @param outputStream The target output stream for the records.
95+
* @param outputStreamSupplier A supplier to give the target output stream for the records.
7896
* @param syncInterval The sync interval for the writer file.
7997
*/
8098
private RecordWriter<AvroKey<GenericRecord>, NullWritable> create(
8199
Schema writerSchema, GenericData dataModel, CodecFactory compressionCodec,
82-
OutputStream outputStream, int syncInterval) throws IOException {
83-
return new AvroRecordWriter(writerSchema, dataModel, compressionCodec, outputStream, syncInterval);
100+
Supplier<OutputStream> outputStreamSupplier, int syncInterval) {
101+
return new AvroRecordWriter(writerSchema, dataModel, compressionCodec, outputStreamSupplier, syncInterval);
84102
}
85103
}

src/main/java/io/cdap/plugin/gcp/bigquery/sink/AvroRecordWriter.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
import java.io.IOException;
3232
import java.io.OutputStream;
33+
import java.util.function.Supplier;
3334

3435
/**
3536
* avro record writer
@@ -40,36 +41,36 @@ public class AvroRecordWriter extends RecordWriter<AvroKey<GenericRecord>, NullW
4041
private Schema prevSchema;
4142
private GenericData dataModel;
4243
private CodecFactory compressionCodec;
43-
private OutputStream outputStream;
44+
private Supplier<OutputStream> outputStreamSupplier;
4445
private int syncInterval;
4546

4647
/**
4748
* Constructor.
4849
*
4950
* @param writerSchema The writer schema for the records in the Avro container file.
5051
* @param compressionCodec A compression codec factory for the Avro container file.
51-
* @param outputStream The output stream to write the Avro container file to.
52+
* @param outputStreamSupplier A supplier to give the output stream to write the Avro container file to.
5253
* @param syncInterval The sync interval for the Avro container file.
5354
* @throws IOException If the record writer cannot be opened.
5455
*/
5556
public AvroRecordWriter(Schema writerSchema, GenericData dataModel, CodecFactory compressionCodec,
56-
OutputStream outputStream, int syncInterval) throws IOException {
57+
Supplier<OutputStream> outputStreamSupplier, int syncInterval) {
5758
this.dataModel = dataModel;
5859
this.compressionCodec = compressionCodec;
59-
this.outputStream = outputStream;
6060
this.syncInterval = syncInterval;
61+
this.outputStreamSupplier = outputStreamSupplier;
6162
}
6263
/**
6364
* Constructor.
6465
*
6566
* @param writerSchema The writer schema for the records in the Avro container file.
6667
* @param compressionCodec A compression codec factory for the Avro container file.
67-
* @param outputStream The output stream to write the Avro container file to.
68+
* @param outputStreamSupplier The output stream to write the Avro container file to.
6869
* @throws IOException If the record writer cannot be opened.
6970
*/
7071
public AvroRecordWriter(Schema writerSchema, GenericData dataModel,
71-
CodecFactory compressionCodec, OutputStream outputStream) throws IOException {
72-
this(writerSchema, dataModel, compressionCodec, outputStream,
72+
CodecFactory compressionCodec, Supplier<OutputStream> outputStreamSupplier) {
73+
this(writerSchema, dataModel, compressionCodec, outputStreamSupplier,
7374
DataFileConstants.DEFAULT_SYNC_INTERVAL);
7475
}
7576

@@ -78,6 +79,7 @@ public AvroRecordWriter(Schema writerSchema, GenericData dataModel,
7879
public void write(AvroKey<GenericRecord> record, NullWritable ignore) throws IOException {
7980
// Create an Avro container file and a writer to it.
8081
Schema writerSchema = record.datum().getSchema();
82+
8183
if (mAvroFileWriter == null) {
8284
createFileWriter(writerSchema);
8385
}
@@ -94,7 +96,7 @@ private void createFileWriter(Schema writerSchema) throws IOException {
9496
mAvroFileWriter = new DataFileWriter<GenericRecord>(dataModel.createDatumWriter(writerSchema));
9597
mAvroFileWriter.setCodec(compressionCodec);
9698
mAvroFileWriter.setSyncInterval(syncInterval);
97-
mAvroFileWriter.create(writerSchema, outputStream);
99+
mAvroFileWriter.create(writerSchema, outputStreamSupplier.get());
98100
prevSchema = writerSchema;
99101
}
100102

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package io.cdap.plugin.gcp.bigquery.sink;
2+
3+
import io.cdap.cdap.api.data.format.StructuredRecord;
4+
import io.cdap.cdap.api.data.schema.Schema;
5+
import io.cdap.plugin.format.avro.StructuredToAvroTransformer;
6+
import org.apache.avro.file.CodecFactory;
7+
import org.apache.avro.file.DataFileReader;
8+
import org.apache.avro.file.SeekableByteArrayInput;
9+
import org.apache.avro.generic.GenericData;
10+
import org.apache.avro.generic.GenericDatumReader;
11+
import org.apache.avro.generic.GenericRecord;
12+
import org.apache.avro.mapred.AvroKey;
13+
import org.apache.hadoop.io.NullWritable;
14+
import org.junit.Assert;
15+
import org.junit.Test;
16+
import org.mockito.Mockito;
17+
18+
import java.io.ByteArrayOutputStream;
19+
import java.io.IOException;
20+
import java.io.OutputStream;
21+
import java.time.LocalDate;
22+
import java.time.LocalTime;
23+
import java.util.function.Supplier;
24+
25+
26+
public class AvroRecordWriterTest {
27+
28+
@Test
29+
public void testSuccessfulWrites() throws IOException {
30+
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
31+
Supplier<OutputStream> outputStreamSupplier = () -> byteArrayOutputStream;
32+
33+
AvroRecordWriter avroRecordWriter = new AvroRecordWriter(
34+
null,
35+
GenericData.get(),
36+
CodecFactory.nullCodec(),
37+
outputStreamSupplier,
38+
33);
39+
40+
//Create a Generic record
41+
Schema schema = Schema.recordOf("record",
42+
Schema.Field.of("id", Schema.of(Schema.Type.LONG)),
43+
Schema.Field.of("name", Schema.of(Schema.Type.STRING)),
44+
Schema.Field.of("price", Schema.of(Schema.Type.DOUBLE)),
45+
Schema.Field.of("dt", Schema.nullableOf(Schema.of(Schema.LogicalType.DATE))),
46+
Schema.Field.of("time", Schema.of(Schema.LogicalType.TIME_MICROS)),
47+
Schema.Field.of("timestamp",
48+
Schema.nullableOf(Schema.of(Schema.LogicalType.TIMESTAMP_MICROS))));
49+
50+
StructuredRecord sourceRecord = StructuredRecord.builder(schema)
51+
.set("id", 1L)
52+
.set("name", "alice")
53+
.set("price", 1.2d)
54+
.setDate("dt", LocalDate.of(2018, 11, 11))
55+
.setTime("time", LocalTime.of(11, 11, 11))
56+
.set("timestamp", 1464181635000000L).build();
57+
GenericRecord sourceGenericRecord = new StructuredToAvroTransformer(schema).transform(sourceRecord);
58+
AvroKey<GenericRecord> record = new AvroKey(sourceGenericRecord);
59+
60+
//Write using avroRecordWriter
61+
avroRecordWriter.write(record, Mockito.mock(NullWritable.class));
62+
avroRecordWriter.close(null);
63+
64+
//Now read the avro record from the Output stream
65+
GenericDatumReader genericDatumReader = new GenericDatumReader<GenericRecord>(sourceGenericRecord.getSchema());
66+
SeekableByteArrayInput inputStream = new SeekableByteArrayInput(byteArrayOutputStream.toByteArray());
67+
DataFileReader dataFileReader = new DataFileReader<GenericRecord>(inputStream, genericDatumReader);
68+
69+
//Check if it matches
70+
Assert.assertTrue(dataFileReader.iterator().hasNext());
71+
//Record read must match the sourceGenericRecord
72+
Assert.assertEquals(dataFileReader.iterator().next(), sourceGenericRecord);
73+
}
74+
75+
76+
/**
77+
* If there are no records, then the output stream supplier should not be called
78+
* If called then will throw exception
79+
*
80+
* @throws IOException
81+
*/
82+
@Test
83+
public void testZeroWrites() throws IOException {
84+
Supplier<OutputStream> outputStreamSupplier = () -> {
85+
throw new IllegalStateException("This supplier should not be called");
86+
};
87+
88+
AvroRecordWriter avroRecordWriter = new AvroRecordWriter(
89+
null,
90+
GenericData.get(),
91+
CodecFactory.nullCodec(),
92+
outputStreamSupplier,
93+
33);
94+
95+
avroRecordWriter.close(null);
96+
}
97+
}

0 commit comments

Comments
 (0)