@@ -51,12 +51,12 @@ public void configure(Map<String, ?> configs) {
5151
5252 @ Override
5353 public R apply (R record ) {
54- if (record . valueSchema ( ) == null ) {
54+ if (operatingSchema ( record ) == null ) {
5555 LOGGER .info ("Schemaless records are not supported" );
5656 return null ;
5757 }
5858
59- Object recordValue = record . value ( );
59+ Object recordValue = operatingValue ( record );
6060 if (recordValue == null ) {
6161 LOGGER .info ("Record is null" );
6262 LOGGER .info (record .toString ());
@@ -94,20 +94,26 @@ private static HashMap<String, String> stringifyFields(Struct value, List<String
9494 String [] pathArr = field .split ("\\ ." );
9595 List <String > path = Arrays .asList (pathArr );
9696
97- if (valueSchema .field (field ) == null ) {
98- LOGGER .warn ("target field {} not present in the record schema" , field );
97+ Object fieldSchema = getSchemaField (path , valueSchema );
98+ if (fieldSchema == null ) {
99+ LOGGER .warn ("target field {} is not present in the record schema {}" , field , fieldSchema );
99100 continue ;
100101 }
101102
102103 Object fieldValue = getFieldValue (path , value );
103104 if (fieldValue == null ) {
104- LOGGER . info ( "target field {} is null, nothing to stringify" , field );
105+ result . put ( field , "null" );
105106 continue ;
106107 }
107108
108- String strValue ;
109- Schema .Type fieldValueType = Values .inferSchema (fieldValue ).type ();
109+ Schema fieldValueSchema = Values .inferSchema (fieldValue );
110+ if (fieldValueSchema == null ) {
111+ result .put (field , fieldValue .toString ());
112+ continue ;
113+ }
114+ Schema .Type fieldValueType = fieldValueSchema .type ();
110115
116+ String strValue ;
111117 if (fieldValueType .equals (Schema .Type .STRUCT )) {
112118 strValue = structToJSONObject ((Struct ) fieldValue ).toString ();
113119
@@ -123,6 +129,7 @@ private static HashMap<String, String> stringifyFields(Struct value, List<String
123129 } else {
124130 strValue = String .valueOf (fieldValue );
125131 }
132+
126133 result .put (field , strValue );
127134 }
128135
@@ -137,6 +144,10 @@ private static HashMap<String, String> stringifyFields(Struct value, List<String
137144 * @return New schema for output record.
138145 */
139146 private Schema makeUpdatedSchema (String parentKey , Struct value , HashMap <String , String > stringifiedFields ) {
147+ if (value == null || value .schema () == null ) {
148+ return null ;
149+ }
150+
140151 final SchemaBuilder builder = SchemaBuilder .struct ();
141152
142153 for (Field field : value .schema ().fields ()) {
@@ -145,8 +156,10 @@ private Schema makeUpdatedSchema(String parentKey, Struct value, HashMap<String,
145156
146157 if (stringifiedFields .containsKey (absoluteKey )) {
147158 fieldSchema = field .schema ().isOptional () ? Schema .OPTIONAL_STRING_SCHEMA : Schema .STRING_SCHEMA ;
159+
148160 } else if (field .schema ().type ().equals (Schema .Type .STRUCT )) {
149161 fieldSchema = makeUpdatedSchema (absoluteKey , value .getStruct (field .name ()), stringifiedFields );
162+
150163 } else {
151164 fieldSchema = field .schema ();
152165 }
@@ -171,6 +184,7 @@ private Struct makeUpdatedValue(String parentKey, Struct value, Schema updatedSc
171184 for (Field field : value .schema ().fields ()) {
172185 final Object fieldValue ;
173186 final String absoluteKey = joinKeys (parentKey , field .name ());
187+
174188 if (stringifiedFields .containsKey (absoluteKey )) {
175189 fieldValue = stringifiedFields .get (absoluteKey );
176190 } else if (field .schema ().type ().equals (Schema .Type .STRUCT )) {
@@ -179,6 +193,7 @@ private Struct makeUpdatedValue(String parentKey, Struct value, Schema updatedSc
179193 } else {
180194 fieldValue = value .get (field .name ());
181195 }
196+
182197 updatedValue .put (field .name (), fieldValue );
183198 }
184199
@@ -187,7 +202,10 @@ private Struct makeUpdatedValue(String parentKey, Struct value, Schema updatedSc
187202
188203 @ SuppressWarnings ("unchecked" )
189204 public static String arrayValueToString (List <Object > value ) {
190- if (value == null || value .size () == 0 ) {
205+ if (value == null ) {
206+ return "null" ;
207+ }
208+ if (value .size () == 0 ) {
191209 return "[]" ;
192210 }
193211
@@ -197,9 +215,13 @@ public static String arrayValueToString(List<Object> value) {
197215 if (builder .toString ().length () != 0 ) {
198216 builder .append (", " );
199217 }
218+ if (elem == null ) {
219+ builder .append ("null" );
220+ continue ;
221+ }
200222 Schema valueSchema = Values .inferSchema (elem );
201223 if (valueSchema == null ) {
202- builder .append ("null" );
224+ builder .append (elem . toString () );
203225 continue ;
204226 }
205227 Schema .Type valueType = valueSchema .type ();
@@ -346,6 +368,16 @@ private static JSONArray listToJSONArray(List<Object> value) {
346368 return result ;
347369 }
348370
371+ private static Object getSchemaField (List <String > path , Schema schema ) {
372+ if (path .isEmpty ()) {
373+ return null ;
374+ } else if (path .size () == 1 ) {
375+ return schema .field (path .get (0 ));
376+ } else {
377+ return getSchemaField (path .subList (1 , path .size ()), schema .field (path .get (0 )).schema ());
378+ }
379+ }
380+
349381 private static Object getFieldValue (List <String > path , Struct value ) {
350382 if (path .isEmpty ()) {
351383 return null ;
0 commit comments