16
16
17
17
package org .radarcns .util ;
18
18
19
+ import com .fasterxml .jackson .databind .JsonMappingException ;
20
+ import com .fasterxml .jackson .databind .MappingIterator ;
21
+ import com .fasterxml .jackson .databind .ObjectReader ;
19
22
import com .fasterxml .jackson .databind .ObjectWriter ;
20
23
import com .fasterxml .jackson .dataformat .csv .CsvFactory ;
21
24
import com .fasterxml .jackson .dataformat .csv .CsvGenerator ;
28
31
import org .apache .avro .generic .GenericRecord ;
29
32
30
33
import java .io .IOException ;
34
+ import java .io .Reader ;
31
35
import java .io .Writer ;
32
36
import java .nio .ByteBuffer ;
33
- import java .util .LinkedHashMap ;
34
- import java .util .List ;
35
- import java .util .Map ;
37
+ import java .util .*;
36
38
37
39
/**
38
40
* Converts deep hierarchical Avro records into flat CSV format. It uses a simple dot syntax in the
@@ -45,8 +47,8 @@ public static RecordConverterFactory getFactory() {
45
47
CsvFactory factory = new CsvFactory ();
46
48
return new RecordConverterFactory () {
47
49
@ Override
48
- public RecordConverter converterFor (Writer writer , GenericRecord record , boolean writeHeader ) throws IOException {
49
- return new CsvAvroConverter (factory , writer , record , writeHeader );
50
+ public RecordConverter converterFor (Writer writer , GenericRecord record , boolean writeHeader , Reader reader ) throws IOException {
51
+ return new CsvAvroConverter (factory , writer , record , writeHeader , reader );
50
52
}
51
53
52
54
@ Override
@@ -59,28 +61,68 @@ public boolean hasHeader() {
59
61
private final ObjectWriter csvWriter ;
60
62
private final Map <String , Object > map ;
61
63
private final CsvGenerator generator ;
64
+ private CsvSchema schema ;
62
65
63
- public CsvAvroConverter (CsvFactory factory , Writer writer , GenericRecord record , boolean writeHeader )
66
+ public CsvAvroConverter (CsvFactory factory , Writer writer , GenericRecord record , boolean writeHeader , Reader reader )
64
67
throws IOException {
65
68
map = new LinkedHashMap <>();
66
- Map <String , Object > value = convertRecord (record );
69
+
70
+ CsvMapper mapper = new CsvMapper (factory );
71
+ Map <String , Object > value ;
72
+
73
+ schema = CsvSchema .emptySchema ().withHeader ();
74
+ if (!writeHeader ) {
75
+ // If file already exists read the schema from the CSV file
76
+ ObjectReader objectReader = mapper .readerFor (Map .class ).with (schema );
77
+ MappingIterator <Map <String ,Object >> iterator = objectReader .readValues (reader );
78
+ value = iterator .next ();
79
+ } else {
80
+ value = convertRecord (record );
81
+ }
82
+
67
83
CsvSchema .Builder builder = new CsvSchema .Builder ();
68
84
for (String key : value .keySet ()) {
69
85
builder .addColumn (key );
70
86
}
71
- CsvSchema schema = builder .build ();
87
+ schema = builder .build ();
88
+
72
89
if (writeHeader ) {
73
90
schema = schema .withHeader ();
74
91
}
92
+
75
93
generator = factory .createGenerator (writer );
76
- csvWriter = new CsvMapper (factory ).writer (schema );
94
+ csvWriter = mapper .writer (schema );
95
+
77
96
}
78
97
98
+ /**
99
+ * Write AVRO record to CSV file.
100
+ * @param record the AVRO record to be written to CSV file
101
+ * @return true if write was successful, false if cannot write record to the current CSV file
102
+ * @throws IOException for other IO and Mapping errors
103
+ */
79
104
@ Override
80
- public void writeRecord (GenericRecord record ) throws IOException {
105
+ public boolean writeRecord (GenericRecord record ) throws IOException {
81
106
Map <String , Object > localMap = convertRecord (record );
107
+
108
+ if (localMap .size () > schema .size ()) {
109
+ // Cannot write to same file so return false
110
+ return false ;
111
+ } else {
112
+ Iterator <String > localColumnIterator = localMap .keySet ().iterator ();
113
+ for (int i = 0 ; i < schema .size (); i ++) {
114
+ if (!schema .columnName (i ).equals (localColumnIterator .next ())) {
115
+ /* The order or name of columns is different and
116
+ thus cannot write to this csv file. return false.
117
+ */
118
+ return false ;
119
+ }
120
+ }
121
+ }
122
+
82
123
csvWriter .writeValue (generator , localMap );
83
124
localMap .clear ();
125
+ return true ;
84
126
}
85
127
86
128
public Map <String , Object > convertRecord (GenericRecord record ) {
0 commit comments