Skip to content

Commit 0d96453

Browse files
committed
Added Avro record to Map converter for use with Jackson
1 parent 3390bb3 commit 0d96453

File tree

6 files changed

+202
-10
lines changed

6 files changed

+202
-10
lines changed

build.gradle

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,23 +9,29 @@ run {
99
args = ['webhdfs://radar-test.thehyve.net:50070', '/topicAndroidPhoneNew/android_phone_sensor_acceleration', "${projectDir}/data"]
1010
}
1111

12-
sourceCompatibility = 1.8
13-
targetCompatibility = 1.8
12+
sourceCompatibility = '1.8'
13+
targetCompatibility = '1.8'
14+
15+
ext.avroVersion = '1.8.1'
16+
ext.jacksonVersion = '2.8.5'
17+
ext.hadoopVersion = '2.7.3'
18+
ext.log4jVersion = '2.8.1'
1419

1520
repositories {
1621
jcenter()
1722
}
1823

1924
dependencies {
20-
compile 'org.apache.hadoop:hadoop-common:2.7.3'
21-
compile 'org.apache.avro:avro:1.8.1'
22-
compile 'org.apache.avro:avro-mapred:1.8.1'
25+
compile group: 'org.apache.hadoop', name: 'hadoop-common', version: hadoopVersion
26+
compile group: 'org.apache.avro', name: 'avro', version: avroVersion
27+
compile group: 'org.apache.avro', name: 'avro-mapred', version: avroVersion
28+
compile group: 'com.fasterxml.jackson.core' , name: 'jackson-databind', version: jacksonVersion
2329

24-
runtime 'org.apache.logging.log4j:log4j-core:2.8.1'
25-
runtime 'org.apache.logging.log4j:log4j-slf4j-impl:2.8.1'
26-
runtime 'org.apache.hadoop:hadoop-hdfs:2.7.3'
30+
runtime group: 'org.apache.logging.log4j', name: 'log4j-core', version: log4jVersion
31+
runtime group: 'org.apache.logging.log4j', name: 'log4j-slf4j-impl', version: log4jVersion
32+
runtime group: 'org.apache.hadoop', name: 'hadoop-hdfs', version: hadoopVersion
2733

28-
testCompile group: 'junit', name: 'junit', version: '4.11'
34+
testCompile group: 'junit', name: 'junit', version: '4.12'
2935
}
3036

3137
//create a single Jar with all dependencies

src/main/java/org/radarcns/RestructureAvroRecords.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,26 @@
11
package org.radarcns;
22

3+
import com.fasterxml.jackson.annotation.JsonAutoDetect;
4+
import com.fasterxml.jackson.core.JsonFactory;
5+
import com.fasterxml.jackson.databind.ObjectMapper;
6+
import com.fasterxml.jackson.databind.ObjectWriter;
37
import java.io.BufferedReader;
48
import java.io.File;
9+
import java.io.FileOutputStream;
510
import java.io.FileReader;
611
import java.io.IOException;
712
import java.text.SimpleDateFormat;
813
import java.util.Date;
914
import java.util.HashSet;
1015
import java.util.Set;
1116
import org.apache.avro.file.DataFileReader;
17+
import org.apache.avro.generic.GenericData;
1218
import org.apache.avro.generic.GenericDatumReader;
19+
import org.apache.avro.generic.GenericDatumWriter;
1320
import org.apache.avro.generic.GenericRecord;
1421
import org.apache.avro.io.DatumReader;
22+
import org.apache.avro.io.EncoderFactory;
23+
import org.apache.avro.io.JsonEncoder;
1524
import org.apache.avro.mapred.FsInput;
1625
import org.apache.commons.io.FilenameUtils;
1726
import org.apache.hadoop.conf.Configuration;
@@ -38,6 +47,7 @@ public class RestructureAvroRecords {
3847

3948
private final Configuration conf = new Configuration();
4049
private final DatumReader<GenericRecord> datumReader;
50+
private final ObjectWriter writer;
4151

4252
private int processedFileCount;
4353
private int processedRecordsCount;
@@ -74,6 +84,16 @@ public RestructureAvroRecords(String inputPath, String outputPath) {
7484
this.setInputWebHdfsURL(inputPath);
7585
this.setOutputPath(outputPath);
7686
datumReader = new GenericDatumReader<>();
87+
88+
ObjectMapper mapper = new ObjectMapper(new JsonFactory());
89+
// only serialize fields, not getters, etc.
90+
mapper.setVisibility(mapper.getSerializationConfig().getDefaultVisibilityChecker()
91+
.withFieldVisibility(JsonAutoDetect.Visibility.ANY)
92+
.withGetterVisibility(JsonAutoDetect.Visibility.NONE)
93+
.withIsGetterVisibility(JsonAutoDetect.Visibility.NONE)
94+
.withSetterVisibility(JsonAutoDetect.Visibility.NONE)
95+
.withCreatorVisibility(JsonAutoDetect.Visibility.NONE));
96+
writer = mapper.writer();
7797
}
7898

7999
public void setInputWebHdfsURL(String fileSystemURL) {
@@ -172,7 +192,8 @@ record = dataFileReader.next(record);
172192
processedFileCount++;
173193
}
174194

175-
private void writeRecord(GenericRecord record, String topicName, FileCache cache) throws IOException {
195+
private void writeRecord(GenericRecord record, String topicName, FileCache cache)
196+
throws IOException {
176197
GenericRecord keyField = (GenericRecord) record.get("keyField");
177198
GenericRecord valueField = (GenericRecord) record.get("valueField");
178199

@@ -236,4 +257,6 @@ private void readSeenOffsets() {
236257
logger.info("Offsets file does not exist yet, will be created.");
237258
}
238259
}
260+
261+
239262
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package org.radarcns.util;
2+
3+
import java.nio.ByteBuffer;
4+
import java.util.ArrayList;
5+
import java.util.HashMap;
6+
import java.util.List;
7+
import java.util.Map;
8+
import org.apache.avro.Schema;
9+
import org.apache.avro.Schema.Field;
10+
import org.apache.avro.generic.GenericData;
11+
import org.apache.avro.generic.GenericFixed;
12+
import org.apache.avro.generic.GenericRecord;
13+
14+
/**
15+
* Created by joris on 20/04/2017.
16+
*/
17+
public class AvroConverter {
18+
public static Map<String, Object> convertRecord(GenericRecord record) {
19+
Map<String, Object> map = new HashMap<>();
20+
Schema schema = record.getSchema();
21+
for (Field field : schema.getFields()) {
22+
map.put(field.name(), convertAvro(record.get(field.pos()), field.schema()));
23+
}
24+
return map;
25+
}
26+
27+
private static Object convertAvro(Object data, Schema schema) {
28+
switch (schema.getType()) {
29+
case RECORD:
30+
return convertRecord((GenericRecord) data);
31+
case MAP: {
32+
Map<String, Object> value = new HashMap<>();
33+
Schema valueType = schema.getValueType();
34+
for (Map.Entry<?, ?> entry : ((Map<?, ?>)data).entrySet()) {
35+
value.put(entry.getKey().toString(), convertAvro(entry.getValue(), valueType));
36+
}
37+
return value;
38+
}
39+
case ARRAY: {
40+
List<?> origList = (List<?>)data;
41+
Schema itemType = schema.getElementType();
42+
List<Object> list = new ArrayList<>(origList.size());
43+
for (Object orig : origList) {
44+
list.add(convertAvro(orig, itemType));
45+
}
46+
return list;
47+
}
48+
case UNION: {
49+
int type = new GenericData().resolveUnion(schema, data);
50+
return convertAvro(data, schema.getTypes().get(type));
51+
}
52+
case BYTES:
53+
return ((ByteBuffer)data).array();
54+
case FIXED:
55+
return ((GenericFixed)data).bytes();
56+
case ENUM:
57+
case STRING:
58+
return data.toString();
59+
case INT:
60+
case LONG:
61+
case DOUBLE:
62+
case FLOAT:
63+
case BOOLEAN:
64+
case NULL:
65+
return data;
66+
default:
67+
throw new IllegalArgumentException("Cannot parse field type " + schema.getType());
68+
}
69+
}
70+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package org.radarcns.util;
2+
3+
import static org.junit.Assert.assertEquals;
4+
5+
import com.fasterxml.jackson.databind.ObjectMapper;
6+
import com.fasterxml.jackson.databind.ObjectWriter;
7+
import com.fasterxml.jackson.databind.SerializationFeature;
8+
import java.io.BufferedReader;
9+
import java.io.IOException;
10+
import java.io.InputStreamReader;
11+
import java.util.Arrays;
12+
import java.util.List;
13+
import java.util.Map;
14+
import java.util.stream.Collectors;
15+
import org.apache.avro.Schema;
16+
import org.apache.avro.Schema.Parser;
17+
import org.apache.avro.generic.GenericDatumReader;
18+
import org.apache.avro.generic.GenericRecord;
19+
import org.apache.avro.io.DecoderFactory;
20+
import org.apache.avro.io.JsonDecoder;
21+
import org.junit.Test;
22+
23+
public class AvroConverterTest {
24+
@Test
25+
public void fullAvroTest() throws IOException {
26+
Parser parser = new Parser();
27+
Schema schema = parser.parse(getClass().getResourceAsStream("full.avsc"));
28+
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
29+
JsonDecoder decoder = DecoderFactory.get().jsonDecoder(schema, getClass().getResourceAsStream("full.json"));
30+
GenericRecord record = reader.read(null, decoder);
31+
32+
Map<String, Object> map = AvroConverter.convertRecord(record);
33+
ObjectWriter writer = new ObjectMapper().enable(SerializationFeature.INDENT_OUTPUT).writer();
34+
String result = writer.writeValueAsString(map);
35+
36+
String expected = new BufferedReader(new InputStreamReader(
37+
getClass().getResourceAsStream("full.json")))
38+
.lines().collect(Collectors.joining("\n"));
39+
40+
System.out.println(result);
41+
42+
String[] expectedLines = expected.split("\n");
43+
String[] resultLines = result.split("\n");
44+
assertEquals(expectedLines.length, resultLines.length);
45+
46+
List<Integer> ignoreLines = Arrays.asList(2, 3, 13);
47+
for (int i = 0; i < expectedLines.length; i++) {
48+
if (ignoreLines.contains(i)) {
49+
continue;
50+
}
51+
assertEquals(expectedLines[i], resultLines[i]);
52+
}
53+
}
54+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
{
2+
"type": "record",
3+
"name": "full",
4+
"fields": [
5+
{"name": "a", "type": "string"},
6+
{"name": "b", "type": "bytes"},
7+
{"name": "c", "type": {"type": "fixed", "size": 1, "name": "md5"}},
8+
{"name": "d", "type": "long"},
9+
{"name": "e", "type": "double"},
10+
{"name": "f", "type": "float"},
11+
{"name": "g", "type": "int"},
12+
{"name": "h", "type": "null"},
13+
{"name": "i", "type": {"type": "map", "values": "int"}},
14+
{"name": "j", "type": {"type": "array", "items": ["null", "string", "full"]}},
15+
{"name": "k", "type": {"name": "KEnum", "type": "enum", "symbols": ["S", "Y", "M"]}},
16+
{"name": "l", "type": {"name": "LRecord", "type": "record", "fields": [{"name": "la", "type": ["null", "string"]}]}},
17+
{"name": "m", "type": "boolean"}
18+
]
19+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
{
2+
"a" : "a",
3+
"b" : "\u00FF",
4+
"c" : "\u00FF",
5+
"d" : 1000000000000000000,
6+
"e" : 1.21322421E-15,
7+
"f" : 0.1213231,
8+
"g" : 132101,
9+
"h" : null,
10+
"i" : {
11+
"some" : 1,
12+
"other" : -1
13+
},
14+
"j" : [ null, { "string": "some" } ],
15+
"k" : "Y",
16+
"l" : {
17+
"la" : null
18+
},
19+
"m" : false
20+
}

0 commit comments

Comments
 (0)