@@ -259,80 +259,4 @@ private GenericRecord generateRecord() throws ConnectException {
259
259
@ Override
260
260
public void stop () {
261
261
}
262
-
263
- private Schema getOptionalSchema (
264
- final Schema schema
265
- ) {
266
- switch (schema .type ()) {
267
- case BOOLEAN :
268
- return Schema .OPTIONAL_BOOLEAN_SCHEMA ;
269
- case INT32 :
270
- return Schema .OPTIONAL_INT32_SCHEMA ;
271
- case INT64 :
272
- return Schema .OPTIONAL_INT64_SCHEMA ;
273
- case FLOAT64 :
274
- return Schema .OPTIONAL_FLOAT64_SCHEMA ;
275
- case STRING :
276
- return Schema .OPTIONAL_STRING_SCHEMA ;
277
- case ARRAY :
278
- return SchemaBuilder .array (getOptionalSchema (schema .valueSchema ())).optional ().build ();
279
- case MAP :
280
- return SchemaBuilder .map (
281
- getOptionalSchema (schema .keySchema ()),
282
- getOptionalSchema (schema .valueSchema ())
283
- ).optional ().build ();
284
- case STRUCT :
285
- final SchemaBuilder schemaBuilder = SchemaBuilder .struct ();
286
- for (Field field : schema .fields ()) {
287
- schemaBuilder .field (
288
- field .name (),
289
- getOptionalSchema (field .schema ())
290
- );
291
- }
292
- return schemaBuilder .optional ().build ();
293
- default :
294
- throw new ConnectException ("Unsupported type: " + schema );
295
- }
296
- }
297
-
298
- private Object getOptionalValue (
299
- final Schema schema ,
300
- final Object value
301
- ) {
302
- switch (schema .type ()) {
303
- case BOOLEAN :
304
- case INT32 :
305
- case INT64 :
306
- case FLOAT64 :
307
- case STRING :
308
- return value ;
309
- case ARRAY :
310
- final List <?> list = (List <?>) value ;
311
- return list .stream ()
312
- .map (listItem -> getOptionalValue (schema .valueSchema (), listItem ))
313
- .collect (Collectors .toList ());
314
- case MAP :
315
- final Map <?, ?> map = (Map <?, ?>) value ;
316
- return map .entrySet ().stream ()
317
- .collect (Collectors .toMap (
318
- k -> getOptionalValue (schema .keySchema (), k ),
319
- v -> getOptionalValue (schema .valueSchema (), v )
320
- ));
321
- case STRUCT :
322
- final Struct struct = (Struct ) value ;
323
- final Struct optionalStruct = new Struct (getOptionalSchema (schema ));
324
- for (Field field : schema .fields ()) {
325
- optionalStruct .put (
326
- field .name (),
327
- getOptionalValue (
328
- field .schema (),
329
- struct .get (field .name ())
330
- )
331
- );
332
- }
333
- return optionalStruct ;
334
- default :
335
- throw new ConnectException ("Invalid value schema: " + schema + ", value = " + value );
336
- }
337
- }
338
262
}
0 commit comments