Skip to content

Commit ab87e75

Browse files
authored
Add Debezium Money converter (#36)
add Debezium Money converter
1 parent e836707 commit ab87e75

File tree

5 files changed

+362
-0
lines changed

5 files changed

+362
-0
lines changed

build.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ targetCompatibility = JavaVersion.VERSION_11
4343
ext {
4444
kafkaVersion = "2.0.1"
4545
testcontainersVersion = "1.15.1"
46+
debeziumVersion = "1.3.0.Final"
4647
}
4748

4849
distributions {
@@ -77,13 +78,15 @@ configurations {
7778

7879
dependencies {
7980
compileOnly "org.apache.kafka:connect-api:$kafkaVersion"
81+
compileOnly "io.debezium:debezium-api:$debeziumVersion"
8082

8183
implementation "org.slf4j:slf4j-api:1.7.25"
8284

8385
testImplementation "org.junit.jupiter:junit-jupiter:5.5.1"
8486
testImplementation "org.hamcrest:hamcrest:2.1"
8587
testImplementation "org.apache.kafka:connect-api:$kafkaVersion"
8688
testImplementation "org.testcontainers:junit-jupiter:$testcontainersVersion"
89+
testImplementation "io.debezium:debezium-api:$debeziumVersion"
8790

8891
testRuntime "org.apache.logging.log4j:log4j-slf4j-impl:2.12.1"
8992
testRuntime "org.apache.logging.log4j:log4j-api:2.12.1"
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2021 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.connect.debezium.converters;
18+
19+
import java.math.BigDecimal;
20+
import java.util.Properties;
21+
22+
import org.apache.kafka.connect.data.SchemaBuilder;
23+
24+
import io.debezium.spi.converter.CustomConverter;
25+
import io.debezium.spi.converter.RelationalColumn;
26+
27+
public class MoneyConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {
28+
/**
29+
* Convert money type to a correct format.
30+
* Source: https://github.com/moyphilip/MoneyConverter
31+
*/
32+
private SchemaBuilder moneySchema;
33+
34+
@Override
35+
public void configure(final Properties props) {
36+
moneySchema = SchemaBuilder.string().name(props.getProperty("schema.name"));
37+
}
38+
39+
@Override
40+
public void converterFor(final RelationalColumn column,
41+
final CustomConverter.ConverterRegistration<SchemaBuilder> registration) {
42+
43+
if ("money".equals(column.typeName())) {
44+
registration.register(moneySchema, data -> {
45+
if (data == null) {
46+
if (column.isOptional()) {
47+
return null;
48+
} else {
49+
throw new IllegalArgumentException("Money column is not optional, but data is null");
50+
}
51+
}
52+
if (data instanceof BigDecimal) {
53+
// Expected type
54+
return String.format("%.2f", data);
55+
} else if (data instanceof Number) {
56+
return String.format("%.2f", ((Number) data).floatValue());
57+
} else {
58+
throw new IllegalArgumentException("Money type should have BigDecimal type");
59+
}
60+
61+
});
62+
}
63+
}
64+
}
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/*
2+
* Copyright 2021 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.connect.debezium.converters;
18+
19+
import java.math.BigDecimal;
20+
import java.util.Properties;
21+
22+
import org.apache.kafka.connect.data.Schema;
23+
import org.apache.kafka.connect.data.SchemaBuilder;
24+
25+
import io.aiven.kafka.connect.debezium.converters.utils.DummyRelationalColumn;
26+
import io.aiven.kafka.connect.debezium.converters.utils.MoneyTestRelationalColumn;
27+
28+
import io.debezium.spi.converter.CustomConverter;
29+
import org.junit.jupiter.api.AfterEach;
30+
import org.junit.jupiter.api.BeforeEach;
31+
import org.junit.jupiter.api.Test;
32+
33+
import static org.junit.jupiter.api.Assertions.assertEquals;
34+
import static org.junit.jupiter.api.Assertions.assertNull;
35+
import static org.junit.jupiter.api.Assertions.assertThrows;
36+
37+
public class MoneyConverterTest {
38+
39+
private MoneyConverter transform;
40+
private StubConverterRegistration registration;
41+
private Properties prop;
42+
43+
44+
@BeforeEach
45+
void init() {
46+
transform = new MoneyConverter();
47+
registration = new StubConverterRegistration();
48+
prop = new Properties();
49+
prop.setProperty("schema.name", "price");
50+
}
51+
52+
@AfterEach
53+
void teardown() {
54+
transform = null;
55+
registration = null;
56+
prop = null;
57+
}
58+
59+
@Test
60+
void shouldRegisterCorrectSchema() {
61+
transform.configure(prop);
62+
assertNull(registration.currFieldSchema);
63+
transform.converterFor(new MoneyTestRelationalColumn(), registration);
64+
65+
assertEquals(registration.currFieldSchema.schema().name(), "price");
66+
assertEquals(registration.currFieldSchema.schema().type(), Schema.Type.STRING);
67+
}
68+
69+
@Test
70+
void shouldDoNothingIfColumnIsNotMoney() {
71+
transform.configure(prop);
72+
73+
transform.converterFor(new DummyRelationalColumn(), registration);
74+
75+
assertNull(registration.currFieldSchema);
76+
assertNull(registration.currConverter);
77+
}
78+
79+
@Test
80+
void shouldFormatDataToMoneyFormat() {
81+
assertNull(registration.currConverter);
82+
transform.converterFor(new MoneyTestRelationalColumn(), registration);
83+
84+
final String result = (String) registration.currConverter.convert(BigDecimal.valueOf(103.6999));
85+
assertEquals(result, "103.70");
86+
87+
final String result2 = (String) registration.currConverter.convert((long) 103);
88+
assertEquals(result2, "103.00");
89+
}
90+
91+
@Test
92+
void shouldFailIfDataIsNotBigDecimal() {
93+
assertNull(registration.currConverter);
94+
transform.converterFor(new MoneyTestRelationalColumn(), registration);
95+
96+
final Throwable e = assertThrows(IllegalArgumentException.class,
97+
() -> registration.currConverter.convert("103.6999"));
98+
assertEquals(e.getMessage(), "Money type should have BigDecimal type");
99+
}
100+
101+
@Test
102+
void shouldFailIfDataIsMissing() {
103+
assertNull(registration.currConverter);
104+
transform.converterFor(new MoneyTestRelationalColumn(), registration);
105+
106+
final Throwable e = assertThrows(IllegalArgumentException.class,
107+
() -> registration.currConverter.convert(null));
108+
assertEquals(e.getMessage(), "Money column is not optional, but data is null");
109+
}
110+
111+
@Test
112+
void shouldDoNothingIfColumnIsOptional() {
113+
transform.configure(prop);
114+
final MoneyTestRelationalColumn moneyColumn = new MoneyTestRelationalColumn();
115+
moneyColumn.isOptional = true;
116+
117+
transform.converterFor(moneyColumn, registration);
118+
119+
final String result = (String) registration.currConverter.convert(null);
120+
assertNull(result);
121+
}
122+
123+
class StubConverterRegistration implements CustomConverter.ConverterRegistration<SchemaBuilder> {
124+
SchemaBuilder currFieldSchema;
125+
CustomConverter.Converter currConverter;
126+
127+
@Override
128+
public void register(final SchemaBuilder fieldSchema,
129+
final CustomConverter.Converter converter) {
130+
currFieldSchema = fieldSchema;
131+
currConverter = converter;
132+
}
133+
}
134+
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Copyright 2021 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.connect.debezium.converters.utils;
18+
19+
import java.util.OptionalInt;
20+
21+
import io.debezium.spi.converter.RelationalColumn;
22+
23+
24+
public class DummyRelationalColumn implements RelationalColumn {
25+
@Override
26+
public int jdbcType() {
27+
return 0;
28+
}
29+
30+
@Override
31+
public int nativeType() {
32+
return 0;
33+
}
34+
35+
@Override
36+
public String typeName() {
37+
return "dummy";
38+
}
39+
40+
@Override
41+
public String typeExpression() {
42+
return null;
43+
}
44+
45+
@Override
46+
public OptionalInt length() {
47+
return null;
48+
}
49+
50+
@Override
51+
public OptionalInt scale() {
52+
return null;
53+
}
54+
55+
@Override
56+
public boolean isOptional() {
57+
return false;
58+
}
59+
60+
@Override
61+
public Object defaultValue() {
62+
return null;
63+
}
64+
65+
@Override
66+
public boolean hasDefaultValue() {
67+
return false;
68+
}
69+
70+
@Override
71+
public String name() {
72+
return null;
73+
}
74+
75+
@Override
76+
public String dataCollection() {
77+
return null;
78+
}
79+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright 2021 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.connect.debezium.converters.utils;
18+
19+
import java.util.OptionalInt;
20+
21+
import io.debezium.spi.converter.RelationalColumn;
22+
23+
24+
public class MoneyTestRelationalColumn implements RelationalColumn {
25+
26+
public boolean isOptional = false;
27+
28+
@Override
29+
public int jdbcType() {
30+
return 0;
31+
}
32+
33+
@Override
34+
public int nativeType() {
35+
return 0;
36+
}
37+
38+
@Override
39+
public String typeName() {
40+
return "money";
41+
}
42+
43+
@Override
44+
public String typeExpression() {
45+
return null;
46+
}
47+
48+
@Override
49+
public OptionalInt length() {
50+
return null;
51+
}
52+
53+
@Override
54+
public OptionalInt scale() {
55+
return null;
56+
}
57+
58+
@Override
59+
public boolean isOptional() {
60+
return isOptional;
61+
}
62+
63+
@Override
64+
public Object defaultValue() {
65+
return null;
66+
}
67+
68+
@Override
69+
public boolean hasDefaultValue() {
70+
return false;
71+
}
72+
73+
@Override
74+
public String name() {
75+
return null;
76+
}
77+
78+
@Override
79+
public String dataCollection() {
80+
return null;
81+
}
82+
}

0 commit comments

Comments
 (0)