17
17
18
18
import com .fasterxml .jackson .core .JsonFactory ;
19
19
import com .fasterxml .jackson .core .JsonParser ;
20
+ import com .fasterxml .jackson .core .JsonProcessingException ;
20
21
import com .fasterxml .jackson .core .JsonToken ;
22
+ import com .fasterxml .jackson .databind .ObjectMapper ;
23
+ import com .jayway .jsonpath .JsonPath ;
24
+
21
25
import org .metafacture .framework .MetafactureException ;
22
26
import org .metafacture .framework .StreamReceiver ;
23
27
import org .metafacture .framework .helpers .DefaultObjectPipe ;
24
28
25
29
import java .io .IOException ;
30
+ import java .util .Arrays ;
31
+ import java .util .List ;
32
+ import java .util .stream .Collectors ;
26
33
27
34
/**
28
35
* Decodes a record in JSON format.
@@ -38,6 +45,8 @@ public final class JsonDecoder extends DefaultObjectPipe<String, StreamReceiver>
38
45
39
46
public static final String DEFAULT_RECORD_ID = "%d" ;
40
47
48
+ public static final String DEFAULT_ROOT_PATH = "" ;
49
+
41
50
private final JsonFactory jsonFactory = new JsonFactory ();
42
51
43
52
private JsonParser jsonParser ;
@@ -46,12 +55,15 @@ public final class JsonDecoder extends DefaultObjectPipe<String, StreamReceiver>
46
55
private String recordId ;
47
56
private int recordCount ;
48
57
58
+ private String recordPath ;
59
+
49
60
public JsonDecoder () {
50
61
super ();
51
62
52
63
setArrayMarker (DEFAULT_ARRAY_MARKER );
53
64
setArrayName (DEFAULT_ARRAY_NAME );
54
65
setRecordId (DEFAULT_RECORD_ID );
66
+ setRecordPath (DEFAULT_ROOT_PATH );
55
67
56
68
resetRecordCount ();
57
69
}
@@ -96,25 +108,45 @@ public int getRecordCount() {
96
108
return recordCount ;
97
109
}
98
110
111
+ public void setRecordPath (final String recordPath ) {
112
+ this .recordPath = recordPath ;
113
+ }
114
+
115
+ public String getRecordPath () {
116
+ return recordPath ;
117
+ }
118
+
99
119
public void resetRecordCount () {
100
120
setRecordCount (0 );
101
121
}
102
122
103
123
@ Override
104
- public void process (final String string ) {
124
+ public void process (final String json ) {
105
125
assert !isClosed ();
106
-
107
- createParser (string );
108
-
109
- try {
110
- decode ();
111
- }
112
- catch (final IOException e ) {
113
- throw new MetafactureException (e );
114
- }
115
- finally {
116
- closeParser ();
117
- }
126
+ final List <String > records = recordPath .isEmpty () ? Arrays .asList (json )
127
+ : matches (JsonPath .read (json , recordPath ));
128
+ records .forEach (record -> {
129
+ createParser (record );
130
+ try {
131
+ decode ();
132
+ } catch (final IOException e ) {
133
+ throw new MetafactureException (e );
134
+ } finally {
135
+ closeParser ();
136
+ }
137
+ });
138
+ }
139
+
140
+ private List <String > matches (Object obj ) {
141
+ final List <?> records = (obj instanceof List <?>) ? ((List <?>) obj ) : Arrays .asList (obj );
142
+ return records .stream ().map (doc -> {
143
+ try {
144
+ return new ObjectMapper ().writeValueAsString (doc );
145
+ } catch (JsonProcessingException e ) {
146
+ e .printStackTrace ();
147
+ return doc .toString ();
148
+ }
149
+ }).collect (Collectors .toList ());
118
150
}
119
151
120
152
@ Override
0 commit comments