13
13
* See the License for the specific language governing permissions and
14
14
* limitations under the License.
15
15
*/
16
+
16
17
package org .metafacture .elasticsearch ;
17
18
19
+ import org .metafacture .framework .FluxCommand ;
20
+ import org .metafacture .framework .ObjectReceiver ;
21
+ import org .metafacture .framework .annotations .In ;
22
+ import org .metafacture .framework .annotations .Out ;
23
+ import org .metafacture .framework .helpers .DefaultObjectPipe ;
24
+
25
+ import com .fasterxml .jackson .databind .ObjectMapper ;
26
+
18
27
import java .io .IOException ;
19
28
import java .io .StringWriter ;
20
29
import java .util .Arrays ;
24
33
import java .util .Set ;
25
34
import java .util .regex .Pattern ;
26
35
27
- import org .metafacture .framework .FluxCommand ;
28
- import org .metafacture .framework .ObjectReceiver ;
29
- import org .metafacture .framework .annotations .In ;
30
- import org .metafacture .framework .annotations .Out ;
31
- import org .metafacture .framework .helpers .DefaultObjectPipe ;
32
-
33
- import com .fasterxml .jackson .databind .ObjectMapper ;
34
-
35
36
/**
36
37
* Add Elasticsearch bulk indexing metadata to JSON input.
37
38
*
42
43
@ In (String .class )
43
44
@ Out (String .class )
44
45
@ FluxCommand ("json-to-elasticsearch-bulk" )
45
- public class JsonToElasticsearchBulk extends
46
- DefaultObjectPipe <String , ObjectReceiver <String >> {
47
-
48
- /**
49
- * Use a MultiMap with Jackson to collect values from multiple fields with
50
- * identical names under a single key.
51
- */
52
- static class MultiMap extends HashMap <String , Object > {
53
- private static final long serialVersionUID = 490682490432334605L ;
54
-
55
- MultiMap () {
56
- // default constructor for Jackson
57
- }
58
-
59
- @ Override
60
- public Object put (String key , Object value ) {
61
- if (containsKey (key )) {
62
- Object oldValue = get (key );
63
- if (oldValue instanceof Set ) {
64
- @ SuppressWarnings ("unchecked" )
65
- Set <Object > vals = ((Set <Object >) oldValue );
66
- vals .add (value );
67
- return super .put (key , vals );
68
- }
69
- HashSet <Object > set = new HashSet <>(Arrays .asList (oldValue , value ));
70
- return super .put (key , set .size () == 1 ? value : set );
71
- }
72
- return super .put (key , value );
73
- }
74
- }
46
+ public class JsonToElasticsearchBulk extends DefaultObjectPipe <String , ObjectReceiver <String >> {
75
47
76
48
private ObjectMapper mapper = new ObjectMapper ();
77
49
private String [] idPath ;
78
50
private String type ;
79
51
private String index ;
80
52
81
- public void setIdKey (String idKey ) {
82
- this .idPath = new String []{idKey };
83
- }
84
-
85
- public void setType (String type ) {
86
- this .type = type ;
87
- }
88
-
89
- public void setIndex (String index ) {
90
- this .index = index ;
91
- }
92
-
93
53
public JsonToElasticsearchBulk () {
94
- super ();
95
54
this .idPath = new String []{};
96
55
this .type = null ;
97
56
this .index = null ;
@@ -103,16 +62,16 @@ public JsonToElasticsearchBulk() {
103
62
* @param type The Elasticsearch index type
104
63
* @param index The Elasticsearch index name
105
64
*/
106
- public JsonToElasticsearchBulk (String type , String index ) {
107
- this (new String [] { }, type , index );
65
+ public JsonToElasticsearchBulk (final String type , final String index ) {
66
+ this (new String [] {}, type , index );
108
67
}
109
68
110
69
/**
111
70
* @param idPath The key path of the JSON value to be used as the ID for the record
112
71
* @param type The Elasticsearch index type
113
72
* @param index The Elasticsearch index name
114
73
*/
115
- public JsonToElasticsearchBulk (String [] idPath , String type , String index ) {
74
+ public JsonToElasticsearchBulk (final String [] idPath , final String type , final String index ) {
116
75
this .idPath = idPath ;
117
76
this .type = type ;
118
77
this .index = index ;
@@ -123,7 +82,7 @@ public JsonToElasticsearchBulk(String[] idPath, String type, String index) {
123
82
* @param type The Elasticsearch index type
124
83
* @param index The Elasticsearch index name
125
84
*/
126
- public JsonToElasticsearchBulk (String idKey , String type , String index ) {
85
+ public JsonToElasticsearchBulk (final String idKey , final String type , final String index ) {
127
86
this (new String []{idKey }, type , index );
128
87
}
129
88
@@ -133,42 +92,96 @@ public JsonToElasticsearchBulk(String idKey, String type, String index) {
133
92
* @param index The Elasticsearch index name
134
93
* @param entitySeparator The separator between entity names in idKey
135
94
*/
136
- public JsonToElasticsearchBulk (String idKey , String type , String index , String entitySeparator ) {
95
+ public JsonToElasticsearchBulk (final String idKey , final String type , final String index , final String entitySeparator ) {
137
96
this (idKey .split (Pattern .quote (entitySeparator )), type , index );
138
97
}
139
98
99
+ public void setIdKey (final String idKey ) {
100
+ this .idPath = new String []{idKey };
101
+ }
102
+
103
+ public void setType (final String type ) {
104
+ this .type = type ;
105
+ }
106
+
107
+ public void setIndex (final String index ) {
108
+ this .index = index ;
109
+ }
110
+
140
111
@ Override
141
- public void process (String obj ) {
142
- StringWriter stringWriter = new StringWriter ();
112
+ public void process (final String obj ) {
113
+ final StringWriter stringWriter = new StringWriter ();
143
114
try {
144
- Map <String , Object > json = mapper .readValue (obj , MultiMap .class );
145
- Map <String , Object > detailsMap = new HashMap <String , Object >();
146
- Map <String , Object > indexMap = new HashMap <String , Object >();
115
+ final Map <String , Object > json = mapper .readValue (obj , MultiMap .class );
116
+ final Map <String , Object > detailsMap = new HashMap <String , Object >();
117
+ final Map <String , Object > indexMap = new HashMap <String , Object >();
147
118
indexMap .put ("index" , detailsMap );
148
- if (idPath .length > 0 ) detailsMap .put ("_id" , findId (json ));
119
+ if (idPath .length > 0 ) {
120
+ detailsMap .put ("_id" , findId (json ));
121
+ }
149
122
detailsMap .put ("_type" , type );
150
123
detailsMap .put ("_index" , index );
151
124
mapper .writeValue (stringWriter , indexMap );
152
125
stringWriter .write ("\n " );
153
126
mapper .writeValue (stringWriter , json );
154
- } catch (IOException e ) {
127
+ }
128
+ catch (final IOException e ) {
155
129
e .printStackTrace ();
156
130
}
157
131
getReceiver ().process (stringWriter .toString ());
158
132
}
159
133
160
- private Object findId (Object value ) {
134
+ private Object findId (final Object value ) {
135
+ Object newValue = value ;
136
+
161
137
for (final String key : idPath ) {
162
- if (value instanceof Map ) {
138
+ if (newValue instanceof Map ) {
163
139
@ SuppressWarnings ("unchecked" )
164
- final Map <String , Object > nestedMap = (Map <String , Object >) value ;
165
- value = nestedMap .get (key );
140
+ final Map <String , Object > nestedMap = (Map <String , Object >) newValue ;
141
+ newValue = nestedMap .get (key );
166
142
}
167
143
else {
168
144
return null ;
169
145
}
170
146
}
171
147
172
- return value ;
148
+ return newValue ;
149
+ }
150
+
151
+ /**
152
+ * Use a MultiMap with Jackson to collect values from multiple fields with
153
+ * identical names under a single key.
154
+ */
155
+ static class MultiMap extends HashMap <String , Object > { // checkstyle-disable-line IllegalType
156
+ private static final long serialVersionUID = 490682490432334605L ;
157
+
158
+ MultiMap () {
159
+ // default constructor for Jackson
160
+ }
161
+
162
+ @ Override
163
+ public Object put (final String key , final Object value ) {
164
+ final Object newValue ;
165
+
166
+ if (containsKey (key )) {
167
+ final Object oldValue = get (key );
168
+ if (oldValue instanceof Set ) {
169
+ @ SuppressWarnings ("unchecked" )
170
+ final Set <Object > vals = (Set <Object >) oldValue ;
171
+ vals .add (value );
172
+ newValue = vals ;
173
+ }
174
+ else {
175
+ final Set <Object > set = new HashSet <>(Arrays .asList (oldValue , value ));
176
+ newValue = set .size () == 1 ? value : set ;
177
+ }
178
+ }
179
+ else {
180
+ newValue = value ;
181
+ }
182
+
183
+ return super .put (key , newValue );
184
+ }
173
185
}
186
+
174
187
}
0 commit comments