Skip to content

Commit 3e62b4a

Browse files
If file already exists, read the schema from CSV file
1 parent 61cae16 commit 3e62b4a

10 files changed

+53
-49
lines changed

src/main/java/org/radarcns/RestructureAvroRecords.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ private void writeRecord(GenericRecord record, String topicName, FileCacheStore
286286
// Write data
287287
int response = cache.writeRecord(outputPath, record);
288288

289-
if (response == FileCacheStore.CACHE_AND_NO_WRITE) {
289+
if (response == FileCacheStore.CACHE_AND_NO_WRITE || response == FileCacheStore.NO_CACHE_AND_NO_WRITE) {
290290
// Write was unsuccessful due to different number of columns,
291291
// try again with new file name
292292
writeRecord(record, topicName, cache, ++suffix);

src/main/java/org/radarcns/util/CsvAvroConverter.java

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.radarcns.util;
1818

19+
import com.fasterxml.jackson.databind.MappingIterator;
20+
import com.fasterxml.jackson.databind.ObjectReader;
1921
import com.fasterxml.jackson.databind.ObjectWriter;
2022
import com.fasterxml.jackson.dataformat.csv.CsvFactory;
2123
import com.fasterxml.jackson.dataformat.csv.CsvGenerator;
@@ -28,9 +30,9 @@
2830
import org.apache.avro.generic.GenericRecord;
2931

3032
import java.io.IOException;
33+
import java.io.Reader;
3134
import java.io.Writer;
3235
import java.nio.ByteBuffer;
33-
import java.util.Iterator;
3436
import java.util.LinkedHashMap;
3537
import java.util.List;
3638
import java.util.Map;
@@ -46,8 +48,8 @@ public static RecordConverterFactory getFactory() {
4648
CsvFactory factory = new CsvFactory();
4749
return new RecordConverterFactory() {
4850
@Override
49-
public RecordConverter converterFor(Writer writer, GenericRecord record, boolean writeHeader) throws IOException {
50-
return new CsvAvroConverter(factory, writer, record, writeHeader);
51+
public RecordConverter converterFor(Writer writer, GenericRecord record, boolean writeHeader, Reader reader) throws IOException {
52+
return new CsvAvroConverter(factory, writer, record, writeHeader, reader);
5153
}
5254

5355
@Override
@@ -62,21 +64,37 @@ public boolean hasHeader() {
6264
private final CsvGenerator generator;
6365
private final int numOfColumns;
6466

65-
public CsvAvroConverter(CsvFactory factory, Writer writer, GenericRecord record, boolean writeHeader)
67+
public CsvAvroConverter(CsvFactory factory, Writer writer, GenericRecord record, boolean writeHeader, Reader reader)
6668
throws IOException {
6769
map = new LinkedHashMap<>();
68-
Map<String, Object> value = convertRecord(record);
70+
71+
CsvMapper mapper = new CsvMapper(factory);
72+
Map<String, Object> value;
73+
74+
CsvSchema schema = CsvSchema.emptySchema().withHeader();
75+
if (!writeHeader) {
76+
// If file already exists read the schema from the CSV file
77+
ObjectReader objectReader = mapper.readerFor(Map.class).with(schema);
78+
MappingIterator<Map<String,Object>> iterator = objectReader.readValues(reader);
79+
value = iterator.next();
80+
} else {
81+
value = convertRecord(record);
82+
}
83+
6984
CsvSchema.Builder builder = new CsvSchema.Builder();
7085
for (String key : value.keySet()) {
7186
builder.addColumn(key);
7287
}
73-
CsvSchema schema = builder.build();
88+
schema = builder.build();
89+
7490
if (writeHeader) {
7591
schema = schema.withHeader();
7692
}
77-
numOfColumns = schema.size();
93+
7894
generator = factory.createGenerator(writer);
79-
csvWriter = new CsvMapper(factory).writer(schema);
95+
csvWriter = mapper.writer(schema);
96+
numOfColumns = schema.size();
97+
8098
}
8199

82100
/**

src/main/java/org/radarcns/util/FileCache.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,11 @@
1616

1717
package org.radarcns.util;
1818

19-
import java.io.BufferedOutputStream;
20-
import java.io.Closeable;
21-
import java.io.FileOutputStream;
22-
import java.io.Flushable;
23-
import java.io.IOException;
24-
import java.io.OutputStream;
25-
import java.io.OutputStreamWriter;
26-
import java.io.Writer;
19+
import java.io.*;
2720
import java.nio.file.Files;
2821
import java.nio.file.Path;
2922
import java.nio.file.StandardOpenOption;
23+
import java.util.zip.GZIPInputStream;
3024
import java.util.zip.GZIPOutputStream;
3125
import javax.annotation.Nonnull;
3226
import org.apache.avro.generic.GenericRecord;
@@ -57,15 +51,19 @@ public FileCache(RecordConverterFactory converterFactory, Path path,
5751

5852
OutputStream outFile = Files.newOutputStream(path,
5953
StandardOpenOption.APPEND, StandardOpenOption.CREATE);
54+
InputStream inputStream = new BufferedInputStream(Files.newInputStream(path));
6055
OutputStream bufOut = new BufferedOutputStream(outFile);
6156
if (gzip) {
6257
bufOut = new GZIPOutputStream(bufOut);
58+
if (!fileIsNew) {
59+
inputStream = new GZIPInputStream(inputStream);
60+
}
6361
}
6462

6563
this.writer = new OutputStreamWriter(bufOut);
6664

67-
try {
68-
this.recordConverter = converterFactory.converterFor(writer, record, fileIsNew);
65+
try (Reader reader = new InputStreamReader(inputStream)) {
66+
this.recordConverter = converterFactory.converterFor(writer, record, fileIsNew, reader);
6967
} catch (IOException ex) {
7068
try {
7169
writer.close();

src/main/java/org/radarcns/util/FileCacheStore.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ public int writeRecord(Path path, GenericRecord record) throws IOException {
8585
if(cache.writeRecord(record)) {
8686
return NO_CACHE_AND_WRITE;
8787
} else {
88+
// The file path was not in cache but the file exists and this write is
89+
// unsuccessful because of different number of columns
8890
return NO_CACHE_AND_NO_WRITE;
8991
}
9092

src/main/java/org/radarcns/util/JsonAvroConverter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public final class JsonAvroConverter implements RecordConverter {
4242

4343
public static RecordConverterFactory getFactory() {
4444
JsonFactory factory = new JsonFactory();
45-
return (writer, record, writeHeader) -> new JsonAvroConverter(factory, writer);
45+
return (writer, record, writeHeader, reader) -> new JsonAvroConverter(factory, writer);
4646
}
4747

4848
private final ObjectWriter jsonWriter;

src/main/java/org/radarcns/util/RecordConverter.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,8 @@
1717
package org.radarcns.util;
1818

1919
import java.io.Closeable;
20-
import java.io.File;
2120
import java.io.Flushable;
2221
import java.io.IOException;
23-
import java.nio.file.Path;
24-
import java.util.List;
2522
import java.util.Map;
2623
import org.apache.avro.generic.GenericRecord;
2724

src/main/java/org/radarcns/util/RecordConverterFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public interface RecordConverterFactory {
5353
* @return RecordConverter that is ready to be used
5454
* @throws IOException if the converter could not be created
5555
*/
56-
RecordConverter converterFor(Writer writer, GenericRecord record, boolean writeHeader) throws IOException;
56+
RecordConverter converterFor(Writer writer, GenericRecord record, boolean writeHeader, Reader reader) throws IOException;
5757

5858
default boolean hasHeader() {
5959
return false;

src/test/java/org/radarcns/util/CsvAvroConverterTest.java

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,9 @@
2424

2525
import com.fasterxml.jackson.databind.JsonMappingException;
2626

27-
import java.io.BufferedReader;
28-
import java.io.BufferedWriter;
29-
import java.io.IOException;
30-
import java.io.InputStream;
31-
import java.io.InputStreamReader;
32-
import java.io.OutputStream;
33-
import java.io.OutputStreamWriter;
34-
import java.io.Reader;
35-
import java.io.StringWriter;
36-
import java.io.Writer;
27+
import java.io.*;
3728
import java.nio.file.Files;
3829
import java.nio.file.Path;
39-
import java.util.ArrayList;
4030
import java.util.Arrays;
4131
import java.util.Iterator;
4232
import java.util.LinkedHashSet;
@@ -77,7 +67,7 @@ public void writeRecord() throws IOException {
7767

7868
StringWriter writer = new StringWriter();
7969
RecordConverterFactory factory = CsvAvroConverter.getFactory();
80-
RecordConverter converter = factory.converterFor(writer, record, true);
70+
RecordConverter converter = factory.converterFor(writer, record, true, new StringReader("test"));
8171

8272
Map<String, Object> map = converter.convertRecord(record);
8373
List<String> keys = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i.some",
@@ -120,7 +110,7 @@ public void differentSchema() throws IOException {
120110
GenericRecord recordA = new GenericRecordBuilder(schemaA).set("a", "something").build();
121111

122112
StringWriter writer = new StringWriter();
123-
RecordConverter converter = CsvAvroConverter.getFactory().converterFor(writer, recordA, true);
113+
RecordConverter converter = CsvAvroConverter.getFactory().converterFor(writer, recordA, true, new StringReader("test"));
124114
converter.writeRecord(recordA);
125115

126116
Schema schemaB = SchemaBuilder.record("B").fields().name("b").type("string").noDefault().endRecord();
@@ -140,7 +130,7 @@ public void subSchema() throws IOException {
140130
.build();
141131

142132
StringWriter writer = new StringWriter();
143-
RecordConverter converter = CsvAvroConverter.getFactory().converterFor(writer, recordA, true);
133+
RecordConverter converter = CsvAvroConverter.getFactory().converterFor(writer, recordA, true, new StringReader("test"));
144134
converter.writeRecord(recordA);
145135

146136
Schema schemaB = SchemaBuilder.record("B").fields().name("b").type("string").noDefault().endRecord();

src/test/java/org/radarcns/util/FileCacheStoreTest.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,7 @@
1717
package org.radarcns.util;
1818

1919
import static org.junit.Assert.assertEquals;
20-
import static org.junit.Assert.assertFalse;
21-
import static org.junit.Assert.assertTrue;
2220

23-
import java.io.File;
2421
import java.io.IOException;
2522
import java.nio.file.Files;
2623
import java.nio.file.Path;
@@ -44,6 +41,7 @@ public void appendLine() throws IOException {
4441
Path f3 = folder.newFile().toPath();
4542
Path d4 = folder.newFolder().toPath();
4643
Path f4 = d4.resolve("f4.txt");
44+
Path newFile = folder.newFile().toPath();
4745

4846
Files.delete(f1);
4947
Files.delete(d4);
@@ -82,12 +80,16 @@ record = new GenericRecordBuilder(simpleSchema).set("a", "f3").build();
8280
record = new GenericRecordBuilder(conflictSchema).set("a", "f3"). set("b", "conflict").build();
8381
assertEquals(cache.writeRecord(f3, record), FileCacheStore.CACHE_AND_NO_WRITE);
8482
record = new GenericRecordBuilder(conflictSchema).set("a", "f1"). set("b", "conflict").build();
85-
assertEquals(cache.writeRecord(f1, record), FileCacheStore.NO_CACHE_AND_WRITE);
83+
// Cannot write to file even though the file is not in cache since schema is different
84+
assertEquals(cache.writeRecord(f1, record), FileCacheStore.NO_CACHE_AND_NO_WRITE);
85+
// Can write the same record to a new file
86+
assertEquals(cache.writeRecord(newFile, record), FileCacheStore.NO_CACHE_AND_WRITE);
8687
}
8788

88-
assertEquals("a\nsomething\nsomethingElse\nthird\nf1,conflict\n", new String(Files.readAllBytes(f1)));
89+
assertEquals("a\nsomething\nsomethingElse\nthird\n", new String(Files.readAllBytes(f1)));
8990
assertEquals("a\nsomething\nf2\n", new String(Files.readAllBytes(f2)));
9091
assertEquals("a\nf3\nf3\nf3\n", new String(Files.readAllBytes(f3)));
9192
assertEquals("a\nf4\n", new String(Files.readAllBytes(f4)));
93+
assertEquals("a,b\nf1,conflict\n", new String(Files.readAllBytes(newFile)));
9294
}
9395
}

src/test/java/org/radarcns/util/JsonAvroConverterTest.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,8 @@
2222
import com.fasterxml.jackson.databind.ObjectMapper;
2323
import com.fasterxml.jackson.databind.ObjectWriter;
2424
import com.fasterxml.jackson.databind.SerializationFeature;
25-
import java.io.BufferedReader;
26-
import java.io.BufferedWriter;
27-
import java.io.IOException;
28-
import java.io.InputStreamReader;
29-
import java.io.StringWriter;
25+
26+
import java.io.*;
3027
import java.nio.file.Files;
3128
import java.nio.file.Path;
3229
import java.util.Arrays;
@@ -55,7 +52,7 @@ public void fullAvroTest() throws IOException {
5552
JsonDecoder decoder = DecoderFactory.get().jsonDecoder(schema, getClass().getResourceAsStream("full.json"));
5653
GenericRecord record = reader.read(null, decoder);
5754

58-
Map<String, Object> map = JsonAvroConverter.getFactory().converterFor(new StringWriter(), record, false).convertRecord(record);
55+
Map<String, Object> map = JsonAvroConverter.getFactory().converterFor(new StringWriter(), record, false, new StringReader("test")).convertRecord(record);
5956
ObjectWriter writer = new ObjectMapper().enable(SerializationFeature.INDENT_OUTPUT).writer();
6057
String result = writer.writeValueAsString(map);
6158

0 commit comments

Comments
 (0)