16
16
17
17
package org .radarcns .util ;
18
18
19
+ import com .fasterxml .jackson .databind .JsonMappingException ;
19
20
import com .fasterxml .jackson .databind .MappingIterator ;
20
21
import com .fasterxml .jackson .databind .ObjectReader ;
21
22
import com .fasterxml .jackson .databind .ObjectWriter ;
33
34
import java .io .Reader ;
34
35
import java .io .Writer ;
35
36
import java .nio .ByteBuffer ;
36
- import java .util .LinkedHashMap ;
37
- import java .util .List ;
38
- import java .util .Map ;
37
+ import java .util .*;
39
38
40
39
/**
41
40
* Converts deep hierarchical Avro records into flat CSV format. It uses a simple dot syntax in the
@@ -62,7 +61,7 @@ public boolean hasHeader() {
62
61
private final ObjectWriter csvWriter ;
63
62
private final Map <String , Object > map ;
64
63
private final CsvGenerator generator ;
65
- private final int numOfColumns ;
64
+ private CsvSchema schema ;
66
65
67
66
public CsvAvroConverter (CsvFactory factory , Writer writer , GenericRecord record , boolean writeHeader , Reader reader )
68
67
throws IOException {
@@ -71,7 +70,7 @@ public CsvAvroConverter(CsvFactory factory, Writer writer, GenericRecord record,
71
70
CsvMapper mapper = new CsvMapper (factory );
72
71
Map <String , Object > value ;
73
72
74
- CsvSchema schema = CsvSchema .emptySchema ().withHeader ();
73
+ schema = CsvSchema .emptySchema ().withHeader ();
75
74
if (!writeHeader ) {
76
75
// If file already exists read the schema from the CSV file
77
76
ObjectReader objectReader = mapper .readerFor (Map .class ).with (schema );
@@ -93,7 +92,6 @@ public CsvAvroConverter(CsvFactory factory, Writer writer, GenericRecord record,
93
92
94
93
generator = factory .createGenerator (writer );
95
94
csvWriter = mapper .writer (schema );
96
- numOfColumns = schema .size ();
97
95
98
96
}
99
97
@@ -107,10 +105,21 @@ public CsvAvroConverter(CsvFactory factory, Writer writer, GenericRecord record,
107
105
public boolean writeRecord (GenericRecord record ) throws IOException {
108
106
Map <String , Object > localMap = convertRecord (record );
109
107
110
- if (localMap .size () > numOfColumns ) {
108
+ if (localMap .size () > schema . size () ) {
111
109
// Cannot write to same file so return false
112
110
return false ;
111
+ } else {
112
+ Iterator <String > localColumnIterator = localMap .keySet ().iterator ();
113
+ for (int i = 0 ; i < schema .size (); i ++) {
114
+ if (!schema .columnName (i ).equals (localColumnIterator .next ())) {
115
+ /* The order or name of columns is different and
116
+ thus cannot write to this csv file. return false.
117
+ */
118
+ return false ;
119
+ }
120
+ }
113
121
}
122
+
114
123
csvWriter .writeValue (generator , localMap );
115
124
localMap .clear ();
116
125
return true ;
0 commit comments