@@ -50,11 +50,15 @@ void nullSchema(final boolean skipMissingOrNull) {
5050
5151 @ParameterizedTest
5252 @ValueSource(booleans = { true, false })
53- void noFieldName_NonString(final boolean skipMissingOrNull) {
54- final SinkRecord originalRecord = record(SchemaBuilder.INT8_SCHEMA, "some");
53+ void noFieldName_UnsupportedType(final boolean skipMissingOrNull) {
54+ final Schema schema = SchemaBuilder.struct().build();
55+ final SinkRecord originalRecord = record(schema, new Struct(schema));
5556 final Throwable e = assertThrows(DataException.class,
5657 () -> transformation(null, skipMissingOrNull).apply(originalRecord));
57- assertEquals(dataPlace() + " schema type must be STRING if field name is not specified: "
58+ assertEquals(dataPlace()
59+ + " schema type must be "
60+ + "[INT8, INT16, INT32, INT64, FLOAT32, FLOAT64, BOOLEAN, STRING]"
61+ + " if field name is not specified: "
5862 + originalRecord,
5963 e.getMessage());
6064 }
@@ -80,11 +84,27 @@ void noFieldName_NullOrEmptyValue_Skip(final String value) {
8084 }
8185
8286 @Test
83- void noFieldName_NormalValue() {
87+ void noFieldName_NormalInt64Value() {
88+ final Schema schema = SchemaBuilder.INT64_SCHEMA;
89+ final SinkRecord originalRecord = record(schema, 123L);
90+ final SinkRecord result = transformation(null, false).apply(originalRecord);
91+ assertEquals(setNewTopic(originalRecord, "123"), result);
92+ }
93+
94+ @Test
95+ void noFieldName_NormalBooleanValue() {
96+ final Schema schema = SchemaBuilder.BOOLEAN_SCHEMA;
97+ final SinkRecord originalRecord = record(schema, false);
98+ final SinkRecord result = transformation(null, false).apply(originalRecord);
99+ assertEquals(setNewTopic(originalRecord, "false"), result);
100+ }
101+
102+ @Test
103+ void noFieldName_NormalStringValue() {
84104 final Schema schema = SchemaBuilder.STRING_SCHEMA;
85105 final SinkRecord originalRecord = record(schema, NEW_TOPIC);
86106 final SinkRecord result = transformation(null, false).apply(originalRecord);
87- assertEquals(setNewTopic(originalRecord), result);
107+ assertEquals(setNewTopic(originalRecord, NEW_TOPIC ), result);
88108 }
89109
90110 @ParameterizedTest
@@ -113,15 +133,18 @@ void fieldName_NullStruct(final boolean skipMissingOrNull) {
113133
114134 @ParameterizedTest
115135 @ValueSource(booleans = { true, false })
116- void fieldName_NonStringInField(final boolean skipMissingOrNull) {
136+ void fieldName_UnsupportedTypeInField(final boolean skipMissingOrNull) {
137+ final Schema innerSchema = SchemaBuilder.struct().build();
117138 final Schema schema = SchemaBuilder.struct()
118- .field(FIELD, SchemaBuilder.INT8_SCHEMA )
139+ .field(FIELD, innerSchema )
119140 .schema();
120141 final SinkRecord originalRecord = record(
121- schema, new Struct(schema).put(FIELD, (byte) 0 ));
142+ schema, new Struct(schema).put(FIELD, new Struct(innerSchema) ));
122143 final Throwable e = assertThrows(DataException.class,
123144 () -> transformation(FIELD, skipMissingOrNull).apply(originalRecord));
124- assertEquals(FIELD + " schema type in " + dataPlace() + " must be STRING: " + originalRecord,
145+ assertEquals(FIELD + " schema type in " + dataPlace() + " must be "
146+ + "[INT8, INT16, INT32, INT64, FLOAT32, FLOAT64, BOOLEAN, STRING]"
147+ + ": " + originalRecord,
125148 e.getMessage());
126149 }
127150
@@ -178,13 +201,33 @@ void fieldName_MissingFieldInSchema_Skip() {
178201 }
179202
180203 @Test
181- void fieldName_NormalValue() {
204+ void fieldName_NormalIntValue() {
205+ final Schema schema = SchemaBuilder.struct()
206+ .field(FIELD, SchemaBuilder.INT64_SCHEMA)
207+ .schema();
208+ final SinkRecord originalRecord = record(schema, new Struct(schema).put(FIELD, 123L));
209+ final SinkRecord result = transformation(FIELD, true).apply(originalRecord);
210+ assertEquals(setNewTopic(originalRecord, "123"), result);
211+ }
212+
213+ @Test
214+ void fieldName_NormalBooleanValue() {
215+ final Schema schema = SchemaBuilder.struct()
216+ .field(FIELD, SchemaBuilder.BOOLEAN_SCHEMA)
217+ .schema();
218+ final SinkRecord originalRecord = record(schema, new Struct(schema).put(FIELD, false));
219+ final SinkRecord result = transformation(FIELD, true).apply(originalRecord);
220+ assertEquals(setNewTopic(originalRecord, "false"), result);
221+ }
222+
223+ @Test
224+ void fieldName_NormalStringValue() {
182225 final Schema schema = SchemaBuilder.struct()
183226 .field(FIELD, SchemaBuilder.STRING_SCHEMA)
184227 .schema();
185228 final SinkRecord originalRecord = record(schema, new Struct(schema).put(FIELD, NEW_TOPIC));
186229 final SinkRecord result = transformation(FIELD, true).apply(originalRecord);
187- assertEquals(setNewTopic(originalRecord), result);
230+ assertEquals(setNewTopic(originalRecord, NEW_TOPIC ), result);
188231 }
189232
190233 private ExtractTopic<SinkRecord> transformation(final String fieldName, final boolean skipMissingOrNull) {
@@ -215,8 +258,8 @@ protected SinkRecord record(final Schema keySchema,
215258 456L, TimestampType.CREATE_TIME);
216259 }
217260
218- private SinkRecord setNewTopic(final SinkRecord record) {
219- return record.newRecord(NEW_TOPIC ,
261+ private SinkRecord setNewTopic(final SinkRecord record, final String newTopic ) {
262+ return record.newRecord(newTopic ,
220263 record.kafkaPartition(),
221264 record.keySchema(),
222265 record.key(),
0 commit comments